diff --git a/awx/lib/tower_display_callback/events.py b/awx/lib/tower_display_callback/events.py index c17cf2c7f1..80265f21c6 100644 --- a/awx/lib/tower_display_callback/events.py +++ b/awx/lib/tower_display_callback/events.py @@ -27,6 +27,7 @@ import multiprocessing import os import threading import uuid +import memcache # Kombu from kombu import Connection, Exchange, Producer @@ -100,6 +101,8 @@ class EventContext(object): def __init__(self): self.display_lock = multiprocessing.RLock() self.dispatcher = CallbackQueueEventDispatcher() + cache_actual = os.getenv('CACHE', '127.0.0.1:11211') + self.cache = memcache.Client([cache_actual], debug=0) def add_local(self, **kwargs): if not hasattr(self, '_local'): @@ -201,7 +204,7 @@ class EventContext(object): def dump_begin(self, fileobj): begin_dict = self.get_begin_dict() - self.dispatcher.dispatch(begin_dict) + self.cache.set(":1:ev-{}".format(begin_dict['uuid']), begin_dict) self.dump(fileobj, {'uuid': begin_dict['uuid']}) def dump_end(self, fileobj): diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 79a6503568..89beb7ebd3 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -87,24 +87,10 @@ class CallbackBrokerWorker(ConsumerMixin): if settings.DEBUG: logger.info('Body: {}'.format(body)) try: - # If event came directly from callback without counter/stdout, - # save it until the rest of the event arrives. - if 'counter' not in body: - if 'uuid' in body: - self.partial_events[body['uuid']] = body - # If event has counter, try to combine it with any event data - # already received for the same uuid, then create the actual - # job event record. - else: - if 'uuid' in body: - partial_event = self.partial_events.pop(body['uuid'], {}) - body.update(partial_event) - else: - continue - if 'job_id' in body: - JobEvent.create_from_data(**body) - elif 'ad_hoc_command_id' in body: - AdHocCommandEvent.create_from_data(**body) + if 'job_id' in body: + JobEvent.create_from_data(**body) + elif 'ad_hoc_command_id' in body: + AdHocCommandEvent.create_from_data(**body) except DatabaseError as e: logger.error('Database Error Saving Job Event: {}'.format(e)) except Exception as exc: diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 0f47d080bb..0bd62b7325 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -822,6 +822,7 @@ class RunJob(BaseTask): env['MAX_EVENT_RES'] = str(settings.MAX_EVENT_RES_DATA) env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE env['CALLBACK_CONNECTION'] = settings.BROKER_URL + env['CACHE'] = settings.CACHES['default']['LOCATION'] if 'LOCATION' in settings.CACHES['default'] else '' if getattr(settings, 'JOB_CALLBACK_DEBUG', False): env['JOB_CALLBACK_DEBUG'] = '2' elif settings.DEBUG: @@ -1029,6 +1030,9 @@ class RunJob(BaseTask): def job_event_callback(event_data): event_data.setdefault('job_id', instance.id) + cache_event = cache.get('ev-{}'.format(event_data['uuid']), None) + if cache_event is not None: + event_data.update(cache_event) dispatcher.dispatch(event_data) else: def job_event_callback(event_data): diff --git a/requirements/requirements_ansible.in b/requirements/requirements_ansible.in index 2d381c955b..dfe378c4bf 100644 --- a/requirements/requirements_ansible.in +++ b/requirements/requirements_ansible.in @@ -4,6 +4,7 @@ azure==2.0.0rc6 backports.ssl-match-hostname==3.5.0.1 kombu==3.0.35 boto==2.45.0 +python-memcached==1.58 psphere==0.5.2 psutil==5.0.0 pyvmomi==6.5 diff --git a/requirements/requirements_ansible.txt b/requirements/requirements_ansible.txt index 59a7fe9544..4d20ac165a 100644 --- a/requirements/requirements_ansible.txt +++ b/requirements/requirements_ansible.txt @@ -84,6 +84,7 @@ pyasn1==0.1.9 # via cryptography pycparser==2.17 # via cffi PyJWT==1.4.2 # via adal pyparsing==2.1.10 # via cliff, cmd2, oslo.utils +python-memcached==1.58 python-cinderclient==1.9.0 # via python-openstackclient, shade python-dateutil==2.6.0 # via adal, azure-storage python-designateclient==2.4.0 # via shade