From f8818730d414ae837f4b01d197131c1875d22e1c Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Thu, 12 Mar 2020 12:34:55 -0400 Subject: [PATCH] consolidate isolated event handling code into one function make the non-isolated *and* isolated event handling share the same function so we don't regress on behavior between the two --- awx/main/isolated/manager.py | 31 ++++++++++--------------------- awx/main/tasks.py | 9 ++++----- 2 files changed, 14 insertions(+), 26 deletions(-) diff --git a/awx/main/isolated/manager.py b/awx/main/isolated/manager.py index afa7eb57be..043f3c8d5e 100644 --- a/awx/main/isolated/manager.py +++ b/awx/main/isolated/manager.py @@ -15,7 +15,6 @@ import awx from awx.main.utils import ( get_system_task_capacity ) -from awx.main.queue import CallbackQueueDispatcher logger = logging.getLogger('awx.isolated.manager') playbook_logger = logging.getLogger('awx.isolated.manager.playbooks') @@ -32,12 +31,14 @@ def set_pythonpath(venv_libdir, env): class IsolatedManager(object): - def __init__(self, canceled_callback=None, check_callback=None, pod_manager=None): + def __init__(self, event_handler, canceled_callback=None, check_callback=None, pod_manager=None): """ + :param event_handler: a callable used to persist event data from isolated nodes :param canceled_callback: a callable - which returns `True` or `False` - signifying if the job has been prematurely canceled """ + self.event_handler = event_handler self.canceled_callback = canceled_callback self.check_callback = check_callback self.started_at = None @@ -208,7 +209,6 @@ class IsolatedManager(object): status = 'failed' rc = None last_check = time.time() - dispatcher = CallbackQueueDispatcher() while status == 'failed': canceled = self.canceled_callback() if self.canceled_callback else False @@ -238,7 +238,7 @@ class IsolatedManager(object): except json.decoder.JSONDecodeError: # Just in case it's not fully here yet. pass - self.consume_events(dispatcher) + self.consume_events() last_check = time.time() @@ -266,19 +266,18 @@ class IsolatedManager(object): # consume events one last time just to be sure we didn't miss anything # in the final sync - self.consume_events(dispatcher) + self.consume_events() # emit an EOF event event_data = { 'event': 'EOF', 'final_counter': len(self.handled_events) } - event_data.setdefault(self.event_data_key, self.instance.id) - dispatcher.dispatch(event_data) + self.event_handler(event_data) return status, rc - def consume_events(self, dispatcher): + def consume_events(self): # discover new events and ingest them events_path = self.path_to('artifacts', self.ident, 'job_events') @@ -303,15 +302,9 @@ class IsolatedManager(object): # in this scenario, just ignore this event and try it # again on the next sync pass - event_data.setdefault(self.event_data_key, self.instance.id) - dispatcher.dispatch(event_data) + self.event_handler(event_data) self.handled_events.add(event) - # handle artifacts - if event_data.get('event_data', {}).get('artifact_data', {}): - self.instance.artifacts = event_data['event_data']['artifact_data'] - self.instance.save(update_fields=['artifacts']) - def cleanup(self): extravars = { @@ -400,8 +393,7 @@ class IsolatedManager(object): if os.path.exists(private_data_dir): shutil.rmtree(private_data_dir) - def run(self, instance, private_data_dir, playbook, module, module_args, - event_data_key, ident=None): + def run(self, instance, private_data_dir, playbook, module, module_args, ident=None): """ Run a job on an isolated host. @@ -412,14 +404,12 @@ class IsolatedManager(object): :param playbook: the playbook to run :param module: the module to run :param module_args: the module args to use - :param event_data_key: e.g., job_id, inventory_id, ... For a completed job run, this function returns (status, rc), representing the status and return code of the isolated `ansible-playbook` run. """ self.ident = ident - self.event_data_key = event_data_key self.instance = instance self.private_data_dir = private_data_dir self.runner_params = self.build_runner_params( @@ -433,6 +423,5 @@ class IsolatedManager(object): else: # emit an EOF event event_data = {'event': 'EOF', 'final_counter': 0} - event_data.setdefault(self.event_data_key, self.instance.id) - CallbackQueueDispatcher().dispatch(event_data) + self.event_handler(event_data) return status, rc diff --git a/awx/main/tasks.py b/awx/main/tasks.py index bd92b6a124..292ae0f17e 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -489,7 +489,7 @@ def awx_isolated_heartbeat(): # Slow pass looping over isolated IGs and their isolated instances if len(isolated_instance_qs) > 0: logger.debug("Managing isolated instances {}.".format(','.join([inst.hostname for inst in isolated_instance_qs]))) - isolated_manager.IsolatedManager().health_check(isolated_instance_qs) + isolated_manager.IsolatedManager(CallbackQueueDispatcher.dispatch).health_check(isolated_instance_qs) @task() @@ -1162,7 +1162,6 @@ class BaseTask(object): except json.JSONDecodeError: pass - should_write_event = False event_data.setdefault(self.event_data_key, self.instance.id) self.dispatcher.dispatch(event_data) self.event_ct += 1 @@ -1174,7 +1173,7 @@ class BaseTask(object): self.instance.artifacts = event_data['event_data']['artifact_data'] self.instance.save(update_fields=['artifacts']) - return should_write_event + return False def cancel_callback(self): ''' @@ -1374,6 +1373,7 @@ class BaseTask(object): if not params[v]: del params[v] + self.dispatcher = CallbackQueueDispatcher() if self.instance.is_isolated() or containerized: module_args = None if 'module_args' in params: @@ -1388,6 +1388,7 @@ class BaseTask(object): ansible_runner.utils.dump_artifacts(params) isolated_manager_instance = isolated_manager.IsolatedManager( + self.event_handler, canceled_callback=lambda: self.update_model(self.instance.pk).cancel_flag, check_callback=self.check_handler, pod_manager=pod_manager @@ -1397,11 +1398,9 @@ class BaseTask(object): params.get('playbook'), params.get('module'), module_args, - event_data_key=self.event_data_key, ident=str(self.instance.pk)) self.event_ct = len(isolated_manager_instance.handled_events) else: - self.dispatcher = CallbackQueueDispatcher() res = ansible_runner.interface.run(**params) status = res.status rc = res.rc