Cleanup and refactor some parts of thew new zeromq based callback receiver

This commit is contained in:
Matthew Jones
2014-02-18 13:54:28 -05:00
parent 2c694e5e07
commit f6870634c4
3 changed files with 80 additions and 67 deletions

View File

@@ -11,7 +11,7 @@ from multiprocessing import Process
# Django # Django
from django.conf import settings from django.conf import settings
from django.core.management.base import NoArgsCommand, CommandError from django.core.management.base import NoArgsCommand, CommandError
from django.db import transaction from django.db import transaction, DatabaseError
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.utils.dateparse import parse_datetime from django.utils.dateparse import parse_datetime
from django.utils.timezone import now, is_aware, make_aware from django.utils.timezone import now, is_aware, make_aware
@@ -23,57 +23,85 @@ from awx.main.models import *
# ZeroMQ # ZeroMQ
import zmq import zmq
def run_subscriber(consumer_port, queue_port, use_workers=True):
consumer_context = zmq.Context()
consumer_subscriber = consumer_context.socket(zmq.PULL)
consumer_subscriber.bind(consumer_port)
queue_context = zmq.Context()
queue_publisher = queue_context.socket(zmq.PUSH)
queue_publisher.bind(queue_port)
if use_workers:
workers = []
for idx in range(4):
w = Worker(queue_port)
w.start()
workers.append(w)
while True: # Handle signal
message = consumer_subscriber.recv_json()
if use_workers:
queue_publisher.send_json(message)
else:
process_job_event(message)
@transaction.commit_on_success
def process_job_event(data):
event = data.get('event', '')
if not event or 'job_id' not in data:
return
try:
if not isinstance(data['created'], datetime.datetime):
data['created'] = parse_datetime(data['created'])
if not data['created'].tzinfo:
data['created'] = data['created'].replace(tzinfo=FixedOffset(0))
except (KeyError, ValueError):
data.pop('created', None)
if settings.DEBUG:
print data
for key in data.keys():
if key not in ('job_id', 'event', 'event_data', 'created'):
data.pop(key)
data['play'] = data.get('event_data', {}).get('play', '').strip()
data['task'] = data.get('event_data', {}).get('task', '').strip()
for retry_count in xrange(11):
try:
if event == 'playbook_on_stats':
transaction.commit()
job_event = JobEvent(**data)
job_event.save(post_process=True)
if not event.startswith('runner_'):
transaction.commit()
break
except DatabaseError as e:
transaction.rollback()
logger.debug('Database error saving job event, retrying in '
'1 second (retry #%d): %s', retry_count + 1, e)
time.sleep(1)
else:
logger.error('Failed to save job event after %d retries.',
retry_count)
class Worker(Process): class Worker(Process):
''' '''
Process to validate and store save job events received via zeromq Process to validate and store save job events received via zeromq
''' '''
def __init__(self, port):
self.port = port
def run(self): def run(self):
print("Starting worker") print("Starting worker")
pool_context = zmq.Context() pool_context = zmq.Context()
pool_subscriber = pool_context.socket(zmq.PULL) pool_subscriber = pool_context.socket(zmq.PULL)
pool_subscriber.connect("ipc:///tmp/callback_receiver.ipc") pool_subscriber.connect(self.port)
while True: while True:
message = pool_subscriber.recv_json() message = pool_subscriber.recv_json()
self.process_job_event(message) process_job_event(message)
@transaction.commit_on_success
def process_job_event(self, data):
event = data.get('event', '')
if not event or 'job_id' not in data:
return
try:
if not isinstance(data['created'], datetime.datetime):
data['created'] = parse_datetime(data['created'])
if not data['created'].tzinfo:
data['created'] = data['created'].replace(tzinfo=FixedOffset(0))
except (KeyError, ValueError):
data.pop('created', None)
if settings.DEBUG:
print data
for key in data.keys():
if key not in ('job_id', 'event', 'event_data', 'created'):
data.pop(key)
data['play'] = data.get('event_data', {}).get('play', '').strip()
data['task'] = data.get('event_data', {}).get('task', '').strip()
for retry_count in xrange(11):
try:
if event == 'playbook_on_stats':
transaction.commit()
job_event = JobEvent(**data)
job_event.save(post_process=True)
if not event.startswith('runner_'):
transaction.commit()
break
except DatabaseError as e:
transaction.rollback()
logger.debug('Database error saving job event, retrying in '
'1 second (retry #%d): %s', retry_count + 1, e)
time.sleep(1)
else:
logger.error('Failed to save job event after %d retries.',
retry_count)
class Command(NoArgsCommand): class Command(NoArgsCommand):
''' '''
@@ -98,29 +126,9 @@ class Command(NoArgsCommand):
self.logger.addHandler(handler) self.logger.addHandler(handler)
self.logger.propagate = False self.logger.propagate = False
def run_subscriber(self, port=5556):
consumer_context = zmq.Context()
consumer_subscriber = consumer_context.socket(zmq.PULL)
consumer_subscriber.bind("tcp://127.0.0.1:%s" % str(port))
print("Consumer Listening on tcp://127.0.0.1:%s" % str(port))
queue_context = zmq.Context()
queue_publisher = queue_context.socket(zmq.PUSH)
queue_publisher.bind("ipc:///tmp/callback_receiver.ipc")
print("Publisher Listening on ipc: /tmp/callback_receiver.ip")
workers = []
for idx in range(4):
w = Worker()
w.start()
workers.append(w)
while True: # Handle signal
message = consumer_subscriber.recv_json()
queue_publisher.send_json(message)
def handle_noargs(self, **options): def handle_noargs(self, **options):
self.verbosity = int(options.get('verbosity', 1)) self.verbosity = int(options.get('verbosity', 1))
self.init_logging() self.init_logging()
self.run_subscriber(port=options.get('port')) consumer_port = settings.CALLBACK_CONSUMER_PORT
queue_port = settings.CALLBACK_QUEUE_PORT
run_subscriber(consumer_port, queue_port)

