Merge pull request #4641 from ansible/multi_worker_callbacks

Implement a worker-based callback receiver system
This commit is contained in:
Matthew Jones
2017-01-18 20:45:45 -05:00
committed by GitHub
6 changed files with 95 additions and 29 deletions

View File

@@ -27,6 +27,7 @@ import multiprocessing
import os import os
import threading import threading
import uuid import uuid
import memcache
# Kombu # Kombu
from kombu import Connection, Exchange, Producer from kombu import Connection, Exchange, Producer
@@ -100,6 +101,8 @@ class EventContext(object):
def __init__(self): def __init__(self):
self.display_lock = multiprocessing.RLock() self.display_lock = multiprocessing.RLock()
self.dispatcher = CallbackQueueEventDispatcher() self.dispatcher = CallbackQueueEventDispatcher()
cache_actual = os.getenv('CACHE', '127.0.0.1:11211')
self.cache = memcache.Client([cache_actual], debug=0)
def add_local(self, **kwargs): def add_local(self, **kwargs):
if not hasattr(self, '_local'): if not hasattr(self, '_local'):
@@ -201,7 +204,7 @@ class EventContext(object):
def dump_begin(self, fileobj): def dump_begin(self, fileobj):
begin_dict = self.get_begin_dict() begin_dict = self.get_begin_dict()
self.dispatcher.dispatch(begin_dict) self.cache.set(":1:ev-{}".format(begin_dict['uuid']), begin_dict)
self.dump(fileobj, {'uuid': begin_dict['uuid']}) self.dump(fileobj, {'uuid': begin_dict['uuid']})
def dump_end(self, fileobj): def dump_end(self, fileobj):

View File

@@ -3,6 +3,11 @@
# Python # Python
import logging import logging
import signal
from uuid import UUID
from multiprocessing import Process
from multiprocessing import Queue as MPQueue
from Queue import Empty as QueueEmpty
from kombu import Connection, Exchange, Queue from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin from kombu.mixins import ConsumerMixin
@@ -10,6 +15,7 @@ from kombu.mixins import ConsumerMixin
# Django # Django
from django.conf import settings from django.conf import settings
from django.core.management.base import NoArgsCommand from django.core.management.base import NoArgsCommand
from django.db import connection as django_connection
from django.db import DatabaseError from django.db import DatabaseError
# AWX # AWX
@@ -19,9 +25,39 @@ logger = logging.getLogger('awx.main.commands.run_callback_receiver')
class CallbackBrokerWorker(ConsumerMixin): class CallbackBrokerWorker(ConsumerMixin):
def __init__(self, connection): def __init__(self, connection, use_workers=True):
self.connection = connection self.connection = connection
self.partial_events = {} self.worker_queues = []
self.total_messages = 0
self.init_workers(use_workers)
def init_workers(self, use_workers=True):
def shutdown_handler(active_workers):
def _handler(signum, frame):
try:
for active_worker in active_workers:
active_worker.terminate()
signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it
except Exception:
# TODO: LOG
pass
return _handler
if use_workers:
django_connection.close()
for idx in range(settings.JOB_EVENT_WORKERS):
queue_actual = MPQueue(settings.JOB_EVENT_MAX_QUEUE_SIZE)
w = Process(target=self.callback_worker, args=(queue_actual, idx,))
w.start()
if settings.DEBUG:
logger.info('Started worker %s' % str(idx))
self.worker_queues.append([0, queue_actual, w])
elif settings.DEBUG:
logger.warn('Started callback receiver (no workers)')
signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in self.worker_queues]))
signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in self.worker_queues]))
def get_consumers(self, Consumer, channel): def get_consumers(self, Consumer, channel):
return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE, return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE,
@@ -31,36 +67,54 @@ class CallbackBrokerWorker(ConsumerMixin):
callbacks=[self.process_task])] callbacks=[self.process_task])]
def process_task(self, body, message): def process_task(self, body, message):
try: if "uuid" in body:
if 'job_id' not in body and 'ad_hoc_command_id' not in body: queue = UUID(body['uuid']).int % settings.JOB_EVENT_WORKERS
raise Exception('Payload does not have a job_id or ad_hoc_command_id') else:
if settings.DEBUG: queue = self.total_messages % settings.JOB_EVENT_WORKERS
logger.info('Body: {}'.format(body)) self.write_queue_worker(queue, body)
logger.info('Message: {}'.format(message)) self.total_messages += 1
message.ack()
def write_queue_worker(self, preferred_queue, body):
queue_order = sorted(range(settings.JOB_EVENT_WORKERS), cmp=lambda x, y: -1 if x==preferred_queue else 0)
for queue_actual in queue_order:
try: try:
# If event came directly from callback without counter/stdout, worker_actual = self.worker_queues[queue_actual]
# save it until the rest of the event arrives. worker_actual[1].put(body, block=True, timeout=5)
if 'counter' not in body: worker_actual[0] += 1
if 'uuid' in body: return queue_actual
self.partial_events[body['uuid']] = body except Exception:
# If event has counter, try to combine it with any event data import traceback
# already received for the same uuid, then create the actual traceback.print_exc()
# job event record. logger.warn("Could not write to queue %s" % preferred_queue)
else: continue
if 'uuid' in body: return None
partial_event = self.partial_events.pop(body['uuid'], {})
body.update(partial_event) def callback_worker(self, queue_actual, idx):
while True:
try:
body = queue_actual.get(block=True, timeout=1)
except QueueEmpty:
continue
except Exception as e:
logger.error("Exception on worker thread, restarting: " + str(e))
continue
try:
if 'job_id' not in body and 'ad_hoc_command_id' not in body:
raise Exception('Payload does not have a job_id or ad_hoc_command_id')
if settings.DEBUG:
logger.info('Body: {}'.format(body))
try:
if 'job_id' in body: if 'job_id' in body:
JobEvent.create_from_data(**body) JobEvent.create_from_data(**body)
elif 'ad_hoc_command_id' in body: elif 'ad_hoc_command_id' in body:
AdHocCommandEvent.create_from_data(**body) AdHocCommandEvent.create_from_data(**body)
except DatabaseError as e: except DatabaseError as e:
logger.error('Database Error Saving Job Event: {}'.format(e)) logger.error('Database Error Saving Job Event: {}'.format(e))
except Exception as exc: except Exception as exc:
import traceback import traceback
traceback.print_exc() traceback.print_exc()
logger.error('Callback Task Processor Raised Exception: %r', exc) logger.error('Callback Task Processor Raised Exception: %r', exc)
message.ack()
class Command(NoArgsCommand): class Command(NoArgsCommand):

