diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py new file mode 100644 index 0000000000..643eefab7a --- /dev/null +++ b/awx/main/management/commands/run_callback_receiver.py @@ -0,0 +1,134 @@ +# Copyright (c) 2014 AnsibleWorks, Inc. +# All Rights Reserved. + +# Python +import datetime +import logging +import json +from optparse import make_option +from multiprocessing import Process + +# Django +from django.conf import settings +from django.core.management.base import NoArgsCommand, CommandError +from django.db import transaction, DatabaseError +from django.contrib.auth.models import User +from django.utils.dateparse import parse_datetime +from django.utils.timezone import now, is_aware, make_aware +from django.utils.tzinfo import FixedOffset + +# AWX +from awx.main.models import * + +# ZeroMQ +import zmq + +def run_subscriber(consumer_port, queue_port, use_workers=True): + + consumer_context = zmq.Context() + consumer_subscriber = consumer_context.socket(zmq.PULL) + consumer_subscriber.bind(consumer_port) + + queue_context = zmq.Context() + queue_publisher = queue_context.socket(zmq.PUSH) + queue_publisher.bind(queue_port) + + if use_workers: + workers = [] + for idx in range(4): + w = Worker(queue_port) + w.start() + workers.append(w) + + while True: # Handle signal + message = consumer_subscriber.recv_json() + if use_workers: + queue_publisher.send_json(message) + else: + process_job_event(message) + + +@transaction.commit_on_success +def process_job_event(data): + event = data.get('event', '') + if not event or 'job_id' not in data: + return + try: + if not isinstance(data['created'], datetime.datetime): + data['created'] = parse_datetime(data['created']) + if not data['created'].tzinfo: + data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) + except (KeyError, ValueError): + data.pop('created', None) + if settings.DEBUG: + print data + for key in data.keys(): + if key not in ('job_id', 'event', 'event_data', 'created'): + data.pop(key) + data['play'] = data.get('event_data', {}).get('play', '').strip() + data['task'] = data.get('event_data', {}).get('task', '').strip() + for retry_count in xrange(11): + try: + if event == 'playbook_on_stats': + transaction.commit() + job_event = JobEvent(**data) + job_event.save(post_process=True) + if not event.startswith('runner_'): + transaction.commit() + break + except DatabaseError as e: + transaction.rollback() + logger.debug('Database error saving job event, retrying in ' + '1 second (retry #%d): %s', retry_count + 1, e) + time.sleep(1) + else: + logger.error('Failed to save job event after %d retries.', + retry_count) + + +class Worker(Process): + ''' + Process to validate and store save job events received via zeromq + ''' + + def __init__(self, port): + self.port = port + + def run(self): + print("Starting worker") + pool_context = zmq.Context() + pool_subscriber = pool_context.socket(zmq.PULL) + pool_subscriber.connect(self.port) + while True: + message = pool_subscriber.recv_json() + process_job_event(message) + +class Command(NoArgsCommand): + ''' + Save Job Callback receiver (see awx.plugins.callbacks.job_event_callback) + Runs as a management command and receives job save events. It then hands + them off to worker processors (see Worker) which writes them to the database + ''' + + help = 'Launch the job callback receiver' + + option_list = NoArgsCommand.option_list + ( + make_option('--port', dest='port', type='int', default=5556, + help='Port to listen for requests on'),) + + def init_logging(self): + log_levels = dict(enumerate([logging.ERROR, logging.INFO, + logging.DEBUG, 0])) + self.logger = logging.getLogger('awx.main.commands.run_callback_receiver') + self.logger.setLevel(log_levels.get(self.verbosity, 0)) + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter('%(message)s')) + self.logger.addHandler(handler) + self.logger.propagate = False + + def handle_noargs(self, **options): + self.verbosity = int(options.get('verbosity', 1)) + self.init_logging() + consumer_port = settings.CALLBACK_CONSUMER_PORT + queue_port = settings.CALLBACK_QUEUE_PORT + run_subscriber(consumer_port, queue_port) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index a6f0350083..1e893aaba9 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -494,11 +494,11 @@ class RunJob(BaseTask): elif job.status in ('pending', 'waiting'): job = self.update_model(job.pk, status='pending') # Start another task to process job events. - if settings.BROKER_URL.startswith('amqp://'): - app = Celery('tasks', broker=settings.BROKER_URL) - send_task('awx.main.tasks.save_job_events', kwargs={ - 'job_id': job.id, - }, serializer='json') + # if settings.BROKER_URL.startswith('amqp://'): + # app = Celery('tasks', broker=settings.BROKER_URL) + # send_task('awx.main.tasks.save_job_events', kwargs={ + # 'job_id': job.id, + # }, serializer='json') return True else: return False @@ -511,20 +511,21 @@ class RunJob(BaseTask): # Send a special message to this job's event queue after the job has run # to tell the save job events task to end. if settings.BROKER_URL.startswith('amqp://'): - job_events_exchange = Exchange('job_events', 'direct', durable=True) - job_events_queue = Queue('job_events[%d]' % job.id, - exchange=job_events_exchange, - routing_key=('job_events[%d]' % job.id), - auto_delete=True) - with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn: - with conn.Producer(serializer='json') as producer: - msg = { - 'job_id': job.id, - 'event': '__complete__' - } - producer.publish(msg, exchange=job_events_exchange, - routing_key=('job_events[%d]' % job.id), - declare=[job_events_queue]) + pass + # job_events_exchange = Exchange('job_events', 'direct', durable=True) + # job_events_queue = Queue('job_events[%d]' % job.id, + # exchange=job_events_exchange, + # routing_key=('job_events[%d]' % job.id), + # auto_delete=True) + # with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn: + # with conn.Producer(serializer='json') as producer: + # msg = { + # 'job_id': job.id, + # 'event': '__complete__' + # } + # producer.publish(msg, exchange=job_events_exchange, + # routing_key=('job_events[%d]' % job.id), + # declare=[job_events_queue]) # Update job event fields after job has completed (only when using REST # API callback). @@ -532,91 +533,6 @@ class RunJob(BaseTask): for job_event in job.job_events.order_by('pk'): job_event.save(post_process=True) - -class SaveJobEvents(Task): - - name = 'awx.main.tasks.save_job_events' - - def process_job_event(self, data, message, events_received=None): - if events_received is None: - events_received = {} - begints = time.time() - event = data.get('event', '') - if not event or 'job_id' not in data: - return - try: - if not isinstance(data['created'], datetime.datetime): - data['created'] = parse_datetime(data['created']) - if not data['created'].tzinfo: - data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) - except (KeyError, ValueError): - data.pop('created', None) - if settings.DEBUG: - print data - for key in data.keys(): - if key not in ('job_id', 'event', 'event_data', 'created'): - data.pop(key) - data['play'] = data.get('event_data', {}).get('play', '').strip() - data['task'] = data.get('event_data', {}).get('task', '').strip() - duplicate = False - if event != '__complete__': - for retry_count in xrange(11): - try: - # Commit any outstanding events before saving stats. - if event == 'playbook_on_stats': - transaction.commit() - if not JobEvent.objects.filter(**data).exists(): - job_event = JobEvent(**data) - job_event.save(post_process=True) - if not event.startswith('runner_'): - transaction.commit() - else: - duplicate = True - if settings.DEBUG: - print 'skipping duplicate job event %r' % data - break - except DatabaseError as e: - transaction.rollback() - logger.debug('Database error saving job event, retrying in ' - '1 second (retry #%d): %s', retry_count + 1, e) - time.sleep(1) - else: - logger.error('Failed to save job event after %d retries.', - retry_count) - if not duplicate: - if event not in events_received: - events_received[event] = 1 - else: - events_received[event] += 1 - if settings.DEBUG: - print 'saved job event in %0.3fs' % (time.time() - begints) - message.ack() - - @transaction.commit_on_success - def run(self, *args, **kwargs): - job_id = kwargs.get('job_id', None) - if not job_id: - return {} - - events_received = {} - process_job_event = functools.partial(self.process_job_event, - events_received=events_received) - - job_events_exchange = Exchange('job_events', 'direct', durable=True) - job_events_queue = Queue('job_events[%d]' % job_id, - exchange=job_events_exchange, - routing_key=('job_events[%d]' % job_id), - auto_delete=True) - with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn: - with conn.Consumer(job_events_queue, callbacks=[process_job_event]) as consumer: - while '__complete__' not in events_received: - conn.drain_events() - - return { - 'job_id': job_id, - 'total_events': sum(events_received.values())} - - class RunProjectUpdate(BaseTask): name = 'awx.main.tasks.run_project_update' diff --git a/awx/main/tests/base.py b/awx/main/tests/base.py index 23f851d6e9..e4ad1b5ade 100644 --- a/awx/main/tests/base.py +++ b/awx/main/tests/base.py @@ -10,6 +10,7 @@ import os import shutil import tempfile import time +from multiprocessing import Process # PyYAML import yaml @@ -23,6 +24,8 @@ from django.test.client import Client # AWX from awx.main.models import * from awx.main.backend import LDAPSettings +from awx.main.management.commands.run_callback_receiver import run_subscriber + class BaseTestMixin(object): ''' @@ -363,6 +366,14 @@ class BaseTestMixin(object): for obj in response['results']: self.assertTrue(set(obj.keys()) <= set(fields)) + def start_queue(self, consumer_port, queue_port): + self.queue_process = Process(target=run_subscriber, + args=(consumer_port, queue_port, False,)) + self.queue_process.start() + + def terminate_queue(self): + self.queue_process.terminate() + class BaseTest(BaseTestMixin, django.test.TestCase): ''' Base class for unit tests. diff --git a/awx/main/tests/commands.py b/awx/main/tests/commands.py index b25d6a04d1..6def182bcd 100644 --- a/awx/main/tests/commands.py +++ b/awx/main/tests/commands.py @@ -301,9 +301,11 @@ class CleanupJobsTest(BaseCommandMixin, BaseLiveServerTest): self.project = None self.credential = None settings.INTERNAL_API_URL = self.live_server_url + self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT) def tearDown(self): super(CleanupJobsTest, self).tearDown() + self.terminate_queue() if self.test_project_path: shutil.rmtree(self.test_project_path, True) diff --git a/awx/main/tests/inventory.py b/awx/main/tests/inventory.py index 481c9f7495..50bc04466a 100644 --- a/awx/main/tests/inventory.py +++ b/awx/main/tests/inventory.py @@ -991,7 +991,12 @@ class InventoryUpdatesTest(BaseTransactionTest): self.group = self.inventory.groups.create(name='Cloud Group') self.inventory2 = self.organization.inventories.create(name='Cloud Inventory 2') self.group2 = self.inventory2.groups.create(name='Cloud Group 2') + self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT) + def tearDown(self): + super(InventoryUpdatesTest, self).tearDown() + self.terminate_queue() + def update_inventory_source(self, group, **kwargs): inventory_source = group.inventory_source update_fields = [] diff --git a/awx/main/tests/jobs.py b/awx/main/tests/jobs.py index 1ccf853888..6f21d3ba5e 100644 --- a/awx/main/tests/jobs.py +++ b/awx/main/tests/jobs.py @@ -442,6 +442,12 @@ class BaseJobTestMixin(BaseTestMixin): def setUp(self): super(BaseJobTestMixin, self).setUp() self.populate() + #self.start_queue("ipc:///tmp/test_consumer.ipc", "ipc:///tmp/test_queue.ipc") + self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT) + + def tearDown(self): + super(BaseJobTestMixin, self).tearDown() + self.terminate_queue() class JobTemplateTest(BaseJobTestMixin, django.test.TestCase): @@ -773,6 +779,7 @@ MIDDLEWARE_CLASSES = filter(lambda x: not x.endswith('TransactionMiddleware'), @override_settings(CELERY_ALWAYS_EAGER=True, CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, + CALLBACK_BYPASS_QUEUE=True, ANSIBLE_TRANSPORT='local', MIDDLEWARE_CLASSES=MIDDLEWARE_CLASSES) class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase): @@ -904,6 +911,9 @@ class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase): job = self.job_ops_east_run job.start() + # Wait for events to filter in since we are using a single consumer + time.sleep(30) + # Check that the job detail has been updated. url = reverse('api:job_detail', args=(job.pk,)) with self.current_user(self.user_sue): diff --git a/awx/main/tests/projects.py b/awx/main/tests/projects.py index f211fb143d..5a3ff9df3e 100644 --- a/awx/main/tests/projects.py +++ b/awx/main/tests/projects.py @@ -680,6 +680,11 @@ class ProjectUpdatesTest(BaseTransactionTest): def setUp(self): super(ProjectUpdatesTest, self).setUp() self.setup_users() + self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT) + + def tearDown(self): + super(ProjectUpdatesTest, self).tearDown() + self.terminate_queue() def create_project(self, **kwargs): cred_fields = ['scm_username', 'scm_password', 'scm_key_data', diff --git a/awx/main/tests/tasks.py b/awx/main/tests/tasks.py index 13aa44ec09..8bf31dc434 100644 --- a/awx/main/tests/tasks.py +++ b/awx/main/tests/tasks.py @@ -188,12 +188,14 @@ class RunJobTest(BaseCeleryTest): return args RunJob.build_args = new_build_args settings.INTERNAL_API_URL = self.live_server_url + self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT) def tearDown(self): super(RunJobTest, self).tearDown() if self.test_project_path: shutil.rmtree(self.test_project_path, True) RunJob.build_args = self.original_build_args + self.terminate_queue() def create_test_credential(self, **kwargs): opts = { diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 4fb6e7120d..1f74042ff9 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -38,26 +38,13 @@ import sys import urllib import urlparse -# Requests / Kombu -try: - import requests - from kombu import Connection, Exchange, Queue -except ImportError: - # If running from an AWX installation, use the local version of requests if - # if cannot be found globally. - local_site_packages = os.path.join(os.path.dirname(__file__), '..', '..', - 'lib', 'site-packages') - sys.path.insert(0, local_site_packages) - import requests - from kombu import Connection, Exchange, Queue +import requests -# Check to see if librabbitmq is installed. -try: - import librabbitmq - LIBRABBITMQ_INSTALLED = True -except ImportError: - LIBRABBITMQ_INSTALLED = False +# Django +from django.conf import settings +# ZeroMQ +import zmq class TokenAuth(requests.auth.AuthBase): @@ -93,14 +80,10 @@ class CallbackModule(object): self.job_id = int(os.getenv('JOB_ID')) self.base_url = os.getenv('REST_API_URL', '') self.auth_token = os.getenv('REST_API_TOKEN', '') - self.broker_url = os.getenv('BROKER_URL', '') + self.context = None + self.socket = None self._init_logging() - # Since we don't yet have a way to confirm publish when using - # librabbitmq, ensure we use pyamqp even if librabbitmq happens to be - # installed. - if LIBRABBITMQ_INSTALLED: - self.logger.info('Forcing use of pyamqp instead of librabbitmq') - self.broker_url = self.broker_url.replace('amqp://', 'pyamqp://') + self._init_connection() def _init_logging(self): try: @@ -120,76 +103,42 @@ class CallbackModule(object): self.logger.addHandler(handler) self.logger.propagate = False - def __del__(self): - self._cleanup_connection() + def _init_connection(self): + self.context = None + self.socket = None - def _publish_errback(self, exc, interval): - self.logger.info('Publish Error: %r', exc) - - def _cleanup_connection(self): - if hasattr(self, 'producer'): - try: - #self.logger.debug('Cleanup Producer: %r', self.producer) - self.producer.cancel() - except: - pass - del self.producer - if hasattr(self, 'connection'): - try: - #self.logger.debug('Cleanup Connection: %r', self.connection) - self.connection.release() - except: - pass - del self.connection + def _start_connection(self): + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PUSH) + self.socket.connect("tcp://127.0.0.1:5556") def _post_job_event_queue_msg(self, event, event_data): - if not hasattr(self, 'job_events_exchange'): - self.job_events_exchange = Exchange('job_events', 'direct', - durable=True) - if not hasattr(self, 'job_events_queue'): - self.job_events_queue = Queue('job_events[%d]' % self.job_id, - exchange=self.job_events_exchange, - routing_key=('job_events[%d]' % self.job_id), - auto_delete=True) msg = { 'job_id': self.job_id, 'event': event, 'event_data': event_data, 'created': datetime.datetime.utcnow().isoformat(), } + active_pid = os.getpid() if self.job_callback_debug: msg.update({ - 'pid': os.getpid(), + 'pid': active_pid, }) for retry_count in xrange(4): try: if not hasattr(self, 'connection_pid'): - self.connection_pid = os.getpid() - if self.connection_pid != os.getpid(): - self._cleanup_connection() - if not hasattr(self, 'connection'): - self.connection = Connection(self.broker_url, transport_options={'confirm_publish': True}) - self.logger.debug('New Connection: %r, retry=%d', - self.connection, retry_count) - if not hasattr(self, 'producer'): - channel = self.connection.channel() - self.producer = self.connection.Producer(channel, exchange=self.job_events_exchange, serializer='json') - self.publish = self.connection.ensure(self.producer, self.producer.publish, - errback=self._publish_errback, - max_retries=3, interval_start=1, interval_step=1, interval_max=10) - self.logger.debug('New Producer: %r, retry=%d', - self.producer, retry_count) - self.logger.debug('Publish: %r, retry=%d', msg, retry_count) - self.publish(msg, exchange=self.job_events_exchange, - routing_key=('job_events[%d]' % self.job_id), - declare=[self.job_events_queue]) - if event == 'playbook_on_stats': - self._cleanup_connection() + self.connection_pid = active_pid + if self.connection_pid != active_pid: + self._init_connection() + if self.context is None: + self._start_connection() + + self.socket.send_json(msg) return except Exception, e: self.logger.info('Publish Exception: %r, retry=%d', e, retry_count, exc_info=True) - self._cleanup_connection() + # TODO: Maybe recycle connection here? if retry_count >= 3: raise @@ -222,7 +171,7 @@ class CallbackModule(object): task = getattr(getattr(self, 'task', None), 'name', '') if task and event not in self.EVENTS_WITHOUT_TASK: event_data['task'] = task - if self.broker_url: + if not settings.CALLBACK_BYPASS_QUEUE: self._post_job_event_queue_msg(event, event_data) else: self._post_rest_api_event(event, event_data) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index fc3d4f3c50..8380ec740e 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -345,6 +345,10 @@ if 'devserver' in INSTALLED_APPS: else: INTERNAL_API_URL = 'http://127.0.0.1:8000' +CALLBACK_CONSUMER_PORT = "tcp://127.0.0.1:5556" +CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver.ipc" +CALLBACK_BYPASS_QUEUE = False + # Logging configuration. LOGGING = { 'version': 1,