View File

@@ -40,6 +40,9 @@ import urlparse
import requests import requests
# Django
from django.conf import settings
# ZeroMQ # ZeroMQ
import zmq import zmq
@@ -79,7 +82,6 @@ class CallbackModule(object):
self.auth_token = os.getenv('REST_API_TOKEN', '') self.auth_token = os.getenv('REST_API_TOKEN', '')
self.context = None self.context = None
self.socket = None self.socket = None
self.broker_url = True # TODO: Figure this out for unit tests
self._init_logging() self._init_logging()
self._init_connection() self._init_connection()
@@ -132,7 +134,6 @@ class CallbackModule(object):
self._start_connection() self._start_connection()
self.socket.send_json(msg) self.socket.send_json(msg)
self.logger.debug('Publish: %r, retry=%d', msg, retry_count)
return return
except Exception, e: except Exception, e:
self.logger.info('Publish Exception: %r, retry=%d', e, self.logger.info('Publish Exception: %r, retry=%d', e,
@@ -170,7 +171,7 @@ class CallbackModule(object):
task = getattr(getattr(self, 'task', None), 'name', '') task = getattr(getattr(self, 'task', None), 'name', '')
if task and event not in self.EVENTS_WITHOUT_TASK: if task and event not in self.EVENTS_WITHOUT_TASK:
event_data['task'] = task event_data['task'] = task
if self.broker_url: if not settings.CALLBACK_BYPASS_QUEUE:
self._post_job_event_queue_msg(event, event_data) self._post_job_event_queue_msg(event, event_data)
else: else:
self._post_rest_api_event(event, event_data) self._post_rest_api_event(event, event_data)

View File

@@ -345,6 +345,10 @@ if 'devserver' in INSTALLED_APPS:
else: else:
INTERNAL_API_URL = 'http://127.0.0.1:8000' INTERNAL_API_URL = 'http://127.0.0.1:8000'
CALLBACK_CONSUMER_PORT = "tcp://127.0.0.1:5556"
CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver.ipc"
CALLBACK_BYPASS_QUEUE = False
# Logging configuration. # Logging configuration.
LOGGING = { LOGGING = {
'version': 1, 'version': 1,