From f6870634c46fcac4f985edeaba327c38f0c00647 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Tue, 18 Feb 2014 13:54:28 -0500 Subject: [PATCH] Cleanup and refactor some parts of thew new zeromq based callback receiver --- .../commands/run_callback_receiver.py | 136 +++++++++--------- awx/plugins/callback/job_event_callback.py | 7 +- awx/settings/defaults.py | 4 + 3 files changed, 80 insertions(+), 67 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 014d9dac61..643eefab7a 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -11,7 +11,7 @@ from multiprocessing import Process # Django from django.conf import settings from django.core.management.base import NoArgsCommand, CommandError -from django.db import transaction +from django.db import transaction, DatabaseError from django.contrib.auth.models import User from django.utils.dateparse import parse_datetime from django.utils.timezone import now, is_aware, make_aware @@ -23,57 +23,85 @@ from awx.main.models import * # ZeroMQ import zmq +def run_subscriber(consumer_port, queue_port, use_workers=True): + + consumer_context = zmq.Context() + consumer_subscriber = consumer_context.socket(zmq.PULL) + consumer_subscriber.bind(consumer_port) + + queue_context = zmq.Context() + queue_publisher = queue_context.socket(zmq.PUSH) + queue_publisher.bind(queue_port) + + if use_workers: + workers = [] + for idx in range(4): + w = Worker(queue_port) + w.start() + workers.append(w) + + while True: # Handle signal + message = consumer_subscriber.recv_json() + if use_workers: + queue_publisher.send_json(message) + else: + process_job_event(message) + + +@transaction.commit_on_success +def process_job_event(data): + event = data.get('event', '') + if not event or 'job_id' not in data: + return + try: + if not isinstance(data['created'], datetime.datetime): + data['created'] = parse_datetime(data['created']) + if not data['created'].tzinfo: + data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) + except (KeyError, ValueError): + data.pop('created', None) + if settings.DEBUG: + print data + for key in data.keys(): + if key not in ('job_id', 'event', 'event_data', 'created'): + data.pop(key) + data['play'] = data.get('event_data', {}).get('play', '').strip() + data['task'] = data.get('event_data', {}).get('task', '').strip() + for retry_count in xrange(11): + try: + if event == 'playbook_on_stats': + transaction.commit() + job_event = JobEvent(**data) + job_event.save(post_process=True) + if not event.startswith('runner_'): + transaction.commit() + break + except DatabaseError as e: + transaction.rollback() + logger.debug('Database error saving job event, retrying in ' + '1 second (retry #%d): %s', retry_count + 1, e) + time.sleep(1) + else: + logger.error('Failed to save job event after %d retries.', + retry_count) + + class Worker(Process): ''' Process to validate and store save job events received via zeromq ''' + def __init__(self, port): + self.port = port + def run(self): print("Starting worker") pool_context = zmq.Context() pool_subscriber = pool_context.socket(zmq.PULL) - pool_subscriber.connect("ipc:///tmp/callback_receiver.ipc") + pool_subscriber.connect(self.port) while True: message = pool_subscriber.recv_json() - self.process_job_event(message) - - @transaction.commit_on_success - def process_job_event(self, data): - event = data.get('event', '') - if not event or 'job_id' not in data: - return - try: - if not isinstance(data['created'], datetime.datetime): - data['created'] = parse_datetime(data['created']) - if not data['created'].tzinfo: - data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) - except (KeyError, ValueError): - data.pop('created', None) - if settings.DEBUG: - print data - for key in data.keys(): - if key not in ('job_id', 'event', 'event_data', 'created'): - data.pop(key) - data['play'] = data.get('event_data', {}).get('play', '').strip() - data['task'] = data.get('event_data', {}).get('task', '').strip() - for retry_count in xrange(11): - try: - if event == 'playbook_on_stats': - transaction.commit() - job_event = JobEvent(**data) - job_event.save(post_process=True) - if not event.startswith('runner_'): - transaction.commit() - break - except DatabaseError as e: - transaction.rollback() - logger.debug('Database error saving job event, retrying in ' - '1 second (retry #%d): %s', retry_count + 1, e) - time.sleep(1) - else: - logger.error('Failed to save job event after %d retries.', - retry_count) - + process_job_event(message) class Command(NoArgsCommand): ''' @@ -98,29 +126,9 @@ class Command(NoArgsCommand): self.logger.addHandler(handler) self.logger.propagate = False - def run_subscriber(self, port=5556): - - consumer_context = zmq.Context() - consumer_subscriber = consumer_context.socket(zmq.PULL) - consumer_subscriber.bind("tcp://127.0.0.1:%s" % str(port)) - print("Consumer Listening on tcp://127.0.0.1:%s" % str(port)) - - queue_context = zmq.Context() - queue_publisher = queue_context.socket(zmq.PUSH) - queue_publisher.bind("ipc:///tmp/callback_receiver.ipc") - print("Publisher Listening on ipc: /tmp/callback_receiver.ip") - - workers = [] - for idx in range(4): - w = Worker() - w.start() - workers.append(w) - - while True: # Handle signal - message = consumer_subscriber.recv_json() - queue_publisher.send_json(message) - def handle_noargs(self, **options): self.verbosity = int(options.get('verbosity', 1)) self.init_logging() - self.run_subscriber(port=options.get('port')) + consumer_port = settings.CALLBACK_CONSUMER_PORT + queue_port = settings.CALLBACK_QUEUE_PORT + run_subscriber(consumer_port, queue_port) diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 38c299706f..1f74042ff9 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -40,6 +40,9 @@ import urlparse import requests +# Django +from django.conf import settings + # ZeroMQ import zmq @@ -79,7 +82,6 @@ class CallbackModule(object): self.auth_token = os.getenv('REST_API_TOKEN', '') self.context = None self.socket = None - self.broker_url = True # TODO: Figure this out for unit tests self._init_logging() self._init_connection() @@ -132,7 +134,6 @@ class CallbackModule(object): self._start_connection() self.socket.send_json(msg) - self.logger.debug('Publish: %r, retry=%d', msg, retry_count) return except Exception, e: self.logger.info('Publish Exception: %r, retry=%d', e, @@ -170,7 +171,7 @@ class CallbackModule(object): task = getattr(getattr(self, 'task', None), 'name', '') if task and event not in self.EVENTS_WITHOUT_TASK: event_data['task'] = task - if self.broker_url: + if not settings.CALLBACK_BYPASS_QUEUE: self._post_job_event_queue_msg(event, event_data) else: self._post_rest_api_event(event, event_data) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index fc3d4f3c50..8380ec740e 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -345,6 +345,10 @@ if 'devserver' in INSTALLED_APPS: else: INTERNAL_API_URL = 'http://127.0.0.1:8000' +CALLBACK_CONSUMER_PORT = "tcp://127.0.0.1:5556" +CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver.ipc" +CALLBACK_BYPASS_QUEUE = False + # Logging configuration. LOGGING = { 'version': 1,