Revert "Only fetch fields we need in task manager"

This reverts commit 868e811b3f.

Turns out this does not play well with polymorphic models.

Will try again with .defer()
This commit is contained in:
Elijah DeLee
2022-04-14 11:21:07 -04:00
parent 778862fe51
commit e24fc43a45

View File

@@ -68,7 +68,8 @@ class TaskManager:
"""
Init AFTER we know this instance of the task manager will run because the lock is acquired.
"""
instances = Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop').only('node_type', 'capacity', 'hostname', 'enabled')
instances = Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop')
self.real_instances = {i.hostname: i for i in instances}
self.controlplane_ig = None
self.dependency_graph = DependencyGraph()
@@ -84,12 +85,11 @@ class TaskManager:
]
instances_by_hostname = {i.hostname: i for i in instances_partial}
self.instances_by_hostname = instances_by_hostname
# updates remaining capacity value based on currently running and waiting tasks
Instance.update_remaining_capacity(instances_by_hostname, all_sorted_tasks)
for rampart_group in InstanceGroup.objects.prefetch_related('instances').only('name', 'instances'):
for rampart_group in InstanceGroup.objects.prefetch_related('instances'):
if rampart_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME:
self.controlplane_ig = rampart_group
self.graph[rampart_group.name] = dict(
@@ -114,23 +114,16 @@ class TaskManager:
return None
def get_tasks(self, status_list=('pending', 'waiting', 'running')):
common_needed_values = ['celery_task_id', 'controller_node', 'created', 'execution_node', 'instance_group', 'job_explanation', 'name', 'pk', 'status']
inv_needed_values = ['inventory_source']
jobs = [j for j in Job.objects.filter(status__in=status_list).prefetch_related('instance_group').only(*common_needed_values)]
jobs = [j for j in Job.objects.filter(status__in=status_list).prefetch_related('instance_group')]
inventory_updates_qs = (
InventoryUpdate.objects.filter(status__in=status_list)
.exclude(source='file')
.prefetch_related('inventory_source', 'instance_group')
.only(*(common_needed_values + inv_needed_values))
InventoryUpdate.objects.filter(status__in=status_list).exclude(source='file').prefetch_related('inventory_source', 'instance_group')
)
inventory_updates = [i for i in inventory_updates_qs]
# Notice the job_type='check': we want to prevent implicit project updates from blocking our jobs.
project_updates = [
p for p in ProjectUpdate.objects.filter(status__in=status_list, job_type='check').prefetch_related('instance_group').only(*common_needed_values)
]
system_jobs = [s for s in SystemJob.objects.filter(status__in=status_list).prefetch_related('instance_group').only(*common_needed_values)]
ad_hoc_commands = [a for a in AdHocCommand.objects.filter(status__in=status_list).prefetch_related('instance_group').only(*common_needed_values)]
workflow_jobs = [w for w in WorkflowJob.objects.filter(status__in=status_list).only(*common_needed_values)]
project_updates = [p for p in ProjectUpdate.objects.filter(status__in=status_list, job_type='check').prefetch_related('instance_group')]
system_jobs = [s for s in SystemJob.objects.filter(status__in=status_list).prefetch_related('instance_group')]
ad_hoc_commands = [a for a in AdHocCommand.objects.filter(status__in=status_list).prefetch_related('instance_group')]
workflow_jobs = [w for w in WorkflowJob.objects.filter(status__in=status_list)]
all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, key=lambda task: task.created)
return all_tasks
@@ -500,7 +493,7 @@ class TaskManager:
task.execution_node = control_instance.hostname
control_instance.remaining_capacity = max(0, control_instance.remaining_capacity - control_impact)
self.dependency_graph.add_job(task)
execution_instance = self.instances_by_hostname[control_instance.hostname].obj
execution_instance = self.real_instances[control_instance.hostname]
task.log_lifecycle("controller_node_chosen")
task.log_lifecycle("execution_node_chosen")
self.start_task(task, self.controlplane_ig, task.get_jobs_fail_chain(), execution_instance)
@@ -540,7 +533,7 @@ class TaskManager:
task.log_format, rampart_group.name, execution_instance.hostname, execution_instance.remaining_capacity
)
)
execution_instance = self.instances_by_hostname[execution_instance.hostname].obj
execution_instance = self.real_instances[execution_instance.hostname]
self.dependency_graph.add_job(task)
self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True