From 66e9e2310c85af07e1690a3148299a20df5e4639 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Tue, 11 Feb 2014 05:40:31 -0500 Subject: [PATCH] Initial 0mq implementation --- .../commands/run_callback_receiver.py | 105 ++++++++++++++++++ awx/main/tasks.py | 39 +++---- awx/plugins/callback/job_event_callback.py | 92 +++------------ 3 files changed, 140 insertions(+), 96 deletions(-) create mode 100644 awx/main/management/commands/run_callback_receiver.py diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py new file mode 100644 index 0000000000..2d9a21a8da --- /dev/null +++ b/awx/main/management/commands/run_callback_receiver.py @@ -0,0 +1,105 @@ +# Copyright (c) 2014 AnsibleWorks, Inc. +# All Rights Reserved. + +# Python +import datetime +import logging +import json +from optparse import make_option + +# Django +from django.conf import settings +from django.core.management.base import NoArgsCommand, CommandError +from django.db import transaction +from django.contrib.auth.models import User +from django.utils.dateparse import parse_datetime +from django.utils.timezone import now, is_aware, make_aware +from django.utils.tzinfo import FixedOffset + +# AWX +from awx.main.models import * + +# ZeroMQ +import zmq + +class Command(NoArgsCommand): + ''' + Management command to run the job callback receiver + ''' + + help = 'Launch the job callback receiver' + + option_list = NoArgsCommand.option_list + ( + make_option('--port', dest='port', type='int', default=5556, + help='Port to listen for requests on'),) + + def init_logging(self): + log_levels = dict(enumerate([logging.ERROR, logging.INFO, + logging.DEBUG, 0])) + self.logger = logging.getLogger('awx.main.commands.run_callback_receiver') + self.logger.setLevel(log_levels.get(self.verbosity, 0)) + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter('%(message)s')) + self.logger.addHandler(handler) + self.logger.propagate = False + + @transaction.commit_on_success + def process_job_event(self, data): + print("Received data: %s" % str(data)) + event = data.get('event', '') + if not event or 'job_id' not in data: + return + try: + if not isinstance(data['created'], datetime.datetime): + data['created'] = parse_datetime(data['created']) + if not data['created'].tzinfo: + data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) + except (KeyError, ValueError): + data.pop('created', None) + if settings.DEBUG: + print data + for key in data.keys(): + if key not in ('job_id', 'event', 'event_data', 'created'): + data.pop(key) + data['play'] = data.get('event_data', {}).get('play', '').strip() + data['task'] = data.get('event_data', {}).get('task', '').strip() + for retry_count in xrange(11): + try: + if event == 'playbook_on_stats': + transaction.commit() + if not JobEvent.objects.filter(**data).exists(): + job_event = JobEvent(**data) + job_event.save(post_process=True) + if not event.startswith('runner_'): + transaction.commit() + else: + duplicate = True + if settings.DEBUG: + print 'skipping duplicate job event %r' % data + break + except DatabaseError as e: + transaction.rollback() + logger.debug('Database error saving job event, retrying in ' + '1 second (retry #%d): %s', retry_count + 1, e) + time.sleep(1) + else: + logger.error('Failed to save job event after %d retries.', + retry_count) + + def run_subscriber(self, port=5556): + print("Starting ZMQ Context") + context = zmq.Context() + subscriber = context.socket(zmq.REP) + print("Starting connection") + subscriber.bind("tcp://127.0.0.1:%s" % str(port)) + print("Listening on tcp://127.0.0.1:%s" % str(port)) + while True: # Handle signal + message = subscriber.recv() + subscriber.send("1") + data = json.loads(message) + self.process_job_event(data) + + def handle_noargs(self, **options): + self.verbosity = int(options.get('verbosity', 1)) + self.init_logging() + self.run_subscriber() diff --git a/awx/main/tasks.py b/awx/main/tasks.py index a6f0350083..1cafa4463a 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -494,11 +494,11 @@ class RunJob(BaseTask): elif job.status in ('pending', 'waiting'): job = self.update_model(job.pk, status='pending') # Start another task to process job events. - if settings.BROKER_URL.startswith('amqp://'): - app = Celery('tasks', broker=settings.BROKER_URL) - send_task('awx.main.tasks.save_job_events', kwargs={ - 'job_id': job.id, - }, serializer='json') + # if settings.BROKER_URL.startswith('amqp://'): + # app = Celery('tasks', broker=settings.BROKER_URL) + # send_task('awx.main.tasks.save_job_events', kwargs={ + # 'job_id': job.id, + # }, serializer='json') return True else: return False @@ -511,20 +511,21 @@ class RunJob(BaseTask): # Send a special message to this job's event queue after the job has run # to tell the save job events task to end. if settings.BROKER_URL.startswith('amqp://'): - job_events_exchange = Exchange('job_events', 'direct', durable=True) - job_events_queue = Queue('job_events[%d]' % job.id, - exchange=job_events_exchange, - routing_key=('job_events[%d]' % job.id), - auto_delete=True) - with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn: - with conn.Producer(serializer='json') as producer: - msg = { - 'job_id': job.id, - 'event': '__complete__' - } - producer.publish(msg, exchange=job_events_exchange, - routing_key=('job_events[%d]' % job.id), - declare=[job_events_queue]) + pass + # job_events_exchange = Exchange('job_events', 'direct', durable=True) + # job_events_queue = Queue('job_events[%d]' % job.id, + # exchange=job_events_exchange, + # routing_key=('job_events[%d]' % job.id), + # auto_delete=True) + # with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn: + # with conn.Producer(serializer='json') as producer: + # msg = { + # 'job_id': job.id, + # 'event': '__complete__' + # } + # producer.publish(msg, exchange=job_events_exchange, + # routing_key=('job_events[%d]' % job.id), + # declare=[job_events_queue]) # Update job event fields after job has completed (only when using REST # API callback). diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 4fb6e7120d..2782142b28 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -38,26 +38,10 @@ import sys import urllib import urlparse -# Requests / Kombu -try: - import requests - from kombu import Connection, Exchange, Queue -except ImportError: - # If running from an AWX installation, use the local version of requests if - # if cannot be found globally. - local_site_packages = os.path.join(os.path.dirname(__file__), '..', '..', - 'lib', 'site-packages') - sys.path.insert(0, local_site_packages) - import requests - from kombu import Connection, Exchange, Queue - -# Check to see if librabbitmq is installed. -try: - import librabbitmq - LIBRABBITMQ_INSTALLED = True -except ImportError: - LIBRABBITMQ_INSTALLED = False +import requests +# ZeroMQ +import zmq class TokenAuth(requests.auth.AuthBase): @@ -93,14 +77,11 @@ class CallbackModule(object): self.job_id = int(os.getenv('JOB_ID')) self.base_url = os.getenv('REST_API_URL', '') self.auth_token = os.getenv('REST_API_TOKEN', '') - self.broker_url = os.getenv('BROKER_URL', '') + self.context = None + self.socket = None + self.broker_url = True # TODO: Figure this out for unit tests self._init_logging() - # Since we don't yet have a way to confirm publish when using - # librabbitmq, ensure we use pyamqp even if librabbitmq happens to be - # installed. - if LIBRABBITMQ_INSTALLED: - self.logger.info('Forcing use of pyamqp instead of librabbitmq') - self.broker_url = self.broker_url.replace('amqp://', 'pyamqp://') + self._init_connection() def _init_logging(self): try: @@ -120,37 +101,12 @@ class CallbackModule(object): self.logger.addHandler(handler) self.logger.propagate = False - def __del__(self): - self._cleanup_connection() - - def _publish_errback(self, exc, interval): - self.logger.info('Publish Error: %r', exc) - - def _cleanup_connection(self): - if hasattr(self, 'producer'): - try: - #self.logger.debug('Cleanup Producer: %r', self.producer) - self.producer.cancel() - except: - pass - del self.producer - if hasattr(self, 'connection'): - try: - #self.logger.debug('Cleanup Connection: %r', self.connection) - self.connection.release() - except: - pass - del self.connection + def _init_connection(self): + self.context = zmq.Context() + self.socket = self.context.socket(zmq.REQ) + self.socket.connect("tcp://127.0.0.1:5556") def _post_job_event_queue_msg(self, event, event_data): - if not hasattr(self, 'job_events_exchange'): - self.job_events_exchange = Exchange('job_events', 'direct', - durable=True) - if not hasattr(self, 'job_events_queue'): - self.job_events_queue = Queue('job_events[%d]' % self.job_id, - exchange=self.job_events_exchange, - routing_key=('job_events[%d]' % self.job_id), - auto_delete=True) msg = { 'job_id': self.job_id, 'event': event, @@ -163,33 +119,15 @@ class CallbackModule(object): }) for retry_count in xrange(4): try: - if not hasattr(self, 'connection_pid'): - self.connection_pid = os.getpid() - if self.connection_pid != os.getpid(): - self._cleanup_connection() - if not hasattr(self, 'connection'): - self.connection = Connection(self.broker_url, transport_options={'confirm_publish': True}) - self.logger.debug('New Connection: %r, retry=%d', - self.connection, retry_count) - if not hasattr(self, 'producer'): - channel = self.connection.channel() - self.producer = self.connection.Producer(channel, exchange=self.job_events_exchange, serializer='json') - self.publish = self.connection.ensure(self.producer, self.producer.publish, - errback=self._publish_errback, - max_retries=3, interval_start=1, interval_step=1, interval_max=10) - self.logger.debug('New Producer: %r, retry=%d', - self.producer, retry_count) + self.socket.send(json.dumps(msg)) self.logger.debug('Publish: %r, retry=%d', msg, retry_count) - self.publish(msg, exchange=self.job_events_exchange, - routing_key=('job_events[%d]' % self.job_id), - declare=[self.job_events_queue]) - if event == 'playbook_on_stats': - self._cleanup_connection() + reply = self.socket.recv() + print("Received reply: " + str(reply)) return except Exception, e: self.logger.info('Publish Exception: %r, retry=%d', e, retry_count, exc_info=True) - self._cleanup_connection() + # TODO: Maybe recycle connection here? if retry_count >= 3: raise