From e594a3ee3e5c511c8fc6e3a09ea3de9ac6fd30bf Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Tue, 10 Jan 2017 14:25:41 -0500 Subject: [PATCH 1/3] Implement a worker-based callback receiver system --- .../commands/run_callback_receiver.py | 110 +++++++++++++----- awx/settings/defaults.py | 4 + 2 files changed, 84 insertions(+), 30 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 7b959f6781..79a6503568 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -3,6 +3,10 @@ # Python import logging +from threading import Thread +from Queue import Queue as ThreadQueue +from Queue import Empty as QueueEmpty +from uuid import UUID from kombu import Connection, Exchange, Queue from kombu.mixins import ConsumerMixin @@ -19,9 +23,23 @@ logger = logging.getLogger('awx.main.commands.run_callback_receiver') class CallbackBrokerWorker(ConsumerMixin): - def __init__(self, connection): + def __init__(self, connection, use_workers=True): self.connection = connection self.partial_events = {} + self.worker_queues = [] + self.total_messages = 0 + + if use_workers: + connection.close() + for idx in range(settings.JOB_EVENT_WORKERS): + queue_actual = ThreadQueue(settings.JOB_EVENT_MAX_QUEUE_SIZE) + w = Thread(target=self.callback_worker, args=(queue_actual, idx,)) + w.start() + if settings.DEBUG: + logger.info('Started worker %s' % str(idx)) + self.worker_queues.append([0, queue_actual, w]) + elif settings.DEBUG: + logger.warn('Started callback receiver (no workers)') def get_consumers(self, Consumer, channel): return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE, @@ -31,37 +49,69 @@ class CallbackBrokerWorker(ConsumerMixin): callbacks=[self.process_task])] def process_task(self, body, message): - try: - if 'job_id' not in body and 'ad_hoc_command_id' not in body: - raise Exception('Payload does not have a job_id or ad_hoc_command_id') - if settings.DEBUG: - logger.info('Body: {}'.format(body)) - logger.info('Message: {}'.format(message)) - try: - # If event came directly from callback without counter/stdout, - # save it until the rest of the event arrives. - if 'counter' not in body: - if 'uuid' in body: - self.partial_events[body['uuid']] = body - # If event has counter, try to combine it with any event data - # already received for the same uuid, then create the actual - # job event record. - else: - if 'uuid' in body: - partial_event = self.partial_events.pop(body['uuid'], {}) - body.update(partial_event) - if 'job_id' in body: - JobEvent.create_from_data(**body) - elif 'ad_hoc_command_id' in body: - AdHocCommandEvent.create_from_data(**body) - except DatabaseError as e: - logger.error('Database Error Saving Job Event: {}'.format(e)) - except Exception as exc: - import traceback - traceback.print_exc() - logger.error('Callback Task Processor Raised Exception: %r', exc) + if "uuid" in body: + queue = UUID(body['uuid']).int % settings.JOB_EVENT_WORKERS + else: + queue = self.total_messages % settings.JOB_EVENT_WORKERS + self.write_queue_worker(queue, body) + self.total_messages += 1 message.ack() + def write_queue_worker(self, preferred_queue, body): + queue_order = sorted(range(settings.JOB_EVENT_WORKERS), cmp=lambda x, y: -1 if x==preferred_queue else 0) + for queue_actual in queue_order: + try: + worker_actual = self.worker_queues[queue_actual] + worker_actual[1].put(body, block=True, timeout=5) + worker_actual[0] += 1 + return queue_actual + except Exception: + import traceback + traceback.print_exc() + logger.warn("Could not write to queue %s" % preferred_queue) + continue + return None + + def callback_worker(self, queue_actual, idx): + while True: + try: + body = queue_actual.get(block=True, timeout=1) + except QueueEmpty: + continue + except Exception as e: + logger.error("Exception on worker thread, restarting: " + str(e)) + continue + try: + if 'job_id' not in body and 'ad_hoc_command_id' not in body: + raise Exception('Payload does not have a job_id or ad_hoc_command_id') + if settings.DEBUG: + logger.info('Body: {}'.format(body)) + try: + # If event came directly from callback without counter/stdout, + # save it until the rest of the event arrives. + if 'counter' not in body: + if 'uuid' in body: + self.partial_events[body['uuid']] = body + # If event has counter, try to combine it with any event data + # already received for the same uuid, then create the actual + # job event record. + else: + if 'uuid' in body: + partial_event = self.partial_events.pop(body['uuid'], {}) + body.update(partial_event) + else: + continue + if 'job_id' in body: + JobEvent.create_from_data(**body) + elif 'ad_hoc_command_id' in body: + AdHocCommandEvent.create_from_data(**body) + except DatabaseError as e: + logger.error('Database Error Saving Job Event: {}'.format(e)) + except Exception as exc: + import traceback + traceback.print_exc() + logger.error('Callback Task Processor Raised Exception: %r', exc) + class Command(NoArgsCommand): ''' diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 48338e09d8..009082b4d5 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -163,6 +163,10 @@ MAX_EVENT_RES_DATA = 700000 # Note: This setting may be overridden by database settings. EVENT_STDOUT_MAX_BYTES_DISPLAY = 1024 +JOB_EVENT_WORKERS = 4 + +JOB_EVENT_MAX_QUEUE_SIZE = 5000 + TEMPLATE_CONTEXT_PROCESSORS = ( # NOQA 'django.contrib.auth.context_processors.auth', 'django.core.context_processors.debug', From 1cdeb4d2af1372f840bc4bcacb296ae04f3a0731 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Tue, 17 Jan 2017 15:59:43 -0500 Subject: [PATCH 2/3] Refactor event emitter to not use event partials event emitter now caches to memcache where it is eventually picked up by the stdout event emitter. This obviates event reassembly in the callback receiver. --- awx/lib/tower_display_callback/events.py | 5 ++++- .../commands/run_callback_receiver.py | 22 ++++--------------- awx/main/tasks.py | 4 ++++ requirements/requirements_ansible.in | 1 + requirements/requirements_ansible.txt | 1 + 5 files changed, 14 insertions(+), 19 deletions(-) diff --git a/awx/lib/tower_display_callback/events.py b/awx/lib/tower_display_callback/events.py index c17cf2c7f1..80265f21c6 100644 --- a/awx/lib/tower_display_callback/events.py +++ b/awx/lib/tower_display_callback/events.py @@ -27,6 +27,7 @@ import multiprocessing import os import threading import uuid +import memcache # Kombu from kombu import Connection, Exchange, Producer @@ -100,6 +101,8 @@ class EventContext(object): def __init__(self): self.display_lock = multiprocessing.RLock() self.dispatcher = CallbackQueueEventDispatcher() + cache_actual = os.getenv('CACHE', '127.0.0.1:11211') + self.cache = memcache.Client([cache_actual], debug=0) def add_local(self, **kwargs): if not hasattr(self, '_local'): @@ -201,7 +204,7 @@ class EventContext(object): def dump_begin(self, fileobj): begin_dict = self.get_begin_dict() - self.dispatcher.dispatch(begin_dict) + self.cache.set(":1:ev-{}".format(begin_dict['uuid']), begin_dict) self.dump(fileobj, {'uuid': begin_dict['uuid']}) def dump_end(self, fileobj): diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 79a6503568..89beb7ebd3 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -87,24 +87,10 @@ class CallbackBrokerWorker(ConsumerMixin): if settings.DEBUG: logger.info('Body: {}'.format(body)) try: - # If event came directly from callback without counter/stdout, - # save it until the rest of the event arrives. - if 'counter' not in body: - if 'uuid' in body: - self.partial_events[body['uuid']] = body - # If event has counter, try to combine it with any event data - # already received for the same uuid, then create the actual - # job event record. - else: - if 'uuid' in body: - partial_event = self.partial_events.pop(body['uuid'], {}) - body.update(partial_event) - else: - continue - if 'job_id' in body: - JobEvent.create_from_data(**body) - elif 'ad_hoc_command_id' in body: - AdHocCommandEvent.create_from_data(**body) + if 'job_id' in body: + JobEvent.create_from_data(**body) + elif 'ad_hoc_command_id' in body: + AdHocCommandEvent.create_from_data(**body) except DatabaseError as e: logger.error('Database Error Saving Job Event: {}'.format(e)) except Exception as exc: diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 0f47d080bb..0bd62b7325 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -822,6 +822,7 @@ class RunJob(BaseTask): env['MAX_EVENT_RES'] = str(settings.MAX_EVENT_RES_DATA) env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE env['CALLBACK_CONNECTION'] = settings.BROKER_URL + env['CACHE'] = settings.CACHES['default']['LOCATION'] if 'LOCATION' in settings.CACHES['default'] else '' if getattr(settings, 'JOB_CALLBACK_DEBUG', False): env['JOB_CALLBACK_DEBUG'] = '2' elif settings.DEBUG: @@ -1029,6 +1030,9 @@ class RunJob(BaseTask): def job_event_callback(event_data): event_data.setdefault('job_id', instance.id) + cache_event = cache.get('ev-{}'.format(event_data['uuid']), None) + if cache_event is not None: + event_data.update(cache_event) dispatcher.dispatch(event_data) else: def job_event_callback(event_data): diff --git a/requirements/requirements_ansible.in b/requirements/requirements_ansible.in index 2d381c955b..dfe378c4bf 100644 --- a/requirements/requirements_ansible.in +++ b/requirements/requirements_ansible.in @@ -4,6 +4,7 @@ azure==2.0.0rc6 backports.ssl-match-hostname==3.5.0.1 kombu==3.0.35 boto==2.45.0 +python-memcached==1.58 psphere==0.5.2 psutil==5.0.0 pyvmomi==6.5 diff --git a/requirements/requirements_ansible.txt b/requirements/requirements_ansible.txt index 59a7fe9544..4d20ac165a 100644 --- a/requirements/requirements_ansible.txt +++ b/requirements/requirements_ansible.txt @@ -84,6 +84,7 @@ pyasn1==0.1.9 # via cryptography pycparser==2.17 # via cffi PyJWT==1.4.2 # via adal pyparsing==2.1.10 # via cliff, cmd2, oslo.utils +python-memcached==1.58 python-cinderclient==1.9.0 # via python-openstackclient, shade python-dateutil==2.6.0 # via adal, azure-storage python-designateclient==2.4.0 # via shade From 73f92bba5df5bc942bb928d3074a2cccba980738 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 18 Jan 2017 12:05:29 -0500 Subject: [PATCH 3/3] Integrate multiprocessing writer worker in callback receiver --- .../commands/run_callback_receiver.py | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 89beb7ebd3..314650283d 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -3,10 +3,11 @@ # Python import logging -from threading import Thread -from Queue import Queue as ThreadQueue -from Queue import Empty as QueueEmpty +import signal from uuid import UUID +from multiprocessing import Process +from multiprocessing import Queue as MPQueue +from Queue import Empty as QueueEmpty from kombu import Connection, Exchange, Queue from kombu.mixins import ConsumerMixin @@ -14,6 +15,7 @@ from kombu.mixins import ConsumerMixin # Django from django.conf import settings from django.core.management.base import NoArgsCommand +from django.db import connection as django_connection from django.db import DatabaseError # AWX @@ -25,15 +27,28 @@ logger = logging.getLogger('awx.main.commands.run_callback_receiver') class CallbackBrokerWorker(ConsumerMixin): def __init__(self, connection, use_workers=True): self.connection = connection - self.partial_events = {} self.worker_queues = [] self.total_messages = 0 + self.init_workers(use_workers) + + def init_workers(self, use_workers=True): + def shutdown_handler(active_workers): + def _handler(signum, frame): + try: + for active_worker in active_workers: + active_worker.terminate() + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it + except Exception: + # TODO: LOG + pass + return _handler if use_workers: - connection.close() + django_connection.close() for idx in range(settings.JOB_EVENT_WORKERS): - queue_actual = ThreadQueue(settings.JOB_EVENT_MAX_QUEUE_SIZE) - w = Thread(target=self.callback_worker, args=(queue_actual, idx,)) + queue_actual = MPQueue(settings.JOB_EVENT_MAX_QUEUE_SIZE) + w = Process(target=self.callback_worker, args=(queue_actual, idx,)) w.start() if settings.DEBUG: logger.info('Started worker %s' % str(idx)) @@ -41,6 +56,9 @@ class CallbackBrokerWorker(ConsumerMixin): elif settings.DEBUG: logger.warn('Started callback receiver (no workers)') + signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in self.worker_queues])) + signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in self.worker_queues])) + def get_consumers(self, Consumer, channel): return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE, Exchange(settings.CALLBACK_QUEUE, type='direct'),