From eb6c58682d1fb8bf05224e6309bb0f2378937568 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Mon, 18 Oct 2021 15:26:07 -0400 Subject: [PATCH] Alternative for reaping lost jobs, in work unit reaper --- awx/main/tasks.py | 55 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 327a1db1a8..fd032653e2 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -108,7 +108,15 @@ from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja from awx.main.utils.reload import stop_local_services from awx.main.utils.pglock import advisory_lock from awx.main.utils.handlers import SpecialInventoryHandler -from awx.main.utils.receptor import get_receptor_ctl, worker_info, get_conn_type, get_tls_client, worker_cleanup, administrative_workunit_reaper +from awx.main.utils.receptor import ( + get_receptor_ctl, + worker_info, + get_conn_type, + get_tls_client, + worker_cleanup, + administrative_workunit_reaper, + RECEPTOR_ACTIVE_STATES, +) from awx.main.consumers import emit_channel_notification from awx.main import analytics from awx.conf import settings_registry @@ -651,8 +659,31 @@ def awx_receptor_workunit_reaper(): jobs_with_unreleased_receptor_units = UnifiedJob.objects.filter(work_unit_id__in=unit_ids).exclude(status__in=ACTIVE_STATES) for job in jobs_with_unreleased_receptor_units: logger.debug(f"{job.log_format} is not active, reaping receptor work unit {job.work_unit_id}") - receptor_ctl.simple_command(f"work cancel {job.work_unit_id}") - receptor_ctl.simple_command(f"work release {job.work_unit_id}") + try: + receptor_ctl.simple_command(f"work cancel {job.work_unit_id}") + receptor_ctl.simple_command(f"work release {job.work_unit_id}") + except RuntimeError as exc: + if 'unknown work unit' not in str(exc): + logger.exception(f'Unexpected error releasing work unit {job.work_unit_id}') + + active_unit_ids = [id for (id, data) in receptor_work_list.items() if data.get('StateName') in RECEPTOR_ACTIVE_STATES] + + jobs_without_active_work_unit = UnifiedJob.objects.filter(status__in=ACTIVE_STATES, controller_node=settings.CLUSTER_HOST_ID).exclude( + work_unit_id__in=active_unit_ids + ) + + for job in jobs_without_active_work_unit: + # TODO: skip if events came in recently + job.cancel_flag = True + job.status = 'error' + if job.work_unit_id in receptor_work_list: + receptor_status = receptor_work_list[job.work_unit_id]['StateName'] + logger.warn(f'Canceling {job.log_format} because of inactive work unit, data:\n{receptor_work_list[job.work_unit_id]}') + job.job_explanation = f'Canceled by reaper because receptor work unit reported a status of {receptor_status}' + else: + logger.warn(f'Canceling {job.log_format} because of missing work unit {job.work_unit_id}') + job.job_explanation = f'Canceled by reaper because receptor work unit could not be found for this job' + job.save(update_fields=['cancel_flag', 'status', 'job_explanation']) administrative_workunit_reaper(receptor_work_list) @@ -3074,7 +3105,7 @@ class AWXReceptorJob: try: receptor_ctl.simple_command(f"work release {self.unit_id}") except RuntimeError: - logger.exception(f"Unable to release work item {self.unit_id}") + logger.exception(f"Unable to release work item {self.unit_id} from {self.task.instance.log_format}") # If an error occured without the job itself failing, it could be a broken instance if self.work_type == 'ansible-runner' and ((res is None) or (getattr(res, 'rc', None) is None)): execution_node_health_check.delay(self.task.instance.execution_node) @@ -3229,24 +3260,16 @@ class AWXReceptorJob: @cleanup_new_process def cancel_watcher(self, processor_future): - receptor_ctl = get_receptor_ctl() while True: if processor_future.done(): return processor_future.result() - # cancel job if receptor no longer knows about work item - try: - receptor_ctl.simple_command(f'work status {self.unit_id}') - except RuntimeError: - self.task.instance.result_traceback = traceback.format_exc() - self.task.instance.save(update_fields=['result_traceback']) - - result = namedtuple('result', ['status', 'rc']) - return result('error', 1) - if self.task.cancel_callback(): + status = self.task.instance.status + if status == 'running': + status = 'canceled' result = namedtuple('result', ['status', 'rc']) - return result('canceled', 1) + return result(status, 1) time.sleep(1)