From 66e9e2310c85af07e1690a3148299a20df5e4639 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Tue, 11 Feb 2014 05:40:31 -0500 Subject: [PATCH 1/6] Initial 0mq implementation --- .../commands/run_callback_receiver.py | 105 ++++++++++++++++++ awx/main/tasks.py | 39 +++---- awx/plugins/callback/job_event_callback.py | 92 +++------------ 3 files changed, 140 insertions(+), 96 deletions(-) create mode 100644 awx/main/management/commands/run_callback_receiver.py diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py new file mode 100644 index 0000000000..2d9a21a8da --- /dev/null +++ b/awx/main/management/commands/run_callback_receiver.py @@ -0,0 +1,105 @@ +# Copyright (c) 2014 AnsibleWorks, Inc. +# All Rights Reserved. + +# Python +import datetime +import logging +import json +from optparse import make_option + +# Django +from django.conf import settings +from django.core.management.base import NoArgsCommand, CommandError +from django.db import transaction +from django.contrib.auth.models import User +from django.utils.dateparse import parse_datetime +from django.utils.timezone import now, is_aware, make_aware +from django.utils.tzinfo import FixedOffset + +# AWX +from awx.main.models import * + +# ZeroMQ +import zmq + +class Command(NoArgsCommand): + ''' + Management command to run the job callback receiver + ''' + + help = 'Launch the job callback receiver' + + option_list = NoArgsCommand.option_list + ( + make_option('--port', dest='port', type='int', default=5556, + help='Port to listen for requests on'),) + + def init_logging(self): + log_levels = dict(enumerate([logging.ERROR, logging.INFO, + logging.DEBUG, 0])) + self.logger = logging.getLogger('awx.main.commands.run_callback_receiver') + self.logger.setLevel(log_levels.get(self.verbosity, 0)) + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter('%(message)s')) + self.logger.addHandler(handler) + self.logger.propagate = False + + @transaction.commit_on_success + def process_job_event(self, data): + print("Received data: %s" % str(data)) + event = data.get('event', '') + if not event or 'job_id' not in data: + return + try: + if not isinstance(data['created'], datetime.datetime): + data['created'] = parse_datetime(data['created']) + if not data['created'].tzinfo: + data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) + except (KeyError, ValueError): + data.pop('created', None) + if settings.DEBUG: + print data + for key in data.keys(): + if key not in ('job_id', 'event', 'event_data', 'created'): + data.pop(key) + data['play'] = data.get('event_data', {}).get('play', '').strip() + data['task'] = data.get('event_data', {}).get('task', '').strip() + for retry_count in xrange(11): + try: + if event == 'playbook_on_stats': + transaction.commit() + if not JobEvent.objects.filter(**data).exists(): + job_event = JobEvent(**data) + job_event.save(post_process=True) + if not event.startswith('runner_'): + transaction.commit() + else: + duplicate = True + if settings.DEBUG: + print 'skipping duplicate job event %r' % data + break + except DatabaseError as e: + transaction.rollback() + logger.debug('Database error saving job event, retrying in ' + '1 second (retry #%d): %s', retry_count + 1, e) + time.sleep(1) + else: + logger.error('Failed to save job event after %d retries.', + retry_count) + + def run_subscriber(self, port=5556): + print("Starting ZMQ Context") + context = zmq.Context() + subscriber = context.socket(zmq.REP) + print("Starting connection") + subscriber.bind("tcp://127.0.0.1:%s" % str(port)) + print("Listening on tcp://127.0.0.1:%s" % str(port)) + while True: # Handle signal + message = subscriber.recv() + subscriber.send("1") + data = json.loads(message) + self.process_job_event(data) + + def handle_noargs(self, **options): + self.verbosity = int(options.get('verbosity', 1)) + self.init_logging() + self.run_subscriber() diff --git a/awx/main/tasks.py b/awx/main/tasks.py index a6f0350083..1cafa4463a 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -494,11 +494,11 @@ class RunJob(BaseTask): elif job.status in ('pending', 'waiting'): job = self.update_model(job.pk, status='pending') # Start another task to process job events. - if settings.BROKER_URL.startswith('amqp://'): - app = Celery('tasks', broker=settings.BROKER_URL) - send_task('awx.main.tasks.save_job_events', kwargs={ - 'job_id': job.id, - }, serializer='json') + # if settings.BROKER_URL.startswith('amqp://'): + # app = Celery('tasks', broker=settings.BROKER_URL) + # send_task('awx.main.tasks.save_job_events', kwargs={ + # 'job_id': job.id, + # }, serializer='json') return True else: return False @@ -511,20 +511,21 @@ class RunJob(BaseTask): # Send a special message to this job's event queue after the job has run # to tell the save job events task to end. if settings.BROKER_URL.startswith('amqp://'): - job_events_exchange = Exchange('job_events', 'direct', durable=True) - job_events_queue = Queue('job_events[%d]' % job.id, - exchange=job_events_exchange, - routing_key=('job_events[%d]' % job.id), - auto_delete=True) - with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn: - with conn.Producer(serializer='json') as producer: - msg = { - 'job_id': job.id, - 'event': '__complete__' - } - producer.publish(msg, exchange=job_events_exchange, - routing_key=('job_events[%d]' % job.id), - declare=[job_events_queue]) + pass + # job_events_exchange = Exchange('job_events', 'direct', durable=True) + # job_events_queue = Queue('job_events[%d]' % job.id, + # exchange=job_events_exchange, + # routing_key=('job_events[%d]' % job.id), + # auto_delete=True) + # with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn: + # with conn.Producer(serializer='json') as producer: + # msg = { + # 'job_id': job.id, + # 'event': '__complete__' + # } + # producer.publish(msg, exchange=job_events_exchange, + # routing_key=('job_events[%d]' % job.id), + # declare=[job_events_queue]) # Update job event fields after job has completed (only when using REST # API callback). diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 4fb6e7120d..2782142b28 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -38,26 +38,10 @@ import sys import urllib import urlparse -# Requests / Kombu -try: - import requests - from kombu import Connection, Exchange, Queue -except ImportError: - # If running from an AWX installation, use the local version of requests if - # if cannot be found globally. - local_site_packages = os.path.join(os.path.dirname(__file__), '..', '..', - 'lib', 'site-packages') - sys.path.insert(0, local_site_packages) - import requests - from kombu import Connection, Exchange, Queue - -# Check to see if librabbitmq is installed. -try: - import librabbitmq - LIBRABBITMQ_INSTALLED = True -except ImportError: - LIBRABBITMQ_INSTALLED = False +import requests +# ZeroMQ +import zmq class TokenAuth(requests.auth.AuthBase): @@ -93,14 +77,11 @@ class CallbackModule(object): self.job_id = int(os.getenv('JOB_ID')) self.base_url = os.getenv('REST_API_URL', '') self.auth_token = os.getenv('REST_API_TOKEN', '') - self.broker_url = os.getenv('BROKER_URL', '') + self.context = None + self.socket = None + self.broker_url = True # TODO: Figure this out for unit tests self._init_logging() - # Since we don't yet have a way to confirm publish when using - # librabbitmq, ensure we use pyamqp even if librabbitmq happens to be - # installed. - if LIBRABBITMQ_INSTALLED: - self.logger.info('Forcing use of pyamqp instead of librabbitmq') - self.broker_url = self.broker_url.replace('amqp://', 'pyamqp://') + self._init_connection() def _init_logging(self): try: @@ -120,37 +101,12 @@ class CallbackModule(object): self.logger.addHandler(handler) self.logger.propagate = False - def __del__(self): - self._cleanup_connection() - - def _publish_errback(self, exc, interval): - self.logger.info('Publish Error: %r', exc) - - def _cleanup_connection(self): - if hasattr(self, 'producer'): - try: - #self.logger.debug('Cleanup Producer: %r', self.producer) - self.producer.cancel() - except: - pass - del self.producer - if hasattr(self, 'connection'): - try: - #self.logger.debug('Cleanup Connection: %r', self.connection) - self.connection.release() - except: - pass - del self.connection + def _init_connection(self): + self.context = zmq.Context() + self.socket = self.context.socket(zmq.REQ) + self.socket.connect("tcp://127.0.0.1:5556") def _post_job_event_queue_msg(self, event, event_data): - if not hasattr(self, 'job_events_exchange'): - self.job_events_exchange = Exchange('job_events', 'direct', - durable=True) - if not hasattr(self, 'job_events_queue'): - self.job_events_queue = Queue('job_events[%d]' % self.job_id, - exchange=self.job_events_exchange, - routing_key=('job_events[%d]' % self.job_id), - auto_delete=True) msg = { 'job_id': self.job_id, 'event': event, @@ -163,33 +119,15 @@ class CallbackModule(object): }) for retry_count in xrange(4): try: - if not hasattr(self, 'connection_pid'): - self.connection_pid = os.getpid() - if self.connection_pid != os.getpid(): - self._cleanup_connection() - if not hasattr(self, 'connection'): - self.connection = Connection(self.broker_url, transport_options={'confirm_publish': True}) - self.logger.debug('New Connection: %r, retry=%d', - self.connection, retry_count) - if not hasattr(self, 'producer'): - channel = self.connection.channel() - self.producer = self.connection.Producer(channel, exchange=self.job_events_exchange, serializer='json') - self.publish = self.connection.ensure(self.producer, self.producer.publish, - errback=self._publish_errback, - max_retries=3, interval_start=1, interval_step=1, interval_max=10) - self.logger.debug('New Producer: %r, retry=%d', - self.producer, retry_count) + self.socket.send(json.dumps(msg)) self.logger.debug('Publish: %r, retry=%d', msg, retry_count) - self.publish(msg, exchange=self.job_events_exchange, - routing_key=('job_events[%d]' % self.job_id), - declare=[self.job_events_queue]) - if event == 'playbook_on_stats': - self._cleanup_connection() + reply = self.socket.recv() + print("Received reply: " + str(reply)) return except Exception, e: self.logger.info('Publish Exception: %r, retry=%d', e, retry_count, exc_info=True) - self._cleanup_connection() + # TODO: Maybe recycle connection here? if retry_count >= 3: raise From 063380304ac7c913f04349e930ffb45586cc06d2 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Tue, 11 Feb 2014 05:56:51 -0500 Subject: [PATCH 2/6] Manage the zeromq connection per-pid --- awx/plugins/callback/job_event_callback.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 2782142b28..7fc1e7465d 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -102,6 +102,10 @@ class CallbackModule(object): self.logger.propagate = False def _init_connection(self): + self.context = None + self.socket = None + + def _start_connection(self): self.context = zmq.Context() self.socket = self.context.socket(zmq.REQ) self.socket.connect("tcp://127.0.0.1:5556") @@ -119,10 +123,16 @@ class CallbackModule(object): }) for retry_count in xrange(4): try: + if not hasattr(self, 'connection_pid'): + self.connection_pid = os.getpid() + if self.connection_pid != os.getpid(): + self._init_connection() + if self.context is None: + self._start_connection() + self.socket.send(json.dumps(msg)) self.logger.debug('Publish: %r, retry=%d', msg, retry_count) reply = self.socket.recv() - print("Received reply: " + str(reply)) return except Exception, e: self.logger.info('Publish Exception: %r, retry=%d', e, From 770947d18dd5a8fe67816ec6a57bd2207d4ce9cf Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 12 Feb 2014 16:09:57 -0500 Subject: [PATCH 3/6] Pull results off zeromq and distribute to workers --- .../commands/run_callback_receiver.py | 84 ++++++++++++------- awx/plugins/callback/job_event_callback.py | 5 +- 2 files changed, 55 insertions(+), 34 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 2d9a21a8da..ff84bf3701 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -6,6 +6,7 @@ import datetime import logging import json from optparse import make_option +from multiprocessing import Process # Django from django.conf import settings @@ -22,26 +23,16 @@ from awx.main.models import * # ZeroMQ import zmq -class Command(NoArgsCommand): - ''' - Management command to run the job callback receiver - ''' +class Worker(Process): - help = 'Launch the job callback receiver' - - option_list = NoArgsCommand.option_list + ( - make_option('--port', dest='port', type='int', default=5556, - help='Port to listen for requests on'),) - - def init_logging(self): - log_levels = dict(enumerate([logging.ERROR, logging.INFO, - logging.DEBUG, 0])) - self.logger = logging.getLogger('awx.main.commands.run_callback_receiver') - self.logger.setLevel(log_levels.get(self.verbosity, 0)) - handler = logging.StreamHandler() - handler.setFormatter(logging.Formatter('%(message)s')) - self.logger.addHandler(handler) - self.logger.propagate = False + def run(self): + print("Starting worker") + pool_context = zmq.Context() + pool_subscriber = pool_context.socket(zmq.PULL) + pool_subscriber.connect("ipc:///tmp/callback_receiver.ipc") + while True: + message = pool_subscriber.recv_json() + self.process_job_event(message) @transaction.commit_on_success def process_job_event(self, data): @@ -79,25 +70,56 @@ class Command(NoArgsCommand): break except DatabaseError as e: transaction.rollback() - logger.debug('Database error saving job event, retrying in ' - '1 second (retry #%d): %s', retry_count + 1, e) + # logger.debug('Database error saving job event, retrying in ' + # '1 second (retry #%d): %s', retry_count + 1, e) time.sleep(1) else: logger.error('Failed to save job event after %d retries.', retry_count) + +class Command(NoArgsCommand): + ''' + Management command to run the job callback receiver + ''' + + help = 'Launch the job callback receiver' + + option_list = NoArgsCommand.option_list + ( + make_option('--port', dest='port', type='int', default=5556, + help='Port to listen for requests on'),) + + def init_logging(self): + log_levels = dict(enumerate([logging.ERROR, logging.INFO, + logging.DEBUG, 0])) + self.logger = logging.getLogger('awx.main.commands.run_callback_receiver') + self.logger.setLevel(log_levels.get(self.verbosity, 0)) + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter('%(message)s')) + self.logger.addHandler(handler) + self.logger.propagate = False + def run_subscriber(self, port=5556): - print("Starting ZMQ Context") - context = zmq.Context() - subscriber = context.socket(zmq.REP) - print("Starting connection") - subscriber.bind("tcp://127.0.0.1:%s" % str(port)) - print("Listening on tcp://127.0.0.1:%s" % str(port)) + + consumer_context = zmq.Context() + consumer_subscriber = consumer_context.socket(zmq.PULL) + consumer_subscriber.bind("tcp://127.0.0.1:%s" % str(port)) + print("Consumer Listening on tcp://127.0.0.1:%s" % str(port)) + + queue_context = zmq.Context() + queue_publisher = queue_context.socket(zmq.PUSH) + queue_publisher.bind("ipc:///tmp/callback_receiver.ipc") + print("Publisher Listening on ipc: /tmp/callback_receiver.ip") + + workers = [] + for idx in range(4): + w = Worker() + w.start() + workers.append(w) + while True: # Handle signal - message = subscriber.recv() - subscriber.send("1") - data = json.loads(message) - self.process_job_event(data) + message = consumer_subscriber.recv_json() + queue_publisher.send_json(message) def handle_noargs(self, **options): self.verbosity = int(options.get('verbosity', 1)) diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 7fc1e7465d..4c33ed8345 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -107,7 +107,7 @@ class CallbackModule(object): def _start_connection(self): self.context = zmq.Context() - self.socket = self.context.socket(zmq.REQ) + self.socket = self.context.socket(zmq.PUSH) self.socket.connect("tcp://127.0.0.1:5556") def _post_job_event_queue_msg(self, event, event_data): @@ -130,9 +130,8 @@ class CallbackModule(object): if self.context is None: self._start_connection() - self.socket.send(json.dumps(msg)) + self.socket.send_json(msg) self.logger.debug('Publish: %r, retry=%d', msg, retry_count) - reply = self.socket.recv() return except Exception, e: self.logger.info('Publish Exception: %r, retry=%d', e, From 2c694e5e07e15b9242d1fa413ecbd7e95a3d16cf Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Mon, 17 Feb 2014 11:11:48 -0500 Subject: [PATCH 4/6] Some cleanup and documentation for zeromq implementation --- .../commands/run_callback_receiver.py | 29 +++---- awx/main/tasks.py | 85 ------------------- awx/plugins/callback/job_event_callback.py | 7 +- 3 files changed, 18 insertions(+), 103 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index ff84bf3701..014d9dac61 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -24,6 +24,9 @@ from awx.main.models import * import zmq class Worker(Process): + ''' + Process to validate and store save job events received via zeromq + ''' def run(self): print("Starting worker") @@ -36,7 +39,6 @@ class Worker(Process): @transaction.commit_on_success def process_job_event(self, data): - print("Received data: %s" % str(data)) event = data.get('event', '') if not event or 'job_id' not in data: return @@ -58,20 +60,15 @@ class Worker(Process): try: if event == 'playbook_on_stats': transaction.commit() - if not JobEvent.objects.filter(**data).exists(): - job_event = JobEvent(**data) - job_event.save(post_process=True) - if not event.startswith('runner_'): - transaction.commit() - else: - duplicate = True - if settings.DEBUG: - print 'skipping duplicate job event %r' % data + job_event = JobEvent(**data) + job_event.save(post_process=True) + if not event.startswith('runner_'): + transaction.commit() break except DatabaseError as e: transaction.rollback() - # logger.debug('Database error saving job event, retrying in ' - # '1 second (retry #%d): %s', retry_count + 1, e) + logger.debug('Database error saving job event, retrying in ' + '1 second (retry #%d): %s', retry_count + 1, e) time.sleep(1) else: logger.error('Failed to save job event after %d retries.', @@ -80,9 +77,11 @@ class Worker(Process): class Command(NoArgsCommand): ''' - Management command to run the job callback receiver + Save Job Callback receiver (see awx.plugins.callbacks.job_event_callback) + Runs as a management command and receives job save events. It then hands + them off to worker processors (see Worker) which writes them to the database ''' - + help = 'Launch the job callback receiver' option_list = NoArgsCommand.option_list + ( @@ -124,4 +123,4 @@ class Command(NoArgsCommand): def handle_noargs(self, **options): self.verbosity = int(options.get('verbosity', 1)) self.init_logging() - self.run_subscriber() + self.run_subscriber(port=options.get('port')) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 1cafa4463a..1e893aaba9 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -533,91 +533,6 @@ class RunJob(BaseTask): for job_event in job.job_events.order_by('pk'): job_event.save(post_process=True) - -class SaveJobEvents(Task): - - name = 'awx.main.tasks.save_job_events' - - def process_job_event(self, data, message, events_received=None): - if events_received is None: - events_received = {} - begints = time.time() - event = data.get('event', '') - if not event or 'job_id' not in data: - return - try: - if not isinstance(data['created'], datetime.datetime): - data['created'] = parse_datetime(data['created']) - if not data['created'].tzinfo: - data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) - except (KeyError, ValueError): - data.pop('created', None) - if settings.DEBUG: - print data - for key in data.keys(): - if key not in ('job_id', 'event', 'event_data', 'created'): - data.pop(key) - data['play'] = data.get('event_data', {}).get('play', '').strip() - data['task'] = data.get('event_data', {}).get('task', '').strip() - duplicate = False - if event != '__complete__': - for retry_count in xrange(11): - try: - # Commit any outstanding events before saving stats. - if event == 'playbook_on_stats': - transaction.commit() - if not JobEvent.objects.filter(**data).exists(): - job_event = JobEvent(**data) - job_event.save(post_process=True) - if not event.startswith('runner_'): - transaction.commit() - else: - duplicate = True - if settings.DEBUG: - print 'skipping duplicate job event %r' % data - break - except DatabaseError as e: - transaction.rollback() - logger.debug('Database error saving job event, retrying in ' - '1 second (retry #%d): %s', retry_count + 1, e) - time.sleep(1) - else: - logger.error('Failed to save job event after %d retries.', - retry_count) - if not duplicate: - if event not in events_received: - events_received[event] = 1 - else: - events_received[event] += 1 - if settings.DEBUG: - print 'saved job event in %0.3fs' % (time.time() - begints) - message.ack() - - @transaction.commit_on_success - def run(self, *args, **kwargs): - job_id = kwargs.get('job_id', None) - if not job_id: - return {} - - events_received = {} - process_job_event = functools.partial(self.process_job_event, - events_received=events_received) - - job_events_exchange = Exchange('job_events', 'direct', durable=True) - job_events_queue = Queue('job_events[%d]' % job_id, - exchange=job_events_exchange, - routing_key=('job_events[%d]' % job_id), - auto_delete=True) - with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn: - with conn.Consumer(job_events_queue, callbacks=[process_job_event]) as consumer: - while '__complete__' not in events_received: - conn.drain_events() - - return { - 'job_id': job_id, - 'total_events': sum(events_received.values())} - - class RunProjectUpdate(BaseTask): name = 'awx.main.tasks.run_project_update' diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 4c33ed8345..38c299706f 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -117,15 +117,16 @@ class CallbackModule(object): 'event_data': event_data, 'created': datetime.datetime.utcnow().isoformat(), } + active_pid = os.getpid() if self.job_callback_debug: msg.update({ - 'pid': os.getpid(), + 'pid': active_pid, }) for retry_count in xrange(4): try: if not hasattr(self, 'connection_pid'): - self.connection_pid = os.getpid() - if self.connection_pid != os.getpid(): + self.connection_pid = active_pid + if self.connection_pid != active_pid: self._init_connection() if self.context is None: self._start_connection() From f6870634c46fcac4f985edeaba327c38f0c00647 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Tue, 18 Feb 2014 13:54:28 -0500 Subject: [PATCH 5/6] Cleanup and refactor some parts of thew new zeromq based callback receiver --- .../commands/run_callback_receiver.py | 136 +++++++++--------- awx/plugins/callback/job_event_callback.py | 7 +- awx/settings/defaults.py | 4 + 3 files changed, 80 insertions(+), 67 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 014d9dac61..643eefab7a 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -11,7 +11,7 @@ from multiprocessing import Process # Django from django.conf import settings from django.core.management.base import NoArgsCommand, CommandError -from django.db import transaction +from django.db import transaction, DatabaseError from django.contrib.auth.models import User from django.utils.dateparse import parse_datetime from django.utils.timezone import now, is_aware, make_aware @@ -23,57 +23,85 @@ from awx.main.models import * # ZeroMQ import zmq +def run_subscriber(consumer_port, queue_port, use_workers=True): + + consumer_context = zmq.Context() + consumer_subscriber = consumer_context.socket(zmq.PULL) + consumer_subscriber.bind(consumer_port) + + queue_context = zmq.Context() + queue_publisher = queue_context.socket(zmq.PUSH) + queue_publisher.bind(queue_port) + + if use_workers: + workers = [] + for idx in range(4): + w = Worker(queue_port) + w.start() + workers.append(w) + + while True: # Handle signal + message = consumer_subscriber.recv_json() + if use_workers: + queue_publisher.send_json(message) + else: + process_job_event(message) + + +@transaction.commit_on_success +def process_job_event(data): + event = data.get('event', '') + if not event or 'job_id' not in data: + return + try: + if not isinstance(data['created'], datetime.datetime): + data['created'] = parse_datetime(data['created']) + if not data['created'].tzinfo: + data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) + except (KeyError, ValueError): + data.pop('created', None) + if settings.DEBUG: + print data + for key in data.keys(): + if key not in ('job_id', 'event', 'event_data', 'created'): + data.pop(key) + data['play'] = data.get('event_data', {}).get('play', '').strip() + data['task'] = data.get('event_data', {}).get('task', '').strip() + for retry_count in xrange(11): + try: + if event == 'playbook_on_stats': + transaction.commit() + job_event = JobEvent(**data) + job_event.save(post_process=True) + if not event.startswith('runner_'): + transaction.commit() + break + except DatabaseError as e: + transaction.rollback() + logger.debug('Database error saving job event, retrying in ' + '1 second (retry #%d): %s', retry_count + 1, e) + time.sleep(1) + else: + logger.error('Failed to save job event after %d retries.', + retry_count) + + class Worker(Process): ''' Process to validate and store save job events received via zeromq ''' + def __init__(self, port): + self.port = port + def run(self): print("Starting worker") pool_context = zmq.Context() pool_subscriber = pool_context.socket(zmq.PULL) - pool_subscriber.connect("ipc:///tmp/callback_receiver.ipc") + pool_subscriber.connect(self.port) while True: message = pool_subscriber.recv_json() - self.process_job_event(message) - - @transaction.commit_on_success - def process_job_event(self, data): - event = data.get('event', '') - if not event or 'job_id' not in data: - return - try: - if not isinstance(data['created'], datetime.datetime): - data['created'] = parse_datetime(data['created']) - if not data['created'].tzinfo: - data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) - except (KeyError, ValueError): - data.pop('created', None) - if settings.DEBUG: - print data - for key in data.keys(): - if key not in ('job_id', 'event', 'event_data', 'created'): - data.pop(key) - data['play'] = data.get('event_data', {}).get('play', '').strip() - data['task'] = data.get('event_data', {}).get('task', '').strip() - for retry_count in xrange(11): - try: - if event == 'playbook_on_stats': - transaction.commit() - job_event = JobEvent(**data) - job_event.save(post_process=True) - if not event.startswith('runner_'): - transaction.commit() - break - except DatabaseError as e: - transaction.rollback() - logger.debug('Database error saving job event, retrying in ' - '1 second (retry #%d): %s', retry_count + 1, e) - time.sleep(1) - else: - logger.error('Failed to save job event after %d retries.', - retry_count) - + process_job_event(message) class Command(NoArgsCommand): ''' @@ -98,29 +126,9 @@ class Command(NoArgsCommand): self.logger.addHandler(handler) self.logger.propagate = False - def run_subscriber(self, port=5556): - - consumer_context = zmq.Context() - consumer_subscriber = consumer_context.socket(zmq.PULL) - consumer_subscriber.bind("tcp://127.0.0.1:%s" % str(port)) - print("Consumer Listening on tcp://127.0.0.1:%s" % str(port)) - - queue_context = zmq.Context() - queue_publisher = queue_context.socket(zmq.PUSH) - queue_publisher.bind("ipc:///tmp/callback_receiver.ipc") - print("Publisher Listening on ipc: /tmp/callback_receiver.ip") - - workers = [] - for idx in range(4): - w = Worker() - w.start() - workers.append(w) - - while True: # Handle signal - message = consumer_subscriber.recv_json() - queue_publisher.send_json(message) - def handle_noargs(self, **options): self.verbosity = int(options.get('verbosity', 1)) self.init_logging() - self.run_subscriber(port=options.get('port')) + consumer_port = settings.CALLBACK_CONSUMER_PORT + queue_port = settings.CALLBACK_QUEUE_PORT + run_subscriber(consumer_port, queue_port) diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 38c299706f..1f74042ff9 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -40,6 +40,9 @@ import urlparse import requests +# Django +from django.conf import settings + # ZeroMQ import zmq @@ -79,7 +82,6 @@ class CallbackModule(object): self.auth_token = os.getenv('REST_API_TOKEN', '') self.context = None self.socket = None - self.broker_url = True # TODO: Figure this out for unit tests self._init_logging() self._init_connection() @@ -132,7 +134,6 @@ class CallbackModule(object): self._start_connection() self.socket.send_json(msg) - self.logger.debug('Publish: %r, retry=%d', msg, retry_count) return except Exception, e: self.logger.info('Publish Exception: %r, retry=%d', e, @@ -170,7 +171,7 @@ class CallbackModule(object): task = getattr(getattr(self, 'task', None), 'name', '') if task and event not in self.EVENTS_WITHOUT_TASK: event_data['task'] = task - if self.broker_url: + if not settings.CALLBACK_BYPASS_QUEUE: self._post_job_event_queue_msg(event, event_data) else: self._post_rest_api_event(event, event_data) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index fc3d4f3c50..8380ec740e 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -345,6 +345,10 @@ if 'devserver' in INSTALLED_APPS: else: INTERNAL_API_URL = 'http://127.0.0.1:8000' +CALLBACK_CONSUMER_PORT = "tcp://127.0.0.1:5556" +CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver.ipc" +CALLBACK_BYPASS_QUEUE = False + # Logging configuration. LOGGING = { 'version': 1, From 11cfacd57ad844ba603a4edf109f6fabc0269e8a Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Tue, 18 Feb 2014 13:54:56 -0500 Subject: [PATCH 6/6] Update unit tests to manage zeromq based tasks --- awx/main/tests/base.py | 11 +++++++++++ awx/main/tests/commands.py | 2 ++ awx/main/tests/inventory.py | 5 +++++ awx/main/tests/jobs.py | 10 ++++++++++ awx/main/tests/projects.py | 5 +++++ awx/main/tests/tasks.py | 2 ++ 6 files changed, 35 insertions(+) diff --git a/awx/main/tests/base.py b/awx/main/tests/base.py index 23f851d6e9..e4ad1b5ade 100644 --- a/awx/main/tests/base.py +++ b/awx/main/tests/base.py @@ -10,6 +10,7 @@ import os import shutil import tempfile import time +from multiprocessing import Process # PyYAML import yaml @@ -23,6 +24,8 @@ from django.test.client import Client # AWX from awx.main.models import * from awx.main.backend import LDAPSettings +from awx.main.management.commands.run_callback_receiver import run_subscriber + class BaseTestMixin(object): ''' @@ -363,6 +366,14 @@ class BaseTestMixin(object): for obj in response['results']: self.assertTrue(set(obj.keys()) <= set(fields)) + def start_queue(self, consumer_port, queue_port): + self.queue_process = Process(target=run_subscriber, + args=(consumer_port, queue_port, False,)) + self.queue_process.start() + + def terminate_queue(self): + self.queue_process.terminate() + class BaseTest(BaseTestMixin, django.test.TestCase): ''' Base class for unit tests. diff --git a/awx/main/tests/commands.py b/awx/main/tests/commands.py index b25d6a04d1..6def182bcd 100644 --- a/awx/main/tests/commands.py +++ b/awx/main/tests/commands.py @@ -301,9 +301,11 @@ class CleanupJobsTest(BaseCommandMixin, BaseLiveServerTest): self.project = None self.credential = None settings.INTERNAL_API_URL = self.live_server_url + self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT) def tearDown(self): super(CleanupJobsTest, self).tearDown() + self.terminate_queue() if self.test_project_path: shutil.rmtree(self.test_project_path, True) diff --git a/awx/main/tests/inventory.py b/awx/main/tests/inventory.py index 481c9f7495..50bc04466a 100644 --- a/awx/main/tests/inventory.py +++ b/awx/main/tests/inventory.py @@ -991,7 +991,12 @@ class InventoryUpdatesTest(BaseTransactionTest): self.group = self.inventory.groups.create(name='Cloud Group') self.inventory2 = self.organization.inventories.create(name='Cloud Inventory 2') self.group2 = self.inventory2.groups.create(name='Cloud Group 2') + self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT) + def tearDown(self): + super(InventoryUpdatesTest, self).tearDown() + self.terminate_queue() + def update_inventory_source(self, group, **kwargs): inventory_source = group.inventory_source update_fields = [] diff --git a/awx/main/tests/jobs.py b/awx/main/tests/jobs.py index 1ccf853888..6f21d3ba5e 100644 --- a/awx/main/tests/jobs.py +++ b/awx/main/tests/jobs.py @@ -442,6 +442,12 @@ class BaseJobTestMixin(BaseTestMixin): def setUp(self): super(BaseJobTestMixin, self).setUp() self.populate() + #self.start_queue("ipc:///tmp/test_consumer.ipc", "ipc:///tmp/test_queue.ipc") + self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT) + + def tearDown(self): + super(BaseJobTestMixin, self).tearDown() + self.terminate_queue() class JobTemplateTest(BaseJobTestMixin, django.test.TestCase): @@ -773,6 +779,7 @@ MIDDLEWARE_CLASSES = filter(lambda x: not x.endswith('TransactionMiddleware'), @override_settings(CELERY_ALWAYS_EAGER=True, CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, + CALLBACK_BYPASS_QUEUE=True, ANSIBLE_TRANSPORT='local', MIDDLEWARE_CLASSES=MIDDLEWARE_CLASSES) class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase): @@ -904,6 +911,9 @@ class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase): job = self.job_ops_east_run job.start() + # Wait for events to filter in since we are using a single consumer + time.sleep(30) + # Check that the job detail has been updated. url = reverse('api:job_detail', args=(job.pk,)) with self.current_user(self.user_sue): diff --git a/awx/main/tests/projects.py b/awx/main/tests/projects.py index f211fb143d..5a3ff9df3e 100644 --- a/awx/main/tests/projects.py +++ b/awx/main/tests/projects.py @@ -680,6 +680,11 @@ class ProjectUpdatesTest(BaseTransactionTest): def setUp(self): super(ProjectUpdatesTest, self).setUp() self.setup_users() + self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT) + + def tearDown(self): + super(ProjectUpdatesTest, self).tearDown() + self.terminate_queue() def create_project(self, **kwargs): cred_fields = ['scm_username', 'scm_password', 'scm_key_data', diff --git a/awx/main/tests/tasks.py b/awx/main/tests/tasks.py index 13aa44ec09..8bf31dc434 100644 --- a/awx/main/tests/tasks.py +++ b/awx/main/tests/tasks.py @@ -188,12 +188,14 @@ class RunJobTest(BaseCeleryTest): return args RunJob.build_args = new_build_args settings.INTERNAL_API_URL = self.live_server_url + self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT) def tearDown(self): super(RunJobTest, self).tearDown() if self.test_project_path: shutil.rmtree(self.test_project_path, True) RunJob.build_args = self.original_build_args + self.terminate_queue() def create_test_credential(self, **kwargs): opts = {