diff --git a/awx/main/migrations/0162_alter_unifiedjob_dependent_jobs.py b/awx/main/migrations/0162_alter_unifiedjob_dependent_jobs.py new file mode 100644 index 0000000000..e8f7c3c4c5 --- /dev/null +++ b/awx/main/migrations/0162_alter_unifiedjob_dependent_jobs.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2.13 on 2022-05-02 21:27 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0161_unifiedjob_host_status_counts'), + ] + + operations = [ + migrations.AlterField( + model_name='unifiedjob', + name='dependent_jobs', + field=models.ManyToManyField(editable=False, related_name='unifiedjob_blocked_jobs', to='main.UnifiedJob'), + ), + ] diff --git a/awx/main/models/mixins.py b/awx/main/models/mixins.py index 8396585bb5..d6c5e4e980 100644 --- a/awx/main/models/mixins.py +++ b/awx/main/models/mixins.py @@ -415,33 +415,47 @@ class TaskManagerJobMixin(TaskManagerUnifiedJobMixin): class Meta: abstract = True - def get_jobs_fail_chain(self): - return [self.project_update] if self.project_update else [] - def dependent_jobs_finished(self): - for j in self.dependent_jobs.all(): - if j.status in ['pending', 'waiting', 'running']: - return False - return True + # if any dependent jobs are pending, waiting, or running, return False + return not any(j.status in ACTIVE_STATES for j in self.dependent_jobs.all()) class TaskManagerUpdateOnLaunchMixin(TaskManagerUnifiedJobMixin): class Meta: abstract = True - def get_jobs_fail_chain(self): - return list(self.dependent_jobs.all()) - class TaskManagerProjectUpdateMixin(TaskManagerUpdateOnLaunchMixin): class Meta: abstract = True + def get_jobs_fail_chain(self): + return list(self.unifiedjob_blocked_jobs.all()) + class TaskManagerInventoryUpdateMixin(TaskManagerUpdateOnLaunchMixin): class Meta: abstract = True + def get_jobs_fail_chain(self): + blocked_jobs = list(self.unifiedjob_blocked_jobs.all()) + other_updates = [] + if blocked_jobs: + # blocked_jobs[0] is just a reference to a job that depends on this + # inventory update. + # We can look at the dependencies of this blocked job to find other + # inventory sources that are safe to fail. + # Since the dependencies could also include project updates, + # we need to check for type. + for dep in blocked_jobs[0].dependent_jobs.all(): + if type(dep) is type(self) and dep.id != self.id: + other_updates.append(dep) + return blocked_jobs + other_updates + + def dependent_jobs_finished(self): + # if any dependent jobs are pending, waiting, or running, return False + return not any(j.status in ACTIVE_STATES for j in self.dependent_jobs.all()) + class ExecutionEnvironmentMixin(models.Model): class Meta: diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index c924a12f6d..c47c42969a 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -575,7 +575,8 @@ class UnifiedJob( dependent_jobs = models.ManyToManyField( 'self', editable=False, - related_name='%(class)s_blocked_jobs+', + related_name='%(class)s_blocked_jobs', + symmetrical=False, ) execution_node = models.TextField( blank=True, diff --git a/awx/main/scheduler/dependency_graph.py b/awx/main/scheduler/dependency_graph.py index b336568bb2..48d2bd2971 100644 --- a/awx/main/scheduler/dependency_graph.py +++ b/awx/main/scheduler/dependency_graph.py @@ -26,7 +26,7 @@ class DependencyGraph(object): # The reason for tracking both inventory and inventory sources: # Consider InvA, which has two sources, InvSource1, InvSource2. # JobB might depend on InvA, which launches two updates, one for each source. - # To determine if JobB can run, we can just check InvA, which is marked in + # To determine if JobB can run, we can just check InvA, which is marked in # INVENTORY_UPDATES, instead of having to check for both entries in # INVENTORY_SOURCE_UPDATES. self.data[self.INVENTORY_UPDATES] = {} diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 05520f50d6..8de9cf3546 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -281,8 +281,10 @@ class TaskManager: for task in running_tasks: self.dependency_graph.add_job(task) - def create_project_update(self, task): - project_task = Project.objects.get(id=task.project_id).create_project_update(_eager_fields=dict(launch_type='dependency')) + def create_project_update(self, task, project_id=None): + if project_id is None: + project_id = task.project_id + project_task = Project.objects.get(id=project_id).create_project_update(_eager_fields=dict(launch_type='dependency')) # Project created 1 seconds behind project_task.created = task.created - timedelta(seconds=1) @@ -302,14 +304,10 @@ class TaskManager: # self.process_inventory_sources(inventory_sources) return inventory_task - def capture_chain_failure_dependencies(self, task, dependencies): + def add_dependencies(self, task, dependencies): with disable_activity_stream(): task.dependent_jobs.add(*dependencies) - for dep in dependencies: - # Add task + all deps except self - dep.dependent_jobs.add(*([task] + [d for d in dependencies if d != dep])) - def get_latest_inventory_update(self, inventory_source): latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created") if not latest_inventory_update.exists(): @@ -335,8 +333,8 @@ class TaskManager: return True return False - def get_latest_project_update(self, job): - latest_project_update = ProjectUpdate.objects.filter(project=job.project, job_type='check').order_by("-created") + def get_latest_project_update(self, project_id): + latest_project_update = ProjectUpdate.objects.filter(project=project_id, job_type='check').order_by("-created") if not latest_project_update.exists(): return None return latest_project_update.first() @@ -376,45 +374,71 @@ class TaskManager: return True return False + def gen_dep_for_job(self, task): + created_dependencies = [] + dependencies = [] + # TODO: Can remove task.project None check after scan-job-default-playbook is removed + if task.project is not None and task.project.scm_update_on_launch is True: + latest_project_update = self.get_latest_project_update(task.project_id) + if self.should_update_related_project(task, latest_project_update): + project_task = self.create_project_update(task) + created_dependencies.append(project_task) + dependencies.append(project_task) + else: + dependencies.append(latest_project_update) + + # Inventory created 2 seconds behind job + try: + start_args = json.loads(decrypt_field(task, field_name="start_args")) + except ValueError: + start_args = dict() + # generator for inventory sources related to this task + task_inv_sources = (invsrc for invsrc in self.all_inventory_sources if invsrc.inventory_id == task.inventory_id) + for inventory_source in task_inv_sources: + if "inventory_sources_already_updated" in start_args and inventory_source.id in start_args['inventory_sources_already_updated']: + continue + if not inventory_source.update_on_launch: + continue + latest_inventory_update = self.get_latest_inventory_update(inventory_source) + if self.should_update_inventory_source(task, latest_inventory_update): + inventory_task = self.create_inventory_update(task, inventory_source) + created_dependencies.append(inventory_task) + dependencies.append(inventory_task) + else: + dependencies.append(latest_inventory_update) + + if dependencies: + self.add_dependencies(task, dependencies) + + return created_dependencies + + def gen_dep_for_inventory_update(self, inventory_task): + created_dependencies = [] + if inventory_task.source == "scm": + invsrc = inventory_task.inventory_source + if not invsrc.source_project.scm_update_on_launch: + return created_dependencies + + latest_src_project_update = self.get_latest_project_update(invsrc.source_project_id) + if self.should_update_related_project(inventory_task, latest_src_project_update): + latest_src_project_update = self.create_project_update(inventory_task, project_id=invsrc.source_project_id) + created_dependencies.append(latest_src_project_update) + self.add_dependencies(inventory_task, [latest_src_project_update]) + + return created_dependencies + def generate_dependencies(self, undeped_tasks): created_dependencies = [] for task in undeped_tasks: task.log_lifecycle("acknowledged") - dependencies = [] - if not type(task) is Job: + if type(task) is Job: + created_dependencies += self.gen_dep_for_job(task) + elif type(task) is InventoryUpdate: + created_dependencies += self.gen_dep_for_inventory_update(task) + else: continue - # TODO: Can remove task.project None check after scan-job-default-playbook is removed - if task.project is not None and task.project.scm_update_on_launch is True: - latest_project_update = self.get_latest_project_update(task) - if self.should_update_related_project(task, latest_project_update): - project_task = self.create_project_update(task) - created_dependencies.append(project_task) - dependencies.append(project_task) - else: - dependencies.append(latest_project_update) - - # Inventory created 2 seconds behind job - try: - start_args = json.loads(decrypt_field(task, field_name="start_args")) - except ValueError: - start_args = dict() - for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]: - if "inventory_sources_already_updated" in start_args and inventory_source.id in start_args['inventory_sources_already_updated']: - continue - if not inventory_source.update_on_launch: - continue - latest_inventory_update = self.get_latest_inventory_update(inventory_source) - if self.should_update_inventory_source(task, latest_inventory_update): - inventory_task = self.create_inventory_update(task, inventory_source) - created_dependencies.append(inventory_task) - dependencies.append(inventory_task) - else: - dependencies.append(latest_inventory_update) - - if len(dependencies) > 0: - self.capture_chain_failure_dependencies(task, dependencies) - UnifiedJob.objects.filter(pk__in=[task.pk for task in undeped_tasks]).update(dependencies_processed=True) + return created_dependencies def process_pending_tasks(self, pending_tasks): @@ -572,6 +596,8 @@ class TaskManager: pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending'] undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed] dependencies = self.generate_dependencies(undeped_tasks) + deps_of_deps = self.generate_dependencies(dependencies) + dependencies += deps_of_deps self.process_pending_tasks(dependencies) self.process_pending_tasks(pending_tasks) diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index 9ceda70eed..c9194c8b87 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -324,6 +324,22 @@ def test_single_job_dependencies_inventory_update_launch(controlplane_instance_g TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance) +@pytest.mark.django_db +def test_inventory_update_launches_project_update(controlplane_instance_group, scm_inventory_source): + ii = scm_inventory_source + project = scm_inventory_source.source_project + project.scm_update_on_launch = True + project.save() + iu = ii.create_inventory_update() + iu.status = "pending" + iu.save() + with mock.patch("awx.main.scheduler.TaskManager.start_task"): + tm = TaskManager() + with mock.patch.object(TaskManager, "create_project_update", wraps=tm.create_project_update) as mock_pu: + tm.schedule() + mock_pu.assert_called_with(iu, project_id=project.id) + + @pytest.mark.django_db def test_job_dependency_with_already_updated(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory): objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) @@ -382,7 +398,7 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa pu = p.project_updates.first() iu = ii.inventory_updates.first() TaskManager.start_task.assert_has_calls( - [mock.call(iu, controlplane_instance_group, [j1, j2, pu], instance), mock.call(pu, controlplane_instance_group, [j1, j2, iu], instance)] + [mock.call(iu, controlplane_instance_group, [j1, j2], instance), mock.call(pu, controlplane_instance_group, [j1, j2], instance)] ) pu.status = "successful" pu.finished = pu.created + timedelta(seconds=1) @@ -464,7 +480,6 @@ def test_generate_dependencies_only_once(job_template_factory): job.status = "pending" job.name = "job_gen_dep" job.save() - with mock.patch("awx.main.scheduler.TaskManager.start_task"): # job starts with dependencies_processed as False assert not job.dependencies_processed @@ -478,10 +493,6 @@ def test_generate_dependencies_only_once(job_template_factory): # Run ._schedule() again, but make sure .generate_dependencies() is not # called with job in the argument list tm = TaskManager() - tm.generate_dependencies = mock.MagicMock() + tm.generate_dependencies = mock.MagicMock(return_value=[]) tm._schedule() - - # .call_args is tuple, (positional_args, kwargs), [0][0] then is - # the first positional arg, i.e. the first argument of - # .generate_dependencies() - assert tm.generate_dependencies.call_args[0][0] == [] + tm.generate_dependencies.assert_has_calls([mock.call([]), mock.call([])])