add job to dependency graph in start task

We always add the job to the graph right before calling start task.
Reduce complexity of proper operation by just doing this in start_task,
because if you call start_task, you need to add it to the dependency
graph
This commit is contained in:
Elijah DeLee
2022-07-11 10:32:16 -04:00
committed by Seth Foster
parent bf89093fac
commit 7776a81e22

View File

@@ -529,6 +529,7 @@ class TaskManager(TaskBase):
@timeit @timeit
def start_task(self, task, instance_group, dependent_tasks=None, instance=None): def start_task(self, task, instance_group, dependent_tasks=None, instance=None):
self.dependency_graph.add_job(task)
self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1) self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1)
self.start_task_limit -= 1 self.start_task_limit -= 1
if self.start_task_limit == 0: if self.start_task_limit == 0:
@@ -618,7 +619,6 @@ class TaskManager(TaskBase):
if isinstance(task, WorkflowJob): if isinstance(task, WorkflowJob):
# Previously we were tracking allow_simultaneous blocking both here and in DependencyGraph. # Previously we were tracking allow_simultaneous blocking both here and in DependencyGraph.
# Double check that using just the DependencyGraph works for Workflows and Sliced Jobs. # Double check that using just the DependencyGraph works for Workflows and Sliced Jobs.
self.dependency_graph.add_job(task)
self.start_task(task, None, task.get_jobs_fail_chain(), None) self.start_task(task, None, task.get_jobs_fail_chain(), None)
continue continue
@@ -644,7 +644,6 @@ class TaskManager(TaskBase):
if task.capacity_type == 'control': if task.capacity_type == 'control':
task.execution_node = control_instance.hostname task.execution_node = control_instance.hostname
control_instance.consume_capacity(control_impact) control_instance.consume_capacity(control_impact)
self.dependency_graph.add_job(task)
execution_instance = self.instances[control_instance.hostname].obj execution_instance = self.instances[control_instance.hostname].obj
task.log_lifecycle("controller_node_chosen") task.log_lifecycle("controller_node_chosen")
task.log_lifecycle("execution_node_chosen") task.log_lifecycle("execution_node_chosen")
@@ -654,7 +653,6 @@ class TaskManager(TaskBase):
for instance_group in preferred_instance_groups: for instance_group in preferred_instance_groups:
if instance_group.is_container_group: if instance_group.is_container_group:
self.dependency_graph.add_job(task)
self.start_task(task, instance_group, task.get_jobs_fail_chain(), None) self.start_task(task, instance_group, task.get_jobs_fail_chain(), None)
found_acceptable_queue = True found_acceptable_queue = True
break break
@@ -686,7 +684,6 @@ class TaskManager(TaskBase):
) )
) )
execution_instance = self.instances[execution_instance.hostname].obj execution_instance = self.instances[execution_instance.hostname].obj
self.dependency_graph.add_job(task)
self.start_task(task, instance_group, task.get_jobs_fail_chain(), execution_instance) self.start_task(task, instance_group, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True found_acceptable_queue = True
break break