mirror of
https://github.com/ansible/awx.git
synced 2026-05-17 22:37:41 -02:30
Initial 0mq implementation
This commit is contained in:
105
awx/main/management/commands/run_callback_receiver.py
Normal file
105
awx/main/management/commands/run_callback_receiver.py
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
# Copyright (c) 2014 AnsibleWorks, Inc.
|
||||||
|
# All Rights Reserved.
|
||||||
|
|
||||||
|
# Python
|
||||||
|
import datetime
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
from optparse import make_option
|
||||||
|
|
||||||
|
# Django
|
||||||
|
from django.conf import settings
|
||||||
|
from django.core.management.base import NoArgsCommand, CommandError
|
||||||
|
from django.db import transaction
|
||||||
|
from django.contrib.auth.models import User
|
||||||
|
from django.utils.dateparse import parse_datetime
|
||||||
|
from django.utils.timezone import now, is_aware, make_aware
|
||||||
|
from django.utils.tzinfo import FixedOffset
|
||||||
|
|
||||||
|
# AWX
|
||||||
|
from awx.main.models import *
|
||||||
|
|
||||||
|
# ZeroMQ
|
||||||
|
import zmq
|
||||||
|
|
||||||
|
class Command(NoArgsCommand):
|
||||||
|
'''
|
||||||
|
Management command to run the job callback receiver
|
||||||
|
'''
|
||||||
|
|
||||||
|
help = 'Launch the job callback receiver'
|
||||||
|
|
||||||
|
option_list = NoArgsCommand.option_list + (
|
||||||
|
make_option('--port', dest='port', type='int', default=5556,
|
||||||
|
help='Port to listen for requests on'),)
|
||||||
|
|
||||||
|
def init_logging(self):
|
||||||
|
log_levels = dict(enumerate([logging.ERROR, logging.INFO,
|
||||||
|
logging.DEBUG, 0]))
|
||||||
|
self.logger = logging.getLogger('awx.main.commands.run_callback_receiver')
|
||||||
|
self.logger.setLevel(log_levels.get(self.verbosity, 0))
|
||||||
|
handler = logging.StreamHandler()
|
||||||
|
handler.setFormatter(logging.Formatter('%(message)s'))
|
||||||
|
self.logger.addHandler(handler)
|
||||||
|
self.logger.propagate = False
|
||||||
|
|
||||||
|
@transaction.commit_on_success
|
||||||
|
def process_job_event(self, data):
|
||||||
|
print("Received data: %s" % str(data))
|
||||||
|
event = data.get('event', '')
|
||||||
|
if not event or 'job_id' not in data:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
if not isinstance(data['created'], datetime.datetime):
|
||||||
|
data['created'] = parse_datetime(data['created'])
|
||||||
|
if not data['created'].tzinfo:
|
||||||
|
data['created'] = data['created'].replace(tzinfo=FixedOffset(0))
|
||||||
|
except (KeyError, ValueError):
|
||||||
|
data.pop('created', None)
|
||||||
|
if settings.DEBUG:
|
||||||
|
print data
|
||||||
|
for key in data.keys():
|
||||||
|
if key not in ('job_id', 'event', 'event_data', 'created'):
|
||||||
|
data.pop(key)
|
||||||
|
data['play'] = data.get('event_data', {}).get('play', '').strip()
|
||||||
|
data['task'] = data.get('event_data', {}).get('task', '').strip()
|
||||||
|
for retry_count in xrange(11):
|
||||||
|
try:
|
||||||
|
if event == 'playbook_on_stats':
|
||||||
|
transaction.commit()
|
||||||
|
if not JobEvent.objects.filter(**data).exists():
|
||||||
|
job_event = JobEvent(**data)
|
||||||
|
job_event.save(post_process=True)
|
||||||
|
if not event.startswith('runner_'):
|
||||||
|
transaction.commit()
|
||||||
|
else:
|
||||||
|
duplicate = True
|
||||||
|
if settings.DEBUG:
|
||||||
|
print 'skipping duplicate job event %r' % data
|
||||||
|
break
|
||||||
|
except DatabaseError as e:
|
||||||
|
transaction.rollback()
|
||||||
|
logger.debug('Database error saving job event, retrying in '
|
||||||
|
'1 second (retry #%d): %s', retry_count + 1, e)
|
||||||
|
time.sleep(1)
|
||||||
|
else:
|
||||||
|
logger.error('Failed to save job event after %d retries.',
|
||||||
|
retry_count)
|
||||||
|
|
||||||
|
def run_subscriber(self, port=5556):
|
||||||
|
print("Starting ZMQ Context")
|
||||||
|
context = zmq.Context()
|
||||||
|
subscriber = context.socket(zmq.REP)
|
||||||
|
print("Starting connection")
|
||||||
|
subscriber.bind("tcp://127.0.0.1:%s" % str(port))
|
||||||
|
print("Listening on tcp://127.0.0.1:%s" % str(port))
|
||||||
|
while True: # Handle signal
|
||||||
|
message = subscriber.recv()
|
||||||
|
subscriber.send("1")
|
||||||
|
data = json.loads(message)
|
||||||
|
self.process_job_event(data)
|
||||||
|
|
||||||
|
def handle_noargs(self, **options):
|
||||||
|
self.verbosity = int(options.get('verbosity', 1))
|
||||||
|
self.init_logging()
|
||||||
|
self.run_subscriber()
|
||||||
@@ -494,11 +494,11 @@ class RunJob(BaseTask):
|
|||||||
elif job.status in ('pending', 'waiting'):
|
elif job.status in ('pending', 'waiting'):
|
||||||
job = self.update_model(job.pk, status='pending')
|
job = self.update_model(job.pk, status='pending')
|
||||||
# Start another task to process job events.
|
# Start another task to process job events.
|
||||||
if settings.BROKER_URL.startswith('amqp://'):
|
# if settings.BROKER_URL.startswith('amqp://'):
|
||||||
app = Celery('tasks', broker=settings.BROKER_URL)
|
# app = Celery('tasks', broker=settings.BROKER_URL)
|
||||||
send_task('awx.main.tasks.save_job_events', kwargs={
|
# send_task('awx.main.tasks.save_job_events', kwargs={
|
||||||
'job_id': job.id,
|
# 'job_id': job.id,
|
||||||
}, serializer='json')
|
# }, serializer='json')
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
@@ -511,20 +511,21 @@ class RunJob(BaseTask):
|
|||||||
# Send a special message to this job's event queue after the job has run
|
# Send a special message to this job's event queue after the job has run
|
||||||
# to tell the save job events task to end.
|
# to tell the save job events task to end.
|
||||||
if settings.BROKER_URL.startswith('amqp://'):
|
if settings.BROKER_URL.startswith('amqp://'):
|
||||||
job_events_exchange = Exchange('job_events', 'direct', durable=True)
|
pass
|
||||||
job_events_queue = Queue('job_events[%d]' % job.id,
|
# job_events_exchange = Exchange('job_events', 'direct', durable=True)
|
||||||
exchange=job_events_exchange,
|
# job_events_queue = Queue('job_events[%d]' % job.id,
|
||||||
routing_key=('job_events[%d]' % job.id),
|
# exchange=job_events_exchange,
|
||||||
auto_delete=True)
|
# routing_key=('job_events[%d]' % job.id),
|
||||||
with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn:
|
# auto_delete=True)
|
||||||
with conn.Producer(serializer='json') as producer:
|
# with Connection(settings.BROKER_URL, transport_options={'confirm_publish': True}) as conn:
|
||||||
msg = {
|
# with conn.Producer(serializer='json') as producer:
|
||||||
'job_id': job.id,
|
# msg = {
|
||||||
'event': '__complete__'
|
# 'job_id': job.id,
|
||||||
}
|
# 'event': '__complete__'
|
||||||
producer.publish(msg, exchange=job_events_exchange,
|
# }
|
||||||
routing_key=('job_events[%d]' % job.id),
|
# producer.publish(msg, exchange=job_events_exchange,
|
||||||
declare=[job_events_queue])
|
# routing_key=('job_events[%d]' % job.id),
|
||||||
|
# declare=[job_events_queue])
|
||||||
|
|
||||||
# Update job event fields after job has completed (only when using REST
|
# Update job event fields after job has completed (only when using REST
|
||||||
# API callback).
|
# API callback).
|
||||||
|
|||||||
@@ -38,26 +38,10 @@ import sys
|
|||||||
import urllib
|
import urllib
|
||||||
import urlparse
|
import urlparse
|
||||||
|
|
||||||
# Requests / Kombu
|
import requests
|
||||||
try:
|
|
||||||
import requests
|
|
||||||
from kombu import Connection, Exchange, Queue
|
|
||||||
except ImportError:
|
|
||||||
# If running from an AWX installation, use the local version of requests if
|
|
||||||
# if cannot be found globally.
|
|
||||||
local_site_packages = os.path.join(os.path.dirname(__file__), '..', '..',
|
|
||||||
'lib', 'site-packages')
|
|
||||||
sys.path.insert(0, local_site_packages)
|
|
||||||
import requests
|
|
||||||
from kombu import Connection, Exchange, Queue
|
|
||||||
|
|
||||||
# Check to see if librabbitmq is installed.
|
|
||||||
try:
|
|
||||||
import librabbitmq
|
|
||||||
LIBRABBITMQ_INSTALLED = True
|
|
||||||
except ImportError:
|
|
||||||
LIBRABBITMQ_INSTALLED = False
|
|
||||||
|
|
||||||
|
# ZeroMQ
|
||||||
|
import zmq
|
||||||
|
|
||||||
class TokenAuth(requests.auth.AuthBase):
|
class TokenAuth(requests.auth.AuthBase):
|
||||||
|
|
||||||
@@ -93,14 +77,11 @@ class CallbackModule(object):
|
|||||||
self.job_id = int(os.getenv('JOB_ID'))
|
self.job_id = int(os.getenv('JOB_ID'))
|
||||||
self.base_url = os.getenv('REST_API_URL', '')
|
self.base_url = os.getenv('REST_API_URL', '')
|
||||||
self.auth_token = os.getenv('REST_API_TOKEN', '')
|
self.auth_token = os.getenv('REST_API_TOKEN', '')
|
||||||
self.broker_url = os.getenv('BROKER_URL', '')
|
self.context = None
|
||||||
|
self.socket = None
|
||||||
|
self.broker_url = True # TODO: Figure this out for unit tests
|
||||||
self._init_logging()
|
self._init_logging()
|
||||||
# Since we don't yet have a way to confirm publish when using
|
self._init_connection()
|
||||||
# librabbitmq, ensure we use pyamqp even if librabbitmq happens to be
|
|
||||||
# installed.
|
|
||||||
if LIBRABBITMQ_INSTALLED:
|
|
||||||
self.logger.info('Forcing use of pyamqp instead of librabbitmq')
|
|
||||||
self.broker_url = self.broker_url.replace('amqp://', 'pyamqp://')
|
|
||||||
|
|
||||||
def _init_logging(self):
|
def _init_logging(self):
|
||||||
try:
|
try:
|
||||||
@@ -120,37 +101,12 @@ class CallbackModule(object):
|
|||||||
self.logger.addHandler(handler)
|
self.logger.addHandler(handler)
|
||||||
self.logger.propagate = False
|
self.logger.propagate = False
|
||||||
|
|
||||||
def __del__(self):
|
def _init_connection(self):
|
||||||
self._cleanup_connection()
|
self.context = zmq.Context()
|
||||||
|
self.socket = self.context.socket(zmq.REQ)
|
||||||
def _publish_errback(self, exc, interval):
|
self.socket.connect("tcp://127.0.0.1:5556")
|
||||||
self.logger.info('Publish Error: %r', exc)
|
|
||||||
|
|
||||||
def _cleanup_connection(self):
|
|
||||||
if hasattr(self, 'producer'):
|
|
||||||
try:
|
|
||||||
#self.logger.debug('Cleanup Producer: %r', self.producer)
|
|
||||||
self.producer.cancel()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
del self.producer
|
|
||||||
if hasattr(self, 'connection'):
|
|
||||||
try:
|
|
||||||
#self.logger.debug('Cleanup Connection: %r', self.connection)
|
|
||||||
self.connection.release()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
del self.connection
|
|
||||||
|
|
||||||
def _post_job_event_queue_msg(self, event, event_data):
|
def _post_job_event_queue_msg(self, event, event_data):
|
||||||
if not hasattr(self, 'job_events_exchange'):
|
|
||||||
self.job_events_exchange = Exchange('job_events', 'direct',
|
|
||||||
durable=True)
|
|
||||||
if not hasattr(self, 'job_events_queue'):
|
|
||||||
self.job_events_queue = Queue('job_events[%d]' % self.job_id,
|
|
||||||
exchange=self.job_events_exchange,
|
|
||||||
routing_key=('job_events[%d]' % self.job_id),
|
|
||||||
auto_delete=True)
|
|
||||||
msg = {
|
msg = {
|
||||||
'job_id': self.job_id,
|
'job_id': self.job_id,
|
||||||
'event': event,
|
'event': event,
|
||||||
@@ -163,33 +119,15 @@ class CallbackModule(object):
|
|||||||
})
|
})
|
||||||
for retry_count in xrange(4):
|
for retry_count in xrange(4):
|
||||||
try:
|
try:
|
||||||
if not hasattr(self, 'connection_pid'):
|
self.socket.send(json.dumps(msg))
|
||||||
self.connection_pid = os.getpid()
|
|
||||||
if self.connection_pid != os.getpid():
|
|
||||||
self._cleanup_connection()
|
|
||||||
if not hasattr(self, 'connection'):
|
|
||||||
self.connection = Connection(self.broker_url, transport_options={'confirm_publish': True})
|
|
||||||
self.logger.debug('New Connection: %r, retry=%d',
|
|
||||||
self.connection, retry_count)
|
|
||||||
if not hasattr(self, 'producer'):
|
|
||||||
channel = self.connection.channel()
|
|
||||||
self.producer = self.connection.Producer(channel, exchange=self.job_events_exchange, serializer='json')
|
|
||||||
self.publish = self.connection.ensure(self.producer, self.producer.publish,
|
|
||||||
errback=self._publish_errback,
|
|
||||||
max_retries=3, interval_start=1, interval_step=1, interval_max=10)
|
|
||||||
self.logger.debug('New Producer: %r, retry=%d',
|
|
||||||
self.producer, retry_count)
|
|
||||||
self.logger.debug('Publish: %r, retry=%d', msg, retry_count)
|
self.logger.debug('Publish: %r, retry=%d', msg, retry_count)
|
||||||
self.publish(msg, exchange=self.job_events_exchange,
|
reply = self.socket.recv()
|
||||||
routing_key=('job_events[%d]' % self.job_id),
|
print("Received reply: " + str(reply))
|
||||||
declare=[self.job_events_queue])
|
|
||||||
if event == 'playbook_on_stats':
|
|
||||||
self._cleanup_connection()
|
|
||||||
return
|
return
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
self.logger.info('Publish Exception: %r, retry=%d', e,
|
self.logger.info('Publish Exception: %r, retry=%d', e,
|
||||||
retry_count, exc_info=True)
|
retry_count, exc_info=True)
|
||||||
self._cleanup_connection()
|
# TODO: Maybe recycle connection here?
|
||||||
if retry_count >= 3:
|
if retry_count >= 3:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user