mirror of
https://github.com/ansible/awx.git
synced 2026-03-24 20:35:02 -02:30
Delay update of artifacts and error fields until final job save (#11832)
* Delay update of artifacts until final job save Save tracebacks from receptor module to callback object Move receptor traceback check up to be more logical Use new mock_me fixture to avoid DB call with me method Update the special runner message to the delay_update pattern * Move special runner message into post-processing of callback fields
This commit is contained in:
@@ -13,7 +13,7 @@ from django.utils.functional import cached_property
|
|||||||
|
|
||||||
# AWX
|
# AWX
|
||||||
from awx.main.redact import UriCleaner
|
from awx.main.redact import UriCleaner
|
||||||
from awx.main.constants import MINIMAL_EVENTS
|
from awx.main.constants import MINIMAL_EVENTS, ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE
|
||||||
from awx.main.utils.update_model import update_model
|
from awx.main.utils.update_model import update_model
|
||||||
from awx.main.queue import CallbackQueueDispatcher
|
from awx.main.queue import CallbackQueueDispatcher
|
||||||
|
|
||||||
@@ -33,6 +33,7 @@ class RunnerCallback:
|
|||||||
self.model = model
|
self.model = model
|
||||||
self.update_attempts = int(settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE / 5)
|
self.update_attempts = int(settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE / 5)
|
||||||
self.wrapup_event_dispatched = False
|
self.wrapup_event_dispatched = False
|
||||||
|
self.extra_update_fields = {}
|
||||||
|
|
||||||
def update_model(self, pk, _attempt=0, **updates):
|
def update_model(self, pk, _attempt=0, **updates):
|
||||||
return update_model(self.model, pk, _attempt=0, _max_attempts=self.update_attempts, **updates)
|
return update_model(self.model, pk, _attempt=0, _max_attempts=self.update_attempts, **updates)
|
||||||
@@ -45,6 +46,26 @@ class RunnerCallback:
|
|||||||
def event_data_key(self):
|
def event_data_key(self):
|
||||||
return self.instance.event_class.JOB_REFERENCE
|
return self.instance.event_class.JOB_REFERENCE
|
||||||
|
|
||||||
|
def delay_update(self, skip_if_already_set=False, **kwargs):
|
||||||
|
"""Stash fields that should be saved along with the job status change"""
|
||||||
|
for key, value in kwargs.items():
|
||||||
|
if key in self.extra_update_fields and skip_if_already_set:
|
||||||
|
continue
|
||||||
|
elif key in self.extra_update_fields and key in ('job_explanation', 'result_traceback'):
|
||||||
|
if str(value) in self.extra_update_fields.get(key, ''):
|
||||||
|
continue # if already set, avoid duplicating messages
|
||||||
|
# In the case of these fields, we do not want to lose any prior information, so combine values
|
||||||
|
self.extra_update_fields[key] = '\n'.join([str(self.extra_update_fields[key]), str(value)])
|
||||||
|
else:
|
||||||
|
self.extra_update_fields[key] = value
|
||||||
|
|
||||||
|
def get_delayed_update_fields(self):
|
||||||
|
"""Return finalized dict of all fields that should be saved along with the job status change"""
|
||||||
|
self.extra_update_fields['emitted_events'] = self.event_ct
|
||||||
|
if 'got an unexpected keyword argument' in self.extra_update_fields.get('result_traceback', ''):
|
||||||
|
self.delay_update(result_traceback=ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE)
|
||||||
|
return self.extra_update_fields
|
||||||
|
|
||||||
def event_handler(self, event_data):
|
def event_handler(self, event_data):
|
||||||
#
|
#
|
||||||
# ⚠️ D-D-D-DANGER ZONE ⚠️
|
# ⚠️ D-D-D-DANGER ZONE ⚠️
|
||||||
@@ -149,8 +170,7 @@ class RunnerCallback:
|
|||||||
Handle artifacts
|
Handle artifacts
|
||||||
'''
|
'''
|
||||||
if event_data.get('event_data', {}).get('artifact_data', {}):
|
if event_data.get('event_data', {}).get('artifact_data', {}):
|
||||||
self.instance.artifacts = event_data['event_data']['artifact_data']
|
self.delay_update(artifacts=event_data['event_data']['artifact_data'])
|
||||||
self.instance.save(update_fields=['artifacts'])
|
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@@ -218,10 +238,7 @@ class RunnerCallback:
|
|||||||
elif status_data['status'] == 'error':
|
elif status_data['status'] == 'error':
|
||||||
result_traceback = status_data.get('result_traceback', None)
|
result_traceback = status_data.get('result_traceback', None)
|
||||||
if result_traceback:
|
if result_traceback:
|
||||||
from awx.main.signals import disable_activity_stream # Circular import
|
self.delay_update(result_traceback=result_traceback)
|
||||||
|
|
||||||
with disable_activity_stream():
|
|
||||||
self.instance = self.update_model(self.instance.pk, result_traceback=result_traceback)
|
|
||||||
|
|
||||||
|
|
||||||
class RunnerCallbackForProjectUpdate(RunnerCallback):
|
class RunnerCallbackForProjectUpdate(RunnerCallback):
|
||||||
|
|||||||
@@ -40,7 +40,6 @@ from awx.main.constants import (
|
|||||||
JOB_FOLDER_PREFIX,
|
JOB_FOLDER_PREFIX,
|
||||||
MAX_ISOLATED_PATH_COLON_DELIMITER,
|
MAX_ISOLATED_PATH_COLON_DELIMITER,
|
||||||
CONTAINER_VOLUMES_MOUNT_TYPES,
|
CONTAINER_VOLUMES_MOUNT_TYPES,
|
||||||
ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE,
|
|
||||||
)
|
)
|
||||||
from awx.main.models import (
|
from awx.main.models import (
|
||||||
Instance,
|
Instance,
|
||||||
@@ -411,7 +410,6 @@ class BaseTask(object):
|
|||||||
self.instance = self.update_model(pk, status='running', start_args='') # blank field to remove encrypted passwords
|
self.instance = self.update_model(pk, status='running', start_args='') # blank field to remove encrypted passwords
|
||||||
self.instance.websocket_emit_status("running")
|
self.instance.websocket_emit_status("running")
|
||||||
status, rc = 'error', None
|
status, rc = 'error', None
|
||||||
extra_update_fields = {}
|
|
||||||
fact_modification_times = {}
|
fact_modification_times = {}
|
||||||
self.runner_callback.event_ct = 0
|
self.runner_callback.event_ct = 0
|
||||||
|
|
||||||
@@ -546,18 +544,14 @@ class BaseTask(object):
|
|||||||
rc = res.rc
|
rc = res.rc
|
||||||
|
|
||||||
if status in ('timeout', 'error'):
|
if status in ('timeout', 'error'):
|
||||||
job_explanation = f"Job terminated due to {status}"
|
self.runner_callback.delay_update(skip_if_already_set=True, job_explanation=f"Job terminated due to {status}")
|
||||||
self.instance.job_explanation = self.instance.job_explanation or job_explanation
|
|
||||||
if status == 'timeout':
|
if status == 'timeout':
|
||||||
status = 'failed'
|
status = 'failed'
|
||||||
|
|
||||||
extra_update_fields['job_explanation'] = self.instance.job_explanation
|
|
||||||
|
|
||||||
except ReceptorNodeNotFound as exc:
|
except ReceptorNodeNotFound as exc:
|
||||||
extra_update_fields['job_explanation'] = str(exc)
|
self.runner_callback.delay_update(job_explanation=str(exc))
|
||||||
except Exception:
|
except Exception:
|
||||||
# this could catch programming or file system errors
|
# this could catch programming or file system errors
|
||||||
extra_update_fields['result_traceback'] = traceback.format_exc()
|
self.runner_callback.delay_update(result_traceback=traceback.format_exc())
|
||||||
logger.exception('%s Exception occurred while running task', self.instance.log_format)
|
logger.exception('%s Exception occurred while running task', self.instance.log_format)
|
||||||
finally:
|
finally:
|
||||||
logger.debug('%s finished running, producing %s events.', self.instance.log_format, self.runner_callback.event_ct)
|
logger.debug('%s finished running, producing %s events.', self.instance.log_format, self.runner_callback.event_ct)
|
||||||
@@ -567,18 +561,14 @@ class BaseTask(object):
|
|||||||
except PostRunError as exc:
|
except PostRunError as exc:
|
||||||
if status == 'successful':
|
if status == 'successful':
|
||||||
status = exc.status
|
status = exc.status
|
||||||
extra_update_fields['job_explanation'] = exc.args[0]
|
self.runner_callback.delay_update(job_explanation=exc.args[0])
|
||||||
if exc.tb:
|
if exc.tb:
|
||||||
extra_update_fields['result_traceback'] = exc.tb
|
self.runner_callback.delay_update(result_traceback=exc.tb)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception('{} Post run hook errored.'.format(self.instance.log_format))
|
logger.exception('{} Post run hook errored.'.format(self.instance.log_format))
|
||||||
|
|
||||||
# We really shouldn't get into this one but just in case....
|
|
||||||
if 'got an unexpected keyword argument' in extra_update_fields.get('result_traceback', ''):
|
|
||||||
extra_update_fields['result_traceback'] = "{}\n\n{}".format(extra_update_fields['result_traceback'], ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE)
|
|
||||||
|
|
||||||
self.instance = self.update_model(pk)
|
self.instance = self.update_model(pk)
|
||||||
self.instance = self.update_model(pk, status=status, emitted_events=self.runner_callback.event_ct, select_for_update=True, **extra_update_fields)
|
self.instance = self.update_model(pk, status=status, select_for_update=True, **self.runner_callback.get_delayed_update_fields())
|
||||||
|
|
||||||
# Field host_status_counts is used as a metric to check if event processing is finished
|
# Field host_status_counts is used as a metric to check if event processing is finished
|
||||||
# we send notifications if it is, if not, callback receiver will send them
|
# we send notifications if it is, if not, callback receiver will send them
|
||||||
|
|||||||
@@ -24,10 +24,7 @@ from awx.main.utils.common import (
|
|||||||
parse_yaml_or_json,
|
parse_yaml_or_json,
|
||||||
cleanup_new_process,
|
cleanup_new_process,
|
||||||
)
|
)
|
||||||
from awx.main.constants import (
|
from awx.main.constants import MAX_ISOLATED_PATH_COLON_DELIMITER
|
||||||
MAX_ISOLATED_PATH_COLON_DELIMITER,
|
|
||||||
ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Receptorctl
|
# Receptorctl
|
||||||
from receptorctl.socket_interface import ReceptorControl
|
from receptorctl.socket_interface import ReceptorControl
|
||||||
@@ -350,6 +347,11 @@ class AWXReceptorJob:
|
|||||||
resultsock.shutdown(socket.SHUT_RDWR)
|
resultsock.shutdown(socket.SHUT_RDWR)
|
||||||
resultfile.close()
|
resultfile.close()
|
||||||
elif res.status == 'error':
|
elif res.status == 'error':
|
||||||
|
# If ansible-runner ran, but an error occured at runtime, the traceback information
|
||||||
|
# is saved via the status_handler passed in to the processor.
|
||||||
|
if 'result_traceback' in self.task.runner_callback.extra_update_fields:
|
||||||
|
return res
|
||||||
|
|
||||||
try:
|
try:
|
||||||
unit_status = receptor_ctl.simple_command(f'work status {self.unit_id}')
|
unit_status = receptor_ctl.simple_command(f'work status {self.unit_id}')
|
||||||
detail = unit_status.get('Detail', None)
|
detail = unit_status.get('Detail', None)
|
||||||
@@ -365,28 +367,19 @@ class AWXReceptorJob:
|
|||||||
logger.warning(f"Could not launch pod for {log_name}. Exceeded quota.")
|
logger.warning(f"Could not launch pod for {log_name}. Exceeded quota.")
|
||||||
self.task.update_model(self.task.instance.pk, status='pending')
|
self.task.update_model(self.task.instance.pk, status='pending')
|
||||||
return
|
return
|
||||||
# If ansible-runner ran, but an error occured at runtime, the traceback information
|
|
||||||
# is saved via the status_handler passed in to the processor.
|
|
||||||
if state_name == 'Succeeded':
|
|
||||||
return res
|
|
||||||
|
|
||||||
if not self.task.instance.result_traceback:
|
try:
|
||||||
try:
|
resultsock = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True)
|
||||||
resultsock = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True)
|
lines = resultsock.readlines()
|
||||||
lines = resultsock.readlines()
|
receptor_output = b"".join(lines).decode()
|
||||||
receptor_output = b"".join(lines).decode()
|
if receptor_output:
|
||||||
if receptor_output:
|
self.task.runner_callback.delay_update(result_traceback=receptor_output)
|
||||||
self.task.instance.result_traceback = receptor_output
|
elif detail:
|
||||||
if 'got an unexpected keyword argument' in receptor_output:
|
self.task.runner_callback.delay_update(result_traceback=detail)
|
||||||
self.task.instance.result_traceback = "{}\n\n{}".format(receptor_output, ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE)
|
else:
|
||||||
self.task.instance.save(update_fields=['result_traceback'])
|
logger.warning(f'No result details or output from {self.task.instance.log_format}, status:\n{state_name}')
|
||||||
elif detail:
|
except Exception:
|
||||||
self.task.instance.result_traceback = detail
|
raise RuntimeError(detail)
|
||||||
self.task.instance.save(update_fields=['result_traceback'])
|
|
||||||
else:
|
|
||||||
logger.warning(f'No result details or output from {self.task.instance.log_format}, status:\n{state_name}')
|
|
||||||
except Exception:
|
|
||||||
raise RuntimeError(detail)
|
|
||||||
|
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
|||||||
52
awx/main/tests/unit/tasks/test_runner_callback.py
Normal file
52
awx/main/tests/unit/tasks/test_runner_callback.py
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
from awx.main.tasks.callback import RunnerCallback
|
||||||
|
from awx.main.constants import ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE
|
||||||
|
|
||||||
|
from django.utils.translation import ugettext_lazy as _
|
||||||
|
|
||||||
|
|
||||||
|
def test_delay_update(mock_me):
|
||||||
|
rc = RunnerCallback()
|
||||||
|
rc.delay_update(foo='bar')
|
||||||
|
assert rc.extra_update_fields == {'foo': 'bar'}
|
||||||
|
rc.delay_update(foo='foobar')
|
||||||
|
assert rc.extra_update_fields == {'foo': 'foobar'}
|
||||||
|
rc.delay_update(bar='foo')
|
||||||
|
assert rc.get_delayed_update_fields() == {'foo': 'foobar', 'bar': 'foo', 'emitted_events': 0}
|
||||||
|
|
||||||
|
|
||||||
|
def test_delay_update_skip_if_set(mock_me):
|
||||||
|
rc = RunnerCallback()
|
||||||
|
rc.delay_update(foo='bar', skip_if_already_set=True)
|
||||||
|
assert rc.extra_update_fields == {'foo': 'bar'}
|
||||||
|
rc.delay_update(foo='foobar', skip_if_already_set=True)
|
||||||
|
assert rc.extra_update_fields == {'foo': 'bar'}
|
||||||
|
|
||||||
|
|
||||||
|
def test_delay_update_failure_fields(mock_me):
|
||||||
|
rc = RunnerCallback()
|
||||||
|
rc.delay_update(job_explanation='1')
|
||||||
|
rc.delay_update(job_explanation=_('2'))
|
||||||
|
assert rc.extra_update_fields == {'job_explanation': '1\n2'}
|
||||||
|
rc.delay_update(result_traceback='1')
|
||||||
|
rc.delay_update(result_traceback=_('2'))
|
||||||
|
rc.delay_update(result_traceback=_('3'), skip_if_already_set=True)
|
||||||
|
assert rc.extra_update_fields == {'job_explanation': '1\n2', 'result_traceback': '1\n2'}
|
||||||
|
|
||||||
|
|
||||||
|
def test_duplicate_updates(mock_me):
|
||||||
|
rc = RunnerCallback()
|
||||||
|
rc.delay_update(job_explanation='really long summary...')
|
||||||
|
rc.delay_update(job_explanation='really long summary...')
|
||||||
|
rc.delay_update(job_explanation='really long summary...')
|
||||||
|
assert rc.extra_update_fields == {'job_explanation': 'really long summary...'}
|
||||||
|
|
||||||
|
|
||||||
|
def test_special_ansible_runner_message(mock_me):
|
||||||
|
rc = RunnerCallback()
|
||||||
|
rc.delay_update(result_traceback='Traceback:\ngot an unexpected keyword argument\nFile: foo.py')
|
||||||
|
rc.delay_update(result_traceback='Traceback:\ngot an unexpected keyword argument\nFile: bar.py')
|
||||||
|
assert rc.get_delayed_update_fields().get('result_traceback') == (
|
||||||
|
'Traceback:\ngot an unexpected keyword argument\nFile: foo.py\n'
|
||||||
|
'Traceback:\ngot an unexpected keyword argument\nFile: bar.py\n'
|
||||||
|
f'{ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE}'
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user