TaskManager process dependencies only once

This adds a boolean "dependencies_processed" field to UnifiedJob
model. The default value is False. Once the task manager generates
dependencies for this task, it will not generate them again on
subsequent runs.

The changes also remove .process_dependencies(), as this method repeats
the same code as .process_pending_tasks(), and is not needed. Once
dependencies are generated, they are handled at .process_pending_tasks().

Adds a unit test that should catch regressions for this fix.
This commit is contained in:
Seth Foster
2020-02-06 11:47:33 -05:00
parent 30354dbcd0
commit 9b4b2167b3
5 changed files with 76 additions and 81 deletions

View File

@@ -23,6 +23,7 @@ from awx.main.models import (
Project,
ProjectUpdate,
SystemJob,
UnifiedJob,
WorkflowApproval,
WorkflowJob,
WorkflowJobTemplate
@@ -74,21 +75,6 @@ class TaskManager():
key=lambda task: task.created)
return all_tasks
def get_latest_project_update_tasks(self, all_sorted_tasks):
project_ids = set()
for task in all_sorted_tasks:
if isinstance(task, Job):
project_ids.add(task.project_id)
return ProjectUpdate.objects.filter(id__in=project_ids)
def get_latest_inventory_update_tasks(self, all_sorted_tasks):
inventory_ids = set()
for task in all_sorted_tasks:
if isinstance(task, Job):
inventory_ids.add(task.inventory_id)
return InventoryUpdate.objects.filter(id__in=inventory_ids)
def get_running_workflow_jobs(self):
graph_workflow_jobs = [wf for wf in
WorkflowJob.objects.filter(status='running')]
@@ -200,9 +186,6 @@ class TaskManager():
schedule_task_manager()
return result
def get_dependent_jobs_for_inv_and_proj_update(self, job_obj):
return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()]
def start_task(self, task, rampart_group, dependent_tasks=None, instance=None):
from awx.main.tasks import handle_work_error, handle_work_success
@@ -364,10 +347,6 @@ class TaskManager():
def should_update_inventory_source(self, job, latest_inventory_update):
now = tz_now()
# Already processed dependencies for this job
if job.dependent_jobs.all():
return False
if latest_inventory_update is None:
return True
'''
@@ -393,8 +372,6 @@ class TaskManager():
def should_update_related_project(self, job, latest_project_update):
now = tz_now()
if job.dependent_jobs.all():
return False
if latest_project_update is None:
return True
@@ -426,18 +403,21 @@ class TaskManager():
return True
return False
def generate_dependencies(self, task):
dependencies = []
if type(task) is Job:
def generate_dependencies(self, undeped_tasks):
created_dependencies = []
for task in undeped_tasks:
dependencies = []
if not type(task) is Job:
continue
# TODO: Can remove task.project None check after scan-job-default-playbook is removed
if task.project is not None and task.project.scm_update_on_launch is True:
latest_project_update = self.get_latest_project_update(task)
if self.should_update_related_project(task, latest_project_update):
project_task = self.create_project_update(task)
created_dependencies.append(project_task)
dependencies.append(project_task)
else:
if latest_project_update.status in ['waiting', 'pending', 'running']:
dependencies.append(latest_project_update)
dependencies.append(latest_project_update)
# Inventory created 2 seconds behind job
try:
@@ -452,56 +432,20 @@ class TaskManager():
latest_inventory_update = self.get_latest_inventory_update(inventory_source)
if self.should_update_inventory_source(task, latest_inventory_update):
inventory_task = self.create_inventory_update(task, inventory_source)
created_dependencies.append(inventory_task)
dependencies.append(inventory_task)
else:
if latest_inventory_update.status in ['waiting', 'pending', 'running']:
dependencies.append(latest_inventory_update)
dependencies.append(latest_inventory_update)
if len(dependencies) > 0:
self.capture_chain_failure_dependencies(task, dependencies)
return dependencies
def process_dependencies(self, dependent_task, dependency_tasks):
for task in dependency_tasks:
if self.is_job_blocked(task):
logger.debug("Dependent {} is blocked from running".format(task.log_format))
continue
preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False
idle_instance_that_fits = None
for rampart_group in preferred_instance_groups:
if idle_instance_that_fits is None:
idle_instance_that_fits = rampart_group.find_largest_idle_instance()
if not rampart_group.is_containerized and self.get_remaining_capacity(rampart_group.name) <= 0:
logger.debug("Skipping group {} capacity <= 0".format(rampart_group.name))
continue
execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task)
if execution_instance:
logger.debug("Starting dependent {} in group {} instance {}".format(
task.log_format, rampart_group.name, execution_instance.hostname))
elif not execution_instance and idle_instance_that_fits:
if not rampart_group.is_containerized:
execution_instance = idle_instance_that_fits
logger.debug("Starting dependent {} in group {} on idle instance {}".format(
task.log_format, rampart_group.name, execution_instance.hostname))
if execution_instance or rampart_group.is_containerized:
self.graph[rampart_group.name]['graph'].add_job(task)
tasks_to_fail = [t for t in dependency_tasks if t != task]
tasks_to_fail += [dependent_task]
self.start_task(task, rampart_group, tasks_to_fail, execution_instance)
found_acceptable_queue = True
break
else:
logger.debug("No instance available in group {} to run job {} w/ capacity requirement {}".format(
rampart_group.name, task.log_format, task.task_impact))
if not found_acceptable_queue:
logger.debug("Dependent {} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format))
UnifiedJob.objects.filter(pk__in = [task.pk for task in undeped_tasks]).update(dependencies_processed=True)
return created_dependencies
def process_pending_tasks(self, pending_tasks):
running_workflow_templates = set([wf.unified_job_template_id for wf in self.get_running_workflow_jobs()])
for task in pending_tasks:
self.process_dependencies(task, self.generate_dependencies(task))
if self.is_job_blocked(task):
logger.debug("{} is blocked from running".format(task.log_format))
continue
@@ -574,13 +518,6 @@ class TaskManager():
def calculate_capacity_consumed(self, tasks):
self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph)
def would_exceed_capacity(self, task, instance_group):
current_capacity = self.graph[instance_group]['consumed_capacity']
capacity_total = self.graph[instance_group]['capacity_total']
if current_capacity == 0:
return False
return (task.task_impact + current_capacity > capacity_total)
def consume_capacity(self, task, instance_group):
logger.debug('{} consumed {} capacity units from {} with prior total of {}'.format(
task.log_format, task.task_impact, instance_group,
@@ -598,6 +535,9 @@ class TaskManager():
self.process_running_tasks(running_tasks)
pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending']
undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed]
dependencies = self.generate_dependencies(undeped_tasks)
self.process_pending_tasks(dependencies)
self.process_pending_tasks(pending_tasks)
def _schedule(self):