calculate remaining capacity in static method

this is to avoid additional queries when we allready have all
the active jobs fetched in the task manager
This commit is contained in:
Elijah DeLee
2022-04-12 11:17:51 -04:00
parent 49051c4aaf
commit 2e9974133a
2 changed files with 25 additions and 3 deletions

View File

@@ -161,6 +161,25 @@ class Instance(HasPolicyEditsMixin, BaseModel):
def remaining_capacity(self): def remaining_capacity(self):
return self.capacity - self.consumed_capacity return self.capacity - self.consumed_capacity
@staticmethod
def update_remaining_capacity(instances, jobs):
"""Takes mapping of hostname to SimpleNamespace instance like objects and a list of jobs.
Computes remaining capacity for all the instances based on currently running and waiting jobs.
No return value, updates the "remaining_capacity" field on the SimpleNamespace instance like object in place.
For use in the task manager to avoid refetching jobs from the database.
"""
for job in jobs:
if job.status not in ['waiting', 'running']:
continue
control_instance = instances.get(job.controller_node, '')
execution_instance = instances.get(job.execution_node, '')
if execution_instance and execution_instance.node_type in ('hybrid', 'execution'):
instances[job.execution_node].remaining_capacity -= job.task_impact
if control_instance and control_instance.node_type in ('hybrid', 'control'):
instances[job.controller_node].remaining_capacity -= settings.AWX_CONTROL_NODE_TASK_IMPACT
@property @property
def jobs_running(self): def jobs_running(self):
return UnifiedJob.objects.filter( return UnifiedJob.objects.filter(

View File

@@ -64,7 +64,7 @@ class TaskManager:
self.time_delta_job_explanation = timedelta(seconds=30) self.time_delta_job_explanation = timedelta(seconds=30)
def after_lock_init(self): def after_lock_init(self, all_sorted_tasks):
""" """
Init AFTER we know this instance of the task manager will run because the lock is acquired. Init AFTER we know this instance of the task manager will run because the lock is acquired.
""" """
@@ -77,7 +77,7 @@ class TaskManager:
SimpleNamespace( SimpleNamespace(
obj=instance, obj=instance,
node_type=instance.node_type, node_type=instance.node_type,
remaining_capacity=instance.remaining_capacity, remaining_capacity=instance.capacity, # Updated with Instance.update_remaining_capacity by looking at all active tasks
capacity=instance.capacity, capacity=instance.capacity,
hostname=instance.hostname, hostname=instance.hostname,
) )
@@ -86,6 +86,9 @@ class TaskManager:
instances_by_hostname = {i.hostname: i for i in instances_partial} instances_by_hostname = {i.hostname: i for i in instances_partial}
# updates remaining capacity value based on currently running and waiting tasks
Instance.update_remaining_capacity(instances_by_hostname, all_sorted_tasks)
for rampart_group in InstanceGroup.objects.prefetch_related('instances'): for rampart_group in InstanceGroup.objects.prefetch_related('instances'):
if rampart_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME: if rampart_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME:
self.controlplane_ig = rampart_group self.controlplane_ig = rampart_group
@@ -603,7 +606,7 @@ class TaskManager:
finished_wfjs = [] finished_wfjs = []
all_sorted_tasks = self.get_tasks() all_sorted_tasks = self.get_tasks()
self.after_lock_init() self.after_lock_init(all_sorted_tasks)
if len(all_sorted_tasks) > 0: if len(all_sorted_tasks) > 0:
# TODO: Deal with # TODO: Deal with