mirror of
https://github.com/ansible/awx.git
synced 2026-01-18 13:11:19 -03:30
Merge pull request #5787 from fosterseth/tm_processed_field
Improve task manager performance for task dependencies Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
commit
ce5c4359ee
@ -0,0 +1,18 @@
|
||||
# Generated by Django 2.2.8 on 2020-02-06 16:43
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('main', '0107_v370_workflow_convergence_api_toggle'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='unifiedjob',
|
||||
name='dependencies_processed',
|
||||
field=models.BooleanField(default=False, editable=False, help_text='If True, the task manager has already processed potential dependencies for this job.'),
|
||||
),
|
||||
]
|
||||
@ -623,6 +623,11 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
||||
editable=False,
|
||||
help_text=_("The date and time the job was queued for starting."),
|
||||
)
|
||||
dependencies_processed = models.BooleanField(
|
||||
default=False,
|
||||
editable=False,
|
||||
help_text=_("If True, the task manager has already processed potential dependencies for this job.")
|
||||
)
|
||||
finished = models.DateTimeField(
|
||||
null=True,
|
||||
default=None,
|
||||
|
||||
@ -23,6 +23,7 @@ from awx.main.models import (
|
||||
Project,
|
||||
ProjectUpdate,
|
||||
SystemJob,
|
||||
UnifiedJob,
|
||||
WorkflowApproval,
|
||||
WorkflowJob,
|
||||
WorkflowJobTemplate
|
||||
@ -74,21 +75,6 @@ class TaskManager():
|
||||
key=lambda task: task.created)
|
||||
return all_tasks
|
||||
|
||||
|
||||
def get_latest_project_update_tasks(self, all_sorted_tasks):
|
||||
project_ids = set()
|
||||
for task in all_sorted_tasks:
|
||||
if isinstance(task, Job):
|
||||
project_ids.add(task.project_id)
|
||||
return ProjectUpdate.objects.filter(id__in=project_ids)
|
||||
|
||||
def get_latest_inventory_update_tasks(self, all_sorted_tasks):
|
||||
inventory_ids = set()
|
||||
for task in all_sorted_tasks:
|
||||
if isinstance(task, Job):
|
||||
inventory_ids.add(task.inventory_id)
|
||||
return InventoryUpdate.objects.filter(id__in=inventory_ids)
|
||||
|
||||
def get_running_workflow_jobs(self):
|
||||
graph_workflow_jobs = [wf for wf in
|
||||
WorkflowJob.objects.filter(status='running')]
|
||||
@ -200,9 +186,6 @@ class TaskManager():
|
||||
schedule_task_manager()
|
||||
return result
|
||||
|
||||
def get_dependent_jobs_for_inv_and_proj_update(self, job_obj):
|
||||
return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()]
|
||||
|
||||
def start_task(self, task, rampart_group, dependent_tasks=None, instance=None):
|
||||
from awx.main.tasks import handle_work_error, handle_work_success
|
||||
|
||||
@ -364,10 +347,6 @@ class TaskManager():
|
||||
def should_update_inventory_source(self, job, latest_inventory_update):
|
||||
now = tz_now()
|
||||
|
||||
# Already processed dependencies for this job
|
||||
if job.dependent_jobs.all():
|
||||
return False
|
||||
|
||||
if latest_inventory_update is None:
|
||||
return True
|
||||
'''
|
||||
@ -393,8 +372,6 @@ class TaskManager():
|
||||
|
||||
def should_update_related_project(self, job, latest_project_update):
|
||||
now = tz_now()
|
||||
if job.dependent_jobs.all():
|
||||
return False
|
||||
|
||||
if latest_project_update is None:
|
||||
return True
|
||||
@ -426,18 +403,21 @@ class TaskManager():
|
||||
return True
|
||||
return False
|
||||
|
||||
def generate_dependencies(self, task):
|
||||
dependencies = []
|
||||
if type(task) is Job:
|
||||
def generate_dependencies(self, undeped_tasks):
|
||||
created_dependencies = []
|
||||
for task in undeped_tasks:
|
||||
dependencies = []
|
||||
if not type(task) is Job:
|
||||
continue
|
||||
# TODO: Can remove task.project None check after scan-job-default-playbook is removed
|
||||
if task.project is not None and task.project.scm_update_on_launch is True:
|
||||
latest_project_update = self.get_latest_project_update(task)
|
||||
if self.should_update_related_project(task, latest_project_update):
|
||||
project_task = self.create_project_update(task)
|
||||
created_dependencies.append(project_task)
|
||||
dependencies.append(project_task)
|
||||
else:
|
||||
if latest_project_update.status in ['waiting', 'pending', 'running']:
|
||||
dependencies.append(latest_project_update)
|
||||
dependencies.append(latest_project_update)
|
||||
|
||||
# Inventory created 2 seconds behind job
|
||||
try:
|
||||
@ -452,56 +432,20 @@ class TaskManager():
|
||||
latest_inventory_update = self.get_latest_inventory_update(inventory_source)
|
||||
if self.should_update_inventory_source(task, latest_inventory_update):
|
||||
inventory_task = self.create_inventory_update(task, inventory_source)
|
||||
created_dependencies.append(inventory_task)
|
||||
dependencies.append(inventory_task)
|
||||
else:
|
||||
if latest_inventory_update.status in ['waiting', 'pending', 'running']:
|
||||
dependencies.append(latest_inventory_update)
|
||||
dependencies.append(latest_inventory_update)
|
||||
|
||||
if len(dependencies) > 0:
|
||||
self.capture_chain_failure_dependencies(task, dependencies)
|
||||
return dependencies
|
||||
|
||||
def process_dependencies(self, dependent_task, dependency_tasks):
|
||||
for task in dependency_tasks:
|
||||
if self.is_job_blocked(task):
|
||||
logger.debug("Dependent {} is blocked from running".format(task.log_format))
|
||||
continue
|
||||
preferred_instance_groups = task.preferred_instance_groups
|
||||
found_acceptable_queue = False
|
||||
idle_instance_that_fits = None
|
||||
for rampart_group in preferred_instance_groups:
|
||||
if idle_instance_that_fits is None:
|
||||
idle_instance_that_fits = rampart_group.find_largest_idle_instance()
|
||||
if not rampart_group.is_containerized and self.get_remaining_capacity(rampart_group.name) <= 0:
|
||||
logger.debug("Skipping group {} capacity <= 0".format(rampart_group.name))
|
||||
continue
|
||||
|
||||
execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task)
|
||||
if execution_instance:
|
||||
logger.debug("Starting dependent {} in group {} instance {}".format(
|
||||
task.log_format, rampart_group.name, execution_instance.hostname))
|
||||
elif not execution_instance and idle_instance_that_fits:
|
||||
if not rampart_group.is_containerized:
|
||||
execution_instance = idle_instance_that_fits
|
||||
logger.debug("Starting dependent {} in group {} on idle instance {}".format(
|
||||
task.log_format, rampart_group.name, execution_instance.hostname))
|
||||
if execution_instance or rampart_group.is_containerized:
|
||||
self.graph[rampart_group.name]['graph'].add_job(task)
|
||||
tasks_to_fail = [t for t in dependency_tasks if t != task]
|
||||
tasks_to_fail += [dependent_task]
|
||||
self.start_task(task, rampart_group, tasks_to_fail, execution_instance)
|
||||
found_acceptable_queue = True
|
||||
break
|
||||
else:
|
||||
logger.debug("No instance available in group {} to run job {} w/ capacity requirement {}".format(
|
||||
rampart_group.name, task.log_format, task.task_impact))
|
||||
if not found_acceptable_queue:
|
||||
logger.debug("Dependent {} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format))
|
||||
UnifiedJob.objects.filter(pk__in = [task.pk for task in undeped_tasks]).update(dependencies_processed=True)
|
||||
return created_dependencies
|
||||
|
||||
def process_pending_tasks(self, pending_tasks):
|
||||
running_workflow_templates = set([wf.unified_job_template_id for wf in self.get_running_workflow_jobs()])
|
||||
for task in pending_tasks:
|
||||
self.process_dependencies(task, self.generate_dependencies(task))
|
||||
if self.is_job_blocked(task):
|
||||
logger.debug("{} is blocked from running".format(task.log_format))
|
||||
continue
|
||||
@ -574,13 +518,6 @@ class TaskManager():
|
||||
def calculate_capacity_consumed(self, tasks):
|
||||
self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph)
|
||||
|
||||
def would_exceed_capacity(self, task, instance_group):
|
||||
current_capacity = self.graph[instance_group]['consumed_capacity']
|
||||
capacity_total = self.graph[instance_group]['capacity_total']
|
||||
if current_capacity == 0:
|
||||
return False
|
||||
return (task.task_impact + current_capacity > capacity_total)
|
||||
|
||||
def consume_capacity(self, task, instance_group):
|
||||
logger.debug('{} consumed {} capacity units from {} with prior total of {}'.format(
|
||||
task.log_format, task.task_impact, instance_group,
|
||||
@ -598,6 +535,9 @@ class TaskManager():
|
||||
self.process_running_tasks(running_tasks)
|
||||
|
||||
pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending']
|
||||
undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed]
|
||||
dependencies = self.generate_dependencies(undeped_tasks)
|
||||
self.process_pending_tasks(dependencies)
|
||||
self.process_pending_tasks(pending_tasks)
|
||||
|
||||
def _schedule(self):
|
||||
|
||||
@ -67,7 +67,7 @@ def test_multi_group_with_shared_dependency(instance_factory, default_instance_g
|
||||
pu = p.project_updates.first()
|
||||
TaskManager.start_task.assert_called_once_with(pu,
|
||||
default_instance_group,
|
||||
[j1],
|
||||
[j1,j2],
|
||||
default_instance_group.instances.all()[0])
|
||||
pu.finished = pu.created + timedelta(seconds=1)
|
||||
pu.status = "successful"
|
||||
@ -193,7 +193,7 @@ def test_instance_group_basic_policies(instance_factory, instance_group_factory)
|
||||
ig2 = InstanceGroup.objects.get(id=ig2.id)
|
||||
ig3 = InstanceGroup.objects.get(id=ig3.id)
|
||||
assert len(ig0.instances.all()) == 1
|
||||
assert i0 in ig0.instances.all()
|
||||
assert i0 in ig0.instances.all()
|
||||
assert len(InstanceGroup.objects.get(id=ig1.id).instances.all()) == 2
|
||||
assert i1 in ig1.instances.all()
|
||||
assert i2 in ig1.instances.all()
|
||||
|
||||
@ -6,7 +6,7 @@ from datetime import timedelta
|
||||
from awx.main.scheduler import TaskManager
|
||||
from awx.main.scheduler.dependency_graph import DependencyGraph
|
||||
from awx.main.utils import encrypt_field
|
||||
from awx.main.models import WorkflowJobTemplate, JobTemplate
|
||||
from awx.main.models import WorkflowJobTemplate, JobTemplate, Job
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@ -307,8 +307,8 @@ def test_shared_dependencies_launch(default_instance_group, job_template_factory
|
||||
TaskManager().schedule()
|
||||
pu = p.project_updates.first()
|
||||
iu = ii.inventory_updates.first()
|
||||
TaskManager.start_task.assert_has_calls([mock.call(pu, default_instance_group, [iu, j1], instance),
|
||||
mock.call(iu, default_instance_group, [pu, j1], instance)])
|
||||
TaskManager.start_task.assert_has_calls([mock.call(iu, default_instance_group, [j1, j2, pu], instance),
|
||||
mock.call(pu, default_instance_group, [j1, j2, iu], instance)])
|
||||
pu.status = "successful"
|
||||
pu.finished = pu.created + timedelta(seconds=1)
|
||||
pu.save()
|
||||
@ -383,3 +383,35 @@ def test_job_not_blocking_inventory_update(default_instance_group, job_template_
|
||||
dependency_graph = DependencyGraph(None)
|
||||
dependency_graph.add_job(job)
|
||||
assert not dependency_graph.is_job_blocked(inventory_update)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_generate_dependencies_only_once(job_template_factory):
|
||||
objects = job_template_factory('jt', organization='org1')
|
||||
|
||||
job = objects.job_template.create_job()
|
||||
job.status = "pending"
|
||||
job.name = "job_gen_dep"
|
||||
job.save()
|
||||
|
||||
|
||||
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
|
||||
# job starts with dependencies_processed as False
|
||||
assert not job.dependencies_processed
|
||||
# run one cycle of ._schedule() to generate dependencies
|
||||
TaskManager()._schedule()
|
||||
|
||||
# make sure dependencies_processed is now True
|
||||
job = Job.objects.filter(name="job_gen_dep")[0]
|
||||
assert job.dependencies_processed
|
||||
|
||||
# Run ._schedule() again, but make sure .generate_dependencies() is not
|
||||
# called with job in the argument list
|
||||
tm = TaskManager()
|
||||
tm.generate_dependencies = mock.MagicMock()
|
||||
tm._schedule()
|
||||
|
||||
# .call_args is tuple, (positional_args, kwargs), [0][0] then is
|
||||
# the first positional arg, i.e. the first argument of
|
||||
# .generate_dependencies()
|
||||
assert tm.generate_dependencies.call_args[0][0] == []
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user