diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 6b30c2ba53..24e399a554 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -9,6 +9,7 @@ import logging import json import signal import time +from contextlib import closing from optparse import make_option from multiprocessing import Process, Queue @@ -90,62 +91,62 @@ class CallbackReceiver(object): time.sleep(0.1) def callback_handler(self, use_workers, worker_queues): - pubsub = PubSub('callbacks') message_number = 0 total_messages = 0 last_parent_events = {} - for message in pubsub.subscribe(): - total_messages += 1 - if not use_workers: - self.process_job_event(message) - else: - job_parent_events = last_parent_events.get(message['job_id'], {}) - if message['event'] in ('playbook_on_play_start', 'playbook_on_stats', 'playbook_on_vars_prompt'): - parent = job_parent_events.get('playbook_on_start', None) - elif message['event'] in ('playbook_on_notify', 'playbook_on_setup', - 'playbook_on_task_start', - 'playbook_on_no_hosts_matched', - 'playbook_on_no_hosts_remaining', - 'playbook_on_import_for_host', - 'playbook_on_not_import_for_host'): - parent = job_parent_events.get('playbook_on_play_start', None) - elif message['event'].startswith('runner_on_'): - list_parents = [] - list_parents.append(job_parent_events.get('playbook_on_setup', None)) - list_parents.append(job_parent_events.get('playbook_on_task_start', None)) - list_parents = sorted(filter(lambda x: x is not None, list_parents), cmp=lambda x, y: y.id-x.id) - parent = list_parents[0] if len(list_parents) > 0 else None + with closing(PubSub('callbacks')) as callbacks: + for message in callbacks.subscribe(wait=0.1): + total_messages += 1 + if not use_workers: + self.process_job_event(message) else: - parent = None - if parent is not None: - message['parent'] = parent.id - if 'created' in message: - del(message['created']) - if message['event'] in ('playbook_on_start', 'playbook_on_play_start', - 'playbook_on_setup', 'playbook_on_task_start'): - job_parent_events[message['event']] = self.process_job_event(message) - else: - if message['event'] == 'playbook_on_stats': - job_parent_events = {} - queue_actual_worker = worker_queues[total_messages % WORKERS] - queue_actual_worker[0] += 1 - queue_actual_worker[1].put(message) - if queue_actual_worker[0] >= MAX_REQUESTS: - queue_actual_worker[0] = 0 - # print("Recycling worker process") - # queue_actual_worker[2].join() - # connection.close() - # w = Process(target=self.callback_worker, args=(queue_actual_worker[1],)) - # w.daemon = True - # w.start() + job_parent_events = last_parent_events.get(message['job_id'], {}) + if message['event'] in ('playbook_on_play_start', 'playbook_on_stats', 'playbook_on_vars_prompt'): + parent = job_parent_events.get('playbook_on_start', None) + elif message['event'] in ('playbook_on_notify', 'playbook_on_setup', + 'playbook_on_task_start', + 'playbook_on_no_hosts_matched', + 'playbook_on_no_hosts_remaining', + 'playbook_on_import_for_host', + 'playbook_on_not_import_for_host'): + parent = job_parent_events.get('playbook_on_play_start', None) + elif message['event'].startswith('runner_on_'): + list_parents = [] + list_parents.append(job_parent_events.get('playbook_on_setup', None)) + list_parents.append(job_parent_events.get('playbook_on_task_start', None)) + list_parents = sorted(filter(lambda x: x is not None, list_parents), cmp=lambda x, y: y.id-x.id) + parent = list_parents[0] if len(list_parents) > 0 else None + else: + parent = None + if parent is not None: + message['parent'] = parent.id + if 'created' in message: + del(message['created']) + if message['event'] in ('playbook_on_start', 'playbook_on_play_start', + 'playbook_on_setup', 'playbook_on_task_start'): + job_parent_events[message['event']] = self.process_job_event(message) + else: + if message['event'] == 'playbook_on_stats': + job_parent_events = {} + queue_actual_worker = worker_queues[total_messages % WORKERS] + queue_actual_worker[0] += 1 + queue_actual_worker[1].put(message) + if queue_actual_worker[0] >= MAX_REQUESTS: + queue_actual_worker[0] = 0 + # print("Recycling worker process") + # queue_actual_worker[2].join() + # connection.close() + # w = Process(target=self.callback_worker, args=(queue_actual_worker[1],)) + # w.daemon = True + # w.start() - # signal.signal(signal.SIGINT, shutdown_handler([w])) - # signal.signal(signal.SIGTERM, shutdown_handler([w])) + # signal.signal(signal.SIGINT, shutdown_handler([w])) + # signal.signal(signal.SIGTERM, shutdown_handler([w])) - # queue_actual_worker[2] = w - last_parent_events[message['job_id']] = job_parent_events - self.consumer_subscriber.send("1") + # queue_actual_worker[2] = w + last_parent_events[message['job_id']] = job_parent_events + self.consumer_subscriber.send("1") def process_job_event(self, data): # Sanity check: Do we need to do anything at all? diff --git a/awx/main/queue.py b/awx/main/queue.py index d71b4cbdfa..5708d3cbcd 100644 --- a/awx/main/queue.py +++ b/awx/main/queue.py @@ -74,7 +74,7 @@ class PubSub(object): Ideally this should be used with `contextmanager.closing` to ensure well-behavedness: - from contextmanager import closing + from contextlib import closing with closing(PubSub('foobar')) as foobar: for message in foobar.subscribe(wait=0.1): diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 92e21ba24d..da210f2c70 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -38,6 +38,7 @@ import sys import urllib import urlparse import time +from contextlib import closing # Requests import requests @@ -125,8 +126,8 @@ class CallbackModule(object): self.connection_pid = active_pid # Publish the callback through Redis. - pubsub = PubSub('callbacks') - pubsub.publish(msg) + with closing(PubSub('callbacks')) as callbacks: + callbacks.publish(msg) return except Exception, e: self.logger.info('Publish Exception: %r, retry=%d', e,