Add RECEPTOR_KEEP_WORK_ON_ERROR setting

If RECEPTOR_KEEP_WORK_ON_ERROR is set to true receptor work unit will not be automatically released

Co-Authored-By: Chris Meyers <chrismeyersfsu@users.noreply.github.com>
This commit is contained in:
Hao Liu 2024-07-22 12:32:53 -04:00
parent 7691365aea
commit 139d8f0ae2
5 changed files with 24 additions and 6 deletions

View File

@ -929,6 +929,16 @@ register(
category_slug='debug',
)
register(
'RECEPTOR_KEEP_WORK_ON_ERROR',
field_class=fields.BooleanField,
label=_('Keep receptor work on error'),
default=False,
help_text=_('Prevent receptor work from being released on when error is detected'),
category=('Debug'),
category_slug='debug',
)
def logging_validate(serializer, attrs):
if not serializer.instance or not hasattr(serializer.instance, 'LOG_AGGREGATOR_HOST') or not hasattr(serializer.instance, 'LOG_AGGREGATOR_TYPE'):

View File

@ -43,6 +43,7 @@ STANDARD_INVENTORY_UPDATE_ENV = {
}
CAN_CANCEL = ('new', 'pending', 'waiting', 'running')
ACTIVE_STATES = CAN_CANCEL
ERROR_STATES = ('error',)
MINIMAL_EVENTS = set(['playbook_on_play_start', 'playbook_on_task_start', 'playbook_on_stats', 'EOF'])
CENSOR_VALUE = '************'
ENV_BLOCKLIST = frozenset(

View File

@ -405,10 +405,11 @@ class AWXReceptorJob:
finally:
# Make sure to always release the work unit if we established it
if self.unit_id is not None and settings.RECEPTOR_RELEASE_WORK:
try:
receptor_ctl.simple_command(f"work release {self.unit_id}")
except Exception:
logger.exception(f"Error releasing work unit {self.unit_id}.")
if settings.RECPETOR_KEEP_WORK_ON_ERROR and getattr(res, 'status', 'error') == 'error':
try:
receptor_ctl.simple_command(f"work release {self.unit_id}")
except Exception:
logger.exception(f"Error releasing work unit {self.unit_id}.")
def _run_internal(self, receptor_ctl):
# Create a socketpair. Where the left side will be used for writing our payload

View File

@ -54,7 +54,7 @@ from awx.main.models import (
Job,
convert_jsonfields,
)
from awx.main.constants import ACTIVE_STATES
from awx.main.constants import ACTIVE_STATES, ERROR_STATES
from awx.main.dispatch.publish import task
from awx.main.dispatch import get_task_queuename, reaper
from awx.main.utils.common import ignore_inventory_computed_fields, ignore_inventory_group_removal
@ -685,6 +685,8 @@ def awx_receptor_workunit_reaper():
unit_ids = [id for id in receptor_work_list]
jobs_with_unreleased_receptor_units = UnifiedJob.objects.filter(work_unit_id__in=unit_ids).exclude(status__in=ACTIVE_STATES)
if settings.RECEPTOR_KEEP_WORK_ON_ERROR:
jobs_with_unreleased_receptor_units = jobs_with_unreleased_receptor_units.exclude(status__in=ERROR_STATES)
for job in jobs_with_unreleased_receptor_units:
logger.debug(f"{job.log_format} is not active, reaping receptor work unit {job.work_unit_id}")
receptor_ctl.simple_command(f"work cancel {job.work_unit_id}")
@ -704,7 +706,10 @@ def awx_k8s_reaper():
logger.debug("Checking for orphaned k8s pods for {}.".format(group))
pods = PodManager.list_active_jobs(group)
time_cutoff = now() - timedelta(seconds=settings.K8S_POD_REAPER_GRACE_PERIOD)
for job in UnifiedJob.objects.filter(pk__in=pods.keys(), finished__lte=time_cutoff).exclude(status__in=ACTIVE_STATES):
reap_job_candidates = UnifiedJob.objects.filter(pk__in=pods.keys(), finished__lte=time_cutoff).exclude(status__in=ACTIVE_STATES)
if settings.RECEPTOR_KEEP_WORK_ON_ERROR:
reap_job_candidates = reap_job_candidates.exclude(status__in=ERROR_STATES)
for job in reap_job_candidates:
logger.debug('{} is no longer active, reaping orphaned k8s pod'.format(job.log_format))
try:
pm = PodManager(job)

View File

@ -1009,6 +1009,7 @@ AWX_RUNNER_KEEPALIVE_SECONDS = 0
# Delete completed work units in receptor
RECEPTOR_RELEASE_WORK = True
RECPETOR_KEEP_WORK_ON_ERROR = False
# K8S only. Use receptor_log_level on AWX spec to set this properly
RECEPTOR_LOG_LEVEL = 'info'