diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index 597c900eeb..121b5502d9 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -49,6 +49,70 @@ class ReceptorConnectionType(Enum): STREAMTLS = 2 +""" +Translate receptorctl messages that come in over stdout into +structured messages. Currently, these are error messages. +""" + + +class ReceptorErrorBase: + _MESSAGE = 'Receptor Error' + + def __init__(self, node: str = 'N/A', state_name: str = 'N/A'): + self.node = node + self.state_name = state_name + + def __str__(self): + return f"{self.__class__.__name__} '{self._MESSAGE}' on node '{self.node}' with state '{self.state_name}'" + + +class WorkUnitError(ReceptorErrorBase): + _MESSAGE = 'unknown work unit ' + + def __init__(self, work_unit_id: str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.work_unit_id = work_unit_id + + def __str__(self): + return f"{super().__str__()} work unit id '{self.work_unit_id}'" + + +class WorkUnitCancelError(WorkUnitError): + _MESSAGE = 'error cancelling remote unit: unknown work unit ' + + +class WorkUnitResultsError(WorkUnitError): + _MESSAGE = 'Failed to get results: unknown work unit ' + + +class UnknownError(ReceptorErrorBase): + _MESSAGE = 'Unknown receptor ctl error' + + def __init__(self, msg, *args, **kwargs): + super().__init__(*args, **kwargs) + self._MESSAGE = msg + + +class FuzzyError: + def __new__(self, e: RuntimeError, node: str, state_name: str): + """ + At the time of writing this comment all of the sub-classes detection + is centralized in this parent class. It's like a Router(). + Someone may find it better to push down the error detection logic into + each sub-class. + """ + msg = e.args[0] + + common_startswith = (WorkUnitCancelError, WorkUnitResultsError, WorkUnitError) + + for klass in common_startswith: + if msg.startswith(klass._MESSAGE): + work_unit_id = msg[len(klass._MESSAGE) :] + return klass(work_unit_id, node=node, state_name=state_name) + + return UnknownError(msg, node=node, state_name=state_name) + + def read_receptor_config(): # for K8S deployments, getting a lock is necessary as another process # may be re-writing the config at this time @@ -185,6 +249,7 @@ def run_until_complete(node, timing_data=None, **kwargs): timing_data['transmit_timing'] = run_start - transmit_start run_timing = 0.0 stdout = '' + state_name = 'local var never set' try: resultfile = receptor_ctl.get_work_results(unit_id) @@ -205,13 +270,33 @@ def run_until_complete(node, timing_data=None, **kwargs): stdout = resultfile.read() stdout = str(stdout, encoding='utf-8') + except RuntimeError as e: + receptor_e = FuzzyError(e, node, state_name) + if type(receptor_e) in ( + WorkUnitError, + WorkUnitResultsError, + ): + logger.warning(f'While consuming job results: {receptor_e}') + else: + raise finally: if settings.RECEPTOR_RELEASE_WORK: - res = receptor_ctl.simple_command(f"work release {unit_id}") - if res != {'released': unit_id}: - logger.warning(f'Could not confirm release of receptor work unit id {unit_id} from {node}, data: {res}') + try: + res = receptor_ctl.simple_command(f"work release {unit_id}") - receptor_ctl.close() + if res != {'released': unit_id}: + logger.warning(f'Could not confirm release of receptor work unit id {unit_id} from {node}, data: {res}') + + receptor_ctl.close() + except RuntimeError as e: + receptor_e = FuzzyError(e, node, state_name) + if type(receptor_e) in ( + WorkUnitError, + WorkUnitCancelError, + ): + logger.warning(f"While releasing work: {receptor_e}") + else: + logger.error(f"While releasing work: {receptor_e}") if state_name.lower() == 'failed': work_detail = status.get('Detail', '') @@ -275,7 +360,7 @@ def _convert_args_to_cli(vargs): args = ['cleanup'] for option in ('exclude_strings', 'remove_images'): if vargs.get(option): - args.append('--{}={}'.format(option.replace('_', '-'), ' '.join(vargs.get(option)))) + args.append('--{}="{}"'.format(option.replace('_', '-'), ' '.join(vargs.get(option)))) for option in ('file_pattern', 'image_prune', 'process_isolation_executable', 'grace_period'): if vargs.get(option) is True: args.append('--{}'.format(option.replace('_', '-'))) diff --git a/awx/main/tests/unit/utils/test_receptor.py b/awx/main/tests/unit/utils/test_receptor.py index 0a7e182070..b077e8a5db 100644 --- a/awx/main/tests/unit/utils/test_receptor.py +++ b/awx/main/tests/unit/utils/test_receptor.py @@ -3,7 +3,7 @@ from awx.main.tasks.receptor import _convert_args_to_cli def test_file_cleanup_scenario(): args = _convert_args_to_cli({'exclude_strings': ['awx_423_', 'awx_582_'], 'file_pattern': '/tmp/awx_*_*'}) - assert ' '.join(args) == 'cleanup --exclude-strings=awx_423_ awx_582_ --file-pattern=/tmp/awx_*_*' + assert ' '.join(args) == 'cleanup --exclude-strings="awx_423_ awx_582_" --file-pattern=/tmp/awx_*_*' def test_image_cleanup_scenario(): @@ -17,5 +17,5 @@ def test_image_cleanup_scenario(): } ) assert ( - ' '.join(args) == 'cleanup --remove-images=quay.invalid/foo/bar:latest quay.invalid/foo/bar:devel --image-prune --process-isolation-executable=podman' + ' '.join(args) == 'cleanup --remove-images="quay.invalid/foo/bar:latest quay.invalid/foo/bar:devel" --image-prune --process-isolation-executable=podman' )