diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 15b9b8f483..0e27ba06da 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -8,12 +8,17 @@ import datetime import logging import signal import time -from multiprocessing import Process, Queue -from Queue import Empty as QueueEmpty + +from kombu import Connection, Exchange, Queue +from kombu.mixins import ConsumerMixin +from kombu.log import get_logger +from kombu.utils import kwdict, reprcall +from kombu.utils.debug import setup_logging # Django from django.conf import settings from django.core.management.base import NoArgsCommand +from django.core.cache import cache from django.db import transaction, DatabaseError from django.utils.dateparse import parse_datetime from django.utils.timezone import FixedOffset @@ -25,156 +30,49 @@ from awx.main.socket_queue import Socket logger = logging.getLogger('awx.main.commands.run_callback_receiver') -WORKERS = 4 +class CallbackBrokerWorker(ConsumerMixin): -class CallbackReceiver(object): - def __init__(self): - self.parent_mappings = {} + def __init__(self, connection): + self.connection = connection - def run_subscriber(self, use_workers=True): - def shutdown_handler(active_workers): - def _handler(signum, frame): - try: - for active_worker in active_workers: - active_worker.terminate() - signal.signal(signum, signal.SIG_DFL) - os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it - except Exception: - # TODO: LOG - pass - return _handler + def get_consumers(self, Consumer, channel): + return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE, + Exchange(settings.CALLBACK_QUEUE, type='direct'), + routing_key=settings.CALLBACK_QUEUE)], + accept=['json'], + callbacks=[self.process_task])] - def check_pre_handle(data): - event = data.get('event', '') - if event == 'playbook_on_play_start': - return True - return False - - worker_queues = [] - - if use_workers: - connection.close() - for idx in range(WORKERS): - queue_actual = Queue(settings.JOB_EVENT_MAX_QUEUE_SIZE) - w = Process(target=self.callback_worker, args=(queue_actual, idx,)) - w.start() - if settings.DEBUG: - logger.info('Started worker %s' % str(idx)) - worker_queues.append([0, queue_actual, w]) - elif settings.DEBUG: - logger.warn('Started callback receiver (no workers)') - - main_process = Process( - target=self.callback_handler, - args=(use_workers, worker_queues,) - ) - main_process.daemon = True - main_process.start() - - signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in worker_queues] + [main_process])) - signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in worker_queues] + [main_process])) - while True: - workers_changed = False - idx = 0 - for queue_worker in worker_queues: - if not queue_worker[2].is_alive(): - logger.warn("Worker %s was not alive, restarting" % str(queue_worker)) - workers_changed = True - queue_worker[2].join() - w = Process(target=self.callback_worker, args=(queue_worker[1], idx,)) - w.daemon = True - w.start() - signal.signal(signal.SIGINT, shutdown_handler([w])) - signal.signal(signal.SIGTERM, shutdown_handler([w])) - queue_worker[2] = w - idx += 1 - if workers_changed: - signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in worker_queues] + [main_process])) - signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in worker_queues] + [main_process])) - if not main_process.is_alive(): - logger.error("Main process is not alive") - for queue_worker in worker_queues: - queue_worker[2].terminate() - break - time.sleep(0.1) - - def write_queue_worker(self, preferred_queue, worker_queues, message): - queue_order = sorted(range(WORKERS), cmp=lambda x, y: -1 if x==preferred_queue else 0) - for queue_actual in queue_order: - try: - worker_actual = worker_queues[queue_actual] - worker_actual[1].put(message, block=True, timeout=2) - worker_actual[0] += 1 - return queue_actual - except Exception: - logger.warn("Could not write to queue %s" % preferred_queue) - continue - return None - - def callback_handler(self, use_workers, worker_queues): - total_messages = 0 - last_parent_events = {} - with Socket('callbacks', 'r') as callbacks: - for message in callbacks.listen(): - total_messages += 1 - if 'ad_hoc_command_id' in message: - self.process_ad_hoc_event(message) - elif not use_workers: - self.process_job_event(message) - else: - job_parent_events = last_parent_events.get(message['job_id'], {}) - if message['event'] in ('playbook_on_play_start', 'playbook_on_stats', 'playbook_on_vars_prompt'): - parent = job_parent_events.get('playbook_on_start', None) - elif message['event'] in ('playbook_on_notify', - 'playbook_on_setup', - 'playbook_on_task_start', - 'playbook_on_no_hosts_matched', - 'playbook_on_no_hosts_remaining', - 'playbook_on_include', - 'playbook_on_import_for_host', - 'playbook_on_not_import_for_host'): - parent = job_parent_events.get('playbook_on_play_start', None) - elif message['event'].startswith('runner_on_') or message['event'].startswith('runner_item_on_'): - list_parents = [] - list_parents.append(job_parent_events.get('playbook_on_setup', None)) - list_parents.append(job_parent_events.get('playbook_on_task_start', None)) - list_parents = sorted(filter(lambda x: x is not None, list_parents), cmp=lambda x, y: y.id - x.id) - parent = list_parents[0] if len(list_parents) > 0 else None - else: - parent = None - if parent is not None: - message['parent'] = parent.id - if 'created' in message: - del(message['created']) - if message['event'] in ('playbook_on_start', 'playbook_on_play_start', - 'playbook_on_setup', 'playbook_on_task_start'): - job_parent_events[message['event']] = self.process_job_event(message) - else: - if message['event'] == 'playbook_on_stats': - job_parent_events = {} - - actual_queue = self.write_queue_worker(total_messages % WORKERS, worker_queues, message) - # NOTE: It might be better to recycle the entire callback receiver process if one or more of the queues are too full - # the drawback is that if we under extremely high load we may be legitimately taking a while to process messages - if actual_queue is None: - logger.error("All queues full!") - sys.exit(1) - last_parent_events[message['job_id']] = job_parent_events - - @transaction.atomic - def process_job_event(self, data): - # Sanity check: Do we need to do anything at all? - event = data.get('event', '') - parent_id = data.get('parent', None) - if not event or 'job_id' not in data: - return + def process_task(self, body, message): + try: + if "event" not in body: + raise Exception("Payload does not have an event") + if "job_id" not in body: + raise Exception("Payload does not have a job_id") + if settings.DEBUG: + logger.info("Body: {}".format(body)) + logger.info("Message: {}".format(message)) + self.process_job_event(body) + except Exception as exc: + import traceback + traceback.print_exc() + logger.error('Callback Task Processor Raised Exception: %r', exc) + message.ack() + def process_job_event(self, payload): # Get the correct "verbose" value from the job. # If for any reason there's a problem, just use 0. + if 'ad_hoc_command_id' in payload: + event_type_key = 'ad_hoc_command_id' + event_object_type = AdHocCommand + else: + event_type_key = 'job_id' + event_object_type = Job + try: - verbose = Job.objects.get(id=data['job_id']).verbosity + verbose = event_object_type.objects.get(id=payload[event_type_key]).verbosity except Exception as e: - verbose = 0 + verbose=0 + # TODO: cache # Convert the datetime for the job event's creation appropriately, # and include a time zone for it. @@ -182,120 +80,58 @@ class CallbackReceiver(object): # In the event of any issue, throw it out, and Django will just save # the current time. try: - if not isinstance(data['created'], datetime.datetime): - data['created'] = parse_datetime(data['created']) - if not data['created'].tzinfo: - data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) + if not isinstance(payload['created'], datetime.datetime): + payload['created'] = parse_datetime(payload['created']) + if not payload['created'].tzinfo: + payload['created'] = payload['created'].replace(tzinfo=FixedOffset(0)) except (KeyError, ValueError): - data.pop('created', None) + payload.pop('created', None) - # Print the data to stdout if we're in DEBUG mode. - if settings.DEBUG: - print(data) + event_uuid = payload.get("uuid", '') + parent_event_uuid = payload.get("parent_uuid", '') # Sanity check: Don't honor keys that we don't recognize. - for key in data.keys(): - if key not in ('job_id', 'event', 'event_data', - 'created', 'counter'): - data.pop(key) + for key in payload.keys(): + if key not in (event_type_key, 'event', 'event_data', + 'created', 'counter', 'uuid'): + payload.pop(key) - # Save any modifications to the job event to the database. - # If we get a database error of some kind, bail out. try: # If we're not in verbose mode, wipe out any module # arguments. - res = data['event_data'].get('res', {}) + res = payload['event_data'].get('res', {}) if isinstance(res, dict): i = res.get('invocation', {}) if verbose == 0 and 'module_args' in i: i['module_args'] = '' - # Create a new JobEvent object. - job_event = JobEvent(**data) - if parent_id is not None: - job_event.parent = JobEvent.objects.get(id=parent_id) - job_event.save(post_process=True) - - # Retrun the job event object. - return job_event + if 'ad_hoc_command_id' in payload: + ad_hoc_command_event = AdHocCommandEvent.objects.create(**data) + return + + j = JobEvent(**payload) + if payload['event'] == 'playbook_on_start': + j.save() + cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300) + return + else: + if parent_event_uuid: + parent_id = cache.get("{}_{}".format(payload['job_id'], parent_event_uuid), None) + if parent_id is None: + parent_id_obj = JobEvent.objects.filter(uuid=parent_event_uuid, job_id=payload['job_id']) + if parent_id_obj.exists(): #Problematic if not there, means the parent hasn't been written yet... TODO + j.parent_id = parent_id_obj[0].id + print("Settings cache: {}_{} with value {}".format(payload['job_id'], parent_event_uuid, j.parent_id)) + cache.set("{}_{}".format(payload['job_id'], parent_event_uuid), j.parent_id, 300) + else: + print("Cache hit") + j.parent_id = parent_id + j.save() + if event_uuid: + cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300) except DatabaseError as e: - # Log the error and bail out. - logger.error('Database error saving job event: %s', e) - return None + logger.error("Database Error Saving Job Event: {}".format(e)) - @transaction.atomic - def process_ad_hoc_event(self, data): - # Sanity check: Do we need to do anything at all? - event = data.get('event', '') - if not event or 'ad_hoc_command_id' not in data: - return - - # Get the correct "verbose" value from the job. - # If for any reason there's a problem, just use 0. - try: - verbose = AdHocCommand.objects.get(id=data['ad_hoc_command_id']).verbosity - except Exception as e: - verbose = 0 - - # Convert the datetime for the job event's creation appropriately, - # and include a time zone for it. - # - # In the event of any issue, throw it out, and Django will just save - # the current time. - try: - if not isinstance(data['created'], datetime.datetime): - data['created'] = parse_datetime(data['created']) - if not data['created'].tzinfo: - data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) - except (KeyError, ValueError): - data.pop('created', None) - - # Print the data to stdout if we're in DEBUG mode. - if settings.DEBUG: - print(data) - - # Sanity check: Don't honor keys that we don't recognize. - for key in data.keys(): - if key not in ('ad_hoc_command_id', 'event', 'event_data', - 'created', 'counter'): - data.pop(key) - - # Save any modifications to the ad hoc command event to the database. - # If we get a database error of some kind, bail out. - try: - # If we're not in verbose mode, wipe out any module - # arguments. FIXME: Needed for adhoc? - res = data['event_data'].get('res', {}) - if isinstance(res, dict): - i = res.get('invocation', {}) - if verbose == 0 and 'module_args' in i: - i['module_args'] = '' - - # Create a new AdHocCommandEvent object. - ad_hoc_command_event = AdHocCommandEvent.objects.create(**data) - - # Retrun the ad hoc comamnd event object. - return ad_hoc_command_event - except DatabaseError as e: - # Log the error and bail out. - logger.error('Database error saving ad hoc command event: %s', e) - return None - - def callback_worker(self, queue_actual, idx): - messages_processed = 0 - while True: - try: - message = queue_actual.get(block=True, timeout=1) - except QueueEmpty: - continue - except Exception as e: - logger.error("Exception on listen socket, restarting: " + str(e)) - break - self.process_job_event(message) - messages_processed += 1 - if messages_processed >= settings.JOB_EVENT_RECYCLE_THRESHOLD: - logger.info("Shutting down message receiver") - break class Command(NoArgsCommand): ''' @@ -306,9 +142,10 @@ class Command(NoArgsCommand): help = 'Launch the job callback receiver' def handle_noargs(self, **options): - cr = CallbackReceiver() - try: - cr.run_subscriber() - except KeyboardInterrupt: - pass + with Connection(settings.BROKER_URL) as conn: + try: + worker = CallbackBrokerWorker(conn) + worker.run() + except KeyboardInterrupt: + print('Terminating Callback Receiver') diff --git a/awx/main/migrations/0034_v310_jobevent_uuid.py b/awx/main/migrations/0034_v310_jobevent_uuid.py new file mode 100644 index 0000000000..4feade28ef --- /dev/null +++ b/awx/main/migrations/0034_v310_jobevent_uuid.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0033_v310_modify_ha_instance'), + ] + + operations = [ + migrations.AddField( + model_name='jobevent', + name='uuid', + field=models.CharField(default=b'', max_length=1024, editable=False), + ), + ] diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index c233269ce9..ddff3f9343 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -950,6 +950,11 @@ class JobEvent(CreatedModifiedModel): default=False, editable=False, ) + uuid = models.CharField( + max_length=1024, + default='', + editable=False, + ) host = models.ForeignKey( 'Host', related_name='job_events_as_primary_host', diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 806a819e3e..b4617f505d 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -794,7 +794,8 @@ class RunJob(BaseTask): env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_path env['REST_API_URL'] = settings.INTERNAL_API_URL env['REST_API_TOKEN'] = job.task_auth_token or '' - env['CALLBACK_CONSUMER_PORT'] = str(settings.CALLBACK_CONSUMER_PORT) + env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE + env['CALLBACK_CONNECTION'] = settings.BROKER_URL if getattr(settings, 'JOB_CALLBACK_DEBUG', False): env['JOB_CALLBACK_DEBUG'] = '2' elif settings.DEBUG: diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index a9c5b712ed..abec176b2f 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -39,37 +39,16 @@ import pwd import urlparse import re from copy import deepcopy +from uuid import uuid4 + +# Kombu +from kombu import Connection, Exchange, Producer # Requests import requests -# ZeroMQ -import zmq - import psutil -# Only use statsd if there's a statsd host in the environment -# otherwise just do a noop. -# NOTE: I've disabled this for the time being until we sort through the venv dependency around this -# if os.environ.get('GRAPHITE_PORT_8125_UDP_ADDR'): -# from statsd import StatsClient -# statsd = StatsClient(host=os.environ['GRAPHITE_PORT_8125_UDP_ADDR'], -# port=8125, -# prefix='tower.job.event_callback', -# maxudpsize=512) -# else: -# from statsd import StatsClient -# class NoStatsClient(StatsClient): -# def __init__(self, *args, **kwargs): -# pass -# def _prepare(self, stat, value, rate): -# pass -# def _send_stat(self, stat, value, rate): -# pass -# def _send(self, *args, **kwargs): -# pass -# statsd = NoStatsClient() - CENSOR_FIELD_WHITELIST = [ 'msg', 'failed', @@ -124,6 +103,7 @@ class TokenAuth(requests.auth.AuthBase): return request +# TODO: non v2_ events are deprecated and should be purge/refactored out class BaseCallbackModule(object): ''' Callback module for logging ansible-playbook job events via the REST API. @@ -132,12 +112,16 @@ class BaseCallbackModule(object): def __init__(self): self.base_url = os.getenv('REST_API_URL', '') self.auth_token = os.getenv('REST_API_TOKEN', '') - self.callback_consumer_port = os.getenv('CALLBACK_CONSUMER_PORT', '') - self.context = None - self.socket = None + self.callback_connection = os.getenv('CALLBACK_CONNECTION', None) + self.connection_queue = os.getenv('CALLBACK_QUEUE', '') + self.connection = None + self.exchange = None self._init_logging() self._init_connection() self.counter = 0 + self.active_playbook = None + self.active_play = None + self.active_task = None def _init_logging(self): try: @@ -158,15 +142,11 @@ class BaseCallbackModule(object): self.logger.propagate = False def _init_connection(self): - self.context = None - self.socket = None + self.connection = None def _start_connection(self): - self.context = zmq.Context() - self.socket = self.context.socket(zmq.REQ) - self.socket.setsockopt(zmq.RCVTIMEO, 4000) - self.socket.setsockopt(zmq.LINGER, 2000) - self.socket.connect(self.callback_consumer_port) + self.connection = Connection(self.callback_connection) + self.exchange = Exchange(self.connection_queue, type='direct') def _post_job_event_queue_msg(self, event, event_data): self.counter += 1 @@ -176,6 +156,29 @@ class BaseCallbackModule(object): 'counter': self.counter, 'created': datetime.datetime.utcnow().isoformat(), } + if event in ('playbook_on_play_start', + 'playbook_on_stats', + 'playbook_on_vars_prompt'): + msg['parent_uuid'] = str(self.active_playbook) + elif event in ('playbook_on_notify', + 'playbook_on_setup', + 'playbook_on_task_start', + 'playbook_on_no_hosts_matched', + 'playbook_on_no_hosts_remaining', + 'playbook_on_include', + 'playbook_on_import_for_host', + 'playbook_on_not_import_for_host'): + msg['parent_uuid'] = str(self.active_play) + elif event.startswith('runner_on_') or event.startswith('runner_item_on_'): + msg['parent_uuid'] = str(self.active_task) + else: + msg['parent_uuid'] = '' + + if "uuid" in event_data: + msg['uuid'] = str(event_data['uuid']) + else: + msg['uuid'] = '' + if getattr(self, 'job_id', None): msg['job_id'] = self.job_id if getattr(self, 'ad_hoc_command_id', None): @@ -192,11 +195,16 @@ class BaseCallbackModule(object): self.connection_pid = active_pid if self.connection_pid != active_pid: self._init_connection() - if self.context is None: + if self.connection is None: self._start_connection() - self.socket.send_json(msg) - self.socket.recv() + producer = Producer(self.connection) + producer.publish(msg, + serializer='json', + compression='bzip2', + exchange=self.exchange, + declare=[self.exchange], + routing_key=self.connection_queue) return except Exception, e: self.logger.info('Publish Job Event Exception: %r, retry=%d', e, @@ -230,7 +238,7 @@ class BaseCallbackModule(object): if 'res' in event_data: event_data['res'] = censor(deepcopy(event_data['res'])) - if self.callback_consumer_port: + if self.callback_connection: self._post_job_event_queue_msg(event, event_data) else: self._post_rest_api_event(event, event_data) @@ -416,7 +424,9 @@ class JobCallbackModule(BaseCallbackModule): def v2_playbook_on_start(self, playbook): # NOTE: the playbook parameter was added late in Ansible 2.0 development # so we don't currently utilize but could later. - self.playbook_on_start() + # NOTE: Ansible doesn't generate a UUID for playbook_on_start so we'll do it for them + self.active_playbook = str(uuid4()) + self._log_event('playbook_on_start', uuid=self.active_playbook) def playbook_on_notify(self, host, handler): self._log_event('playbook_on_notify', host=host, handler=handler) @@ -446,14 +456,16 @@ class JobCallbackModule(BaseCallbackModule): is_conditional=is_conditional) def v2_playbook_on_task_start(self, task, is_conditional): - self._log_event('playbook_on_task_start', task=task, + self.active_task = task._uuid + self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid), name=task.get_name(), is_conditional=is_conditional) def v2_playbook_on_cleanup_task_start(self, task): # re-using playbook_on_task_start event here for this v2-specific # event, though we may consider any changes necessary to distinguish # this from a normal task - self._log_event('playbook_on_task_start', task=task, + self.active_task = task._uuid + self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid), name=task.get_name()) def playbook_on_vars_prompt(self, varname, private=True, prompt=None, @@ -507,7 +519,8 @@ class JobCallbackModule(BaseCallbackModule): play.name = ','.join(play.hosts) else: play.name = play.hosts - self._log_event('playbook_on_play_start', name=play.name, + self.active_play = play._uuid + self._log_event('playbook_on_play_start', name=play.name, uuid=str(play._uuid), pattern=play.hosts) def playbook_on_stats(self, stats): diff --git a/awx/settings/development.py b/awx/settings/development.py index 4c727e0bdc..722f397900 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -28,6 +28,8 @@ if 'celeryd' in sys.argv: CALLBACK_CONSUMER_PORT = "tcp://127.0.0.1:5557" CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver_dev.ipc" +CALLBACK_QUEUE = "callback_tasks" + # Enable PROOT for tower-qa integration tests AWX_PROOT_ENABLED = True diff --git a/requirements/requirements.txt b/requirements/requirements.txt index fb5872f572..433ae22e00 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -50,7 +50,7 @@ jsonpatch==1.12 jsonpointer==1.10 jsonschema==2.5.1 keyring==4.1 -kombu==3.0.30 +kombu==3.0.35 apache-libcloud==0.20.1 lxml==3.4.4 Markdown==2.4.1 diff --git a/requirements/requirements_ansible.txt b/requirements/requirements_ansible.txt index b35cb6fcbb..fe9fe45aed 100644 --- a/requirements/requirements_ansible.txt +++ b/requirements/requirements_ansible.txt @@ -25,6 +25,7 @@ jsonpatch==1.12 jsonpointer==1.10 jsonschema==2.5.1 keyring==4.1 +kombu==3.0.35 lxml==3.4.4 mock==1.0.1 monotonic==0.6