From 31faca2b4f3de37502c1025200cb2b9e6404e61b Mon Sep 17 00:00:00 2001 From: Chris Church Date: Fri, 28 Oct 2016 22:32:49 -0400 Subject: [PATCH] Add option to use callback queue for job events. --- awx/main/queue.py | 48 ++++++++++++++++++++++++++++++++++++++++++++++- awx/main/tasks.py | 27 ++++++++++++++++++++------ 2 files changed, 68 insertions(+), 7 deletions(-) diff --git a/awx/main/queue.py b/awx/main/queue.py index b0b8d0374e..bfb487441f 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -1,9 +1,19 @@ # Copyright (c) 2015 Ansible, Inc. # All Rights Reserved. +# Python import json +import logging +import os + +# Django +from django.conf import settings + +# Kombu +from kombu import Connection, Exchange, Producer + +__all__ = ['FifoQueue', 'CallbackQueueDispatcher'] -__all__ = ['FifoQueue'] # TODO: Figure out wtf to do with this class class FifoQueue(object): @@ -33,3 +43,39 @@ class FifoQueue(object): answer = None if answer: return json.loads(answer) + + +class CallbackQueueDispatcher(object): + + def __init__(self): + self.callback_connection = getattr(settings, 'CALLBACK_CONNECTION', None) + self.connection_queue = getattr(settings, 'CALLBACK_QUEUE', '') + self.connection = None + self.exchange = None + self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher') + + def dispatch(self, obj): + if not self.callback_connection or not self.connection_queue: + return + active_pid = os.getpid() + for retry_count in xrange(4): + try: + if not hasattr(self, 'connection_pid'): + self.connection_pid = active_pid + if self.connection_pid != active_pid: + self.connection = None + if self.connection is None: + self.connection = Connection(self.callback_connection) + self.exchange = Exchange(self.connection_queue, type='direct') + + producer = Producer(self.connection) + producer.publish(obj, + serializer='json', + compression='bzip2', + exchange=self.exchange, + declare=[self.exchange], + routing_key=self.connection_queue) + return + except Exception, e: + self.logger.info('Publish Job Event Exception: %r, retry=%d', e, + retry_count, exc_info=True) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index cd785ef96a..35858577a7 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -47,6 +47,7 @@ from django.contrib.auth.models import User from awx.main.constants import CLOUD_PROVIDERS from awx.main.models import * # noqa from awx.main.models import UnifiedJob +from awx.main.queue import CallbackQueueDispatcher from awx.main.task_engine import TaskEnhancer from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, check_proot_installed, build_proot_temp_dir, wrap_args_with_proot, @@ -991,10 +992,17 @@ class RunJob(BaseTask): Wrap stdout file object to capture events. ''' stdout_handle = super(RunJob, self).get_stdout_handle(instance) + + if getattr(settings, 'USE_CALLBACK_QUEUE', False): + dispatcher = CallbackQueueDispatcher() - def job_event_callback(event_data): - event_data.setdefault('job_id', instance.id) - JobEvent.create_from_data(**event_data) + def job_event_callback(event_data): + event_data.setdefault('job_id', instance.id) + dispatcher.dispatch(event_data) + else: + def job_event_callback(event_data): + event_data.setdefault('job_id', instance.id) + JobEvent.create_from_data(**event_data) return OutputEventFilter(stdout_handle, job_event_callback) @@ -1719,9 +1727,16 @@ class RunAdHocCommand(BaseTask): ''' stdout_handle = super(RunAdHocCommand, self).get_stdout_handle(instance) - def ad_hoc_command_event_callback(event_data): - event_data.setdefault('ad_hoc_command_id', instance.id) - AdHocCommandEvent.create_from_data(**event_data) + if getattr(settings, 'USE_CALLBACK_QUEUE', False): + dispatcher = CallbackQueueDispatcher() + + def ad_hoc_command_event_callback(event_data): + event_data.setdefault('ad_hoc_command_id', instance.id) + dispatcher.dispatch(event_data) + else: + def ad_hoc_command_event_callback(event_data): + event_data.setdefault('ad_hoc_command_id', instance.id) + AdHocCommandEvent.create_from_data(**event_data) return OutputEventFilter(stdout_handle, ad_hoc_command_event_callback)