Merge pull request #6275 from ryanpetrello/fix-isolated-hostname-in-events

consolidate isolated event handling code into one function

Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
softwarefactory-project-zuul[bot] 2020-03-13 16:34:46 +00:00 committed by GitHub
commit 6b20ffbfdd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 26 deletions

View File

@ -15,7 +15,6 @@ import awx
from awx.main.utils import (
get_system_task_capacity
)
from awx.main.queue import CallbackQueueDispatcher
logger = logging.getLogger('awx.isolated.manager')
playbook_logger = logging.getLogger('awx.isolated.manager.playbooks')
@ -32,12 +31,14 @@ def set_pythonpath(venv_libdir, env):
class IsolatedManager(object):
def __init__(self, canceled_callback=None, check_callback=None, pod_manager=None):
def __init__(self, event_handler, canceled_callback=None, check_callback=None, pod_manager=None):
"""
:param event_handler: a callable used to persist event data from isolated nodes
:param canceled_callback: a callable - which returns `True` or `False`
- signifying if the job has been prematurely
canceled
"""
self.event_handler = event_handler
self.canceled_callback = canceled_callback
self.check_callback = check_callback
self.started_at = None
@ -208,7 +209,6 @@ class IsolatedManager(object):
status = 'failed'
rc = None
last_check = time.time()
dispatcher = CallbackQueueDispatcher()
while status == 'failed':
canceled = self.canceled_callback() if self.canceled_callback else False
@ -238,7 +238,7 @@ class IsolatedManager(object):
except json.decoder.JSONDecodeError: # Just in case it's not fully here yet.
pass
self.consume_events(dispatcher)
self.consume_events()
last_check = time.time()
@ -266,19 +266,18 @@ class IsolatedManager(object):
# consume events one last time just to be sure we didn't miss anything
# in the final sync
self.consume_events(dispatcher)
self.consume_events()
# emit an EOF event
event_data = {
'event': 'EOF',
'final_counter': len(self.handled_events)
}
event_data.setdefault(self.event_data_key, self.instance.id)
dispatcher.dispatch(event_data)
self.event_handler(event_data)
return status, rc
def consume_events(self, dispatcher):
def consume_events(self):
# discover new events and ingest them
events_path = self.path_to('artifacts', self.ident, 'job_events')
@ -303,15 +302,9 @@ class IsolatedManager(object):
# in this scenario, just ignore this event and try it
# again on the next sync
pass
event_data.setdefault(self.event_data_key, self.instance.id)
dispatcher.dispatch(event_data)
self.event_handler(event_data)
self.handled_events.add(event)
# handle artifacts
if event_data.get('event_data', {}).get('artifact_data', {}):
self.instance.artifacts = event_data['event_data']['artifact_data']
self.instance.save(update_fields=['artifacts'])
def cleanup(self):
extravars = {
@ -400,8 +393,7 @@ class IsolatedManager(object):
if os.path.exists(private_data_dir):
shutil.rmtree(private_data_dir)
def run(self, instance, private_data_dir, playbook, module, module_args,
event_data_key, ident=None):
def run(self, instance, private_data_dir, playbook, module, module_args, ident=None):
"""
Run a job on an isolated host.
@ -412,14 +404,12 @@ class IsolatedManager(object):
:param playbook: the playbook to run
:param module: the module to run
:param module_args: the module args to use
:param event_data_key: e.g., job_id, inventory_id, ...
For a completed job run, this function returns (status, rc),
representing the status and return code of the isolated
`ansible-playbook` run.
"""
self.ident = ident
self.event_data_key = event_data_key
self.instance = instance
self.private_data_dir = private_data_dir
self.runner_params = self.build_runner_params(
@ -433,6 +423,5 @@ class IsolatedManager(object):
else:
# emit an EOF event
event_data = {'event': 'EOF', 'final_counter': 0}
event_data.setdefault(self.event_data_key, self.instance.id)
CallbackQueueDispatcher().dispatch(event_data)
self.event_handler(event_data)
return status, rc

View File

@ -489,7 +489,7 @@ def awx_isolated_heartbeat():
# Slow pass looping over isolated IGs and their isolated instances
if len(isolated_instance_qs) > 0:
logger.debug("Managing isolated instances {}.".format(','.join([inst.hostname for inst in isolated_instance_qs])))
isolated_manager.IsolatedManager().health_check(isolated_instance_qs)
isolated_manager.IsolatedManager(CallbackQueueDispatcher.dispatch).health_check(isolated_instance_qs)
@task()
@ -1162,7 +1162,6 @@ class BaseTask(object):
except json.JSONDecodeError:
pass
should_write_event = False
event_data.setdefault(self.event_data_key, self.instance.id)
self.dispatcher.dispatch(event_data)
self.event_ct += 1
@ -1174,7 +1173,7 @@ class BaseTask(object):
self.instance.artifacts = event_data['event_data']['artifact_data']
self.instance.save(update_fields=['artifacts'])
return should_write_event
return False
def cancel_callback(self):
'''
@ -1374,6 +1373,7 @@ class BaseTask(object):
if not params[v]:
del params[v]
self.dispatcher = CallbackQueueDispatcher()
if self.instance.is_isolated() or containerized:
module_args = None
if 'module_args' in params:
@ -1388,6 +1388,7 @@ class BaseTask(object):
ansible_runner.utils.dump_artifacts(params)
isolated_manager_instance = isolated_manager.IsolatedManager(
self.event_handler,
canceled_callback=lambda: self.update_model(self.instance.pk).cancel_flag,
check_callback=self.check_handler,
pod_manager=pod_manager
@ -1397,11 +1398,9 @@ class BaseTask(object):
params.get('playbook'),
params.get('module'),
module_args,
event_data_key=self.event_data_key,
ident=str(self.instance.pk))
self.event_ct = len(isolated_manager_instance.handled_events)
else:
self.dispatcher = CallbackQueueDispatcher()
res = ansible_runner.interface.run(**params)
status = res.status
rc = res.rc