From 7571df49d576d4cf99ffe09a3b6c007b70471d52 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Tue, 26 Mar 2024 16:38:16 -0400 Subject: [PATCH] Pass --exclude="list of exclude dirs like this" * Previously, the params were passed without quotes and each directory was being interpreted as a seperate command line flag. * Added some structure around the error messages returned from receptorctl so we can more easily decide how to handle each case. For example, releasing the cleanup job from receptor doesn't absolutely need to succeed because we have a periodic job that does that. In fact, that is the thing that is making it fail .. but I digress. --- awx/main/tasks/receptor.py | 95 ++++++++++++++++++++-- awx/main/tests/unit/utils/test_receptor.py | 4 +- 2 files changed, 92 insertions(+), 7 deletions(-) diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index 597c900eeb..121b5502d9 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -49,6 +49,70 @@ class ReceptorConnectionType(Enum): STREAMTLS = 2 +""" +Translate receptorctl messages that come in over stdout into +structured messages. Currently, these are error messages. +""" + + +class ReceptorErrorBase: + _MESSAGE = 'Receptor Error' + + def __init__(self, node: str = 'N/A', state_name: str = 'N/A'): + self.node = node + self.state_name = state_name + + def __str__(self): + return f"{self.__class__.__name__} '{self._MESSAGE}' on node '{self.node}' with state '{self.state_name}'" + + +class WorkUnitError(ReceptorErrorBase): + _MESSAGE = 'unknown work unit ' + + def __init__(self, work_unit_id: str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.work_unit_id = work_unit_id + + def __str__(self): + return f"{super().__str__()} work unit id '{self.work_unit_id}'" + + +class WorkUnitCancelError(WorkUnitError): + _MESSAGE = 'error cancelling remote unit: unknown work unit ' + + +class WorkUnitResultsError(WorkUnitError): + _MESSAGE = 'Failed to get results: unknown work unit ' + + +class UnknownError(ReceptorErrorBase): + _MESSAGE = 'Unknown receptor ctl error' + + def __init__(self, msg, *args, **kwargs): + super().__init__(*args, **kwargs) + self._MESSAGE = msg + + +class FuzzyError: + def __new__(self, e: RuntimeError, node: str, state_name: str): + """ + At the time of writing this comment all of the sub-classes detection + is centralized in this parent class. It's like a Router(). + Someone may find it better to push down the error detection logic into + each sub-class. + """ + msg = e.args[0] + + common_startswith = (WorkUnitCancelError, WorkUnitResultsError, WorkUnitError) + + for klass in common_startswith: + if msg.startswith(klass._MESSAGE): + work_unit_id = msg[len(klass._MESSAGE) :] + return klass(work_unit_id, node=node, state_name=state_name) + + return UnknownError(msg, node=node, state_name=state_name) + + def read_receptor_config(): # for K8S deployments, getting a lock is necessary as another process # may be re-writing the config at this time @@ -185,6 +249,7 @@ def run_until_complete(node, timing_data=None, **kwargs): timing_data['transmit_timing'] = run_start - transmit_start run_timing = 0.0 stdout = '' + state_name = 'local var never set' try: resultfile = receptor_ctl.get_work_results(unit_id) @@ -205,13 +270,33 @@ def run_until_complete(node, timing_data=None, **kwargs): stdout = resultfile.read() stdout = str(stdout, encoding='utf-8') + except RuntimeError as e: + receptor_e = FuzzyError(e, node, state_name) + if type(receptor_e) in ( + WorkUnitError, + WorkUnitResultsError, + ): + logger.warning(f'While consuming job results: {receptor_e}') + else: + raise finally: if settings.RECEPTOR_RELEASE_WORK: - res = receptor_ctl.simple_command(f"work release {unit_id}") - if res != {'released': unit_id}: - logger.warning(f'Could not confirm release of receptor work unit id {unit_id} from {node}, data: {res}') + try: + res = receptor_ctl.simple_command(f"work release {unit_id}") - receptor_ctl.close() + if res != {'released': unit_id}: + logger.warning(f'Could not confirm release of receptor work unit id {unit_id} from {node}, data: {res}') + + receptor_ctl.close() + except RuntimeError as e: + receptor_e = FuzzyError(e, node, state_name) + if type(receptor_e) in ( + WorkUnitError, + WorkUnitCancelError, + ): + logger.warning(f"While releasing work: {receptor_e}") + else: + logger.error(f"While releasing work: {receptor_e}") if state_name.lower() == 'failed': work_detail = status.get('Detail', '') @@ -275,7 +360,7 @@ def _convert_args_to_cli(vargs): args = ['cleanup'] for option in ('exclude_strings', 'remove_images'): if vargs.get(option): - args.append('--{}={}'.format(option.replace('_', '-'), ' '.join(vargs.get(option)))) + args.append('--{}="{}"'.format(option.replace('_', '-'), ' '.join(vargs.get(option)))) for option in ('file_pattern', 'image_prune', 'process_isolation_executable', 'grace_period'): if vargs.get(option) is True: args.append('--{}'.format(option.replace('_', '-'))) diff --git a/awx/main/tests/unit/utils/test_receptor.py b/awx/main/tests/unit/utils/test_receptor.py index 0a7e182070..b077e8a5db 100644 --- a/awx/main/tests/unit/utils/test_receptor.py +++ b/awx/main/tests/unit/utils/test_receptor.py @@ -3,7 +3,7 @@ from awx.main.tasks.receptor import _convert_args_to_cli def test_file_cleanup_scenario(): args = _convert_args_to_cli({'exclude_strings': ['awx_423_', 'awx_582_'], 'file_pattern': '/tmp/awx_*_*'}) - assert ' '.join(args) == 'cleanup --exclude-strings=awx_423_ awx_582_ --file-pattern=/tmp/awx_*_*' + assert ' '.join(args) == 'cleanup --exclude-strings="awx_423_ awx_582_" --file-pattern=/tmp/awx_*_*' def test_image_cleanup_scenario(): @@ -17,5 +17,5 @@ def test_image_cleanup_scenario(): } ) assert ( - ' '.join(args) == 'cleanup --remove-images=quay.invalid/foo/bar:latest quay.invalid/foo/bar:devel --image-prune --process-isolation-executable=podman' + ' '.join(args) == 'cleanup --remove-images="quay.invalid/foo/bar:latest quay.invalid/foo/bar:devel" --image-prune --process-isolation-executable=podman' )