mirror of
https://github.com/ansible/awx.git
synced 2026-05-19 14:57:39 -02:30
Implement Performance improvements shown in #3492
* Make callback workers tunable * Disable callback worker recycling - doesn't appear to be needed anymore * Tweak pexpect behavior by limiting its read buffer size and search window * Use copy instead of deepcopy for job event callback emitter censor
This commit is contained in:
@@ -25,8 +25,6 @@ from awx.main.socket import Socket
|
|||||||
|
|
||||||
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
|
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
|
||||||
|
|
||||||
WORKERS = 4
|
|
||||||
|
|
||||||
class CallbackReceiver(object):
|
class CallbackReceiver(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.parent_mappings = {}
|
self.parent_mappings = {}
|
||||||
@@ -54,7 +52,7 @@ class CallbackReceiver(object):
|
|||||||
|
|
||||||
if use_workers:
|
if use_workers:
|
||||||
connection.close()
|
connection.close()
|
||||||
for idx in range(WORKERS):
|
for idx in range(settings.JOB_EVENT_WORKERS):
|
||||||
queue_actual = Queue(settings.JOB_EVENT_MAX_QUEUE_SIZE)
|
queue_actual = Queue(settings.JOB_EVENT_MAX_QUEUE_SIZE)
|
||||||
w = Process(target=self.callback_worker, args=(queue_actual, idx,))
|
w = Process(target=self.callback_worker, args=(queue_actual, idx,))
|
||||||
w.start()
|
w.start()
|
||||||
@@ -99,7 +97,7 @@ class CallbackReceiver(object):
|
|||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
def write_queue_worker(self, preferred_queue, worker_queues, message):
|
def write_queue_worker(self, preferred_queue, worker_queues, message):
|
||||||
queue_order = sorted(range(WORKERS), cmp=lambda x, y: -1 if x==preferred_queue else 0)
|
queue_order = sorted(range(settings.JOB_EVENT_WORKERS), cmp=lambda x, y: -1 if x==preferred_queue else 0)
|
||||||
for queue_actual in queue_order:
|
for queue_actual in queue_order:
|
||||||
try:
|
try:
|
||||||
worker_actual = worker_queues[queue_actual]
|
worker_actual = worker_queues[queue_actual]
|
||||||
@@ -153,7 +151,7 @@ class CallbackReceiver(object):
|
|||||||
if message['event'] == 'playbook_on_stats':
|
if message['event'] == 'playbook_on_stats':
|
||||||
job_parent_events = {}
|
job_parent_events = {}
|
||||||
|
|
||||||
actual_queue = self.write_queue_worker(total_messages % WORKERS, worker_queues, message)
|
actual_queue = self.write_queue_worker(total_messages % settings.JOB_EVENT_WORKERS, worker_queues, message)
|
||||||
# NOTE: It might be better to recycle the entire callback receiver process if one or more of the queues are too full
|
# NOTE: It might be better to recycle the entire callback receiver process if one or more of the queues are too full
|
||||||
# the drawback is that if we under extremely high load we may be legitimately taking a while to process messages
|
# the drawback is that if we under extremely high load we may be legitimately taking a while to process messages
|
||||||
if actual_queue is None:
|
if actual_queue is None:
|
||||||
@@ -282,7 +280,6 @@ class CallbackReceiver(object):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def callback_worker(self, queue_actual, idx):
|
def callback_worker(self, queue_actual, idx):
|
||||||
messages_processed = 0
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
message = queue_actual.get(block=True, timeout=1)
|
message = queue_actual.get(block=True, timeout=1)
|
||||||
@@ -292,10 +289,6 @@ class CallbackReceiver(object):
|
|||||||
logger.error("Exception on listen socket, restarting: " + str(e))
|
logger.error("Exception on listen socket, restarting: " + str(e))
|
||||||
break
|
break
|
||||||
self.process_job_event(message)
|
self.process_job_event(message)
|
||||||
messages_processed += 1
|
|
||||||
if messages_processed >= settings.JOB_EVENT_RECYCLE_THRESHOLD:
|
|
||||||
logger.info("Shutting down message receiver")
|
|
||||||
break
|
|
||||||
|
|
||||||
class Command(NoArgsCommand):
|
class Command(NoArgsCommand):
|
||||||
'''
|
'''
|
||||||
|
|||||||
@@ -575,7 +575,7 @@ class BaseTask(Task):
|
|||||||
instance = self.update_model(instance.pk, status='running',
|
instance = self.update_model(instance.pk, status='running',
|
||||||
output_replacements=output_replacements)
|
output_replacements=output_replacements)
|
||||||
while child.isalive():
|
while child.isalive():
|
||||||
result_id = child.expect(expect_list, timeout=pexpect_timeout)
|
result_id = child.expect(expect_list, timeout=pexpect_timeout, maxread=100, searchwindowsize=100)
|
||||||
if result_id in expect_passwords:
|
if result_id in expect_passwords:
|
||||||
child.sendline(expect_passwords[result_id])
|
child.sendline(expect_passwords[result_id])
|
||||||
if logfile_pos != logfile.tell():
|
if logfile_pos != logfile.tell():
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ import os
|
|||||||
import pwd
|
import pwd
|
||||||
import urlparse
|
import urlparse
|
||||||
import re
|
import re
|
||||||
from copy import deepcopy
|
from copy import copy
|
||||||
|
|
||||||
# Requests
|
# Requests
|
||||||
import requests
|
import requests
|
||||||
@@ -228,7 +228,7 @@ class BaseCallbackModule(object):
|
|||||||
|
|
||||||
def _log_event(self, event, **event_data):
|
def _log_event(self, event, **event_data):
|
||||||
if 'res' in event_data:
|
if 'res' in event_data:
|
||||||
event_data['res'] = censor(deepcopy(event_data['res']))
|
event_data['res'] = censor(copy(event_data['res']))
|
||||||
|
|
||||||
if self.callback_consumer_port:
|
if self.callback_consumer_port:
|
||||||
self._post_job_event_queue_msg(event, event_data)
|
self._post_job_event_queue_msg(event, event_data)
|
||||||
|
|||||||
@@ -438,6 +438,9 @@ AWX_TASK_ENV = {}
|
|||||||
# before it recycles
|
# before it recycles
|
||||||
JOB_EVENT_RECYCLE_THRESHOLD = 3000
|
JOB_EVENT_RECYCLE_THRESHOLD = 3000
|
||||||
|
|
||||||
|
# Number of workers used to proecess job events in parallel
|
||||||
|
JOB_EVENT_WORKERS = 4
|
||||||
|
|
||||||
# Maximum number of job events that can be waiting on a single worker queue before
|
# Maximum number of job events that can be waiting on a single worker queue before
|
||||||
# it can be skipped as too busy
|
# it can be skipped as too busy
|
||||||
JOB_EVENT_MAX_QUEUE_SIZE = 100
|
JOB_EVENT_MAX_QUEUE_SIZE = 100
|
||||||
|
|||||||
Reference in New Issue
Block a user