diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 10a9489d0f..90b5672241 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -108,15 +108,7 @@ from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja from awx.main.utils.reload import stop_local_services from awx.main.utils.pglock import advisory_lock from awx.main.utils.handlers import SpecialInventoryHandler -from awx.main.utils.receptor import ( - get_receptor_ctl, - worker_info, - get_conn_type, - get_tls_client, - worker_cleanup, - administrative_workunit_reaper, - RECEPTOR_ACTIVE_STATES, -) +from awx.main.utils.receptor import get_receptor_ctl, worker_info, get_conn_type, get_tls_client, worker_cleanup, administrative_workunit_reaper from awx.main.consumers import emit_channel_notification from awx.main import analytics from awx.conf import settings_registry @@ -655,37 +647,12 @@ def awx_receptor_workunit_reaper(): receptor_ctl = get_receptor_ctl() receptor_work_list = receptor_ctl.simple_command("work list") - # Release work units for inactive jobs unit_ids = [id for id in receptor_work_list] jobs_with_unreleased_receptor_units = UnifiedJob.objects.filter(work_unit_id__in=unit_ids).exclude(status__in=ACTIVE_STATES) for job in jobs_with_unreleased_receptor_units: logger.debug(f"{job.log_format} is not active, reaping receptor work unit {job.work_unit_id}") - try: - receptor_ctl.simple_command(f"work cancel {job.work_unit_id}") - receptor_ctl.simple_command(f"work release {job.work_unit_id}") - except RuntimeError as exc: - if 'unknown work unit' not in str(exc): - logger.exception(f'Unexpected error releasing work unit {job.work_unit_id}') - - # Cancel jobs missing active work units - active_unit_ids = [id for (id, data) in receptor_work_list.items() if data.get('StateName') in RECEPTOR_ACTIVE_STATES] - active_unit_ids.append('') # exclude jobs that have not yet started a receptor work unit - - jobs_without_active_work_unit = UnifiedJob.objects.filter(status__in=ACTIVE_STATES, controller_node=settings.CLUSTER_HOST_ID).exclude( - work_unit_id__in=active_unit_ids - ) - - for job in jobs_without_active_work_unit: - job.cancel_flag = True - job.status = 'error' - if job.work_unit_id in receptor_work_list: - receptor_status = receptor_work_list[job.work_unit_id]['StateName'] - logger.warn(f'Canceling {job.log_format} because of inactive work unit, data:\n{receptor_work_list[job.work_unit_id]}') - job.job_explanation = f'Canceled by reaper because receptor work unit reported a status of {receptor_status}' - else: - logger.warn(f'Canceling {job.log_format} because of missing work unit {job.work_unit_id}') - job.job_explanation = f'Canceled by reaper because receptor work unit could not be found for this job' - job.save(update_fields=['cancel_flag', 'status', 'job_explanation']) + receptor_ctl.simple_command(f"work cancel {job.work_unit_id}") + receptor_ctl.simple_command(f"work release {job.work_unit_id}") administrative_workunit_reaper(receptor_work_list) @@ -3104,10 +3071,7 @@ class AWXReceptorJob: finally: # Make sure to always release the work unit if we established it if self.unit_id is not None and settings.RECEPTOR_RELEASE_WORK: - try: - receptor_ctl.simple_command(f"work release {self.unit_id}") - except RuntimeError: - logger.exception(f"Unable to release work item {self.unit_id} from {self.task.instance.log_format}") + receptor_ctl.simple_command(f"work release {self.unit_id}") @property def sign_work(self): @@ -3164,10 +3128,10 @@ class AWXReceptorJob: unit_status = receptor_ctl.simple_command(f'work status {self.unit_id}') detail = unit_status.get('Detail', None) state_name = unit_status.get('StateName', None) - except RuntimeError: + except RuntimeError as e: detail = '' state_name = '' - logger.exception(f'Unable to retrieve work status for {self.unit_id}') + logger.exception(e) if 'exceeded quota' in detail: logger.warn(detail) @@ -3175,26 +3139,20 @@ class AWXReceptorJob: logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.") self.task.update_model(self.task.instance.pk, status='pending') return - # If ansible-runner ran, but an error occured at runtime, the traceback information # is saved via the status_handler passed in to the processor. if state_name == 'Succeeded': return res - if not (self.task.instance.result_traceback or self.task.instance.job_explanation): + if not self.task.instance.result_traceback: try: resultsock = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True) lines = resultsock.readlines() self.task.instance.result_traceback = b"".join(lines).decode() self.task.instance.save(update_fields=['result_traceback']) except Exception: - resultsock.shutdown(socket.SHUT_RDWR) - resultfile.close() raise RuntimeError(detail) - resultsock.shutdown(socket.SHUT_RDWR) - resultfile.close() - time.sleep(3) return res @@ -3265,11 +3223,8 @@ class AWXReceptorJob: return processor_future.result() if self.task.cancel_callback(): - status = self.task.instance.status - if status == 'running': - status = 'canceled' result = namedtuple('result', ['status', 'rc']) - return result(status, 1) + return result('canceled', 1) time.sleep(1) diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index b17e3b3a00..15aeb86504 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -1990,51 +1990,3 @@ def test_project_update_no_ee(): task.build_env(job, {}) assert 'The project could not sync because there is no Execution Environment' in str(e.value) - - -@mock.patch('awx.main.tasks.administrative_workunit_reaper') -@mock.patch('awx.main.tasks.UnifiedJob.objects.filter') -@mock.patch('awx.main.tasks.get_receptor_ctl') -def test_awx_receptor_workunit_reaper_cancels_work_units_missing_active_jobs( - mock_get_receptor_ctl, mock_uj_objects, mock_administrative_workunit_reaper, mocker -): - mock_receptor_ctl = mocker.MagicMock() - mock_receptor_ctl.simple_command.return_value = { - '4PiSBdGz': {'Detail': 'Running: PID: 134', 'ExtraData': None, 'State': 1, 'StateName': 'Running', 'StdoutSize': 106, 'WorkType': 'ansible-runner'} - } - mock_get_receptor_ctl.return_value = mock_receptor_ctl - - filtered_objects = mocker.MagicMock() - mock_uj_objects.return_value = filtered_objects - - mock_finished_job = mocker.MagicMock(spec=UnifiedJob) - filtered_objects.exclude.side_effect = [[mock_finished_job], []] - - tasks.awx_receptor_workunit_reaper() - assert mock_receptor_ctl.simple_command.called_with('work cancel 4PiSBdGz') - assert mock_receptor_ctl.simple_command.called_with('work release 4PiSBdGz') - - assert mock_administrative_workunit_reaper.called - - -@mock.patch('awx.main.tasks.administrative_workunit_reaper') -@mock.patch('awx.main.tasks.UnifiedJob.objects.filter') -@mock.patch('awx.main.tasks.get_receptor_ctl') -def test_awx_receptor_workunit_reaper_reaps_jobs_missing_work_units(mock_get_receptor_ctl, mock_uj_objects, mock_administrative_workunit_reaper, mocker): - mock_receptor_ctl = mocker.MagicMock() - mock_receptor_ctl.simple_command.return_value = {} - mock_get_receptor_ctl.return_value = mock_receptor_ctl - - filtered_objects = mocker.MagicMock() - mock_uj_objects.return_value = filtered_objects - - mock_job = mocker.MagicMock(spec=UnifiedJob) - filtered_objects.exclude.side_effect = [[], [mock_job]] - - tasks.awx_receptor_workunit_reaper() - assert mock_job.cancel_flag == True - assert mock_job.status == 'error' - assert mock_job.job_explanation == 'Canceled by reaper because receptor work unit could not be found for this job' - assert mock_job.save.called_once_with(update_fields=['cancel_flag', 'status', 'job_explanation']) - - assert mock_administrative_workunit_reaper.called