From 452744b67e02823879e722fe574984a2d760ed60 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Tue, 3 May 2022 14:42:50 -0400 Subject: [PATCH] Delay update of artifacts and error fields until final job save (#11832) * Delay update of artifacts until final job save Save tracebacks from receptor module to callback object Move receptor traceback check up to be more logical Use new mock_me fixture to avoid DB call with me method Update the special runner message to the delay_update pattern * Move special runner message into post-processing of callback fields --- awx/main/tasks/callback.py | 31 ++++++++--- awx/main/tasks/jobs.py | 22 +++----- awx/main/tasks/receptor.py | 43 +++++++-------- .../tests/unit/tasks/test_runner_callback.py | 52 +++++++++++++++++++ 4 files changed, 100 insertions(+), 48 deletions(-) create mode 100644 awx/main/tests/unit/tasks/test_runner_callback.py diff --git a/awx/main/tasks/callback.py b/awx/main/tasks/callback.py index 529f9fc278..fa37055ac2 100644 --- a/awx/main/tasks/callback.py +++ b/awx/main/tasks/callback.py @@ -13,7 +13,7 @@ from django.utils.functional import cached_property # AWX from awx.main.redact import UriCleaner -from awx.main.constants import MINIMAL_EVENTS +from awx.main.constants import MINIMAL_EVENTS, ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE from awx.main.utils.update_model import update_model from awx.main.queue import CallbackQueueDispatcher @@ -33,6 +33,7 @@ class RunnerCallback: self.model = model self.update_attempts = int(settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE / 5) self.wrapup_event_dispatched = False + self.extra_update_fields = {} def update_model(self, pk, _attempt=0, **updates): return update_model(self.model, pk, _attempt=0, _max_attempts=self.update_attempts, **updates) @@ -45,6 +46,26 @@ class RunnerCallback: def event_data_key(self): return self.instance.event_class.JOB_REFERENCE + def delay_update(self, skip_if_already_set=False, **kwargs): + """Stash fields that should be saved along with the job status change""" + for key, value in kwargs.items(): + if key in self.extra_update_fields and skip_if_already_set: + continue + elif key in self.extra_update_fields and key in ('job_explanation', 'result_traceback'): + if str(value) in self.extra_update_fields.get(key, ''): + continue # if already set, avoid duplicating messages + # In the case of these fields, we do not want to lose any prior information, so combine values + self.extra_update_fields[key] = '\n'.join([str(self.extra_update_fields[key]), str(value)]) + else: + self.extra_update_fields[key] = value + + def get_delayed_update_fields(self): + """Return finalized dict of all fields that should be saved along with the job status change""" + self.extra_update_fields['emitted_events'] = self.event_ct + if 'got an unexpected keyword argument' in self.extra_update_fields.get('result_traceback', ''): + self.delay_update(result_traceback=ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE) + return self.extra_update_fields + def event_handler(self, event_data): # # ⚠️ D-D-D-DANGER ZONE ⚠️ @@ -149,8 +170,7 @@ class RunnerCallback: Handle artifacts ''' if event_data.get('event_data', {}).get('artifact_data', {}): - self.instance.artifacts = event_data['event_data']['artifact_data'] - self.instance.save(update_fields=['artifacts']) + self.delay_update(artifacts=event_data['event_data']['artifact_data']) return False @@ -218,10 +238,7 @@ class RunnerCallback: elif status_data['status'] == 'error': result_traceback = status_data.get('result_traceback', None) if result_traceback: - from awx.main.signals import disable_activity_stream # Circular import - - with disable_activity_stream(): - self.instance = self.update_model(self.instance.pk, result_traceback=result_traceback) + self.delay_update(result_traceback=result_traceback) class RunnerCallbackForProjectUpdate(RunnerCallback): diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index 0ce5c42200..63b07d1d8a 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -40,7 +40,6 @@ from awx.main.constants import ( JOB_FOLDER_PREFIX, MAX_ISOLATED_PATH_COLON_DELIMITER, CONTAINER_VOLUMES_MOUNT_TYPES, - ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE, ) from awx.main.models import ( Instance, @@ -411,7 +410,6 @@ class BaseTask(object): self.instance = self.update_model(pk, status='running', start_args='') # blank field to remove encrypted passwords self.instance.websocket_emit_status("running") status, rc = 'error', None - extra_update_fields = {} fact_modification_times = {} self.runner_callback.event_ct = 0 @@ -546,18 +544,14 @@ class BaseTask(object): rc = res.rc if status in ('timeout', 'error'): - job_explanation = f"Job terminated due to {status}" - self.instance.job_explanation = self.instance.job_explanation or job_explanation + self.runner_callback.delay_update(skip_if_already_set=True, job_explanation=f"Job terminated due to {status}") if status == 'timeout': status = 'failed' - - extra_update_fields['job_explanation'] = self.instance.job_explanation - except ReceptorNodeNotFound as exc: - extra_update_fields['job_explanation'] = str(exc) + self.runner_callback.delay_update(job_explanation=str(exc)) except Exception: # this could catch programming or file system errors - extra_update_fields['result_traceback'] = traceback.format_exc() + self.runner_callback.delay_update(result_traceback=traceback.format_exc()) logger.exception('%s Exception occurred while running task', self.instance.log_format) finally: logger.debug('%s finished running, producing %s events.', self.instance.log_format, self.runner_callback.event_ct) @@ -567,18 +561,14 @@ class BaseTask(object): except PostRunError as exc: if status == 'successful': status = exc.status - extra_update_fields['job_explanation'] = exc.args[0] + self.runner_callback.delay_update(job_explanation=exc.args[0]) if exc.tb: - extra_update_fields['result_traceback'] = exc.tb + self.runner_callback.delay_update(result_traceback=exc.tb) except Exception: logger.exception('{} Post run hook errored.'.format(self.instance.log_format)) - # We really shouldn't get into this one but just in case.... - if 'got an unexpected keyword argument' in extra_update_fields.get('result_traceback', ''): - extra_update_fields['result_traceback'] = "{}\n\n{}".format(extra_update_fields['result_traceback'], ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE) - self.instance = self.update_model(pk) - self.instance = self.update_model(pk, status=status, emitted_events=self.runner_callback.event_ct, select_for_update=True, **extra_update_fields) + self.instance = self.update_model(pk, status=status, select_for_update=True, **self.runner_callback.get_delayed_update_fields()) # Field host_status_counts is used as a metric to check if event processing is finished # we send notifications if it is, if not, callback receiver will send them diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index a6fe39f2e7..2d180d7ad9 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -24,10 +24,7 @@ from awx.main.utils.common import ( parse_yaml_or_json, cleanup_new_process, ) -from awx.main.constants import ( - MAX_ISOLATED_PATH_COLON_DELIMITER, - ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE, -) +from awx.main.constants import MAX_ISOLATED_PATH_COLON_DELIMITER # Receptorctl from receptorctl.socket_interface import ReceptorControl @@ -350,6 +347,11 @@ class AWXReceptorJob: resultsock.shutdown(socket.SHUT_RDWR) resultfile.close() elif res.status == 'error': + # If ansible-runner ran, but an error occured at runtime, the traceback information + # is saved via the status_handler passed in to the processor. + if 'result_traceback' in self.task.runner_callback.extra_update_fields: + return res + try: unit_status = receptor_ctl.simple_command(f'work status {self.unit_id}') detail = unit_status.get('Detail', None) @@ -365,28 +367,19 @@ class AWXReceptorJob: logger.warning(f"Could not launch pod for {log_name}. Exceeded quota.") self.task.update_model(self.task.instance.pk, status='pending') return - # If ansible-runner ran, but an error occured at runtime, the traceback information - # is saved via the status_handler passed in to the processor. - if state_name == 'Succeeded': - return res - if not self.task.instance.result_traceback: - try: - resultsock = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True) - lines = resultsock.readlines() - receptor_output = b"".join(lines).decode() - if receptor_output: - self.task.instance.result_traceback = receptor_output - if 'got an unexpected keyword argument' in receptor_output: - self.task.instance.result_traceback = "{}\n\n{}".format(receptor_output, ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE) - self.task.instance.save(update_fields=['result_traceback']) - elif detail: - self.task.instance.result_traceback = detail - self.task.instance.save(update_fields=['result_traceback']) - else: - logger.warning(f'No result details or output from {self.task.instance.log_format}, status:\n{state_name}') - except Exception: - raise RuntimeError(detail) + try: + resultsock = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True) + lines = resultsock.readlines() + receptor_output = b"".join(lines).decode() + if receptor_output: + self.task.runner_callback.delay_update(result_traceback=receptor_output) + elif detail: + self.task.runner_callback.delay_update(result_traceback=detail) + else: + logger.warning(f'No result details or output from {self.task.instance.log_format}, status:\n{state_name}') + except Exception: + raise RuntimeError(detail) return res diff --git a/awx/main/tests/unit/tasks/test_runner_callback.py b/awx/main/tests/unit/tasks/test_runner_callback.py new file mode 100644 index 0000000000..f24a92c6c8 --- /dev/null +++ b/awx/main/tests/unit/tasks/test_runner_callback.py @@ -0,0 +1,52 @@ +from awx.main.tasks.callback import RunnerCallback +from awx.main.constants import ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE + +from django.utils.translation import ugettext_lazy as _ + + +def test_delay_update(mock_me): + rc = RunnerCallback() + rc.delay_update(foo='bar') + assert rc.extra_update_fields == {'foo': 'bar'} + rc.delay_update(foo='foobar') + assert rc.extra_update_fields == {'foo': 'foobar'} + rc.delay_update(bar='foo') + assert rc.get_delayed_update_fields() == {'foo': 'foobar', 'bar': 'foo', 'emitted_events': 0} + + +def test_delay_update_skip_if_set(mock_me): + rc = RunnerCallback() + rc.delay_update(foo='bar', skip_if_already_set=True) + assert rc.extra_update_fields == {'foo': 'bar'} + rc.delay_update(foo='foobar', skip_if_already_set=True) + assert rc.extra_update_fields == {'foo': 'bar'} + + +def test_delay_update_failure_fields(mock_me): + rc = RunnerCallback() + rc.delay_update(job_explanation='1') + rc.delay_update(job_explanation=_('2')) + assert rc.extra_update_fields == {'job_explanation': '1\n2'} + rc.delay_update(result_traceback='1') + rc.delay_update(result_traceback=_('2')) + rc.delay_update(result_traceback=_('3'), skip_if_already_set=True) + assert rc.extra_update_fields == {'job_explanation': '1\n2', 'result_traceback': '1\n2'} + + +def test_duplicate_updates(mock_me): + rc = RunnerCallback() + rc.delay_update(job_explanation='really long summary...') + rc.delay_update(job_explanation='really long summary...') + rc.delay_update(job_explanation='really long summary...') + assert rc.extra_update_fields == {'job_explanation': 'really long summary...'} + + +def test_special_ansible_runner_message(mock_me): + rc = RunnerCallback() + rc.delay_update(result_traceback='Traceback:\ngot an unexpected keyword argument\nFile: foo.py') + rc.delay_update(result_traceback='Traceback:\ngot an unexpected keyword argument\nFile: bar.py') + assert rc.get_delayed_update_fields().get('result_traceback') == ( + 'Traceback:\ngot an unexpected keyword argument\nFile: foo.py\n' + 'Traceback:\ngot an unexpected keyword argument\nFile: bar.py\n' + f'{ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE}' + )