From 8e61d17cb4e8c958da6bb2ccb6aef1986668c31b Mon Sep 17 00:00:00 2001 From: Chris Church Date: Wed, 26 Feb 2014 14:11:59 -0500 Subject: [PATCH] Updated zeromq callback to receive callback port via environment variable. Changed production, development and unit tests to all use different ports/queues for the callback. --- .../commands/run_callback_receiver.py | 11 +++++++++- awx/main/tasks.py | 10 ++------- awx/main/tests/base.py | 21 +++++++++++++------ awx/main/tests/jobs.py | 6 +++--- awx/main/tests/tasks.py | 3 ++- awx/plugins/callback/job_event_callback.py | 10 ++++----- awx/settings/defaults.py | 2 +- awx/settings/development.py | 5 +++++ 8 files changed, 43 insertions(+), 25 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index f514684477..8bcb6f14e3 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -50,7 +50,12 @@ def run_subscriber(consumer_port, queue_port, use_workers=True): w.daemon = True w.start() workers.append(w) + signal.signal(signal.SIGINT, shutdown_handler(workers)) signal.signal(signal.SIGTERM, shutdown_handler(workers)) + if settings.DEBUG: + print 'Started callback receiver (4 workers)' + elif settings.DEBUG: + print 'Started callback receiver (no workers)' while True: # Handle signal message = consumer_subscriber.recv_json() @@ -134,4 +139,8 @@ class Command(NoArgsCommand): self.init_logging() consumer_port = settings.CALLBACK_CONSUMER_PORT queue_port = settings.CALLBACK_QUEUE_PORT - run_subscriber(consumer_port, queue_port) + try: + run_subscriber(consumer_port, queue_port) + except KeyboardInterrupt: + pass + diff --git a/awx/main/tasks.py b/awx/main/tasks.py index bb0f7275d3..f6c8c83edf 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -388,8 +388,7 @@ class RunJob(BaseTask): env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_dir env['REST_API_URL'] = settings.INTERNAL_API_URL env['REST_API_TOKEN'] = job.task_auth_token or '' - if settings.BROKER_URL.startswith('amqp://'): - env['BROKER_URL'] = settings.BROKER_URL + env['CALLBACK_CONSUMER_PORT'] = settings.CALLBACK_CONSUMER_PORT if getattr(settings, 'JOB_CALLBACK_DEBUG', False): env['JOB_CALLBACK_DEBUG'] = '2' elif settings.DEBUG: @@ -502,14 +501,9 @@ class RunJob(BaseTask): Hook for actions to run after job/task has completed. ''' super(RunJob, self).post_run_hook(job, **kwargs) - # Send a special message to this job's event queue after the job has run - # to tell the save job events task to end. - if settings.BROKER_URL.startswith('amqp://'): - pass - # Update job event fields after job has completed (only when using REST # API callback). - else: + if not settings.CALLBACK_CONSUMER_PORT: for job_event in job.job_events.order_by('pk'): job_event.save(post_process=True) diff --git a/awx/main/tests/base.py b/awx/main/tests/base.py index e4ad1b5ade..aa82d04062 100644 --- a/awx/main/tests/base.py +++ b/awx/main/tests/base.py @@ -7,6 +7,7 @@ import contextlib import datetime import json import os +import random import shutil import tempfile import time @@ -51,11 +52,15 @@ class BaseTestMixin(object): # commands that run from tests. for opt in ('ENGINE', 'NAME', 'USER', 'PASSWORD', 'HOST', 'PORT'): os.environ['AWX_TEST_DATABASE_%s' % opt] = settings.DATABASES['default'][opt] - # For now, prevent tests from trying to use celery for job event - # callbacks. - if settings.BROKER_URL.startswith('amqp://'): - settings.BROKER_URL = 'django://' + # Set flag so that task chain works with unit tests. settings.CELERY_UNIT_TEST = True + # Create unique random consumer and queue ports for zeromq callback. + if settings.CALLBACK_CONSUMER_PORT: + callback_port = random.randint(55700, 55799) + settings.CALLBACK_CONSUMER_PORT = 'tcp://127.0.0.1:%d' % callback_port + callback_queue_path = '/tmp/callback_receiver_test_%d.ipc' % callback_port + self._temp_project_dirs.append(callback_queue_path) + settings.CALLBACK_QUEUE_PORT = 'ipc://%s' % callback_queue_path # Make temp job status directory for unit tests. job_status_dir = tempfile.mkdtemp() self._temp_project_dirs.append(job_status_dir) @@ -66,7 +71,10 @@ class BaseTestMixin(object): super(BaseTestMixin, self).tearDown() for project_dir in self._temp_project_dirs: if os.path.exists(project_dir): - shutil.rmtree(project_dir, True) + if os.path.isdir(project_dir): + shutil.rmtree(project_dir, True) + else: + os.remove(project_dir) # Restore previous settings after each test. settings._wrapped = self._wrapped @@ -372,7 +380,8 @@ class BaseTestMixin(object): self.queue_process.start() def terminate_queue(self): - self.queue_process.terminate() + if hasattr(self, 'queue_process'): + self.queue_process.terminate() class BaseTest(BaseTestMixin, django.test.TestCase): ''' diff --git a/awx/main/tests/jobs.py b/awx/main/tests/jobs.py index b638528700..4e84c2b310 100644 --- a/awx/main/tests/jobs.py +++ b/awx/main/tests/jobs.py @@ -442,8 +442,8 @@ class BaseJobTestMixin(BaseTestMixin): def setUp(self): super(BaseJobTestMixin, self).setUp() self.populate() - #self.start_queue("ipc:///tmp/test_consumer.ipc", "ipc:///tmp/test_queue.ipc") - self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT) + if settings.CALLBACK_CONSUMER_PORT: + self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT) def tearDown(self): super(BaseJobTestMixin, self).tearDown() @@ -779,7 +779,7 @@ MIDDLEWARE_CLASSES = filter(lambda x: not x.endswith('TransactionMiddleware'), @override_settings(CELERY_ALWAYS_EAGER=True, CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, - CALLBACK_BYPASS_QUEUE=True, + CALLBACK_CONSUMER_PORT='', ANSIBLE_TRANSPORT='local', MIDDLEWARE_CLASSES=MIDDLEWARE_CLASSES) class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase): diff --git a/awx/main/tests/tasks.py b/awx/main/tests/tasks.py index 8bf31dc434..240c17b02c 100644 --- a/awx/main/tests/tasks.py +++ b/awx/main/tests/tasks.py @@ -188,7 +188,8 @@ class RunJobTest(BaseCeleryTest): return args RunJob.build_args = new_build_args settings.INTERNAL_API_URL = self.live_server_url - self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT) + if settings.CALLBACK_CONSUMER_PORT: + self.start_queue(settings.CALLBACK_CONSUMER_PORT, settings.CALLBACK_QUEUE_PORT) def tearDown(self): super(RunJobTest, self).tearDown() diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 93a671d899..860d736701 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -39,14 +39,13 @@ import urllib import urlparse import time +# Requests import requests -# Django -from django.conf import settings - # ZeroMQ import zmq + class TokenAuth(requests.auth.AuthBase): def __init__(self, token): @@ -81,6 +80,7 @@ class CallbackModule(object): self.job_id = int(os.getenv('JOB_ID')) self.base_url = os.getenv('REST_API_URL', '') self.auth_token = os.getenv('REST_API_TOKEN', '') + self.callback_consumer_port = os.getenv('CALLBACK_CONSUMER_PORT', '') self.context = None self.socket = None self._init_logging() @@ -111,7 +111,7 @@ class CallbackModule(object): def _start_connection(self): self.context = zmq.Context() self.socket = self.context.socket(zmq.REQ) - self.socket.connect("tcp://127.0.0.1:5556") + self.socket.connect(self.callback_consumer_port) def _post_job_event_queue_msg(self, event, event_data): msg = { @@ -173,7 +173,7 @@ class CallbackModule(object): task = getattr(getattr(self, 'task', None), 'name', '') if task and event not in self.EVENTS_WITHOUT_TASK: event_data['task'] = task - if not settings.CALLBACK_BYPASS_QUEUE: + if self.callback_consumer_port: self._post_job_event_queue_msg(event, event_data) else: self._post_rest_api_event(event, event_data) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 8380ec740e..e83e465561 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -345,9 +345,9 @@ if 'devserver' in INSTALLED_APPS: else: INTERNAL_API_URL = 'http://127.0.0.1:8000' +# ZeroMQ callback settings. CALLBACK_CONSUMER_PORT = "tcp://127.0.0.1:5556" CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver.ipc" -CALLBACK_BYPASS_QUEUE = False # Logging configuration. LOGGING = { diff --git a/awx/settings/development.py b/awx/settings/development.py index a19b0e8a88..5210e6f71c 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -17,6 +17,11 @@ from defaults import * if 'celeryd' in sys.argv: SQL_DEBUG = False +# Use a different callback consumer/queue for development, to avoid a conflict +# if there is also a nightly install running on the development machine. +CALLBACK_CONSUMER_PORT = "tcp://127.0.0.1:5557" +CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver_dev.ipc" + # If any local_*.py files are present in awx/settings/, use them to override # default settings for development. If not present, we can still run using # only the defaults.