Major fixes for job callback receiver processing

* Add logic to ansible callback plugin to prevent it from waiting
  forever to submit events to Tower
* Lower process recycle threshold for tower callback receiver
* Make recycle threshold configurable
* Properly exit the main callback receiver management process if
  the event receiver process is dead so we don't leave dead worker
  processes
* Set a configurable maximum number of messages that can be waiting
  in a worker process queue before it is skipped instead of filling
  up memory on a dead worker process
* Skip over a dead worker process if it's queue is full
* Force restart callback receiver if all queues are dead
* Roll back transaction.atomic with the thought that it is causing
  deadlocks in the worker process.  Use the old commit_on_success
  mechanism with retry logic
* Seperate queue nonblocking expected exception from any other type
  of exception that could be encountered on the queue fetch operation
This commit is contained in:
Matthew Jones 2015-03-13 11:11:49 -04:00
parent 6258035ca8
commit 0f5beca9ae
3 changed files with 65 additions and 47 deletions

View File

@ -9,6 +9,7 @@ import logging
import signal
import time
from multiprocessing import Process, Queue
from Queue import Empty as QueueEmpty
# Django
from django.conf import settings
@ -24,7 +25,6 @@ from awx.main.socket import Socket
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
MAX_REQUESTS = 10000
WORKERS = 4
class CallbackReceiver(object):
@ -55,8 +55,8 @@ class CallbackReceiver(object):
if use_workers:
connection.close()
for idx in range(WORKERS):
queue_actual = Queue()
w = Process(target=self.callback_worker, args=(queue_actual,))
queue_actual = Queue(settings.JOB_EVENT_MAX_QUEUE_SIZE)
w = Process(target=self.callback_worker, args=(queue_actual, idx,))
w.start()
if settings.DEBUG:
logger.info('Started worker %s' % str(idx))
@ -75,27 +75,45 @@ class CallbackReceiver(object):
signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in worker_queues] + [main_process]))
while True:
workers_changed = False
idx = 0
for queue_worker in worker_queues:
if not queue_worker[2].is_alive():
logger.warn("Worker %s was not alive, restarting" % str(queue_worker))
workers_changed = True
queue_worker[2].join()
w = Process(target=self.callback_worker, args=(queue_worker[1],))
w = Process(target=self.callback_worker, args=(queue_worker[1], idx,))
w.daemon = True
w.start()
signal.signal(signal.SIGINT, shutdown_handler([w]))
signal.signal(signal.SIGTERM, shutdown_handler([w]))
queue_worker[2] = w
idx += 1
if workers_changed:
signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in worker_queues] + [main_process]))
signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in worker_queues] + [main_process]))
if not main_process.is_alive():
sys.exit(1)
logger.error("Main process is not alive")
for queue_worker in worker_queues:
queue_worker[2].terminate()
break
time.sleep(0.1)
def write_queue_worker(self, preferred_queue, worker_queues, message):
queue_order = sorted(range(WORKERS), cmp=lambda x, y: -1 if x==preferred_queue else 0)
for queue_actual in queue_order:
try:
worker_actual = worker_queues[queue_actual]
worker_queues[queue_actual][1].put(message, block=True, timeout=2)
worker_queues[queue_actual][0] += 1
return queue_actual
except Exception, e:
logger.warn("Could not write to queue %s" % preferred_queue)
continue
return None
def callback_handler(self, use_workers, worker_queues):
total_messages = 0
last_parent_events = {}
with Socket('callbacks', 'r') as callbacks:
for message in callbacks.listen():
total_messages += 1
@ -131,28 +149,16 @@ class CallbackReceiver(object):
else:
if message['event'] == 'playbook_on_stats':
job_parent_events = {}
queue_actual_worker = worker_queues[total_messages % WORKERS]
queue_actual_worker[0] += 1
try:
queue_actual_worker[1].put(message, block=True, timeout=2)
except Exception:
print("Queue workers went away, continuing...")
continue
if queue_actual_worker[0] >= MAX_REQUESTS:
queue_actual_worker[0] = 0
# print("Recycling worker process")
# queue_actual_worker[2].join()
# connection.close()
# w = Process(target=self.callback_worker, args=(queue_actual_worker[1],))
# w.daemon = True
# w.start()
# signal.signal(signal.SIGINT, shutdown_handler([w]))
# signal.signal(signal.SIGTERM, shutdown_handler([w]))
# queue_actual_worker[2] = w
actual_queue = self.write_queue_worker(total_messages % WORKERS, worker_queues, message)
# NOTE: It might be better to recycle the entire callback receiver process if one or more of the queues are too full
# the drawback is that if we under extremely high load we may be legitimately taking a while to process messages
if actual_queue is None:
logger.error("All queues full!")
sys.exit(1)
last_parent_events[message['job_id']] = job_parent_events
@transaction.commit_on_success
def process_job_event(self, data):
# Sanity check: Do we need to do anything at all?
event = data.get('event', '')
@ -194,24 +200,24 @@ class CallbackReceiver(object):
# If we get a database error of some kind, try again.
for retry_count in xrange(11):
try:
with transaction.atomic():
# If we're not in verbose mode, wipe out any module
# arguments.
res = data['event_data'].get('res', {})
if isinstance(res, dict):
i = res.get('invocation', {})
if verbose == 0 and 'module_args' in i:
i['module_args'] = ''
# If we're not in verbose mode, wipe out any module
# arguments.
res = data['event_data'].get('res', {})
if isinstance(res, dict):
i = res.get('invocation', {})
if verbose == 0 and 'module_args' in i:
i['module_args'] = ''
# Create a new JobEvent object.
job_event = JobEvent(**data)
if parent_id is not None:
job_event.parent = JobEvent.objects.get(id=parent_id)
job_event.save(post_process=True)
# Create a new JobEvent object.
job_event = JobEvent(**data)
if parent_id is not None:
job_event.parent = JobEvent.objects.get(id=parent_id)
job_event.save(post_process=True)
# Retrun the job event object.
return job_event
# Retrun the job event object.
return job_event
except DatabaseError as e:
transaction.rollback()
# Log the error and try again.
logger.error('Database error saving job event, retrying in '
'1 second (retry #%d): %s', retry_count + 1, e)
@ -221,16 +227,19 @@ class CallbackReceiver(object):
logger.error('Failed to save job event after %d retries.', retry_count)
return None
def callback_worker(self, queue_actual):
def callback_worker(self, queue_actual, idx):
messages_processed = 0
while True:
try:
message = queue_actual.get(block=True, timeout=1)
except Exception:
except QueueEmpty:
continue
except Exception, e:
logger.error("Exception on listen socket, restarting: " + str(e))
break
self.process_job_event(message)
messages_processed += 1
if messages_processed >= MAX_REQUESTS:
if messages_processed >= settings.JOB_EVENT_RECYCLE_THRESHOLD:
logger.info("Shutting down message receiver")
break

