Shifting job callbacks to ZeroMQ.

This commit is contained in:
Luke Sneeringer
2014-10-29 13:51:47 -05:00
parent 99381f11fa
commit 8d90650134
3 changed files with 17 additions and 39 deletions

View File

@@ -24,19 +24,17 @@ from django.db import connection
# AWX # AWX
from awx.main.models import * from awx.main.models import *
from awx.main.queue import PubSub
# ZeroMQ
import zmq
MAX_REQUESTS = 10000 MAX_REQUESTS = 10000
WORKERS = 4 WORKERS = 4
class CallbackReceiver(object):
class CallbackReceiver(object):
def __init__(self): def __init__(self):
self.parent_mappings = {} self.parent_mappings = {}
def run_subscriber(self, consumer_port, queue_port, use_workers=True): def run_subscriber(self, use_workers=True):
def shutdown_handler(active_workers): def shutdown_handler(active_workers):
def _handler(signum, frame): def _handler(signum, frame):
for active_worker in active_workers: for active_worker in active_workers:
@@ -67,7 +65,10 @@ class CallbackReceiver(object):
elif settings.DEBUG: elif settings.DEBUG:
print 'Started callback receiver (no workers)' print 'Started callback receiver (no workers)'
main_process = Process(target=self.callback_handler, args=(use_workers, consumer_port, worker_queues,)) main_process = Process(
target=self.callback_handler,
args=(use_workers, worker_queues,),
)
main_process.daemon = True main_process.daemon = True
main_process.start() main_process.start()
@@ -88,16 +89,12 @@ class CallbackReceiver(object):
sys.exit(1) sys.exit(1)
time.sleep(0.1) time.sleep(0.1)
def callback_handler(self, use_workers, consumer_port, worker_queues): def callback_handler(self, use_workers, worker_queues):
message_number = 0 message_number = 0
total_messages = 0 total_messages = 0
last_parent_events = {} last_parent_events = {}
self.consumer_context = zmq.Context()
self.consumer_subscriber = self.consumer_context.socket(zmq.REP)
self.consumer_subscriber.bind(consumer_port)
while True: # Handle signal for message in pubsub.subscribe('callbacks'):
message = self.consumer_subscriber.recv_json()
total_messages += 1 total_messages += 1
if not use_workers: if not use_workers:
self.process_job_event(message) self.process_job_event(message)
@@ -232,14 +229,9 @@ class Command(NoArgsCommand):
Save Job Callback receiver (see awx.plugins.callbacks.job_event_callback) Save Job Callback receiver (see awx.plugins.callbacks.job_event_callback)
Runs as a management command and receives job save events. It then hands Runs as a management command and receives job save events. It then hands
them off to worker processors (see Worker) which writes them to the database them off to worker processors (see Worker) which writes them to the database
''' '''
help = 'Launch the job callback receiver' help = 'Launch the job callback receiver'
option_list = NoArgsCommand.option_list + (
make_option('--port', dest='port', type='int', default=5556,
help='Port to listen for requests on'),)
def init_logging(self): def init_logging(self):
log_levels = dict(enumerate([logging.ERROR, logging.INFO, log_levels = dict(enumerate([logging.ERROR, logging.INFO,
logging.DEBUG, 0])) logging.DEBUG, 0]))
@@ -253,11 +245,9 @@ class Command(NoArgsCommand):
def handle_noargs(self, **options): def handle_noargs(self, **options):
self.verbosity = int(options.get('verbosity', 1)) self.verbosity = int(options.get('verbosity', 1))
self.init_logging() self.init_logging()
consumer_port = settings.CALLBACK_CONSUMER_PORT
queue_port = settings.CALLBACK_QUEUE_PORT
cr = CallbackReceiver() cr = CallbackReceiver()
try: try:
cr.run_subscriber(consumer_port, queue_port) cr.run_subscriber()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass

View File

@@ -76,7 +76,7 @@ class PubSub(object):
from contextmanager import closing from contextmanager import closing
with closing(PubSub('foobar')) as foobar: with closing(PubSub('foobar')) as foobar:
for message in foobar.listen(wait=0.1): for message in foobar.subscribe(wait=0.1):
<deal with message> <deal with message>
""" """
self._queue_name = queue_name self._queue_name = queue_name

View File

@@ -42,9 +42,8 @@ import time
# Requests # Requests
import requests import requests
# ZeroMQ # Tower
import zmq from awx.main.queue import PubSub
class TokenAuth(requests.auth.AuthBase): class TokenAuth(requests.auth.AuthBase):
@@ -80,9 +79,6 @@ class CallbackModule(object):
self.job_id = int(os.getenv('JOB_ID')) self.job_id = int(os.getenv('JOB_ID'))
self.base_url = os.getenv('REST_API_URL', '') self.base_url = os.getenv('REST_API_URL', '')
self.auth_token = os.getenv('REST_API_TOKEN', '') self.auth_token = os.getenv('REST_API_TOKEN', '')
self.callback_consumer_port = os.getenv('CALLBACK_CONSUMER_PORT', '')
self.context = None
self.socket = None
self._init_logging() self._init_logging()
self._init_connection() self._init_connection()
self.counter = 0 self.counter = 0
@@ -109,11 +105,6 @@ class CallbackModule(object):
self.context = None self.context = None
self.socket = None self.socket = None
def _start_connection(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect(self.callback_consumer_port)
def _post_job_event_queue_msg(self, event, event_data): def _post_job_event_queue_msg(self, event, event_data):
self.counter += 1 self.counter += 1
msg = { msg = {
@@ -132,13 +123,10 @@ class CallbackModule(object):
try: try:
if not hasattr(self, 'connection_pid'): if not hasattr(self, 'connection_pid'):
self.connection_pid = active_pid self.connection_pid = active_pid
if self.connection_pid != active_pid:
self._init_connection()
if self.context is None:
self._start_connection()
self.socket.send_json(msg) # Publish the callback through Redis.
self.socket.recv() pubsub = PubSub('callbacks')
pubsub.publish(msg)
return return
except Exception, e: except Exception, e:
self.logger.info('Publish Exception: %r, retry=%d', e, self.logger.info('Publish Exception: %r, retry=%d', e,