mirror of
https://github.com/ansible/awx.git
synced 2026-03-22 19:35:02 -02:30
Fix dispatcher connection deadlock w scheduler and cleanup
This commit is contained in:
@@ -387,6 +387,8 @@ class AutoscalePool(WorkerPool):
|
|||||||
reaper.reap_job(j, 'failed')
|
reaper.reap_job(j, 'failed')
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception('failed to reap job UUID {}'.format(w.current_task['uuid']))
|
logger.exception('failed to reap job UUID {}'.format(w.current_task['uuid']))
|
||||||
|
else:
|
||||||
|
logger.warning(f'Worker was told to quit but has not, pid={w.pid}')
|
||||||
orphaned.extend(w.orphaned_tasks)
|
orphaned.extend(w.orphaned_tasks)
|
||||||
self.workers.remove(w)
|
self.workers.remove(w)
|
||||||
elif w.idle and len(self.workers) > self.min_workers:
|
elif w.idle and len(self.workers) > self.min_workers:
|
||||||
@@ -450,9 +452,6 @@ class AutoscalePool(WorkerPool):
|
|||||||
try:
|
try:
|
||||||
if isinstance(body, dict) and body.get('bind_kwargs'):
|
if isinstance(body, dict) and body.get('bind_kwargs'):
|
||||||
self.add_bind_kwargs(body)
|
self.add_bind_kwargs(body)
|
||||||
# when the cluster heartbeat occurs, clean up internally
|
|
||||||
if isinstance(body, dict) and 'cluster_node_heartbeat' in body['task']:
|
|
||||||
self.cleanup()
|
|
||||||
if self.should_grow:
|
if self.should_grow:
|
||||||
self.up()
|
self.up()
|
||||||
# we don't care about "preferred queue" round robin distribution, just
|
# we don't care about "preferred queue" round robin distribution, just
|
||||||
|
|||||||
@@ -114,7 +114,6 @@ class AWXConsumerBase(object):
|
|||||||
queue = 0
|
queue = 0
|
||||||
self.pool.write(queue, body)
|
self.pool.write(queue, body)
|
||||||
self.total_messages += 1
|
self.total_messages += 1
|
||||||
self.record_statistics()
|
|
||||||
|
|
||||||
@log_excess_runtime(logger)
|
@log_excess_runtime(logger)
|
||||||
def record_statistics(self):
|
def record_statistics(self):
|
||||||
@@ -156,6 +155,16 @@ class AWXConsumerPG(AWXConsumerBase):
|
|||||||
# if no successful loops have ran since startup, then we should fail right away
|
# if no successful loops have ran since startup, then we should fail right away
|
||||||
self.pg_is_down = True # set so that we fail if we get database errors on startup
|
self.pg_is_down = True # set so that we fail if we get database errors on startup
|
||||||
self.pg_down_time = time.time() - self.pg_max_wait # allow no grace period
|
self.pg_down_time = time.time() - self.pg_max_wait # allow no grace period
|
||||||
|
self.last_cleanup = time.time()
|
||||||
|
|
||||||
|
def run_periodic_tasks(self):
|
||||||
|
self.record_statistics() # maintains time buffer in method
|
||||||
|
|
||||||
|
if time.time() - self.last_cleanup > 60: # same as cluster_node_heartbeat
|
||||||
|
# NOTE: if we run out of database connections, it is important to still run cleanup
|
||||||
|
# so that we scale down workers and free up connections
|
||||||
|
self.pool.cleanup()
|
||||||
|
self.last_cleanup = time.time()
|
||||||
|
|
||||||
def run(self, *args, **kwargs):
|
def run(self, *args, **kwargs):
|
||||||
super(AWXConsumerPG, self).run(*args, **kwargs)
|
super(AWXConsumerPG, self).run(*args, **kwargs)
|
||||||
@@ -171,8 +180,10 @@ class AWXConsumerPG(AWXConsumerBase):
|
|||||||
if init is False:
|
if init is False:
|
||||||
self.worker.on_start()
|
self.worker.on_start()
|
||||||
init = True
|
init = True
|
||||||
for e in conn.events():
|
for e in conn.events(yield_timeouts=True):
|
||||||
self.process_task(json.loads(e.payload))
|
if e is not None:
|
||||||
|
self.process_task(json.loads(e.payload))
|
||||||
|
self.run_periodic_tasks()
|
||||||
self.pg_is_down = False
|
self.pg_is_down = False
|
||||||
if self.should_stop:
|
if self.should_stop:
|
||||||
return
|
return
|
||||||
@@ -229,6 +240,8 @@ class BaseWorker(object):
|
|||||||
# so we can establish a new connection
|
# so we can establish a new connection
|
||||||
conn.close_if_unusable_or_obsolete()
|
conn.close_if_unusable_or_obsolete()
|
||||||
self.perform_work(body, *args)
|
self.perform_work(body, *args)
|
||||||
|
except Exception:
|
||||||
|
logger.exception(f'Unhandled exception in perform_work in worker pid={os.getpid()}')
|
||||||
finally:
|
finally:
|
||||||
if 'uuid' in body:
|
if 'uuid' in body:
|
||||||
uuid = body['uuid']
|
uuid = body['uuid']
|
||||||
|
|||||||
Reference in New Issue
Block a user