View File

@@ -862,6 +862,7 @@ class RunJob(BaseTask):
env['MAX_EVENT_RES'] = str(settings.MAX_EVENT_RES_DATA) env['MAX_EVENT_RES'] = str(settings.MAX_EVENT_RES_DATA)
env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE
env['CALLBACK_CONNECTION'] = settings.BROKER_URL env['CALLBACK_CONNECTION'] = settings.BROKER_URL
env['CACHE'] = settings.CACHES['default']['LOCATION'] if 'LOCATION' in settings.CACHES['default'] else ''
if getattr(settings, 'JOB_CALLBACK_DEBUG', False): if getattr(settings, 'JOB_CALLBACK_DEBUG', False):
env['JOB_CALLBACK_DEBUG'] = '2' env['JOB_CALLBACK_DEBUG'] = '2'
elif settings.DEBUG: elif settings.DEBUG:
@@ -1069,6 +1070,9 @@ class RunJob(BaseTask):
def job_event_callback(event_data): def job_event_callback(event_data):
event_data.setdefault('job_id', instance.id) event_data.setdefault('job_id', instance.id)
cache_event = cache.get('ev-{}'.format(event_data['uuid']), None)
if cache_event is not None:
event_data.update(cache_event)
dispatcher.dispatch(event_data) dispatcher.dispatch(event_data)
else: else:
def job_event_callback(event_data): def job_event_callback(event_data):

View File

@@ -163,6 +163,10 @@ MAX_EVENT_RES_DATA = 700000
# Note: This setting may be overridden by database settings. # Note: This setting may be overridden by database settings.
EVENT_STDOUT_MAX_BYTES_DISPLAY = 1024 EVENT_STDOUT_MAX_BYTES_DISPLAY = 1024
JOB_EVENT_WORKERS = 4
JOB_EVENT_MAX_QUEUE_SIZE = 5000
# Disallow sending session cookies over insecure connections # Disallow sending session cookies over insecure connections
SESSION_COOKIE_SECURE = True SESSION_COOKIE_SECURE = True
@@ -172,7 +176,6 @@ CSRF_COOKIE_SECURE = True
# Limit CSRF cookies to browser sessions # Limit CSRF cookies to browser sessions
CSRF_COOKIE_AGE = None CSRF_COOKIE_AGE = None
TEMPLATE_CONTEXT_PROCESSORS = ( # NOQA TEMPLATE_CONTEXT_PROCESSORS = ( # NOQA
'django.contrib.auth.context_processors.auth', 'django.contrib.auth.context_processors.auth',
'django.core.context_processors.debug', 'django.core.context_processors.debug',

View File

@@ -4,6 +4,7 @@ azure==2.0.0rc6
backports.ssl-match-hostname==3.5.0.1 backports.ssl-match-hostname==3.5.0.1
kombu==3.0.35 kombu==3.0.35
boto==2.45.0 boto==2.45.0
python-memcached==1.58
psphere==0.5.2 psphere==0.5.2
psutil==5.0.0 psutil==5.0.0
pyvmomi==6.5 pyvmomi==6.5

View File

@@ -87,6 +87,7 @@ pycparser==2.17 # via cffi
PyJWT==1.4.2 # via adal PyJWT==1.4.2 # via adal
pykerberos==1.1.13 # via requests-kerberos pykerberos==1.1.13 # via requests-kerberos
pyparsing==2.1.10 # via cliff, cmd2, oslo.utils pyparsing==2.1.10 # via cliff, cmd2, oslo.utils
python-memcached==1.58
python-cinderclient==1.9.0 # via python-openstackclient, shade python-cinderclient==1.9.0 # via python-openstackclient, shade
python-dateutil==2.6.0 # via adal, azure-storage python-dateutil==2.6.0 # via adal, azure-storage
python-designateclient==2.4.0 # via shade python-designateclient==2.4.0 # via shade