diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 6a65df95de..527b4b52fb 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -441,12 +441,12 @@ class AutoscalePool(WorkerPool): body.setdefault('kwargs', {}) if 'dispatch_time' in bind_kwargs: body['kwargs']['dispatch_time'] = tz_now().isoformat() - if 'active_task_ids' in bind_kwargs: - active_task_ids = [] + if 'worker_tasks' in bind_kwargs: + worker_tasks = {} for worker in self.workers: worker.calculate_managed_tasks() - active_task_ids.extend(list(worker.managed_tasks.keys())) - body['kwargs']['active_task_ids'] = active_task_ids + worker_tasks[worker.pid] = list(worker.managed_tasks.keys()) + body['kwargs']['worker_tasks'] = worker_tasks def up(self): if self.full: diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 598b3a680f..d4f067115e 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -483,8 +483,8 @@ def inspect_execution_nodes(instance_list): execution_node_health_check.apply_async([hostname]) -@task(queue=get_local_queuename, bind_kwargs=['dispatch_time', 'active_task_ids']) -def cluster_node_heartbeat(dispatch_time=None, active_task_ids=None): +@task(queue=get_local_queuename, bind_kwargs=['dispatch_time', 'worker_tasks']) +def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None): logger.debug("Cluster node heartbeat task.") nowtime = now() instance_list = list(Instance.objects.all()) @@ -564,8 +564,13 @@ def cluster_node_heartbeat(dispatch_time=None, active_task_ids=None): logger.exception('Error marking {} as lost'.format(other_inst.hostname)) # Run local reaper - if active_task_ids is not None: - reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time)) + if worker_tasks is not None: + active_task_ids = [] + for task_list in worker_tasks.values(): + active_task_ids.extend(task_list) + reaper.reap(instance=this_inst, excluded_uuids=active_task_ids) + if max(len(task_list) for task_list in worker_tasks.values()) <= 1: + reaper.reap_waiting(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time)) @task(queue=get_local_queuename)