diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 9d7c956360..dc2fe76e01 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -161,6 +161,25 @@ class Instance(HasPolicyEditsMixin, BaseModel): def remaining_capacity(self): return self.capacity - self.consumed_capacity + @staticmethod + def update_remaining_capacity(instances, jobs): + """Takes mapping of hostname to SimpleNamespace instance like objects and a list of jobs. + + Computes remaining capacity for all the instances based on currently running and waiting jobs. + + No return value, updates the "remaining_capacity" field on the SimpleNamespace instance like object in place. + For use in the task manager to avoid refetching jobs from the database. + """ + for job in jobs: + if job.status not in ['waiting', 'running']: + continue + control_instance = instances.get(job.controller_node, '') + execution_instance = instances.get(job.execution_node, '') + if execution_instance and execution_instance.node_type in ('hybrid', 'execution'): + instances[job.execution_node].remaining_capacity -= job.task_impact + if control_instance and control_instance.node_type in ('hybrid', 'control'): + instances[job.controller_node].remaining_capacity -= settings.AWX_CONTROL_NODE_TASK_IMPACT + @property def jobs_running(self): return UnifiedJob.objects.filter( diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 62dace5968..2301f7ec00 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -64,7 +64,7 @@ class TaskManager: self.time_delta_job_explanation = timedelta(seconds=30) - def after_lock_init(self): + def after_lock_init(self, all_sorted_tasks): """ Init AFTER we know this instance of the task manager will run because the lock is acquired. """ @@ -77,7 +77,7 @@ class TaskManager: SimpleNamespace( obj=instance, node_type=instance.node_type, - remaining_capacity=instance.remaining_capacity, + remaining_capacity=instance.capacity, # Updated with Instance.update_remaining_capacity by looking at all active tasks capacity=instance.capacity, hostname=instance.hostname, ) @@ -86,6 +86,9 @@ class TaskManager: instances_by_hostname = {i.hostname: i for i in instances_partial} + # updates remaining capacity value based on currently running and waiting tasks + Instance.update_remaining_capacity(instances_by_hostname, all_sorted_tasks) + for rampart_group in InstanceGroup.objects.prefetch_related('instances'): if rampart_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME: self.controlplane_ig = rampart_group @@ -603,7 +606,7 @@ class TaskManager: finished_wfjs = [] all_sorted_tasks = self.get_tasks() - self.after_lock_init() + self.after_lock_init(all_sorted_tasks) if len(all_sorted_tasks) > 0: # TODO: Deal with