View File

@ -121,6 +121,8 @@ class CallbackModule(object):
def _start_connection(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.setsockopt(zmq.RCVTIMEO, 4000)
self.socket.setsockopt(zmq.LINGER, 2000)
self.socket.connect(self.callback_consumer_port)
def _post_job_event_queue_msg(self, event, event_data):
@ -146,16 +148,15 @@ class CallbackModule(object):
self._init_connection()
if self.context is None:
self._start_connection()
self.socket.send_json(msg)
self.socket.recv()
return
except Exception, e:
self.logger.info('Publish Exception: %r, retry=%d', e,
self.logger.info('Publish Job Event Exception: %r, retry=%d', e,
retry_count, exc_info=True)
# TODO: Maybe recycle connection here?
retry_count += 1
if retry_count >= 3:
raise
break
def _post_rest_api_event(self, event, event_data):
data = json.dumps({

View File

@ -317,6 +317,14 @@ ANSIBLE_FORCE_COLOR = True
# the celery task.
AWX_TASK_ENV = {}
# Maximum number of job events processed by the callback receiver worker process
# before it recycles
JOB_EVENT_RECYCLE_THRESHOLD = 3000
# Maximum number of job events that can be waiting on a single worker queue before
# it can be skipped as too busy
JOB_EVENT_MAX_QUEUE_SIZE = 100
# Flag to enable/disable updating hosts M2M when saving job events.
CAPTURE_JOB_EVENT_HOSTS = False