From 613f949e6edb8031937387f403f16b01615f34ae Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Fri, 15 Oct 2021 16:45:21 -0700 Subject: [PATCH 01/10] when releasing receptor work, do so in try/except --- awx/main/tasks.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 73344b066d..e2468cd8f1 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -3063,7 +3063,13 @@ class AWXReceptorJob: finally: # Make sure to always release the work unit if we established it if self.unit_id is not None and settings.RECEPTOR_RELEASE_WORK: - receptor_ctl.simple_command(f"work release {self.unit_id}") + try: + receptor_ctl.simple_command(f"work release {self.unit_id}") + except RuntimeError: + logger.exception(f"Unable to release work item {self.unit_id}") + # If an error occured without the job itself failing, it could be a broken instance + if self.work_type == 'ansible-runner' and ((res is None) or (getattr(res, 'rc', None) is None)): + execution_node_health_check.delay(self.task.instance.execution_node) @property def sign_work(self): From 430b6826bfcd4c736095c7e4ccfc5fc2e9e4a0c0 Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Fri, 15 Oct 2021 16:46:15 -0700 Subject: [PATCH 02/10] update exception log message to be descriptive .. instead of surfacing exception --- awx/main/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index e2468cd8f1..6a083f69e2 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -3129,7 +3129,7 @@ class AWXReceptorJob: except RuntimeError as e: detail = '' state_name = '' - logger.exception(e) + logger.exception(f'Unable to retrieve work status for {self.unit_id}') if 'exceeded quota' in detail: logger.warn(detail) From fdae7a3a0e69e693bf374d16c0c404f50a91ba10 Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Fri, 15 Oct 2021 16:49:14 -0700 Subject: [PATCH 03/10] cancel job if receptor no longer knows about the work item --- awx/main/tasks.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 6a083f69e2..6c9ae0d9cf 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -3137,6 +3137,11 @@ class AWXReceptorJob: logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.") self.task.update_model(self.task.instance.pk, status='pending') return + + # if we did not exceed the quota, continue with shutting down the job + resultsock.shutdown(socket.SHUT_RDWR) + resultfile.close() + # If ansible-runner ran, but an error occured at runtime, the traceback information # is saved via the status_handler passed in to the processor. if state_name == 'Succeeded': @@ -3216,10 +3221,21 @@ class AWXReceptorJob: @cleanup_new_process def cancel_watcher(self, processor_future): + receptor_ctl = get_receptor_ctl() while True: if processor_future.done(): return processor_future.result() + # cancel job if receptor no longer knows about work item + try: + unit_status = receptor_ctl.simple_command(f'work status {self.unit_id}') + except RuntimeError as e: + self.task.instance.result_traceback = traceback.format_exc() + self.task.instance.save(update_fields=['result_traceback']) + + result = namedtuple('result', ['status', 'rc']) + return result('error', 1) + if self.task.cancel_callback(): result = namedtuple('result', ['status', 'rc']) return result('canceled', 1) From 516fe60a16a44f505affad6147ad6e6e7c110bfb Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Fri, 15 Oct 2021 17:17:10 -0700 Subject: [PATCH 04/10] lint --- awx/main/tasks.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 6c9ae0d9cf..0cb42ac858 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -3126,7 +3126,7 @@ class AWXReceptorJob: unit_status = receptor_ctl.simple_command(f'work status {self.unit_id}') detail = unit_status.get('Detail', None) state_name = unit_status.get('StateName', None) - except RuntimeError as e: + except RuntimeError: detail = '' state_name = '' logger.exception(f'Unable to retrieve work status for {self.unit_id}') @@ -3228,8 +3228,8 @@ class AWXReceptorJob: # cancel job if receptor no longer knows about work item try: - unit_status = receptor_ctl.simple_command(f'work status {self.unit_id}') - except RuntimeError as e: + receptor_ctl.simple_command(f'work status {self.unit_id}') + except RuntimeError: self.task.instance.result_traceback = traceback.format_exc() self.task.instance.save(update_fields=['result_traceback']) From 20d9e491125f664cf027ec9acdcf325650e0484a Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 18 Oct 2021 15:26:07 -0400 Subject: [PATCH 05/10] Alternative for reaping lost jobs, in work unit reaper --- awx/main/tasks.py | 55 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 0cb42ac858..595ab0a0e8 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -108,7 +108,15 @@ from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja from awx.main.utils.reload import stop_local_services from awx.main.utils.pglock import advisory_lock from awx.main.utils.handlers import SpecialInventoryHandler -from awx.main.utils.receptor import get_receptor_ctl, worker_info, get_conn_type, get_tls_client, worker_cleanup, administrative_workunit_reaper +from awx.main.utils.receptor import ( + get_receptor_ctl, + worker_info, + get_conn_type, + get_tls_client, + worker_cleanup, + administrative_workunit_reaper, + RECEPTOR_ACTIVE_STATES, +) from awx.main.consumers import emit_channel_notification from awx.main import analytics from awx.conf import settings_registry @@ -647,8 +655,31 @@ def awx_receptor_workunit_reaper(): jobs_with_unreleased_receptor_units = UnifiedJob.objects.filter(work_unit_id__in=unit_ids).exclude(status__in=ACTIVE_STATES) for job in jobs_with_unreleased_receptor_units: logger.debug(f"{job.log_format} is not active, reaping receptor work unit {job.work_unit_id}") - receptor_ctl.simple_command(f"work cancel {job.work_unit_id}") - receptor_ctl.simple_command(f"work release {job.work_unit_id}") + try: + receptor_ctl.simple_command(f"work cancel {job.work_unit_id}") + receptor_ctl.simple_command(f"work release {job.work_unit_id}") + except RuntimeError as exc: + if 'unknown work unit' not in str(exc): + logger.exception(f'Unexpected error releasing work unit {job.work_unit_id}') + + active_unit_ids = [id for (id, data) in receptor_work_list.items() if data.get('StateName') in RECEPTOR_ACTIVE_STATES] + + jobs_without_active_work_unit = UnifiedJob.objects.filter(status__in=ACTIVE_STATES, controller_node=settings.CLUSTER_HOST_ID).exclude( + work_unit_id__in=active_unit_ids + ) + + for job in jobs_without_active_work_unit: + # TODO: skip if events came in recently + job.cancel_flag = True + job.status = 'error' + if job.work_unit_id in receptor_work_list: + receptor_status = receptor_work_list[job.work_unit_id]['StateName'] + logger.warn(f'Canceling {job.log_format} because of inactive work unit, data:\n{receptor_work_list[job.work_unit_id]}') + job.job_explanation = f'Canceled by reaper because receptor work unit reported a status of {receptor_status}' + else: + logger.warn(f'Canceling {job.log_format} because of missing work unit {job.work_unit_id}') + job.job_explanation = f'Canceled by reaper because receptor work unit could not be found for this job' + job.save(update_fields=['cancel_flag', 'status', 'job_explanation']) administrative_workunit_reaper(receptor_work_list) @@ -3066,7 +3097,7 @@ class AWXReceptorJob: try: receptor_ctl.simple_command(f"work release {self.unit_id}") except RuntimeError: - logger.exception(f"Unable to release work item {self.unit_id}") + logger.exception(f"Unable to release work item {self.unit_id} from {self.task.instance.log_format}") # If an error occured without the job itself failing, it could be a broken instance if self.work_type == 'ansible-runner' and ((res is None) or (getattr(res, 'rc', None) is None)): execution_node_health_check.delay(self.task.instance.execution_node) @@ -3221,24 +3252,16 @@ class AWXReceptorJob: @cleanup_new_process def cancel_watcher(self, processor_future): - receptor_ctl = get_receptor_ctl() while True: if processor_future.done(): return processor_future.result() - # cancel job if receptor no longer knows about work item - try: - receptor_ctl.simple_command(f'work status {self.unit_id}') - except RuntimeError: - self.task.instance.result_traceback = traceback.format_exc() - self.task.instance.save(update_fields=['result_traceback']) - - result = namedtuple('result', ['status', 'rc']) - return result('error', 1) - if self.task.cancel_callback(): + status = self.task.instance.status + if status == 'running': + status = 'canceled' result = namedtuple('result', ['status', 'rc']) - return result('canceled', 1) + return result(status, 1) time.sleep(1) From d27b5a7e7e61ee89d4c3f627e9185b6dcf85c38e Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 18 Oct 2021 15:32:06 -0400 Subject: [PATCH 06/10] Avoid resultsock shutdown before reading from it --- awx/main/tasks.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 595ab0a0e8..d325b12b51 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -3169,10 +3169,6 @@ class AWXReceptorJob: self.task.update_model(self.task.instance.pk, status='pending') return - # if we did not exceed the quota, continue with shutting down the job - resultsock.shutdown(socket.SHUT_RDWR) - resultfile.close() - # If ansible-runner ran, but an error occured at runtime, the traceback information # is saved via the status_handler passed in to the processor. if state_name == 'Succeeded': @@ -3185,8 +3181,13 @@ class AWXReceptorJob: self.task.instance.result_traceback = b"".join(lines).decode() self.task.instance.save(update_fields=['result_traceback']) except Exception: + resultsock.shutdown(socket.SHUT_RDWR) + resultfile.close() raise RuntimeError(detail) + resultsock.shutdown(socket.SHUT_RDWR) + resultfile.close() + time.sleep(3) return res From 081ce23b7df1e922c0d045abf33549b81467b8ac Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 18 Oct 2021 15:35:23 -0400 Subject: [PATCH 07/10] Avoid reaping tentative jobs --- awx/main/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index d325b12b51..4a4090aec6 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -663,13 +663,13 @@ def awx_receptor_workunit_reaper(): logger.exception(f'Unexpected error releasing work unit {job.work_unit_id}') active_unit_ids = [id for (id, data) in receptor_work_list.items() if data.get('StateName') in RECEPTOR_ACTIVE_STATES] + active_unit_ids.append('') # exclude jobs that have not yet started a receptor work unit jobs_without_active_work_unit = UnifiedJob.objects.filter(status__in=ACTIVE_STATES, controller_node=settings.CLUSTER_HOST_ID).exclude( work_unit_id__in=active_unit_ids ) for job in jobs_without_active_work_unit: - # TODO: skip if events came in recently job.cancel_flag = True job.status = 'error' if job.work_unit_id in receptor_work_list: From 89bee9d5121dbd00760cc035bb16c08a7a67f91f Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 18 Oct 2021 16:07:36 -0400 Subject: [PATCH 08/10] Avoid extra check if we have job_explanation --- awx/main/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 4a4090aec6..b15c125bb4 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -3174,7 +3174,7 @@ class AWXReceptorJob: if state_name == 'Succeeded': return res - if not self.task.instance.result_traceback: + if not (self.task.instance.result_traceback or self.task.instance.job_explanation): try: resultsock = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True) lines = resultsock.readlines() From a0e07d3644d2e4f51f521d4c0e867480aacc137e Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Wed, 20 Oct 2021 15:13:01 -0700 Subject: [PATCH 09/10] drop lines picked up during merge resolution --- awx/main/tasks.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index b15c125bb4..35b43d57d0 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -3098,9 +3098,6 @@ class AWXReceptorJob: receptor_ctl.simple_command(f"work release {self.unit_id}") except RuntimeError: logger.exception(f"Unable to release work item {self.unit_id} from {self.task.instance.log_format}") - # If an error occured without the job itself failing, it could be a broken instance - if self.work_type == 'ansible-runner' and ((res is None) or (getattr(res, 'rc', None) is None)): - execution_node_health_check.delay(self.task.instance.execution_node) @property def sign_work(self): From 69f093c66263b8db3125ebb4e863deaec4fb5e87 Mon Sep 17 00:00:00 2001 From: Jim Ladd Date: Wed, 20 Oct 2021 15:50:56 -0700 Subject: [PATCH 10/10] add unit tests --- awx/main/tasks.py | 2 ++ awx/main/tests/unit/test_tasks.py | 48 +++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 35b43d57d0..ca3e0bb330 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -651,6 +651,7 @@ def awx_receptor_workunit_reaper(): receptor_ctl = get_receptor_ctl() receptor_work_list = receptor_ctl.simple_command("work list") + # Release work units for inactive jobs unit_ids = [id for id in receptor_work_list] jobs_with_unreleased_receptor_units = UnifiedJob.objects.filter(work_unit_id__in=unit_ids).exclude(status__in=ACTIVE_STATES) for job in jobs_with_unreleased_receptor_units: @@ -662,6 +663,7 @@ def awx_receptor_workunit_reaper(): if 'unknown work unit' not in str(exc): logger.exception(f'Unexpected error releasing work unit {job.work_unit_id}') + # Cancel jobs missing active work units active_unit_ids = [id for (id, data) in receptor_work_list.items() if data.get('StateName') in RECEPTOR_ACTIVE_STATES] active_unit_ids.append('') # exclude jobs that have not yet started a receptor work unit diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 15aeb86504..b17e3b3a00 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -1990,3 +1990,51 @@ def test_project_update_no_ee(): task.build_env(job, {}) assert 'The project could not sync because there is no Execution Environment' in str(e.value) + + +@mock.patch('awx.main.tasks.administrative_workunit_reaper') +@mock.patch('awx.main.tasks.UnifiedJob.objects.filter') +@mock.patch('awx.main.tasks.get_receptor_ctl') +def test_awx_receptor_workunit_reaper_cancels_work_units_missing_active_jobs( + mock_get_receptor_ctl, mock_uj_objects, mock_administrative_workunit_reaper, mocker +): + mock_receptor_ctl = mocker.MagicMock() + mock_receptor_ctl.simple_command.return_value = { + '4PiSBdGz': {'Detail': 'Running: PID: 134', 'ExtraData': None, 'State': 1, 'StateName': 'Running', 'StdoutSize': 106, 'WorkType': 'ansible-runner'} + } + mock_get_receptor_ctl.return_value = mock_receptor_ctl + + filtered_objects = mocker.MagicMock() + mock_uj_objects.return_value = filtered_objects + + mock_finished_job = mocker.MagicMock(spec=UnifiedJob) + filtered_objects.exclude.side_effect = [[mock_finished_job], []] + + tasks.awx_receptor_workunit_reaper() + assert mock_receptor_ctl.simple_command.called_with('work cancel 4PiSBdGz') + assert mock_receptor_ctl.simple_command.called_with('work release 4PiSBdGz') + + assert mock_administrative_workunit_reaper.called + + +@mock.patch('awx.main.tasks.administrative_workunit_reaper') +@mock.patch('awx.main.tasks.UnifiedJob.objects.filter') +@mock.patch('awx.main.tasks.get_receptor_ctl') +def test_awx_receptor_workunit_reaper_reaps_jobs_missing_work_units(mock_get_receptor_ctl, mock_uj_objects, mock_administrative_workunit_reaper, mocker): + mock_receptor_ctl = mocker.MagicMock() + mock_receptor_ctl.simple_command.return_value = {} + mock_get_receptor_ctl.return_value = mock_receptor_ctl + + filtered_objects = mocker.MagicMock() + mock_uj_objects.return_value = filtered_objects + + mock_job = mocker.MagicMock(spec=UnifiedJob) + filtered_objects.exclude.side_effect = [[], [mock_job]] + + tasks.awx_receptor_workunit_reaper() + assert mock_job.cancel_flag == True + assert mock_job.status == 'error' + assert mock_job.job_explanation == 'Canceled by reaper because receptor work unit could not be found for this job' + assert mock_job.save.called_once_with(update_fields=['cancel_flag', 'status', 'job_explanation']) + + assert mock_administrative_workunit_reaper.called