mirror of
https://github.com/ansible/awx.git
synced 2026-03-04 10:11:05 -03:30
Reimplement parallel job event processing to increase the speed of
playbook runs within tower
This commit is contained in:
@@ -28,6 +28,7 @@ from awx.main.models import *
|
|||||||
import zmq
|
import zmq
|
||||||
|
|
||||||
MAX_REQUESTS = 20000
|
MAX_REQUESTS = 20000
|
||||||
|
WORKERS = 4
|
||||||
|
|
||||||
class CallbackReceiver(object):
|
class CallbackReceiver(object):
|
||||||
|
|
||||||
@@ -52,85 +53,85 @@ class CallbackReceiver(object):
|
|||||||
consumer_subscriber = consumer_context.socket(zmq.REP)
|
consumer_subscriber = consumer_context.socket(zmq.REP)
|
||||||
consumer_subscriber.bind(consumer_port)
|
consumer_subscriber.bind(consumer_port)
|
||||||
|
|
||||||
queue_context = zmq.Context()
|
worker_queues = []
|
||||||
queue_publisher = queue_context.socket(zmq.PUSH)
|
|
||||||
queue_publisher.bind(queue_port)
|
|
||||||
|
|
||||||
if use_workers:
|
if use_workers:
|
||||||
w = Process(target=self.callback_worker, args=(queue_port,))
|
|
||||||
w.daemon = True
|
|
||||||
w.start()
|
|
||||||
|
|
||||||
signal.signal(signal.SIGINT, shutdown_handler([w]))
|
for idx in range(WORKERS):
|
||||||
signal.signal(signal.SIGTERM, shutdown_handler([w]))
|
queue_port_actual = queue_port + "-%s" % str(idx)
|
||||||
if settings.DEBUG:
|
queue_context = zmq.Context()
|
||||||
print 'Started worker'
|
queue_publisher = queue_context.socket(zmq.PUSH)
|
||||||
|
queue_publisher.bind(queue_port_actual)
|
||||||
|
|
||||||
|
w = Process(target=self.callback_worker, args=(queue_port_actual,))
|
||||||
|
w.daemon = True
|
||||||
|
w.start()
|
||||||
|
|
||||||
|
signal.signal(signal.SIGINT, shutdown_handler([w]))
|
||||||
|
signal.signal(signal.SIGTERM, shutdown_handler([w]))
|
||||||
|
if settings.DEBUG:
|
||||||
|
print 'Started worker %s' % str(idx)
|
||||||
|
worker_queues.append([0, queue_publisher, w])
|
||||||
elif settings.DEBUG:
|
elif settings.DEBUG:
|
||||||
print 'Started callback receiver (no workers)'
|
print 'Started callback receiver (no workers)'
|
||||||
|
|
||||||
message_number = 0
|
message_number = 0
|
||||||
|
total_messages = 0
|
||||||
|
|
||||||
|
last_parent_events = {}
|
||||||
|
|
||||||
while True: # Handle signal
|
while True: # Handle signal
|
||||||
message = consumer_subscriber.recv_json()
|
message = consumer_subscriber.recv_json()
|
||||||
message_number += 1
|
total_messages += 1
|
||||||
if not use_workers:
|
if not use_workers:
|
||||||
self.process_job_event(message)
|
self.process_job_event(message)
|
||||||
else:
|
else:
|
||||||
queue_publisher.send_json(message)
|
job_parent_events = last_parent_events.get(message['job_id'], {})
|
||||||
if message_number >= MAX_REQUESTS:
|
if message['event'] in ('playbook_on_play_start', 'playbook_on_stats', 'playbook_on_vars_prompt'):
|
||||||
message_number = 0
|
parent = job_parent_events.get('playbook_on_start', None)
|
||||||
print("Recycling worker process")
|
elif message['event'] in ('playbook_on_notify', 'playbook_on_setup',
|
||||||
w.join()
|
'playbook_on_task_start',
|
||||||
w = Process(target=self.callback_worker, args=(queue_port,))
|
'playbook_on_no_hosts_matched',
|
||||||
w.daemon = True
|
'playbook_on_no_hosts_remaining',
|
||||||
w.start()
|
'playbook_on_import_for_host',
|
||||||
|
'playbook_on_not_import_for_host'):
|
||||||
|
parent = job_parent_events.get('playbook_on_play_start', None)
|
||||||
|
elif message['event'].startswith('runner_on_'):
|
||||||
|
list_parents = []
|
||||||
|
list_parents.append(job_parent_events.get('playbook_on_setup', None))
|
||||||
|
list_parents.append(job_parent_events.get('playbook_on_task_start', None))
|
||||||
|
list_parents = sorted(filter(lambda x: x is not None, list_parents), cmp=lambda x, y: y.id-x.id)
|
||||||
|
parent = list_parents[0] if len(list_parents) > 0 else None
|
||||||
|
else:
|
||||||
|
parent = None
|
||||||
|
if parent is not None:
|
||||||
|
message['parent'] = parent.id
|
||||||
|
if 'created' in message:
|
||||||
|
del(message['created'])
|
||||||
|
if message['event'] in ('playbook_on_start', 'playbook_on_play_start',
|
||||||
|
'playbook_on_setup', 'playbook_on_task_start'):
|
||||||
|
job_parent_events[message['event']] = self.process_job_event(message)
|
||||||
|
else:
|
||||||
|
if message['event'] == 'playbook_on_stats':
|
||||||
|
job_parent_events = {}
|
||||||
|
queue_actual = worker_queues[total_messages % WORKERS]
|
||||||
|
queue_actual[0] += 1
|
||||||
|
queue_actual[1].send_json(message)
|
||||||
|
if queue_actual[0] >= MAX_REQUESTS:
|
||||||
|
queue_actual[0] = 0
|
||||||
|
print("Recycling worker process")
|
||||||
|
queue_actual[2].join()
|
||||||
|
w = Process(target=self.callback_worker, args=(queue_port + "-%s" % str(total_messages % WORKERS),))
|
||||||
|
w.daemon = True
|
||||||
|
w.start()
|
||||||
|
queue_actual[2] = w
|
||||||
|
last_parent_events[message['job_id']] = job_parent_events
|
||||||
consumer_subscriber.send("1")
|
consumer_subscriber.send("1")
|
||||||
|
|
||||||
# NOTE: This cache doesn't work too terribly well but it can help prevent database queries
|
|
||||||
# we may want to use something like memcached here instead
|
|
||||||
def process_parent_cache(self, job_id, event_object):
|
|
||||||
if event_object.event not in ('playbook_on_start', 'playbook_on_play_start', 'playbook_on_setup', 'playbook_on_task_start'):
|
|
||||||
return
|
|
||||||
if job_id not in self.parent_mappings:
|
|
||||||
self.parent_mappings[job_id] = {}
|
|
||||||
if event_object.event not in self.parent_mappings[job_id]:
|
|
||||||
self.parent_mappings[job_id][event_object.event] = {event_object.id: event_object}
|
|
||||||
else:
|
|
||||||
self.parent_mappings[job_id][event_object.event][event_object.id] = event_object
|
|
||||||
|
|
||||||
def find_parent(self, job_id, event_object):
|
|
||||||
if job_id not in self.parent_mappings:
|
|
||||||
return None
|
|
||||||
job_parent_mappings = self.parent_mappings[job_id]
|
|
||||||
search_events = set()
|
|
||||||
if event_object.event in ('playbook_on_play_start', 'playbook_on_stats',
|
|
||||||
'playbook_on_vars_prompt'):
|
|
||||||
search_events.add('playbook_on_start')
|
|
||||||
elif event_object.event in ('playbook_on_notify', 'playbook_on_setup',
|
|
||||||
'playbook_on_task_start',
|
|
||||||
'playbook_on_no_hosts_matched',
|
|
||||||
'playbook_on_no_hosts_remaining',
|
|
||||||
'playbook_on_import_for_host',
|
|
||||||
'playbook_on_not_import_for_host'):
|
|
||||||
search_events.add('playbook_on_play_start')
|
|
||||||
elif event_object.event.startswith('runner_on_'):
|
|
||||||
search_events.add('playbook_on_setup')
|
|
||||||
search_events.add('playbook_on_task_start')
|
|
||||||
potential_events = []
|
|
||||||
for event_type in search_events:
|
|
||||||
potential_events.extend([e for e in self.parent_mappings[job_id][event_type].values()] if \
|
|
||||||
event_type in self.parent_mappings[job_id] else [])
|
|
||||||
potential_events = sorted(potential_events, cmp=lambda x,y: y.id-x.id)
|
|
||||||
if len(potential_events) > 0:
|
|
||||||
return potential_events[0]
|
|
||||||
return None
|
|
||||||
|
|
||||||
def check_purge_parent_cache(self, job_id, event_object):
|
|
||||||
if event_object.event == 'playbook_on_stats':
|
|
||||||
del(self.parent_mappings[job_id])
|
|
||||||
|
|
||||||
@transaction.commit_on_success
|
@transaction.commit_on_success
|
||||||
def process_job_event(self, data):
|
def process_job_event(self, data):
|
||||||
event = data.get('event', '')
|
event = data.get('event', '')
|
||||||
|
parent_id = data.get('parent', None)
|
||||||
if not event or 'job_id' not in data:
|
if not event or 'job_id' not in data:
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
@@ -150,13 +151,10 @@ class CallbackReceiver(object):
|
|||||||
if event == 'playbook_on_stats':
|
if event == 'playbook_on_stats':
|
||||||
transaction.commit()
|
transaction.commit()
|
||||||
job_event = JobEvent(**data)
|
job_event = JobEvent(**data)
|
||||||
job_event.parent = self.find_parent(data['job_id'], job_event)
|
if parent_id is not None:
|
||||||
|
job_event.parent = JobEvent.objects.get(id=parent_id)
|
||||||
job_event.save(post_process=True)
|
job_event.save(post_process=True)
|
||||||
self.process_parent_cache(data['job_id'], job_event)
|
return job_event
|
||||||
self.check_purge_parent_cache(data['job_id'], job_event)
|
|
||||||
if not event.startswith('runner_'):
|
|
||||||
transaction.commit()
|
|
||||||
break
|
|
||||||
except DatabaseError as e:
|
except DatabaseError as e:
|
||||||
transaction.rollback()
|
transaction.rollback()
|
||||||
print('Database error saving job event, retrying in '
|
print('Database error saving job event, retrying in '
|
||||||
@@ -165,6 +163,7 @@ class CallbackReceiver(object):
|
|||||||
else:
|
else:
|
||||||
print('Failed to save job event after %d retries.',
|
print('Failed to save job event after %d retries.',
|
||||||
retry_count)
|
retry_count)
|
||||||
|
return None
|
||||||
|
|
||||||
def callback_worker(self, port):
|
def callback_worker(self, port):
|
||||||
messages_processed = 0
|
messages_processed = 0
|
||||||
|
|||||||
Reference in New Issue
Block a user