From 41e1d809b5d21e14eb5e9ae7657805bfa7c94cc9 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Tue, 24 Jun 2014 10:36:25 -0400 Subject: [PATCH] Refactor job event collection to build and manage an internal cache of potential parent values to reduce the number of queries to the database --- .../commands/run_callback_receiver.py | 209 +++++++++++------- awx/main/models/jobs.py | 3 +- awx/main/tests/base.py | 5 +- 3 files changed, 134 insertions(+), 83 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index d6957ba0a1..b5e9a90fe0 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -26,93 +26,141 @@ from awx.main.models import * # ZeroMQ import zmq -def run_subscriber(consumer_port, queue_port, use_workers=True): - def shutdown_handler(active_workers): - def _handler(signum, frame): - for active_worker in active_workers: - active_worker.terminate() - signal.signal(signum, signal.SIG_DFL) - os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it - return _handler - def check_pre_handle(data): - event = data.get('event', '') - if event == 'playbook_on_play_start': - return True - return False - - consumer_context = zmq.Context() - consumer_subscriber = consumer_context.socket(zmq.REP) - consumer_subscriber.bind(consumer_port) +class CallbackReceiver(object): - queue_context = zmq.Context() - queue_publisher = queue_context.socket(zmq.PUSH) - queue_publisher.bind(queue_port) + def __init__(self): + self.parent_mappings = {} - if use_workers: - workers = [] - for idx in range(4): - w = Process(target=callback_worker, args=(queue_port,)) - w.daemon = True - w.start() - workers.append(w) - signal.signal(signal.SIGINT, shutdown_handler(workers)) - signal.signal(signal.SIGTERM, shutdown_handler(workers)) - if settings.DEBUG: - print 'Started callback receiver (4 workers)' - elif settings.DEBUG: - print 'Started callback receiver (no workers)' + def run_subscriber(self, consumer_port, queue_port, use_workers=True): + def shutdown_handler(active_workers): + def _handler(signum, frame): + for active_worker in active_workers: + active_worker.terminate() + signal.signal(signum, signal.SIG_DFL) + os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it + return _handler + def check_pre_handle(data): + event = data.get('event', '') + if event == 'playbook_on_play_start': + return True + return False - while True: # Handle signal - message = consumer_subscriber.recv_json() - if check_pre_handle(message) or not use_workers: - process_job_event(message) + consumer_context = zmq.Context() + consumer_subscriber = consumer_context.socket(zmq.REP) + consumer_subscriber.bind(consumer_port) + + queue_context = zmq.Context() + queue_publisher = queue_context.socket(zmq.PUSH) + queue_publisher.bind(queue_port) + + if use_workers: + workers = [] + for idx in range(4): + w = Process(target=self.callback_worker, args=(queue_port,)) + w.daemon = True + w.start() + workers.append(w) + signal.signal(signal.SIGINT, shutdown_handler(workers)) + signal.signal(signal.SIGTERM, shutdown_handler(workers)) + if settings.DEBUG: + print 'Started callback receiver (4 workers)' + elif settings.DEBUG: + print 'Started callback receiver (no workers)' + + while True: # Handle signal + message = consumer_subscriber.recv_json() + if True: # check_pre_handle(message) or not use_workers: + self.process_job_event(message) + else: + queue_publisher.send_json(message) + consumer_subscriber.send("1") + + def process_parent_cache(self, job_id, event_object): + if event_object.event not in ('playbook_on_start', 'playbook_on_play_start', 'playbook_on_setup', 'playbook_on_task_start'): + return + if job_id not in self.parent_mappings: + self.parent_mappings[job_id] = {} + if event_object.event not in self.parent_mappings[job_id]: + self.parent_mappings[job_id][event_object.event] = {event_object.id: event_object} else: - queue_publisher.send_json(message) - consumer_subscriber.send("1") + self.parent_mappings[job_id][event_object.event][event_object.id] = event_object + def find_parent(self, job_id, event_object): + if job_id not in self.parent_mappings: + return None + job_parent_mappings = self.parent_mappings[job_id] + search_events = set() + if event_object.event in ('playbook_on_play_start', 'playbook_on_stats', + 'playbook_on_vars_prompt'): + search_events.add('playbook_on_start') + elif event_object.event in ('playbook_on_notify', 'playbook_on_setup', + 'playbook_on_task_start', + 'playbook_on_no_hosts_matched', + 'playbook_on_no_hosts_remaining', + 'playbook_on_import_for_host', + 'playbook_on_not_import_for_host'): + search_events.add('playbook_on_play_start') + elif event_object.event.startswith('runner_on_'): + search_events.add('playbook_on_setup') + search_events.add('playbook_on_task_start') + potential_events = [] + for event_type in search_events: + potential_events.extend([e for e in self.parent_mappings[job_id][event_type].values()] if \ + event_type in self.parent_mappings[job_id] else []) + potential_events = sorted(potential_events, cmp=lambda x,y: y.id-x.id) + if len(potential_events) > 0: + return potential_events[0] + return None -@transaction.commit_on_success -def process_job_event(data): - event = data.get('event', '') - if not event or 'job_id' not in data: - return - try: - if not isinstance(data['created'], datetime.datetime): - data['created'] = parse_datetime(data['created']) - if not data['created'].tzinfo: - data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) - except (KeyError, ValueError): - data.pop('created', None) - if settings.DEBUG: - print data - for key in data.keys(): - if key not in ('job_id', 'event', 'event_data', 'created'): - data.pop(key) - for retry_count in xrange(11): + def check_purge_parent_cache(self, job_id, event_object): + if event_object.event == 'playbook_on_stats': + del(self.parent_mappings[job_id]) + + @transaction.commit_on_success + def process_job_event(self, data): + event = data.get('event', '') + if not event or 'job_id' not in data: + return try: - if event == 'playbook_on_stats': - transaction.commit() - job_event = JobEvent(**data) - job_event.save(post_process=True) - if not event.startswith('runner_'): - transaction.commit() - break - except DatabaseError as e: - transaction.rollback() - print('Database error saving job event, retrying in ' - '1 second (retry #%d): %s', retry_count + 1, e) - time.sleep(1) - else: - print('Failed to save job event after %d retries.', - retry_count) + if not isinstance(data['created'], datetime.datetime): + data['created'] = parse_datetime(data['created']) + if not data['created'].tzinfo: + data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) + except (KeyError, ValueError): + data.pop('created', None) + if settings.DEBUG: + print data + for key in data.keys(): + if key not in ('job_id', 'event', 'event_data', 'created'): + data.pop(key) + for retry_count in xrange(11): + try: + if event == 'playbook_on_stats': + transaction.commit() + job_event = JobEvent(**data) + job_event.parent = self.find_parent(data['job_id'], job_event) + job_event.save(post_process=True) + self.process_parent_cache(data['job_id'], job_event) + self.check_purge_parent_cache(data['job_id'], job_event) + if not event.startswith('runner_'): + transaction.commit() + break + except DatabaseError as e: + transaction.rollback() + print('Database error saving job event, retrying in ' + '1 second (retry #%d): %s', retry_count + 1, e) + time.sleep(1) + else: + print('Failed to save job event after %d retries.', + retry_count) -def callback_worker(port): - pool_context = zmq.Context() - pool_subscriber = pool_context.socket(zmq.PULL) - pool_subscriber.connect(port) - while True: - message = pool_subscriber.recv_json() - process_job_event(message) + def callback_worker(self, port): + pool_context = zmq.Context() + pool_subscriber = pool_context.socket(zmq.PULL) + pool_subscriber.connect(port) + while True: + message = pool_subscriber.recv_json() + self.process_job_event(message) class Command(NoArgsCommand): ''' @@ -142,8 +190,9 @@ class Command(NoArgsCommand): self.init_logging() consumer_port = settings.CALLBACK_CONSUMER_PORT queue_port = settings.CALLBACK_QUEUE_PORT + cr = CallbackReceiver() try: - run_subscriber(consumer_port, queue_port) + cr.run_subscriber(consumer_port, queue_port) except KeyboardInterrupt: pass diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 077141a05b..bdb97e5ae0 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -675,7 +675,8 @@ class JobEvent(CreatedModifiedModel): update_fields.append('host_id') except (IndexError, AttributeError): pass - self.parent = self._find_parent() + if self.parent is None: + self.parent = self._find_parent() if 'parent' not in update_fields: update_fields.append('parent') super(JobEvent, self).save(*args, **kwargs) diff --git a/awx/main/tests/base.py b/awx/main/tests/base.py index afeaef4354..df04a16d77 100644 --- a/awx/main/tests/base.py +++ b/awx/main/tests/base.py @@ -26,7 +26,7 @@ from django.test.client import Client # AWX from awx.main.models import * from awx.main.backend import LDAPSettings -from awx.main.management.commands.run_callback_receiver import run_subscriber +from awx.main.management.commands.run_callback_receiver import CallbackReceiver from awx.main.management.commands.run_task_system import run_taskmanager from awx.main.utils import get_ansible_version from awx.main.task_engine import TaskEngager as LicenseWriter @@ -453,7 +453,8 @@ class BaseTestMixin(object): self.taskmanager_process.terminate() def start_queue(self, consumer_port, queue_port): - self.queue_process = Process(target=run_subscriber, + receiver = CallbackReceiver() + self.queue_process = Process(target=receiver.run_subscriber, args=(consumer_port, queue_port, False,)) self.queue_process.start()