Refactor job event collection to build and manage an internal cache of

potential parent values to reduce the number of queries to the database
This commit is contained in:
Matthew Jones
2014-06-24 10:36:25 -04:00
parent c60fb7a58f
commit 41e1d809b5
3 changed files with 134 additions and 83 deletions

View File

@@ -26,93 +26,141 @@ from awx.main.models import *
# ZeroMQ # ZeroMQ
import zmq import zmq
def run_subscriber(consumer_port, queue_port, use_workers=True): class CallbackReceiver(object):
def shutdown_handler(active_workers):
def _handler(signum, frame):
for active_worker in active_workers:
active_worker.terminate()
signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it
return _handler
def check_pre_handle(data):
event = data.get('event', '')
if event == 'playbook_on_play_start':
return True
return False
consumer_context = zmq.Context()
consumer_subscriber = consumer_context.socket(zmq.REP)
consumer_subscriber.bind(consumer_port)
queue_context = zmq.Context() def __init__(self):
queue_publisher = queue_context.socket(zmq.PUSH) self.parent_mappings = {}
queue_publisher.bind(queue_port)
if use_workers: def run_subscriber(self, consumer_port, queue_port, use_workers=True):
workers = [] def shutdown_handler(active_workers):
for idx in range(4): def _handler(signum, frame):
w = Process(target=callback_worker, args=(queue_port,)) for active_worker in active_workers:
w.daemon = True active_worker.terminate()
w.start() signal.signal(signum, signal.SIG_DFL)
workers.append(w) os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it
signal.signal(signal.SIGINT, shutdown_handler(workers)) return _handler
signal.signal(signal.SIGTERM, shutdown_handler(workers)) def check_pre_handle(data):
if settings.DEBUG: event = data.get('event', '')
print 'Started callback receiver (4 workers)' if event == 'playbook_on_play_start':
elif settings.DEBUG: return True
print 'Started callback receiver (no workers)' return False
while True: # Handle signal consumer_context = zmq.Context()
message = consumer_subscriber.recv_json() consumer_subscriber = consumer_context.socket(zmq.REP)
if check_pre_handle(message) or not use_workers: consumer_subscriber.bind(consumer_port)
process_job_event(message)
queue_context = zmq.Context()
queue_publisher = queue_context.socket(zmq.PUSH)
queue_publisher.bind(queue_port)
if use_workers:
workers = []
for idx in range(4):
w = Process(target=self.callback_worker, args=(queue_port,))
w.daemon = True
w.start()
workers.append(w)
signal.signal(signal.SIGINT, shutdown_handler(workers))
signal.signal(signal.SIGTERM, shutdown_handler(workers))
if settings.DEBUG:
print 'Started callback receiver (4 workers)'
elif settings.DEBUG:
print 'Started callback receiver (no workers)'
while True: # Handle signal
message = consumer_subscriber.recv_json()
if True: # check_pre_handle(message) or not use_workers:
self.process_job_event(message)
else:
queue_publisher.send_json(message)
consumer_subscriber.send("1")
def process_parent_cache(self, job_id, event_object):
if event_object.event not in ('playbook_on_start', 'playbook_on_play_start', 'playbook_on_setup', 'playbook_on_task_start'):
return
if job_id not in self.parent_mappings:
self.parent_mappings[job_id] = {}
if event_object.event not in self.parent_mappings[job_id]:
self.parent_mappings[job_id][event_object.event] = {event_object.id: event_object}
else: else:
queue_publisher.send_json(message) self.parent_mappings[job_id][event_object.event][event_object.id] = event_object
consumer_subscriber.send("1")
def find_parent(self, job_id, event_object):
if job_id not in self.parent_mappings:
return None
job_parent_mappings = self.parent_mappings[job_id]
search_events = set()
if event_object.event in ('playbook_on_play_start', 'playbook_on_stats',
'playbook_on_vars_prompt'):
search_events.add('playbook_on_start')
elif event_object.event in ('playbook_on_notify', 'playbook_on_setup',
'playbook_on_task_start',
'playbook_on_no_hosts_matched',
'playbook_on_no_hosts_remaining',
'playbook_on_import_for_host',
'playbook_on_not_import_for_host'):
search_events.add('playbook_on_play_start')
elif event_object.event.startswith('runner_on_'):
search_events.add('playbook_on_setup')
search_events.add('playbook_on_task_start')
potential_events = []
for event_type in search_events:
potential_events.extend([e for e in self.parent_mappings[job_id][event_type].values()] if \
event_type in self.parent_mappings[job_id] else [])
potential_events = sorted(potential_events, cmp=lambda x,y: y.id-x.id)
if len(potential_events) > 0:
return potential_events[0]
return None
@transaction.commit_on_success def check_purge_parent_cache(self, job_id, event_object):
def process_job_event(data): if event_object.event == 'playbook_on_stats':
event = data.get('event', '') del(self.parent_mappings[job_id])
if not event or 'job_id' not in data:
return @transaction.commit_on_success
try: def process_job_event(self, data):
if not isinstance(data['created'], datetime.datetime): event = data.get('event', '')
data['created'] = parse_datetime(data['created']) if not event or 'job_id' not in data:
if not data['created'].tzinfo: return
data['created'] = data['created'].replace(tzinfo=FixedOffset(0))
except (KeyError, ValueError):
data.pop('created', None)
if settings.DEBUG:
print data
for key in data.keys():
if key not in ('job_id', 'event', 'event_data', 'created'):
data.pop(key)
for retry_count in xrange(11):
try: try:
if event == 'playbook_on_stats': if not isinstance(data['created'], datetime.datetime):
transaction.commit() data['created'] = parse_datetime(data['created'])
job_event = JobEvent(**data) if not data['created'].tzinfo:
job_event.save(post_process=True) data['created'] = data['created'].replace(tzinfo=FixedOffset(0))
if not event.startswith('runner_'): except (KeyError, ValueError):
transaction.commit() data.pop('created', None)
break if settings.DEBUG:
except DatabaseError as e: print data
transaction.rollback() for key in data.keys():
print('Database error saving job event, retrying in ' if key not in ('job_id', 'event', 'event_data', 'created'):
'1 second (retry #%d): %s', retry_count + 1, e) data.pop(key)
time.sleep(1) for retry_count in xrange(11):
else: try:
print('Failed to save job event after %d retries.', if event == 'playbook_on_stats':
retry_count) transaction.commit()
job_event = JobEvent(**data)
job_event.parent = self.find_parent(data['job_id'], job_event)
job_event.save(post_process=True)
self.process_parent_cache(data['job_id'], job_event)
self.check_purge_parent_cache(data['job_id'], job_event)
if not event.startswith('runner_'):
transaction.commit()
break
except DatabaseError as e:
transaction.rollback()
print('Database error saving job event, retrying in '
'1 second (retry #%d): %s', retry_count + 1, e)
time.sleep(1)
else:
print('Failed to save job event after %d retries.',
retry_count)
def callback_worker(port): def callback_worker(self, port):
pool_context = zmq.Context() pool_context = zmq.Context()
pool_subscriber = pool_context.socket(zmq.PULL) pool_subscriber = pool_context.socket(zmq.PULL)
pool_subscriber.connect(port) pool_subscriber.connect(port)
while True: while True:
message = pool_subscriber.recv_json() message = pool_subscriber.recv_json()
process_job_event(message) self.process_job_event(message)
class Command(NoArgsCommand): class Command(NoArgsCommand):
''' '''
@@ -142,8 +190,9 @@ class Command(NoArgsCommand):
self.init_logging() self.init_logging()
consumer_port = settings.CALLBACK_CONSUMER_PORT consumer_port = settings.CALLBACK_CONSUMER_PORT
queue_port = settings.CALLBACK_QUEUE_PORT queue_port = settings.CALLBACK_QUEUE_PORT
cr = CallbackReceiver()
try: try:
run_subscriber(consumer_port, queue_port) cr.run_subscriber(consumer_port, queue_port)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass

