consolidate isolated event handling code into one function

make the non-isolated *and* isolated event handling share the same
function so we don't regress on behavior between the two
This commit is contained in:
Ryan Petrello
2020-03-12 12:34:55 -04:00
parent 7120e92078
commit f8818730d4
2 changed files with 14 additions and 26 deletions

View File

@@ -15,7 +15,6 @@ import awx
from awx.main.utils import (
get_system_task_capacity
)
from awx.main.queue import CallbackQueueDispatcher
logger = logging.getLogger('awx.isolated.manager')
playbook_logger = logging.getLogger('awx.isolated.manager.playbooks')
@@ -32,12 +31,14 @@ def set_pythonpath(venv_libdir, env):
class IsolatedManager(object):
def __init__(self, canceled_callback=None, check_callback=None, pod_manager=None):
def __init__(self, event_handler, canceled_callback=None, check_callback=None, pod_manager=None):
"""
:param event_handler: a callable used to persist event data from isolated nodes
:param canceled_callback: a callable - which returns `True` or `False`
- signifying if the job has been prematurely
canceled
"""
self.event_handler = event_handler
self.canceled_callback = canceled_callback
self.check_callback = check_callback
self.started_at = None
@@ -208,7 +209,6 @@ class IsolatedManager(object):
status = 'failed'
rc = None
last_check = time.time()
dispatcher = CallbackQueueDispatcher()
while status == 'failed':
canceled = self.canceled_callback() if self.canceled_callback else False
@@ -238,7 +238,7 @@ class IsolatedManager(object):
except json.decoder.JSONDecodeError: # Just in case it's not fully here yet.
pass
self.consume_events(dispatcher)
self.consume_events()
last_check = time.time()
@@ -266,19 +266,18 @@ class IsolatedManager(object):
# consume events one last time just to be sure we didn't miss anything
# in the final sync
self.consume_events(dispatcher)
self.consume_events()
# emit an EOF event
event_data = {
'event': 'EOF',
'final_counter': len(self.handled_events)
}
event_data.setdefault(self.event_data_key, self.instance.id)
dispatcher.dispatch(event_data)
self.event_handler(event_data)
return status, rc
def consume_events(self, dispatcher):
def consume_events(self):
# discover new events and ingest them
events_path = self.path_to('artifacts', self.ident, 'job_events')
@@ -303,15 +302,9 @@ class IsolatedManager(object):
# in this scenario, just ignore this event and try it
# again on the next sync
pass
event_data.setdefault(self.event_data_key, self.instance.id)
dispatcher.dispatch(event_data)
self.event_handler(event_data)
self.handled_events.add(event)
# handle artifacts
if event_data.get('event_data', {}).get('artifact_data', {}):
self.instance.artifacts = event_data['event_data']['artifact_data']
self.instance.save(update_fields=['artifacts'])
def cleanup(self):
extravars = {
@@ -400,8 +393,7 @@ class IsolatedManager(object):
if os.path.exists(private_data_dir):
shutil.rmtree(private_data_dir)
def run(self, instance, private_data_dir, playbook, module, module_args,
event_data_key, ident=None):
def run(self, instance, private_data_dir, playbook, module, module_args, ident=None):
"""
Run a job on an isolated host.
@@ -412,14 +404,12 @@ class IsolatedManager(object):
:param playbook: the playbook to run
:param module: the module to run
:param module_args: the module args to use
:param event_data_key: e.g., job_id, inventory_id, ...
For a completed job run, this function returns (status, rc),
representing the status and return code of the isolated
`ansible-playbook` run.
"""
self.ident = ident
self.event_data_key = event_data_key
self.instance = instance
self.private_data_dir = private_data_dir
self.runner_params = self.build_runner_params(
@@ -433,6 +423,5 @@ class IsolatedManager(object):
else:
# emit an EOF event
event_data = {'event': 'EOF', 'final_counter': 0}
event_data.setdefault(self.event_data_key, self.instance.id)
CallbackQueueDispatcher().dispatch(event_data)
self.event_handler(event_data)
return status, rc