From 1b662fcca53e99b26c18e5cfa54bcf839d681606 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Tue, 26 Apr 2022 14:17:12 -0400 Subject: [PATCH 1/3] SCM inv source trigger project update - scm based inventory sources should launch project updates prior to running inventory updates for that source. - fixes scenario where a job is based on projectA, but the inventory source is based on projectB. Running the job will likely trigger a sync for projectA, but not projectB. comments --- .../0162_alter_unifiedjob_dependent_jobs.py | 18 +++ awx/main/models/mixins.py | 34 ++++-- awx/main/models/unified_jobs.py | 3 +- awx/main/scheduler/dependency_graph.py | 2 +- awx/main/scheduler/task_manager.py | 110 +++++++++++------- .../task_management/test_scheduler.py | 27 +++-- 6 files changed, 132 insertions(+), 62 deletions(-) create mode 100644 awx/main/migrations/0162_alter_unifiedjob_dependent_jobs.py diff --git a/awx/main/migrations/0162_alter_unifiedjob_dependent_jobs.py b/awx/main/migrations/0162_alter_unifiedjob_dependent_jobs.py new file mode 100644 index 0000000000..e8f7c3c4c5 --- /dev/null +++ b/awx/main/migrations/0162_alter_unifiedjob_dependent_jobs.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2.13 on 2022-05-02 21:27 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0161_unifiedjob_host_status_counts'), + ] + + operations = [ + migrations.AlterField( + model_name='unifiedjob', + name='dependent_jobs', + field=models.ManyToManyField(editable=False, related_name='unifiedjob_blocked_jobs', to='main.UnifiedJob'), + ), + ] diff --git a/awx/main/models/mixins.py b/awx/main/models/mixins.py index 8396585bb5..d6c5e4e980 100644 --- a/awx/main/models/mixins.py +++ b/awx/main/models/mixins.py @@ -415,33 +415,47 @@ class TaskManagerJobMixin(TaskManagerUnifiedJobMixin): class Meta: abstract = True - def get_jobs_fail_chain(self): - return [self.project_update] if self.project_update else [] - def dependent_jobs_finished(self): - for j in self.dependent_jobs.all(): - if j.status in ['pending', 'waiting', 'running']: - return False - return True + # if any dependent jobs are pending, waiting, or running, return False + return not any(j.status in ACTIVE_STATES for j in self.dependent_jobs.all()) class TaskManagerUpdateOnLaunchMixin(TaskManagerUnifiedJobMixin): class Meta: abstract = True - def get_jobs_fail_chain(self): - return list(self.dependent_jobs.all()) - class TaskManagerProjectUpdateMixin(TaskManagerUpdateOnLaunchMixin): class Meta: abstract = True + def get_jobs_fail_chain(self): + return list(self.unifiedjob_blocked_jobs.all()) + class TaskManagerInventoryUpdateMixin(TaskManagerUpdateOnLaunchMixin): class Meta: abstract = True + def get_jobs_fail_chain(self): + blocked_jobs = list(self.unifiedjob_blocked_jobs.all()) + other_updates = [] + if blocked_jobs: + # blocked_jobs[0] is just a reference to a job that depends on this + # inventory update. + # We can look at the dependencies of this blocked job to find other + # inventory sources that are safe to fail. + # Since the dependencies could also include project updates, + # we need to check for type. + for dep in blocked_jobs[0].dependent_jobs.all(): + if type(dep) is type(self) and dep.id != self.id: + other_updates.append(dep) + return blocked_jobs + other_updates + + def dependent_jobs_finished(self): + # if any dependent jobs are pending, waiting, or running, return False + return not any(j.status in ACTIVE_STATES for j in self.dependent_jobs.all()) + class ExecutionEnvironmentMixin(models.Model): class Meta: diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index c924a12f6d..c47c42969a 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -575,7 +575,8 @@ class UnifiedJob( dependent_jobs = models.ManyToManyField( 'self', editable=False, - related_name='%(class)s_blocked_jobs+', + related_name='%(class)s_blocked_jobs', + symmetrical=False, ) execution_node = models.TextField( blank=True, diff --git a/awx/main/scheduler/dependency_graph.py b/awx/main/scheduler/dependency_graph.py index b336568bb2..48d2bd2971 100644 --- a/awx/main/scheduler/dependency_graph.py +++ b/awx/main/scheduler/dependency_graph.py @@ -26,7 +26,7 @@ class DependencyGraph(object): # The reason for tracking both inventory and inventory sources: # Consider InvA, which has two sources, InvSource1, InvSource2. # JobB might depend on InvA, which launches two updates, one for each source. - # To determine if JobB can run, we can just check InvA, which is marked in + # To determine if JobB can run, we can just check InvA, which is marked in # INVENTORY_UPDATES, instead of having to check for both entries in # INVENTORY_SOURCE_UPDATES. self.data[self.INVENTORY_UPDATES] = {} diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 05520f50d6..8de9cf3546 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -281,8 +281,10 @@ class TaskManager: for task in running_tasks: self.dependency_graph.add_job(task) - def create_project_update(self, task): - project_task = Project.objects.get(id=task.project_id).create_project_update(_eager_fields=dict(launch_type='dependency')) + def create_project_update(self, task, project_id=None): + if project_id is None: + project_id = task.project_id + project_task = Project.objects.get(id=project_id).create_project_update(_eager_fields=dict(launch_type='dependency')) # Project created 1 seconds behind project_task.created = task.created - timedelta(seconds=1) @@ -302,14 +304,10 @@ class TaskManager: # self.process_inventory_sources(inventory_sources) return inventory_task - def capture_chain_failure_dependencies(self, task, dependencies): + def add_dependencies(self, task, dependencies): with disable_activity_stream(): task.dependent_jobs.add(*dependencies) - for dep in dependencies: - # Add task + all deps except self - dep.dependent_jobs.add(*([task] + [d for d in dependencies if d != dep])) - def get_latest_inventory_update(self, inventory_source): latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created") if not latest_inventory_update.exists(): @@ -335,8 +333,8 @@ class TaskManager: return True return False - def get_latest_project_update(self, job): - latest_project_update = ProjectUpdate.objects.filter(project=job.project, job_type='check').order_by("-created") + def get_latest_project_update(self, project_id): + latest_project_update = ProjectUpdate.objects.filter(project=project_id, job_type='check').order_by("-created") if not latest_project_update.exists(): return None return latest_project_update.first() @@ -376,45 +374,71 @@ class TaskManager: return True return False + def gen_dep_for_job(self, task): + created_dependencies = [] + dependencies = [] + # TODO: Can remove task.project None check after scan-job-default-playbook is removed + if task.project is not None and task.project.scm_update_on_launch is True: + latest_project_update = self.get_latest_project_update(task.project_id) + if self.should_update_related_project(task, latest_project_update): + project_task = self.create_project_update(task) + created_dependencies.append(project_task) + dependencies.append(project_task) + else: + dependencies.append(latest_project_update) + + # Inventory created 2 seconds behind job + try: + start_args = json.loads(decrypt_field(task, field_name="start_args")) + except ValueError: + start_args = dict() + # generator for inventory sources related to this task + task_inv_sources = (invsrc for invsrc in self.all_inventory_sources if invsrc.inventory_id == task.inventory_id) + for inventory_source in task_inv_sources: + if "inventory_sources_already_updated" in start_args and inventory_source.id in start_args['inventory_sources_already_updated']: + continue + if not inventory_source.update_on_launch: + continue + latest_inventory_update = self.get_latest_inventory_update(inventory_source) + if self.should_update_inventory_source(task, latest_inventory_update): + inventory_task = self.create_inventory_update(task, inventory_source) + created_dependencies.append(inventory_task) + dependencies.append(inventory_task) + else: + dependencies.append(latest_inventory_update) + + if dependencies: + self.add_dependencies(task, dependencies) + + return created_dependencies + + def gen_dep_for_inventory_update(self, inventory_task): + created_dependencies = [] + if inventory_task.source == "scm": + invsrc = inventory_task.inventory_source + if not invsrc.source_project.scm_update_on_launch: + return created_dependencies + + latest_src_project_update = self.get_latest_project_update(invsrc.source_project_id) + if self.should_update_related_project(inventory_task, latest_src_project_update): + latest_src_project_update = self.create_project_update(inventory_task, project_id=invsrc.source_project_id) + created_dependencies.append(latest_src_project_update) + self.add_dependencies(inventory_task, [latest_src_project_update]) + + return created_dependencies + def generate_dependencies(self, undeped_tasks): created_dependencies = [] for task in undeped_tasks: task.log_lifecycle("acknowledged") - dependencies = [] - if not type(task) is Job: + if type(task) is Job: + created_dependencies += self.gen_dep_for_job(task) + elif type(task) is InventoryUpdate: + created_dependencies += self.gen_dep_for_inventory_update(task) + else: continue - # TODO: Can remove task.project None check after scan-job-default-playbook is removed - if task.project is not None and task.project.scm_update_on_launch is True: - latest_project_update = self.get_latest_project_update(task) - if self.should_update_related_project(task, latest_project_update): - project_task = self.create_project_update(task) - created_dependencies.append(project_task) - dependencies.append(project_task) - else: - dependencies.append(latest_project_update) - - # Inventory created 2 seconds behind job - try: - start_args = json.loads(decrypt_field(task, field_name="start_args")) - except ValueError: - start_args = dict() - for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]: - if "inventory_sources_already_updated" in start_args and inventory_source.id in start_args['inventory_sources_already_updated']: - continue - if not inventory_source.update_on_launch: - continue - latest_inventory_update = self.get_latest_inventory_update(inventory_source) - if self.should_update_inventory_source(task, latest_inventory_update): - inventory_task = self.create_inventory_update(task, inventory_source) - created_dependencies.append(inventory_task) - dependencies.append(inventory_task) - else: - dependencies.append(latest_inventory_update) - - if len(dependencies) > 0: - self.capture_chain_failure_dependencies(task, dependencies) - UnifiedJob.objects.filter(pk__in=[task.pk for task in undeped_tasks]).update(dependencies_processed=True) + return created_dependencies def process_pending_tasks(self, pending_tasks): @@ -572,6 +596,8 @@ class TaskManager: pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending'] undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed] dependencies = self.generate_dependencies(undeped_tasks) + deps_of_deps = self.generate_dependencies(dependencies) + dependencies += deps_of_deps self.process_pending_tasks(dependencies) self.process_pending_tasks(pending_tasks) diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index 9ceda70eed..c9194c8b87 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -324,6 +324,22 @@ def test_single_job_dependencies_inventory_update_launch(controlplane_instance_g TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance) +@pytest.mark.django_db +def test_inventory_update_launches_project_update(controlplane_instance_group, scm_inventory_source): + ii = scm_inventory_source + project = scm_inventory_source.source_project + project.scm_update_on_launch = True + project.save() + iu = ii.create_inventory_update() + iu.status = "pending" + iu.save() + with mock.patch("awx.main.scheduler.TaskManager.start_task"): + tm = TaskManager() + with mock.patch.object(TaskManager, "create_project_update", wraps=tm.create_project_update) as mock_pu: + tm.schedule() + mock_pu.assert_called_with(iu, project_id=project.id) + + @pytest.mark.django_db def test_job_dependency_with_already_updated(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory): objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) @@ -382,7 +398,7 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa pu = p.project_updates.first() iu = ii.inventory_updates.first() TaskManager.start_task.assert_has_calls( - [mock.call(iu, controlplane_instance_group, [j1, j2, pu], instance), mock.call(pu, controlplane_instance_group, [j1, j2, iu], instance)] + [mock.call(iu, controlplane_instance_group, [j1, j2], instance), mock.call(pu, controlplane_instance_group, [j1, j2], instance)] ) pu.status = "successful" pu.finished = pu.created + timedelta(seconds=1) @@ -464,7 +480,6 @@ def test_generate_dependencies_only_once(job_template_factory): job.status = "pending" job.name = "job_gen_dep" job.save() - with mock.patch("awx.main.scheduler.TaskManager.start_task"): # job starts with dependencies_processed as False assert not job.dependencies_processed @@ -478,10 +493,6 @@ def test_generate_dependencies_only_once(job_template_factory): # Run ._schedule() again, but make sure .generate_dependencies() is not # called with job in the argument list tm = TaskManager() - tm.generate_dependencies = mock.MagicMock() + tm.generate_dependencies = mock.MagicMock(return_value=[]) tm._schedule() - - # .call_args is tuple, (positional_args, kwargs), [0][0] then is - # the first positional arg, i.e. the first argument of - # .generate_dependencies() - assert tm.generate_dependencies.call_args[0][0] == [] + tm.generate_dependencies.assert_has_calls([mock.call([]), mock.call([])]) From 0ae9fe3624a3225ef63ea75f0aa36a48ee2cd884 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Tue, 10 May 2022 17:19:07 -0400 Subject: [PATCH 2/3] if dependency fails, fail job in task manager --- awx/main/models/mixins.py | 23 +++++++++++------------ awx/main/scheduler/task_manager.py | 22 ++++++++++++++++++---- awx/main/tasks/system.py | 2 +- 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/awx/main/models/mixins.py b/awx/main/models/mixins.py index d6c5e4e980..34e05fa818 100644 --- a/awx/main/models/mixins.py +++ b/awx/main/models/mixins.py @@ -407,18 +407,11 @@ class TaskManagerUnifiedJobMixin(models.Model): def get_jobs_fail_chain(self): return [] - def dependent_jobs_finished(self): - return True - class TaskManagerJobMixin(TaskManagerUnifiedJobMixin): class Meta: abstract = True - def dependent_jobs_finished(self): - # if any dependent jobs are pending, waiting, or running, return False - return not any(j.status in ACTIVE_STATES for j in self.dependent_jobs.all()) - class TaskManagerUpdateOnLaunchMixin(TaskManagerUnifiedJobMixin): class Meta: @@ -430,7 +423,17 @@ class TaskManagerProjectUpdateMixin(TaskManagerUpdateOnLaunchMixin): abstract = True def get_jobs_fail_chain(self): - return list(self.unifiedjob_blocked_jobs.all()) + # project update can be a dependency of an inventory update, in which + # case we need to fail the job that may have spawned the inventory + # update. + # The inventory update will fail, but since it is not running it will + # not cascade fail to the job from the errback logic in apply_async. As + # such we should capture it here. + blocked_jobs = list(self.unifiedjob_blocked_jobs.all().prefetch_related("unifiedjob_blocked_jobs")) + other_tasks = [] + for b in blocked_jobs: + other_tasks += list(b.unifiedjob_blocked_jobs.all()) + return blocked_jobs + other_tasks class TaskManagerInventoryUpdateMixin(TaskManagerUpdateOnLaunchMixin): @@ -452,10 +455,6 @@ class TaskManagerInventoryUpdateMixin(TaskManagerUpdateOnLaunchMixin): other_updates.append(dep) return blocked_jobs + other_updates - def dependent_jobs_finished(self): - # if any dependent jobs are pending, waiting, or running, return False - return not any(j.status in ACTIVE_STATES for j in self.dependent_jobs.all()) - class ExecutionEnvironmentMixin(models.Model): class Meta: diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 8de9cf3546..3d80973840 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -34,6 +34,7 @@ from awx.main.utils.pglock import advisory_lock from awx.main.utils import get_type_for_model, task_manager_bulk_reschedule, schedule_task_manager from awx.main.utils.common import create_partition from awx.main.signals import disable_activity_stream +from awx.main.constants import ACTIVE_STATES from awx.main.scheduler.dependency_graph import DependencyGraph from awx.main.scheduler.task_manager_models import TaskManagerInstances from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups @@ -79,10 +80,23 @@ class TaskManager: if blocked_by: return blocked_by - if not task.dependent_jobs_finished(): - blocked_by = task.dependent_jobs.first() - if blocked_by: - return blocked_by + for dep in task.dependent_jobs.all(): + if dep.status in ACTIVE_STATES: + return dep + # if we detect a failed or error dependency, go ahead and fail this + # task. The errback on the dependency takes some time to trigger, + # and we don't want the task to enter running state if its + # dependency has failed or errored. + elif dep.status in ("error", "failed"): + task.status = 'failed' + task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % ( + get_type_for_model(type(dep)), + dep.name, + dep.id, + ) + task.save(update_fields=['status', 'job_explanation']) + task.websocket_emit_status('failed') + return dep return None diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 274ff546f1..8c698609a5 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -695,7 +695,7 @@ def handle_work_error(task_id, *args, **kwargs): first_instance = instance first_instance_type = each_task['type'] - if instance.celery_task_id != task_id and not instance.cancel_flag and not instance.status == 'successful': + if instance.celery_task_id != task_id and not instance.cancel_flag and not instance.status in ('successful', 'failed'): instance.status = 'failed' instance.failed = True if not instance.job_explanation: From eba4a3f1c244564aabd8039019cb996646883fe3 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Thu, 12 May 2022 15:21:17 -0400 Subject: [PATCH 3/3] in case we fail a job in task manager, we need to add the project update to the inventoryupdate.source_project field --- awx/main/scheduler/task_manager.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 3d80973840..6fa200fcd6 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -395,11 +395,9 @@ class TaskManager: if task.project is not None and task.project.scm_update_on_launch is True: latest_project_update = self.get_latest_project_update(task.project_id) if self.should_update_related_project(task, latest_project_update): - project_task = self.create_project_update(task) - created_dependencies.append(project_task) - dependencies.append(project_task) - else: - dependencies.append(latest_project_update) + latest_project_update = self.create_project_update(task) + created_dependencies.append(latest_project_update) + dependencies.append(latest_project_update) # Inventory created 2 seconds behind job try: @@ -438,7 +436,7 @@ class TaskManager: latest_src_project_update = self.create_project_update(inventory_task, project_id=invsrc.source_project_id) created_dependencies.append(latest_src_project_update) self.add_dependencies(inventory_task, [latest_src_project_update]) - + latest_src_project_update.scm_inventory_updates.add(inventory_task) return created_dependencies def generate_dependencies(self, undeped_tasks):