View File

@@ -675,7 +675,8 @@ class JobEvent(CreatedModifiedModel):
update_fields.append('host_id') update_fields.append('host_id')
except (IndexError, AttributeError): except (IndexError, AttributeError):
pass pass
self.parent = self._find_parent() if self.parent is None:
self.parent = self._find_parent()
if 'parent' not in update_fields: if 'parent' not in update_fields:
update_fields.append('parent') update_fields.append('parent')
super(JobEvent, self).save(*args, **kwargs) super(JobEvent, self).save(*args, **kwargs)

View File

@@ -26,7 +26,7 @@ from django.test.client import Client
# AWX # AWX
from awx.main.models import * from awx.main.models import *
from awx.main.backend import LDAPSettings from awx.main.backend import LDAPSettings
from awx.main.management.commands.run_callback_receiver import run_subscriber from awx.main.management.commands.run_callback_receiver import CallbackReceiver
from awx.main.management.commands.run_task_system import run_taskmanager from awx.main.management.commands.run_task_system import run_taskmanager
from awx.main.utils import get_ansible_version from awx.main.utils import get_ansible_version
from awx.main.task_engine import TaskEngager as LicenseWriter from awx.main.task_engine import TaskEngager as LicenseWriter
@@ -453,7 +453,8 @@ class BaseTestMixin(object):
self.taskmanager_process.terminate() self.taskmanager_process.terminate()
def start_queue(self, consumer_port, queue_port): def start_queue(self, consumer_port, queue_port):
self.queue_process = Process(target=run_subscriber, receiver = CallbackReceiver()
self.queue_process = Process(target=receiver.run_subscriber,
args=(consumer_port, queue_port, False,)) args=(consumer_port, queue_port, False,))
self.queue_process.start() self.queue_process.start()