From 9b4b2167b31dd4f957e3920b8018257cef557dcf Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Thu, 6 Feb 2020 11:47:33 -0500 Subject: [PATCH] TaskManager process dependencies only once This adds a boolean "dependencies_processed" field to UnifiedJob model. The default value is False. Once the task manager generates dependencies for this task, it will not generate them again on subsequent runs. The changes also remove .process_dependencies(), as this method repeats the same code as .process_pending_tasks(), and is not needed. Once dependencies are generated, they are handled at .process_pending_tasks(). Adds a unit test that should catch regressions for this fix. --- ..._v370_unifiedjob_dependencies_processed.py | 18 ++++ awx/main/models/unified_jobs.py | 5 + awx/main/scheduler/task_manager.py | 92 ++++--------------- .../task_management/test_rampart_groups.py | 4 +- .../task_management/test_scheduler.py | 38 +++++++- 5 files changed, 76 insertions(+), 81 deletions(-) create mode 100644 awx/main/migrations/0108_v370_unifiedjob_dependencies_processed.py diff --git a/awx/main/migrations/0108_v370_unifiedjob_dependencies_processed.py b/awx/main/migrations/0108_v370_unifiedjob_dependencies_processed.py new file mode 100644 index 0000000000..6c10b11083 --- /dev/null +++ b/awx/main/migrations/0108_v370_unifiedjob_dependencies_processed.py @@ -0,0 +1,18 @@ +# Generated by Django 2.2.8 on 2020-02-06 16:43 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0107_v370_workflow_convergence_api_toggle'), + ] + + operations = [ + migrations.AddField( + model_name='unifiedjob', + name='dependencies_processed', + field=models.BooleanField(default=False, editable=False, help_text='If True, the task manager has already processed potential dependencies for this job.'), + ), + ] diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 1617014540..67883744de 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -623,6 +623,11 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique editable=False, help_text=_("The date and time the job was queued for starting."), ) + dependencies_processed = models.BooleanField( + default=False, + editable=False, + help_text=_("If True, the task manager has already processed potential dependencies for this job.") + ) finished = models.DateTimeField( null=True, default=None, diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 32e8a7defb..9cd0aa1db9 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -23,6 +23,7 @@ from awx.main.models import ( Project, ProjectUpdate, SystemJob, + UnifiedJob, WorkflowApproval, WorkflowJob, WorkflowJobTemplate @@ -74,21 +75,6 @@ class TaskManager(): key=lambda task: task.created) return all_tasks - - def get_latest_project_update_tasks(self, all_sorted_tasks): - project_ids = set() - for task in all_sorted_tasks: - if isinstance(task, Job): - project_ids.add(task.project_id) - return ProjectUpdate.objects.filter(id__in=project_ids) - - def get_latest_inventory_update_tasks(self, all_sorted_tasks): - inventory_ids = set() - for task in all_sorted_tasks: - if isinstance(task, Job): - inventory_ids.add(task.inventory_id) - return InventoryUpdate.objects.filter(id__in=inventory_ids) - def get_running_workflow_jobs(self): graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')] @@ -200,9 +186,6 @@ class TaskManager(): schedule_task_manager() return result - def get_dependent_jobs_for_inv_and_proj_update(self, job_obj): - return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()] - def start_task(self, task, rampart_group, dependent_tasks=None, instance=None): from awx.main.tasks import handle_work_error, handle_work_success @@ -364,10 +347,6 @@ class TaskManager(): def should_update_inventory_source(self, job, latest_inventory_update): now = tz_now() - # Already processed dependencies for this job - if job.dependent_jobs.all(): - return False - if latest_inventory_update is None: return True ''' @@ -393,8 +372,6 @@ class TaskManager(): def should_update_related_project(self, job, latest_project_update): now = tz_now() - if job.dependent_jobs.all(): - return False if latest_project_update is None: return True @@ -426,18 +403,21 @@ class TaskManager(): return True return False - def generate_dependencies(self, task): - dependencies = [] - if type(task) is Job: + def generate_dependencies(self, undeped_tasks): + created_dependencies = [] + for task in undeped_tasks: + dependencies = [] + if not type(task) is Job: + continue # TODO: Can remove task.project None check after scan-job-default-playbook is removed if task.project is not None and task.project.scm_update_on_launch is True: latest_project_update = self.get_latest_project_update(task) if self.should_update_related_project(task, latest_project_update): project_task = self.create_project_update(task) + created_dependencies.append(project_task) dependencies.append(project_task) else: - if latest_project_update.status in ['waiting', 'pending', 'running']: - dependencies.append(latest_project_update) + dependencies.append(latest_project_update) # Inventory created 2 seconds behind job try: @@ -452,56 +432,20 @@ class TaskManager(): latest_inventory_update = self.get_latest_inventory_update(inventory_source) if self.should_update_inventory_source(task, latest_inventory_update): inventory_task = self.create_inventory_update(task, inventory_source) + created_dependencies.append(inventory_task) dependencies.append(inventory_task) else: - if latest_inventory_update.status in ['waiting', 'pending', 'running']: - dependencies.append(latest_inventory_update) + dependencies.append(latest_inventory_update) if len(dependencies) > 0: self.capture_chain_failure_dependencies(task, dependencies) - return dependencies - def process_dependencies(self, dependent_task, dependency_tasks): - for task in dependency_tasks: - if self.is_job_blocked(task): - logger.debug("Dependent {} is blocked from running".format(task.log_format)) - continue - preferred_instance_groups = task.preferred_instance_groups - found_acceptable_queue = False - idle_instance_that_fits = None - for rampart_group in preferred_instance_groups: - if idle_instance_that_fits is None: - idle_instance_that_fits = rampart_group.find_largest_idle_instance() - if not rampart_group.is_containerized and self.get_remaining_capacity(rampart_group.name) <= 0: - logger.debug("Skipping group {} capacity <= 0".format(rampart_group.name)) - continue - - execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task) - if execution_instance: - logger.debug("Starting dependent {} in group {} instance {}".format( - task.log_format, rampart_group.name, execution_instance.hostname)) - elif not execution_instance and idle_instance_that_fits: - if not rampart_group.is_containerized: - execution_instance = idle_instance_that_fits - logger.debug("Starting dependent {} in group {} on idle instance {}".format( - task.log_format, rampart_group.name, execution_instance.hostname)) - if execution_instance or rampart_group.is_containerized: - self.graph[rampart_group.name]['graph'].add_job(task) - tasks_to_fail = [t for t in dependency_tasks if t != task] - tasks_to_fail += [dependent_task] - self.start_task(task, rampart_group, tasks_to_fail, execution_instance) - found_acceptable_queue = True - break - else: - logger.debug("No instance available in group {} to run job {} w/ capacity requirement {}".format( - rampart_group.name, task.log_format, task.task_impact)) - if not found_acceptable_queue: - logger.debug("Dependent {} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format)) + UnifiedJob.objects.filter(pk__in = [task.pk for task in undeped_tasks]).update(dependencies_processed=True) + return created_dependencies def process_pending_tasks(self, pending_tasks): running_workflow_templates = set([wf.unified_job_template_id for wf in self.get_running_workflow_jobs()]) for task in pending_tasks: - self.process_dependencies(task, self.generate_dependencies(task)) if self.is_job_blocked(task): logger.debug("{} is blocked from running".format(task.log_format)) continue @@ -574,13 +518,6 @@ class TaskManager(): def calculate_capacity_consumed(self, tasks): self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph) - def would_exceed_capacity(self, task, instance_group): - current_capacity = self.graph[instance_group]['consumed_capacity'] - capacity_total = self.graph[instance_group]['capacity_total'] - if current_capacity == 0: - return False - return (task.task_impact + current_capacity > capacity_total) - def consume_capacity(self, task, instance_group): logger.debug('{} consumed {} capacity units from {} with prior total of {}'.format( task.log_format, task.task_impact, instance_group, @@ -598,6 +535,9 @@ class TaskManager(): self.process_running_tasks(running_tasks) pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending'] + undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed] + dependencies = self.generate_dependencies(undeped_tasks) + self.process_pending_tasks(dependencies) self.process_pending_tasks(pending_tasks) def _schedule(self): diff --git a/awx/main/tests/functional/task_management/test_rampart_groups.py b/awx/main/tests/functional/task_management/test_rampart_groups.py index b459241f55..b763ef5ca3 100644 --- a/awx/main/tests/functional/task_management/test_rampart_groups.py +++ b/awx/main/tests/functional/task_management/test_rampart_groups.py @@ -67,7 +67,7 @@ def test_multi_group_with_shared_dependency(instance_factory, default_instance_g pu = p.project_updates.first() TaskManager.start_task.assert_called_once_with(pu, default_instance_group, - [j1], + [j1,j2], default_instance_group.instances.all()[0]) pu.finished = pu.created + timedelta(seconds=1) pu.status = "successful" @@ -193,7 +193,7 @@ def test_instance_group_basic_policies(instance_factory, instance_group_factory) ig2 = InstanceGroup.objects.get(id=ig2.id) ig3 = InstanceGroup.objects.get(id=ig3.id) assert len(ig0.instances.all()) == 1 - assert i0 in ig0.instances.all() + assert i0 in ig0.instances.all() assert len(InstanceGroup.objects.get(id=ig1.id).instances.all()) == 2 assert i1 in ig1.instances.all() assert i2 in ig1.instances.all() diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index 1ad3d7e035..9ba1e068c9 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -6,7 +6,7 @@ from datetime import timedelta from awx.main.scheduler import TaskManager from awx.main.scheduler.dependency_graph import DependencyGraph from awx.main.utils import encrypt_field -from awx.main.models import WorkflowJobTemplate, JobTemplate +from awx.main.models import WorkflowJobTemplate, JobTemplate, Job @pytest.mark.django_db @@ -307,8 +307,8 @@ def test_shared_dependencies_launch(default_instance_group, job_template_factory TaskManager().schedule() pu = p.project_updates.first() iu = ii.inventory_updates.first() - TaskManager.start_task.assert_has_calls([mock.call(pu, default_instance_group, [iu, j1], instance), - mock.call(iu, default_instance_group, [pu, j1], instance)]) + TaskManager.start_task.assert_has_calls([mock.call(iu, default_instance_group, [j1, j2, pu], instance), + mock.call(pu, default_instance_group, [j1, j2, iu], instance)]) pu.status = "successful" pu.finished = pu.created + timedelta(seconds=1) pu.save() @@ -383,3 +383,35 @@ def test_job_not_blocking_inventory_update(default_instance_group, job_template_ dependency_graph = DependencyGraph(None) dependency_graph.add_job(job) assert not dependency_graph.is_job_blocked(inventory_update) + + +@pytest.mark.django_db +def test_generate_dependencies_only_once(job_template_factory): + objects = job_template_factory('jt', organization='org1') + + job = objects.job_template.create_job() + job.status = "pending" + job.name = "job_gen_dep" + job.save() + + + with mock.patch("awx.main.scheduler.TaskManager.start_task"): + # job starts with dependencies_processed as False + assert not job.dependencies_processed + # run one cycle of ._schedule() to generate dependencies + TaskManager()._schedule() + + # make sure dependencies_processed is now True + job = Job.objects.filter(name="job_gen_dep")[0] + assert job.dependencies_processed + + # Run ._schedule() again, but make sure .generate_dependencies() is not + # called with job in the argument list + tm = TaskManager() + tm.generate_dependencies = mock.MagicMock() + tm._schedule() + + # .call_args is tuple, (positional_args, kwargs), [0][0] then is + # the first positional arg, i.e. the first argument of + # .generate_dependencies() + assert tm.generate_dependencies.call_args[0][0] == []