Some cleanup and documentation for zeromq implementation

This commit is contained in:
Matthew Jones
2014-02-17 11:11:48 -05:00
parent 770947d18d
commit 2c694e5e07
3 changed files with 18 additions and 103 deletions

View File

@@ -24,6 +24,9 @@ from awx.main.models import *
import zmq import zmq
class Worker(Process): class Worker(Process):
'''
Process to validate and store save job events received via zeromq
'''
def run(self): def run(self):
print("Starting worker") print("Starting worker")
@@ -36,7 +39,6 @@ class Worker(Process):
@transaction.commit_on_success @transaction.commit_on_success
def process_job_event(self, data): def process_job_event(self, data):
print("Received data: %s" % str(data))
event = data.get('event', '') event = data.get('event', '')
if not event or 'job_id' not in data: if not event or 'job_id' not in data:
return return
@@ -58,20 +60,15 @@ class Worker(Process):
try: try:
if event == 'playbook_on_stats': if event == 'playbook_on_stats':
transaction.commit() transaction.commit()
if not JobEvent.objects.filter(**data).exists(): job_event = JobEvent(**data)
job_event = JobEvent(**data) job_event.save(post_process=True)
job_event.save(post_process=True) if not event.startswith('runner_'):
if not event.startswith('runner_'): transaction.commit()
transaction.commit()
else:
duplicate = True
if settings.DEBUG:
print 'skipping duplicate job event %r' % data
break break
except DatabaseError as e: except DatabaseError as e:
transaction.rollback() transaction.rollback()
# logger.debug('Database error saving job event, retrying in ' logger.debug('Database error saving job event, retrying in '
# '1 second (retry #%d): %s', retry_count + 1, e) '1 second (retry #%d): %s', retry_count + 1, e)
time.sleep(1) time.sleep(1)
else: else:
logger.error('Failed to save job event after %d retries.', logger.error('Failed to save job event after %d retries.',
@@ -80,9 +77,11 @@ class Worker(Process):
class Command(NoArgsCommand): class Command(NoArgsCommand):
''' '''
Management command to run the job callback receiver Save Job Callback receiver (see awx.plugins.callbacks.job_event_callback)
Runs as a management command and receives job save events. It then hands
them off to worker processors (see Worker) which writes them to the database
''' '''
help = 'Launch the job callback receiver' help = 'Launch the job callback receiver'
option_list = NoArgsCommand.option_list + ( option_list = NoArgsCommand.option_list + (
@@ -124,4 +123,4 @@ class Command(NoArgsCommand):
def handle_noargs(self, **options): def handle_noargs(self, **options):
self.verbosity = int(options.get('verbosity', 1)) self.verbosity = int(options.get('verbosity', 1))
self.init_logging() self.init_logging()
self.run_subscriber() self.run_subscriber(port=options.get('port'))

View File

@@ -533,91 +533,6 @@ class RunJob(BaseTask):
for job_event in job.job_events.order_by('pk'): for job_event in job.job_events.order_by('pk'):
job_event.save(post_process=True) job_event.save(post_process=True)
class SaveJobEvents(Task):
name = 'awx.main.tasks.save_job_events'
def process_job_event(self, data, message, events_received=None):
if events_received is None:
events_received = {}
begints = time.time()
event = data.get('event', '')
if not event or 'job_id' not in data:
return
try:
if not isinstance(data['created'], datetime.datetime):
data['created'] = parse_datetime(data['created'])
if not data['created'].tzinfo:
data['created'] = data['created'].replace(tzinfo=FixedOffset(0))
except (KeyError, ValueError):
data.pop('created', None)
if settings.DEBUG:
print data
for key in data.keys():
if key not in ('job_id', 'event', 'event_data', 'created'):
data.pop(key)
data['play'] = data.get('event_data', {}).get('play', '').strip()
data['task'] = data.get('event_data', {}).get('task', '').strip()
duplicate = False
if event != '__complete__':
for retry_count in xrange(11):
try:
# Commit any outstanding events before saving stats.
if event == 'playbook_on_stats':
transaction.commit()
if not JobEvent.objects.filter(**data).exists():
job_event = JobEvent(**data)
job_event.save(post_process=True)
if not event.startswith('runner_'):
transaction.commit()
else:
duplicate = True
if settings.DEBUG:
print 'skipping duplicate job event %r' % data
break
except DatabaseError as e:
transaction.rollback()
logger.debug('Database error saving job event, retrying in '
'1 second (retry #%d): %s', retry_count + 1, e)
time.sleep(1)
else:
logger.error('Failed to save job event after %d retries.',
retry_count)
if not duplicate:
if event not in events_received:
events_received[event] = 1
else:
events_received[event] += 1
if settings.DEBUG:
print 'saved job event in %0.3fs' % (time.time() - begints)
message.ack()
@transaction.commit_on_success
def run(self, *args, **kwargs):
job_id = kwargs.get('job_id', None)
if not job_id:
return {}
events_received = {}
process_job_event = functools.partial(self.process_job_event,
events_received=events_received)
job_events_exchange = Exchange('job_events', 'direct', durable=True)
job_events_queue = Queue('job_events[%d]' % job_id,
exchange=job_events_exchange,
routing_key=('job_events[%d]' % job_id),
auto_delete=True)
with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn:
with conn.Consumer(job_events_queue, callbacks=[process_job_event]) as consumer:
while '__complete__' not in events_received:
conn.drain_events()
return {
'job_id': job_id,
'total_events': sum(events_received.values())}
class RunProjectUpdate(BaseTask): class RunProjectUpdate(BaseTask):
name = 'awx.main.tasks.run_project_update' name = 'awx.main.tasks.run_project_update'

View File

@@ -117,15 +117,16 @@ class CallbackModule(object):
'event_data': event_data, 'event_data': event_data,
'created': datetime.datetime.utcnow().isoformat(), 'created': datetime.datetime.utcnow().isoformat(),
} }
active_pid = os.getpid()
if self.job_callback_debug: if self.job_callback_debug:
msg.update({ msg.update({
'pid': os.getpid(), 'pid': active_pid,
}) })
for retry_count in xrange(4): for retry_count in xrange(4):
try: try:
if not hasattr(self, 'connection_pid'): if not hasattr(self, 'connection_pid'):
self.connection_pid = os.getpid() self.connection_pid = active_pid
if self.connection_pid != os.getpid(): if self.connection_pid != active_pid:
self._init_connection() self._init_connection()
if self.context is None: if self.context is None:
self._start_connection() self._start_connection()