Change data structure so we can conditionally reap waiting jobs

This commit is contained in:
Alan Rominger 2022-08-17 16:00:30 -04:00
parent 7645cc2707
commit e0c59d12c1
No known key found for this signature in database
GPG Key ID: C2D7EAAA12B63559
2 changed files with 13 additions and 8 deletions

View File

@ -441,12 +441,12 @@ class AutoscalePool(WorkerPool):
body.setdefault('kwargs', {})
if 'dispatch_time' in bind_kwargs:
body['kwargs']['dispatch_time'] = tz_now().isoformat()
if 'active_task_ids' in bind_kwargs:
active_task_ids = []
if 'worker_tasks' in bind_kwargs:
worker_tasks = {}
for worker in self.workers:
worker.calculate_managed_tasks()
active_task_ids.extend(list(worker.managed_tasks.keys()))
body['kwargs']['active_task_ids'] = active_task_ids
worker_tasks[worker.pid] = list(worker.managed_tasks.keys())
body['kwargs']['worker_tasks'] = worker_tasks
def up(self):
if self.full:

View File

@ -483,8 +483,8 @@ def inspect_execution_nodes(instance_list):
execution_node_health_check.apply_async([hostname])
@task(queue=get_local_queuename, bind_kwargs=['dispatch_time', 'active_task_ids'])
def cluster_node_heartbeat(dispatch_time=None, active_task_ids=None):
@task(queue=get_local_queuename, bind_kwargs=['dispatch_time', 'worker_tasks'])
def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None):
logger.debug("Cluster node heartbeat task.")
nowtime = now()
instance_list = list(Instance.objects.all())
@ -564,8 +564,13 @@ def cluster_node_heartbeat(dispatch_time=None, active_task_ids=None):
logger.exception('Error marking {} as lost'.format(other_inst.hostname))
# Run local reaper
if active_task_ids is not None:
reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time))
if worker_tasks is not None:
active_task_ids = []
for task_list in worker_tasks.values():
active_task_ids.extend(task_list)
reaper.reap(instance=this_inst, excluded_uuids=active_task_ids)
if max(len(task_list) for task_list in worker_tasks.values()) <= 1:
reaper.reap_waiting(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time))
@task(queue=get_local_queuename)