Add back in cleanup call

This commit is contained in:
Alan Rominger
2022-07-27 11:50:53 -04:00
parent ccd46a1c0f
commit 6719010050

View File

@@ -378,8 +378,6 @@ class AutoscalePool(WorkerPool):
1. Discover worker processes that exited, and recover messages they 1. Discover worker processes that exited, and recover messages they
were handling. were handling.
2. Clean up unnecessary, idle workers. 2. Clean up unnecessary, idle workers.
3. Check to see if the database says this node is running any tasks
that aren't actually running. If so, reap them.
IMPORTANT: this function is one of the few places in the dispatcher IMPORTANT: this function is one of the few places in the dispatcher
(aside from setting lookups) where we talk to the database. As such, (aside from setting lookups) where we talk to the database. As such,
@@ -465,6 +463,9 @@ class AutoscalePool(WorkerPool):
try: try:
if isinstance(body, dict) and body.get('bind_kwargs'): if isinstance(body, dict) and body.get('bind_kwargs'):
self.add_bind_kwargs(body) self.add_bind_kwargs(body)
# when the cluster heartbeat occurs, clean up internally
if isinstance(body, dict) and 'cluster_node_heartbeat' in body['task']:
self.cleanup()
if self.should_grow: if self.should_grow:
self.up() self.up()
# we don't care about "preferred queue" round robin distribution, just # we don't care about "preferred queue" round robin distribution, just