diff --git a/awx/lib/tower_display_callback/events.py b/awx/lib/tower_display_callback/events.py index f46d5ae96f..a419b33e85 100644 --- a/awx/lib/tower_display_callback/events.py +++ b/awx/lib/tower_display_callback/events.py @@ -27,6 +27,7 @@ import multiprocessing import os import threading import uuid +import memcache # Kombu from kombu import Connection, Exchange, Producer @@ -100,6 +101,8 @@ class EventContext(object): def __init__(self): self.display_lock = multiprocessing.RLock() self.dispatcher = CallbackQueueEventDispatcher() + cache_actual = os.getenv('CACHE', '127.0.0.1:11211') + self.cache = memcache.Client([cache_actual], debug=0) def add_local(self, **kwargs): if not hasattr(self, '_local'): @@ -201,7 +204,7 @@ class EventContext(object): def dump_begin(self, fileobj): begin_dict = self.get_begin_dict() - self.dispatcher.dispatch(begin_dict) + self.cache.set(":1:ev-{}".format(begin_dict['uuid']), begin_dict) self.dump(fileobj, {'uuid': begin_dict['uuid']}) def dump_end(self, fileobj): diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 7b959f6781..314650283d 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -3,6 +3,11 @@ # Python import logging +import signal +from uuid import UUID +from multiprocessing import Process +from multiprocessing import Queue as MPQueue +from Queue import Empty as QueueEmpty from kombu import Connection, Exchange, Queue from kombu.mixins import ConsumerMixin @@ -10,6 +15,7 @@ from kombu.mixins import ConsumerMixin # Django from django.conf import settings from django.core.management.base import NoArgsCommand +from django.db import connection as django_connection from django.db import DatabaseError # AWX @@ -19,9 +25,39 @@ logger = logging.getLogger('awx.main.commands.run_callback_receiver') class CallbackBrokerWorker(ConsumerMixin): - def __init__(self, connection): + def __init__(self, connection, use_workers=True): self.connection = connection - self.partial_events = {} + self.worker_queues = [] + self.total_messages = 0 + self.init_workers(use_workers) + + def init_workers(self, use_workers=True): + def shutdown_handler(active_workers): + def _handler(signum, frame): + try: + for active_worker in active_workers: + active_worker.terminate() + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it + except Exception: + # TODO: LOG + pass + return _handler + + if use_workers: + django_connection.close() + for idx in range(settings.JOB_EVENT_WORKERS): + queue_actual = MPQueue(settings.JOB_EVENT_MAX_QUEUE_SIZE) + w = Process(target=self.callback_worker, args=(queue_actual, idx,)) + w.start() + if settings.DEBUG: + logger.info('Started worker %s' % str(idx)) + self.worker_queues.append([0, queue_actual, w]) + elif settings.DEBUG: + logger.warn('Started callback receiver (no workers)') + + signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in self.worker_queues])) + signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in self.worker_queues])) def get_consumers(self, Consumer, channel): return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE, @@ -31,36 +67,54 @@ class CallbackBrokerWorker(ConsumerMixin): callbacks=[self.process_task])] def process_task(self, body, message): - try: - if 'job_id' not in body and 'ad_hoc_command_id' not in body: - raise Exception('Payload does not have a job_id or ad_hoc_command_id') - if settings.DEBUG: - logger.info('Body: {}'.format(body)) - logger.info('Message: {}'.format(message)) + if "uuid" in body: + queue = UUID(body['uuid']).int % settings.JOB_EVENT_WORKERS + else: + queue = self.total_messages % settings.JOB_EVENT_WORKERS + self.write_queue_worker(queue, body) + self.total_messages += 1 + message.ack() + + def write_queue_worker(self, preferred_queue, body): + queue_order = sorted(range(settings.JOB_EVENT_WORKERS), cmp=lambda x, y: -1 if x==preferred_queue else 0) + for queue_actual in queue_order: try: - # If event came directly from callback without counter/stdout, - # save it until the rest of the event arrives. - if 'counter' not in body: - if 'uuid' in body: - self.partial_events[body['uuid']] = body - # If event has counter, try to combine it with any event data - # already received for the same uuid, then create the actual - # job event record. - else: - if 'uuid' in body: - partial_event = self.partial_events.pop(body['uuid'], {}) - body.update(partial_event) + worker_actual = self.worker_queues[queue_actual] + worker_actual[1].put(body, block=True, timeout=5) + worker_actual[0] += 1 + return queue_actual + except Exception: + import traceback + traceback.print_exc() + logger.warn("Could not write to queue %s" % preferred_queue) + continue + return None + + def callback_worker(self, queue_actual, idx): + while True: + try: + body = queue_actual.get(block=True, timeout=1) + except QueueEmpty: + continue + except Exception as e: + logger.error("Exception on worker thread, restarting: " + str(e)) + continue + try: + if 'job_id' not in body and 'ad_hoc_command_id' not in body: + raise Exception('Payload does not have a job_id or ad_hoc_command_id') + if settings.DEBUG: + logger.info('Body: {}'.format(body)) + try: if 'job_id' in body: JobEvent.create_from_data(**body) elif 'ad_hoc_command_id' in body: AdHocCommandEvent.create_from_data(**body) - except DatabaseError as e: - logger.error('Database Error Saving Job Event: {}'.format(e)) - except Exception as exc: - import traceback - traceback.print_exc() - logger.error('Callback Task Processor Raised Exception: %r', exc) - message.ack() + except DatabaseError as e: + logger.error('Database Error Saving Job Event: {}'.format(e)) + except Exception as exc: + import traceback + traceback.print_exc() + logger.error('Callback Task Processor Raised Exception: %r', exc) class Command(NoArgsCommand): diff --git a/awx/main/tasks.py b/awx/main/tasks.py index d670aec8ef..18ccf6471c 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -862,6 +862,7 @@ class RunJob(BaseTask): env['MAX_EVENT_RES'] = str(settings.MAX_EVENT_RES_DATA) env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE env['CALLBACK_CONNECTION'] = settings.BROKER_URL + env['CACHE'] = settings.CACHES['default']['LOCATION'] if 'LOCATION' in settings.CACHES['default'] else '' if getattr(settings, 'JOB_CALLBACK_DEBUG', False): env['JOB_CALLBACK_DEBUG'] = '2' elif settings.DEBUG: @@ -1069,6 +1070,9 @@ class RunJob(BaseTask): def job_event_callback(event_data): event_data.setdefault('job_id', instance.id) + cache_event = cache.get('ev-{}'.format(event_data['uuid']), None) + if cache_event is not None: + event_data.update(cache_event) dispatcher.dispatch(event_data) else: def job_event_callback(event_data): diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 2ae0f6829e..4563692271 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -163,6 +163,10 @@ MAX_EVENT_RES_DATA = 700000 # Note: This setting may be overridden by database settings. EVENT_STDOUT_MAX_BYTES_DISPLAY = 1024 +JOB_EVENT_WORKERS = 4 + +JOB_EVENT_MAX_QUEUE_SIZE = 5000 + # Disallow sending session cookies over insecure connections SESSION_COOKIE_SECURE = True @@ -172,7 +176,6 @@ CSRF_COOKIE_SECURE = True # Limit CSRF cookies to browser sessions CSRF_COOKIE_AGE = None - TEMPLATE_CONTEXT_PROCESSORS = ( # NOQA 'django.contrib.auth.context_processors.auth', 'django.core.context_processors.debug', diff --git a/requirements/requirements_ansible.in b/requirements/requirements_ansible.in index 5497b2a7b1..671eaf1496 100644 --- a/requirements/requirements_ansible.in +++ b/requirements/requirements_ansible.in @@ -4,6 +4,7 @@ azure==2.0.0rc6 backports.ssl-match-hostname==3.5.0.1 kombu==3.0.35 boto==2.45.0 +python-memcached==1.58 psphere==0.5.2 psutil==5.0.0 pyvmomi==6.5 diff --git a/requirements/requirements_ansible.txt b/requirements/requirements_ansible.txt index fef057d56b..687378c9aa 100644 --- a/requirements/requirements_ansible.txt +++ b/requirements/requirements_ansible.txt @@ -87,6 +87,7 @@ pycparser==2.17 # via cffi PyJWT==1.4.2 # via adal pykerberos==1.1.13 # via requests-kerberos pyparsing==2.1.10 # via cliff, cmd2, oslo.utils +python-memcached==1.58 python-cinderclient==1.9.0 # via python-openstackclient, shade python-dateutil==2.6.0 # via adal, azure-storage python-designateclient==2.4.0 # via shade