diff --git a/awx/main/migrations/0108_v370_unifiedjob_dependencies_processed.py b/awx/main/migrations/0108_v370_unifiedjob_dependencies_processed.py new file mode 100644 index 0000000000..6c10b11083 --- /dev/null +++ b/awx/main/migrations/0108_v370_unifiedjob_dependencies_processed.py @@ -0,0 +1,18 @@ +# Generated by Django 2.2.8 on 2020-02-06 16:43 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0107_v370_workflow_convergence_api_toggle'), + ] + + operations = [ + migrations.AddField( + model_name='unifiedjob', + name='dependencies_processed', + field=models.BooleanField(default=False, editable=False, help_text='If True, the task manager has already processed potential dependencies for this job.'), + ), + ] diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 1617014540..67883744de 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -623,6 +623,11 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique editable=False, help_text=_("The date and time the job was queued for starting."), ) + dependencies_processed = models.BooleanField( + default=False, + editable=False, + help_text=_("If True, the task manager has already processed potential dependencies for this job.") + ) finished = models.DateTimeField( null=True, default=None, diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 32e8a7defb..9cd0aa1db9 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -23,6 +23,7 @@ from awx.main.models import ( Project, ProjectUpdate, SystemJob, + UnifiedJob, WorkflowApproval, WorkflowJob, WorkflowJobTemplate @@ -74,21 +75,6 @@ class TaskManager(): key=lambda task: task.created) return all_tasks - - def get_latest_project_update_tasks(self, all_sorted_tasks): - project_ids = set() - for task in all_sorted_tasks: - if isinstance(task, Job): - project_ids.add(task.project_id) - return ProjectUpdate.objects.filter(id__in=project_ids) - - def get_latest_inventory_update_tasks(self, all_sorted_tasks): - inventory_ids = set() - for task in all_sorted_tasks: - if isinstance(task, Job): - inventory_ids.add(task.inventory_id) - return InventoryUpdate.objects.filter(id__in=inventory_ids) - def get_running_workflow_jobs(self): graph_workflow_jobs = [wf for wf in WorkflowJob.objects.filter(status='running')] @@ -200,9 +186,6 @@ class TaskManager(): schedule_task_manager() return result - def get_dependent_jobs_for_inv_and_proj_update(self, job_obj): - return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()] - def start_task(self, task, rampart_group, dependent_tasks=None, instance=None): from awx.main.tasks import handle_work_error, handle_work_success @@ -364,10 +347,6 @@ class TaskManager(): def should_update_inventory_source(self, job, latest_inventory_update): now = tz_now() - # Already processed dependencies for this job - if job.dependent_jobs.all(): - return False - if latest_inventory_update is None: return True ''' @@ -393,8 +372,6 @@ class TaskManager(): def should_update_related_project(self, job, latest_project_update): now = tz_now() - if job.dependent_jobs.all(): - return False if latest_project_update is None: return True @@ -426,18 +403,21 @@ class TaskManager(): return True return False - def generate_dependencies(self, task): - dependencies = [] - if type(task) is Job: + def generate_dependencies(self, undeped_tasks): + created_dependencies = [] + for task in undeped_tasks: + dependencies = [] + if not type(task) is Job: + continue # TODO: Can remove task.project None check after scan-job-default-playbook is removed if task.project is not None and task.project.scm_update_on_launch is True: latest_project_update = self.get_latest_project_update(task) if self.should_update_related_project(task, latest_project_update): project_task = self.create_project_update(task) + created_dependencies.append(project_task) dependencies.append(project_task) else: - if latest_project_update.status in ['waiting', 'pending', 'running']: - dependencies.append(latest_project_update) + dependencies.append(latest_project_update) # Inventory created 2 seconds behind job try: @@ -452,56 +432,20 @@ class TaskManager(): latest_inventory_update = self.get_latest_inventory_update(inventory_source) if self.should_update_inventory_source(task, latest_inventory_update): inventory_task = self.create_inventory_update(task, inventory_source) + created_dependencies.append(inventory_task) dependencies.append(inventory_task) else: - if latest_inventory_update.status in ['waiting', 'pending', 'running']: - dependencies.append(latest_inventory_update) + dependencies.append(latest_inventory_update) if len(dependencies) > 0: self.capture_chain_failure_dependencies(task, dependencies) - return dependencies - def process_dependencies(self, dependent_task, dependency_tasks): - for task in dependency_tasks: - if self.is_job_blocked(task): - logger.debug("Dependent {} is blocked from running".format(task.log_format)) - continue - preferred_instance_groups = task.preferred_instance_groups - found_acceptable_queue = False - idle_instance_that_fits = None - for rampart_group in preferred_instance_groups: - if idle_instance_that_fits is None: - idle_instance_that_fits = rampart_group.find_largest_idle_instance() - if not rampart_group.is_containerized and self.get_remaining_capacity(rampart_group.name) <= 0: - logger.debug("Skipping group {} capacity <= 0".format(rampart_group.name)) - continue - - execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task) - if execution_instance: - logger.debug("Starting dependent {} in group {} instance {}".format( - task.log_format, rampart_group.name, execution_instance.hostname)) - elif not execution_instance and idle_instance_that_fits: - if not rampart_group.is_containerized: - execution_instance = idle_instance_that_fits - logger.debug("Starting dependent {} in group {} on idle instance {}".format( - task.log_format, rampart_group.name, execution_instance.hostname)) - if execution_instance or rampart_group.is_containerized: - self.graph[rampart_group.name]['graph'].add_job(task) - tasks_to_fail = [t for t in dependency_tasks if t != task] - tasks_to_fail += [dependent_task] - self.start_task(task, rampart_group, tasks_to_fail, execution_instance) - found_acceptable_queue = True - break - else: - logger.debug("No instance available in group {} to run job {} w/ capacity requirement {}".format( - rampart_group.name, task.log_format, task.task_impact)) - if not found_acceptable_queue: - logger.debug("Dependent {} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format)) + UnifiedJob.objects.filter(pk__in = [task.pk for task in undeped_tasks]).update(dependencies_processed=True) + return created_dependencies def process_pending_tasks(self, pending_tasks): running_workflow_templates = set([wf.unified_job_template_id for wf in self.get_running_workflow_jobs()]) for task in pending_tasks: - self.process_dependencies(task, self.generate_dependencies(task)) if self.is_job_blocked(task): logger.debug("{} is blocked from running".format(task.log_format)) continue @@ -574,13 +518,6 @@ class TaskManager(): def calculate_capacity_consumed(self, tasks): self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph) - def would_exceed_capacity(self, task, instance_group): - current_capacity = self.graph[instance_group]['consumed_capacity'] - capacity_total = self.graph[instance_group]['capacity_total'] - if current_capacity == 0: - return False - return (task.task_impact + current_capacity > capacity_total) - def consume_capacity(self, task, instance_group): logger.debug('{} consumed {} capacity units from {} with prior total of {}'.format( task.log_format, task.task_impact, instance_group, @@ -598,6 +535,9 @@ class TaskManager(): self.process_running_tasks(running_tasks) pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending'] + undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed] + dependencies = self.generate_dependencies(undeped_tasks) + self.process_pending_tasks(dependencies) self.process_pending_tasks(pending_tasks) def _schedule(self): diff --git a/awx/main/tests/functional/task_management/test_rampart_groups.py b/awx/main/tests/functional/task_management/test_rampart_groups.py index b459241f55..b763ef5ca3 100644 --- a/awx/main/tests/functional/task_management/test_rampart_groups.py +++ b/awx/main/tests/functional/task_management/test_rampart_groups.py @@ -67,7 +67,7 @@ def test_multi_group_with_shared_dependency(instance_factory, default_instance_g pu = p.project_updates.first() TaskManager.start_task.assert_called_once_with(pu, default_instance_group, - [j1], + [j1,j2], default_instance_group.instances.all()[0]) pu.finished = pu.created + timedelta(seconds=1) pu.status = "successful" @@ -193,7 +193,7 @@ def test_instance_group_basic_policies(instance_factory, instance_group_factory) ig2 = InstanceGroup.objects.get(id=ig2.id) ig3 = InstanceGroup.objects.get(id=ig3.id) assert len(ig0.instances.all()) == 1 - assert i0 in ig0.instances.all() + assert i0 in ig0.instances.all() assert len(InstanceGroup.objects.get(id=ig1.id).instances.all()) == 2 assert i1 in ig1.instances.all() assert i2 in ig1.instances.all() diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index 1ad3d7e035..9ba1e068c9 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -6,7 +6,7 @@ from datetime import timedelta from awx.main.scheduler import TaskManager from awx.main.scheduler.dependency_graph import DependencyGraph from awx.main.utils import encrypt_field -from awx.main.models import WorkflowJobTemplate, JobTemplate +from awx.main.models import WorkflowJobTemplate, JobTemplate, Job @pytest.mark.django_db @@ -307,8 +307,8 @@ def test_shared_dependencies_launch(default_instance_group, job_template_factory TaskManager().schedule() pu = p.project_updates.first() iu = ii.inventory_updates.first() - TaskManager.start_task.assert_has_calls([mock.call(pu, default_instance_group, [iu, j1], instance), - mock.call(iu, default_instance_group, [pu, j1], instance)]) + TaskManager.start_task.assert_has_calls([mock.call(iu, default_instance_group, [j1, j2, pu], instance), + mock.call(pu, default_instance_group, [j1, j2, iu], instance)]) pu.status = "successful" pu.finished = pu.created + timedelta(seconds=1) pu.save() @@ -383,3 +383,35 @@ def test_job_not_blocking_inventory_update(default_instance_group, job_template_ dependency_graph = DependencyGraph(None) dependency_graph.add_job(job) assert not dependency_graph.is_job_blocked(inventory_update) + + +@pytest.mark.django_db +def test_generate_dependencies_only_once(job_template_factory): + objects = job_template_factory('jt', organization='org1') + + job = objects.job_template.create_job() + job.status = "pending" + job.name = "job_gen_dep" + job.save() + + + with mock.patch("awx.main.scheduler.TaskManager.start_task"): + # job starts with dependencies_processed as False + assert not job.dependencies_processed + # run one cycle of ._schedule() to generate dependencies + TaskManager()._schedule() + + # make sure dependencies_processed is now True + job = Job.objects.filter(name="job_gen_dep")[0] + assert job.dependencies_processed + + # Run ._schedule() again, but make sure .generate_dependencies() is not + # called with job in the argument list + tm = TaskManager() + tm.generate_dependencies = mock.MagicMock() + tm._schedule() + + # .call_args is tuple, (positional_args, kwargs), [0][0] then is + # the first positional arg, i.e. the first argument of + # .generate_dependencies() + assert tm.generate_dependencies.call_args[0][0] == []