diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 7c156bfbeb..841b587d8e 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -387,6 +387,8 @@ class AutoscalePool(WorkerPool): reaper.reap_job(j, 'failed') except Exception: logger.exception('failed to reap job UUID {}'.format(w.current_task['uuid'])) + else: + logger.warning(f'Worker was told to quit but has not, pid={w.pid}') orphaned.extend(w.orphaned_tasks) self.workers.remove(w) elif w.idle and len(self.workers) > self.min_workers: @@ -450,9 +452,6 @@ class AutoscalePool(WorkerPool): try: if isinstance(body, dict) and body.get('bind_kwargs'): self.add_bind_kwargs(body) - # when the cluster heartbeat occurs, clean up internally - if isinstance(body, dict) and 'cluster_node_heartbeat' in body['task']: - self.cleanup() if self.should_grow: self.up() # we don't care about "preferred queue" round robin distribution, just diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index ff05d9e846..25664d9737 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -114,7 +114,6 @@ class AWXConsumerBase(object): queue = 0 self.pool.write(queue, body) self.total_messages += 1 - self.record_statistics() @log_excess_runtime(logger) def record_statistics(self): @@ -156,6 +155,16 @@ class AWXConsumerPG(AWXConsumerBase): # if no successful loops have ran since startup, then we should fail right away self.pg_is_down = True # set so that we fail if we get database errors on startup self.pg_down_time = time.time() - self.pg_max_wait # allow no grace period + self.last_cleanup = time.time() + + def run_periodic_tasks(self): + self.record_statistics() # maintains time buffer in method + + if time.time() - self.last_cleanup > 60: # same as cluster_node_heartbeat + # NOTE: if we run out of database connections, it is important to still run cleanup + # so that we scale down workers and free up connections + self.pool.cleanup() + self.last_cleanup = time.time() def run(self, *args, **kwargs): super(AWXConsumerPG, self).run(*args, **kwargs) @@ -171,8 +180,10 @@ class AWXConsumerPG(AWXConsumerBase): if init is False: self.worker.on_start() init = True - for e in conn.events(): - self.process_task(json.loads(e.payload)) + for e in conn.events(yield_timeouts=True): + if e is not None: + self.process_task(json.loads(e.payload)) + self.run_periodic_tasks() self.pg_is_down = False if self.should_stop: return @@ -229,6 +240,8 @@ class BaseWorker(object): # so we can establish a new connection conn.close_if_unusable_or_obsolete() self.perform_work(body, *args) + except Exception: + logger.exception(f'Unhandled exception in perform_work in worker pid={os.getpid()}') finally: if 'uuid' in body: uuid = body['uuid']