mirror of
https://github.com/ansible/awx.git
synced 2026-02-16 18:50:04 -03:30
Add option to use callback queue for job events.
This commit is contained in:
@@ -1,9 +1,19 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
# Python
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
|
||||
# Kombu
|
||||
from kombu import Connection, Exchange, Producer
|
||||
|
||||
__all__ = ['FifoQueue', 'CallbackQueueDispatcher']
|
||||
|
||||
__all__ = ['FifoQueue']
|
||||
|
||||
# TODO: Figure out wtf to do with this class
|
||||
class FifoQueue(object):
|
||||
@@ -33,3 +43,39 @@ class FifoQueue(object):
|
||||
answer = None
|
||||
if answer:
|
||||
return json.loads(answer)
|
||||
|
||||
|
||||
class CallbackQueueDispatcher(object):
|
||||
|
||||
def __init__(self):
|
||||
self.callback_connection = getattr(settings, 'CALLBACK_CONNECTION', None)
|
||||
self.connection_queue = getattr(settings, 'CALLBACK_QUEUE', '')
|
||||
self.connection = None
|
||||
self.exchange = None
|
||||
self.logger = logging.getLogger('awx.main.queue.CallbackQueueDispatcher')
|
||||
|
||||
def dispatch(self, obj):
|
||||
if not self.callback_connection or not self.connection_queue:
|
||||
return
|
||||
active_pid = os.getpid()
|
||||
for retry_count in xrange(4):
|
||||
try:
|
||||
if not hasattr(self, 'connection_pid'):
|
||||
self.connection_pid = active_pid
|
||||
if self.connection_pid != active_pid:
|
||||
self.connection = None
|
||||
if self.connection is None:
|
||||
self.connection = Connection(self.callback_connection)
|
||||
self.exchange = Exchange(self.connection_queue, type='direct')
|
||||
|
||||
producer = Producer(self.connection)
|
||||
producer.publish(obj,
|
||||
serializer='json',
|
||||
compression='bzip2',
|
||||
exchange=self.exchange,
|
||||
declare=[self.exchange],
|
||||
routing_key=self.connection_queue)
|
||||
return
|
||||
except Exception, e:
|
||||
self.logger.info('Publish Job Event Exception: %r, retry=%d', e,
|
||||
retry_count, exc_info=True)
|
||||
|
||||
Reference in New Issue
Block a user