From 56d42100476955a2c42ab156c75844f84e390783 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 14 Sep 2016 16:06:13 -0400 Subject: [PATCH] Implement Performance improvements shown in #3492 * Make callback workers tunable * Disable callback worker recycling - doesn't appear to be needed anymore * Tweak pexpect behavior by limiting its read buffer size and search window * Use copy instead of deepcopy for job event callback emitter censor --- .../management/commands/run_callback_receiver.py | 13 +++---------- awx/main/tasks.py | 2 +- awx/plugins/callback/job_event_callback.py | 4 ++-- awx/settings/defaults.py | 3 +++ 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index e6080fa419..c31b3dffe2 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -25,8 +25,6 @@ from awx.main.socket import Socket logger = logging.getLogger('awx.main.commands.run_callback_receiver') -WORKERS = 4 - class CallbackReceiver(object): def __init__(self): self.parent_mappings = {} @@ -54,7 +52,7 @@ class CallbackReceiver(object): if use_workers: connection.close() - for idx in range(WORKERS): + for idx in range(settings.JOB_EVENT_WORKERS): queue_actual = Queue(settings.JOB_EVENT_MAX_QUEUE_SIZE) w = Process(target=self.callback_worker, args=(queue_actual, idx,)) w.start() @@ -99,7 +97,7 @@ class CallbackReceiver(object): time.sleep(0.1) def write_queue_worker(self, preferred_queue, worker_queues, message): - queue_order = sorted(range(WORKERS), cmp=lambda x, y: -1 if x==preferred_queue else 0) + queue_order = sorted(range(settings.JOB_EVENT_WORKERS), cmp=lambda x, y: -1 if x==preferred_queue else 0) for queue_actual in queue_order: try: worker_actual = worker_queues[queue_actual] @@ -153,7 +151,7 @@ class CallbackReceiver(object): if message['event'] == 'playbook_on_stats': job_parent_events = {} - actual_queue = self.write_queue_worker(total_messages % WORKERS, worker_queues, message) + actual_queue = self.write_queue_worker(total_messages % settings.JOB_EVENT_WORKERS, worker_queues, message) # NOTE: It might be better to recycle the entire callback receiver process if one or more of the queues are too full # the drawback is that if we under extremely high load we may be legitimately taking a while to process messages if actual_queue is None: @@ -282,7 +280,6 @@ class CallbackReceiver(object): return None def callback_worker(self, queue_actual, idx): - messages_processed = 0 while True: try: message = queue_actual.get(block=True, timeout=1) @@ -292,10 +289,6 @@ class CallbackReceiver(object): logger.error("Exception on listen socket, restarting: " + str(e)) break self.process_job_event(message) - messages_processed += 1 - if messages_processed >= settings.JOB_EVENT_RECYCLE_THRESHOLD: - logger.info("Shutting down message receiver") - break class Command(NoArgsCommand): ''' diff --git a/awx/main/tasks.py b/awx/main/tasks.py index c99f043e1a..20edd955ed 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -575,7 +575,7 @@ class BaseTask(Task): instance = self.update_model(instance.pk, status='running', output_replacements=output_replacements) while child.isalive(): - result_id = child.expect(expect_list, timeout=pexpect_timeout) + result_id = child.expect(expect_list, timeout=pexpect_timeout, maxread=100, searchwindowsize=100) if result_id in expect_passwords: child.sendline(expect_passwords[result_id]) if logfile_pos != logfile.tell(): diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index a9c5b712ed..2049edc4b8 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -38,7 +38,7 @@ import os import pwd import urlparse import re -from copy import deepcopy +from copy import copy # Requests import requests @@ -228,7 +228,7 @@ class BaseCallbackModule(object): def _log_event(self, event, **event_data): if 'res' in event_data: - event_data['res'] = censor(deepcopy(event_data['res'])) + event_data['res'] = censor(copy(event_data['res'])) if self.callback_consumer_port: self._post_job_event_queue_msg(event, event_data) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 2998d15bb7..e2995d0dbd 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -438,6 +438,9 @@ AWX_TASK_ENV = {} # before it recycles JOB_EVENT_RECYCLE_THRESHOLD = 3000 +# Number of workers used to proecess job events in parallel +JOB_EVENT_WORKERS = 4 + # Maximum number of job events that can be waiting on a single worker queue before # it can be skipped as too busy JOB_EVENT_MAX_QUEUE_SIZE = 100