Integrate callback receiver refactoring

* Drop ZMQ as the communication mechanism between job_event_callback and
  callback_receiver
* Setup queue and exchange for callback broker communication
* Refactor event plugin and callback receiver to efficiently handle
  message submission and processing
* Integrate django caching for parent processing
This commit is contained in:
Matthew Jones
2016-09-14 11:42:13 -04:00
parent 799f321760
commit ab395b0009
8 changed files with 171 additions and 293 deletions

View File

@@ -39,37 +39,16 @@ import pwd
import urlparse
import re
from copy import deepcopy
from uuid import uuid4
# Kombu
from kombu import Connection, Exchange, Producer
# Requests
import requests
# ZeroMQ
import zmq
import psutil
# Only use statsd if there's a statsd host in the environment
# otherwise just do a noop.
# NOTE: I've disabled this for the time being until we sort through the venv dependency around this
# if os.environ.get('GRAPHITE_PORT_8125_UDP_ADDR'):
# from statsd import StatsClient
# statsd = StatsClient(host=os.environ['GRAPHITE_PORT_8125_UDP_ADDR'],
# port=8125,
# prefix='tower.job.event_callback',
# maxudpsize=512)
# else:
# from statsd import StatsClient
# class NoStatsClient(StatsClient):
# def __init__(self, *args, **kwargs):
# pass
# def _prepare(self, stat, value, rate):
# pass
# def _send_stat(self, stat, value, rate):
# pass
# def _send(self, *args, **kwargs):
# pass
# statsd = NoStatsClient()
CENSOR_FIELD_WHITELIST = [
'msg',
'failed',
@@ -124,6 +103,7 @@ class TokenAuth(requests.auth.AuthBase):
return request
# TODO: non v2_ events are deprecated and should be purge/refactored out
class BaseCallbackModule(object):
'''
Callback module for logging ansible-playbook job events via the REST API.
@@ -132,12 +112,16 @@ class BaseCallbackModule(object):
def __init__(self):
self.base_url = os.getenv('REST_API_URL', '')
self.auth_token = os.getenv('REST_API_TOKEN', '')
self.callback_consumer_port = os.getenv('CALLBACK_CONSUMER_PORT', '')
self.context = None
self.socket = None
self.callback_connection = os.getenv('CALLBACK_CONNECTION', None)
self.connection_queue = os.getenv('CALLBACK_QUEUE', '')
self.connection = None
self.exchange = None
self._init_logging()
self._init_connection()
self.counter = 0
self.active_playbook = None
self.active_play = None
self.active_task = None
def _init_logging(self):
try:
@@ -158,15 +142,11 @@ class BaseCallbackModule(object):
self.logger.propagate = False
def _init_connection(self):
self.context = None
self.socket = None
self.connection = None
def _start_connection(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.setsockopt(zmq.RCVTIMEO, 4000)
self.socket.setsockopt(zmq.LINGER, 2000)
self.socket.connect(self.callback_consumer_port)
self.connection = Connection(self.callback_connection)
self.exchange = Exchange(self.connection_queue, type='direct')
def _post_job_event_queue_msg(self, event, event_data):
self.counter += 1
@@ -176,6 +156,29 @@ class BaseCallbackModule(object):
'counter': self.counter,
'created': datetime.datetime.utcnow().isoformat(),
}
if event in ('playbook_on_play_start',
'playbook_on_stats',
'playbook_on_vars_prompt'):
msg['parent_uuid'] = str(self.active_playbook)
elif event in ('playbook_on_notify',
'playbook_on_setup',
'playbook_on_task_start',
'playbook_on_no_hosts_matched',
'playbook_on_no_hosts_remaining',
'playbook_on_include',
'playbook_on_import_for_host',
'playbook_on_not_import_for_host'):
msg['parent_uuid'] = str(self.active_play)
elif event.startswith('runner_on_') or event.startswith('runner_item_on_'):
msg['parent_uuid'] = str(self.active_task)
else:
msg['parent_uuid'] = ''
if "uuid" in event_data:
msg['uuid'] = str(event_data['uuid'])
else:
msg['uuid'] = ''
if getattr(self, 'job_id', None):
msg['job_id'] = self.job_id
if getattr(self, 'ad_hoc_command_id', None):
@@ -192,11 +195,16 @@ class BaseCallbackModule(object):
self.connection_pid = active_pid
if self.connection_pid != active_pid:
self._init_connection()
if self.context is None:
if self.connection is None:
self._start_connection()
self.socket.send_json(msg)
self.socket.recv()
producer = Producer(self.connection)
producer.publish(msg,
serializer='json',
compression='bzip2',
exchange=self.exchange,
declare=[self.exchange],
routing_key=self.connection_queue)
return
except Exception, e:
self.logger.info('Publish Job Event Exception: %r, retry=%d', e,
@@ -230,7 +238,7 @@ class BaseCallbackModule(object):
if 'res' in event_data:
event_data['res'] = censor(deepcopy(event_data['res']))
if self.callback_consumer_port:
if self.callback_connection:
self._post_job_event_queue_msg(event, event_data)
else:
self._post_rest_api_event(event, event_data)
@@ -416,7 +424,9 @@ class JobCallbackModule(BaseCallbackModule):
def v2_playbook_on_start(self, playbook):
# NOTE: the playbook parameter was added late in Ansible 2.0 development
# so we don't currently utilize but could later.
self.playbook_on_start()
# NOTE: Ansible doesn't generate a UUID for playbook_on_start so we'll do it for them
self.active_playbook = str(uuid4())
self._log_event('playbook_on_start', uuid=self.active_playbook)
def playbook_on_notify(self, host, handler):
self._log_event('playbook_on_notify', host=host, handler=handler)
@@ -446,14 +456,16 @@ class JobCallbackModule(BaseCallbackModule):
is_conditional=is_conditional)
def v2_playbook_on_task_start(self, task, is_conditional):
self._log_event('playbook_on_task_start', task=task,
self.active_task = task._uuid
self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid),
name=task.get_name(), is_conditional=is_conditional)
def v2_playbook_on_cleanup_task_start(self, task):
# re-using playbook_on_task_start event here for this v2-specific
# event, though we may consider any changes necessary to distinguish
# this from a normal task
self._log_event('playbook_on_task_start', task=task,
self.active_task = task._uuid
self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid),
name=task.get_name())
def playbook_on_vars_prompt(self, varname, private=True, prompt=None,
@@ -507,7 +519,8 @@ class JobCallbackModule(BaseCallbackModule):
play.name = ','.join(play.hosts)
else:
play.name = play.hosts
self._log_event('playbook_on_play_start', name=play.name,
self.active_play = play._uuid
self._log_event('playbook_on_play_start', name=play.name, uuid=str(play._uuid),
pattern=play.hosts)
def playbook_on_stats(self, stats):