From 139d8f0ae2f35aa97b1efa25d1dbe25b0cf975f0 Mon Sep 17 00:00:00 2001 From: Hao Liu <44379968+TheRealHaoLiu@users.noreply.github.com> Date: Mon, 22 Jul 2024 12:32:53 -0400 Subject: [PATCH] Add RECEPTOR_KEEP_WORK_ON_ERROR setting If RECEPTOR_KEEP_WORK_ON_ERROR is set to true receptor work unit will not be automatically released Co-Authored-By: Chris Meyers --- awx/main/conf.py | 10 ++++++++++ awx/main/constants.py | 1 + awx/main/tasks/receptor.py | 9 +++++---- awx/main/tasks/system.py | 9 +++++++-- awx/settings/defaults.py | 1 + 5 files changed, 24 insertions(+), 6 deletions(-) diff --git a/awx/main/conf.py b/awx/main/conf.py index 6af2d7d64c..c450d24250 100644 --- a/awx/main/conf.py +++ b/awx/main/conf.py @@ -929,6 +929,16 @@ register( category_slug='debug', ) +register( + 'RECEPTOR_KEEP_WORK_ON_ERROR', + field_class=fields.BooleanField, + label=_('Keep receptor work on error'), + default=False, + help_text=_('Prevent receptor work from being released on when error is detected'), + category=('Debug'), + category_slug='debug', +) + def logging_validate(serializer, attrs): if not serializer.instance or not hasattr(serializer.instance, 'LOG_AGGREGATOR_HOST') or not hasattr(serializer.instance, 'LOG_AGGREGATOR_TYPE'): diff --git a/awx/main/constants.py b/awx/main/constants.py index a12da39938..5610dde101 100644 --- a/awx/main/constants.py +++ b/awx/main/constants.py @@ -43,6 +43,7 @@ STANDARD_INVENTORY_UPDATE_ENV = { } CAN_CANCEL = ('new', 'pending', 'waiting', 'running') ACTIVE_STATES = CAN_CANCEL +ERROR_STATES = ('error',) MINIMAL_EVENTS = set(['playbook_on_play_start', 'playbook_on_task_start', 'playbook_on_stats', 'EOF']) CENSOR_VALUE = '************' ENV_BLOCKLIST = frozenset( diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index 121b5502d9..444aeaa881 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -405,10 +405,11 @@ class AWXReceptorJob: finally: # Make sure to always release the work unit if we established it if self.unit_id is not None and settings.RECEPTOR_RELEASE_WORK: - try: - receptor_ctl.simple_command(f"work release {self.unit_id}") - except Exception: - logger.exception(f"Error releasing work unit {self.unit_id}.") + if settings.RECPETOR_KEEP_WORK_ON_ERROR and getattr(res, 'status', 'error') == 'error': + try: + receptor_ctl.simple_command(f"work release {self.unit_id}") + except Exception: + logger.exception(f"Error releasing work unit {self.unit_id}.") def _run_internal(self, receptor_ctl): # Create a socketpair. Where the left side will be used for writing our payload diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 2aebe14d20..e0bb0dfc9f 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -54,7 +54,7 @@ from awx.main.models import ( Job, convert_jsonfields, ) -from awx.main.constants import ACTIVE_STATES +from awx.main.constants import ACTIVE_STATES, ERROR_STATES from awx.main.dispatch.publish import task from awx.main.dispatch import get_task_queuename, reaper from awx.main.utils.common import ignore_inventory_computed_fields, ignore_inventory_group_removal @@ -685,6 +685,8 @@ def awx_receptor_workunit_reaper(): unit_ids = [id for id in receptor_work_list] jobs_with_unreleased_receptor_units = UnifiedJob.objects.filter(work_unit_id__in=unit_ids).exclude(status__in=ACTIVE_STATES) + if settings.RECEPTOR_KEEP_WORK_ON_ERROR: + jobs_with_unreleased_receptor_units = jobs_with_unreleased_receptor_units.exclude(status__in=ERROR_STATES) for job in jobs_with_unreleased_receptor_units: logger.debug(f"{job.log_format} is not active, reaping receptor work unit {job.work_unit_id}") receptor_ctl.simple_command(f"work cancel {job.work_unit_id}") @@ -704,7 +706,10 @@ def awx_k8s_reaper(): logger.debug("Checking for orphaned k8s pods for {}.".format(group)) pods = PodManager.list_active_jobs(group) time_cutoff = now() - timedelta(seconds=settings.K8S_POD_REAPER_GRACE_PERIOD) - for job in UnifiedJob.objects.filter(pk__in=pods.keys(), finished__lte=time_cutoff).exclude(status__in=ACTIVE_STATES): + reap_job_candidates = UnifiedJob.objects.filter(pk__in=pods.keys(), finished__lte=time_cutoff).exclude(status__in=ACTIVE_STATES) + if settings.RECEPTOR_KEEP_WORK_ON_ERROR: + reap_job_candidates = reap_job_candidates.exclude(status__in=ERROR_STATES) + for job in reap_job_candidates: logger.debug('{} is no longer active, reaping orphaned k8s pod'.format(job.log_format)) try: pm = PodManager(job) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 26e9d0107b..7f40dd9763 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1009,6 +1009,7 @@ AWX_RUNNER_KEEPALIVE_SECONDS = 0 # Delete completed work units in receptor RECEPTOR_RELEASE_WORK = True +RECPETOR_KEEP_WORK_ON_ERROR = False # K8S only. Use receptor_log_level on AWX spec to set this properly RECEPTOR_LOG_LEVEL = 'info'