diff --git a/awx/main/tasks.py b/awx/main/tasks.py index dc58c14622..10a9489d0f 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -655,6 +655,7 @@ def awx_receptor_workunit_reaper(): receptor_ctl = get_receptor_ctl() receptor_work_list = receptor_ctl.simple_command("work list") + # Release work units for inactive jobs unit_ids = [id for id in receptor_work_list] jobs_with_unreleased_receptor_units = UnifiedJob.objects.filter(work_unit_id__in=unit_ids).exclude(status__in=ACTIVE_STATES) for job in jobs_with_unreleased_receptor_units: @@ -666,6 +667,7 @@ def awx_receptor_workunit_reaper(): if 'unknown work unit' not in str(exc): logger.exception(f'Unexpected error releasing work unit {job.work_unit_id}') + # Cancel jobs missing active work units active_unit_ids = [id for (id, data) in receptor_work_list.items() if data.get('StateName') in RECEPTOR_ACTIVE_STATES] active_unit_ids.append('') # exclude jobs that have not yet started a receptor work unit diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 15aeb86504..b17e3b3a00 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -1990,3 +1990,51 @@ def test_project_update_no_ee(): task.build_env(job, {}) assert 'The project could not sync because there is no Execution Environment' in str(e.value) + + +@mock.patch('awx.main.tasks.administrative_workunit_reaper') +@mock.patch('awx.main.tasks.UnifiedJob.objects.filter') +@mock.patch('awx.main.tasks.get_receptor_ctl') +def test_awx_receptor_workunit_reaper_cancels_work_units_missing_active_jobs( + mock_get_receptor_ctl, mock_uj_objects, mock_administrative_workunit_reaper, mocker +): + mock_receptor_ctl = mocker.MagicMock() + mock_receptor_ctl.simple_command.return_value = { + '4PiSBdGz': {'Detail': 'Running: PID: 134', 'ExtraData': None, 'State': 1, 'StateName': 'Running', 'StdoutSize': 106, 'WorkType': 'ansible-runner'} + } + mock_get_receptor_ctl.return_value = mock_receptor_ctl + + filtered_objects = mocker.MagicMock() + mock_uj_objects.return_value = filtered_objects + + mock_finished_job = mocker.MagicMock(spec=UnifiedJob) + filtered_objects.exclude.side_effect = [[mock_finished_job], []] + + tasks.awx_receptor_workunit_reaper() + assert mock_receptor_ctl.simple_command.called_with('work cancel 4PiSBdGz') + assert mock_receptor_ctl.simple_command.called_with('work release 4PiSBdGz') + + assert mock_administrative_workunit_reaper.called + + +@mock.patch('awx.main.tasks.administrative_workunit_reaper') +@mock.patch('awx.main.tasks.UnifiedJob.objects.filter') +@mock.patch('awx.main.tasks.get_receptor_ctl') +def test_awx_receptor_workunit_reaper_reaps_jobs_missing_work_units(mock_get_receptor_ctl, mock_uj_objects, mock_administrative_workunit_reaper, mocker): + mock_receptor_ctl = mocker.MagicMock() + mock_receptor_ctl.simple_command.return_value = {} + mock_get_receptor_ctl.return_value = mock_receptor_ctl + + filtered_objects = mocker.MagicMock() + mock_uj_objects.return_value = filtered_objects + + mock_job = mocker.MagicMock(spec=UnifiedJob) + filtered_objects.exclude.side_effect = [[], [mock_job]] + + tasks.awx_receptor_workunit_reaper() + assert mock_job.cancel_flag == True + assert mock_job.status == 'error' + assert mock_job.job_explanation == 'Canceled by reaper because receptor work unit could not be found for this job' + assert mock_job.save.called_once_with(update_fields=['cancel_flag', 'status', 'job_explanation']) + + assert mock_administrative_workunit_reaper.called