From ab5cc2e69c68a1fa763d181ec15b986af78e531d Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 27 Jul 2023 15:42:29 -0400 Subject: [PATCH] Simplifications for DependencyManager (#13533) --- awx/main/scheduler/task_manager.py | 200 ++++++------------ .../task_management/test_scheduler.py | 52 ++--- docs/task_manager_system.md | 22 +- 3 files changed, 102 insertions(+), 172 deletions(-) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 2a6e8fc776..95d47b04c3 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -25,7 +25,6 @@ from awx.main.models import ( InventoryUpdate, Job, Project, - ProjectUpdate, UnifiedJob, WorkflowApproval, WorkflowJob, @@ -281,184 +280,115 @@ class WorkflowManager(TaskBase): class DependencyManager(TaskBase): def __init__(self): super().__init__(prefix="dependency_manager") + self.all_projects = {} + self.all_inventory_sources = {} - def create_project_update(self, task, project_id=None): - if project_id is None: - project_id = task.project_id - project_task = Project.objects.get(id=project_id).create_project_update(_eager_fields=dict(launch_type='dependency')) - - # Project created 1 seconds behind - project_task.created = task.created - timedelta(seconds=1) - project_task.status = 'pending' - project_task.save() - logger.debug('Spawned {} as dependency of {}'.format(project_task.log_format, task.log_format)) - return project_task - - def create_inventory_update(self, task, inventory_source_task): - inventory_task = InventorySource.objects.get(id=inventory_source_task.id).create_inventory_update(_eager_fields=dict(launch_type='dependency')) - - inventory_task.created = task.created - timedelta(seconds=2) - inventory_task.status = 'pending' - inventory_task.save() - logger.debug('Spawned {} as dependency of {}'.format(inventory_task.log_format, task.log_format)) - - return inventory_task - - def add_dependencies(self, task, dependencies): - with disable_activity_stream(): - task.dependent_jobs.add(*dependencies) - - def get_inventory_source_tasks(self): + def cache_projects_and_sources(self, task_list): + project_ids = set() inventory_ids = set() - for task in self.all_tasks: + for task in task_list: if isinstance(task, Job): - inventory_ids.add(task.inventory_id) - self.all_inventory_sources = [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] + if task.project_id: + project_ids.add(task.project_id) + if task.inventory_id: + inventory_ids.add(task.inventory_id) + elif isinstance(task, InventoryUpdate): + if task.inventory_source and task.inventory_source.source_project_id: + project_ids.add(task.inventory_source.source_project_id) - def get_latest_inventory_update(self, inventory_source): - latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created") - if not latest_inventory_update.exists(): - return None - return latest_inventory_update.first() + for proj in Project.objects.filter(id__in=project_ids, scm_update_on_launch=True): + self.all_projects[proj.id] = proj - def should_update_inventory_source(self, job, latest_inventory_update): - now = tz_now() + for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True): + self.all_inventory_sources.setdefault(invsrc.inventory_id, []) + self.all_inventory_sources[invsrc.inventory_id].append(invsrc) - if latest_inventory_update is None: + @staticmethod + def should_update_again(update, cache_timeout): + ''' + If it has never updated, we need to update + If there is already an update in progress then we do not need to a new create one + If the last update failed, we always need to try and update again + If current time is more than cache_timeout after last update, then we need a new one + ''' + if (update is None) or (update.status in ['failed', 'canceled', 'error']): return True - ''' - If there's already a inventory update utilizing this job that's about to run - then we don't need to create one - ''' - if latest_inventory_update.status in ['waiting', 'pending', 'running']: + if update.status in ['waiting', 'pending', 'running']: return False - timeout_seconds = timedelta(seconds=latest_inventory_update.inventory_source.update_cache_timeout) - if (latest_inventory_update.finished + timeout_seconds) < now: - return True - if latest_inventory_update.inventory_source.update_on_launch is True and latest_inventory_update.status in ['failed', 'canceled', 'error']: - return True - return False + return bool(((update.finished + timedelta(seconds=cache_timeout))) < tz_now()) - def get_latest_project_update(self, project_id): - latest_project_update = ProjectUpdate.objects.filter(project=project_id, job_type='check').order_by("-created") - if not latest_project_update.exists(): - return None - return latest_project_update.first() - - def should_update_related_project(self, job, latest_project_update): - now = tz_now() - - if latest_project_update is None: - return True - - if latest_project_update.status in ['failed', 'canceled']: - return True - - ''' - If there's already a project update utilizing this job that's about to run - then we don't need to create one - ''' - if latest_project_update.status in ['waiting', 'pending', 'running']: - return False - - ''' - If the latest project update has a created time == job_created_time-1 - then consider the project update found. This is so we don't enter an infinite loop - of updating the project when cache timeout is 0. - ''' - if ( - latest_project_update.project.scm_update_cache_timeout == 0 - and latest_project_update.launch_type == 'dependency' - and latest_project_update.created == job.created - timedelta(seconds=1) - ): - return False - ''' - Normal Cache Timeout Logic - ''' - timeout_seconds = timedelta(seconds=latest_project_update.project.scm_update_cache_timeout) - if (latest_project_update.finished + timeout_seconds) < now: - return True - return False + def get_or_create_project_update(self, project_id): + project = self.all_projects.get(project_id, None) + if project is not None: + latest_project_update = project.project_updates.filter(job_type='check').order_by("-created").first() + if self.should_update_again(latest_project_update, project.scm_update_cache_timeout): + project_task = project.create_project_update(_eager_fields=dict(launch_type='dependency')) + project_task.signal_start() + return [project_task] + else: + return [latest_project_update] + return [] def gen_dep_for_job(self, task): - created_dependencies = [] - dependencies = [] - # TODO: Can remove task.project None check after scan-job-default-playbook is removed - if task.project is not None and task.project.scm_update_on_launch is True: - latest_project_update = self.get_latest_project_update(task.project_id) - if self.should_update_related_project(task, latest_project_update): - latest_project_update = self.create_project_update(task) - created_dependencies.append(latest_project_update) - dependencies.append(latest_project_update) + dependencies = self.get_or_create_project_update(task.project_id) - # Inventory created 2 seconds behind job try: start_args = json.loads(decrypt_field(task, field_name="start_args")) except ValueError: start_args = dict() - # generator for inventory sources related to this task - task_inv_sources = (invsrc for invsrc in self.all_inventory_sources if invsrc.inventory_id == task.inventory_id) - for inventory_source in task_inv_sources: + # generator for update-on-launch inventory sources related to this task + for inventory_source in self.all_inventory_sources.get(task.inventory_id, []): if "inventory_sources_already_updated" in start_args and inventory_source.id in start_args['inventory_sources_already_updated']: continue - if not inventory_source.update_on_launch: - continue - latest_inventory_update = self.get_latest_inventory_update(inventory_source) - if self.should_update_inventory_source(task, latest_inventory_update): - inventory_task = self.create_inventory_update(task, inventory_source) - created_dependencies.append(inventory_task) + latest_inventory_update = inventory_source.inventory_updates.order_by("-created").first() + if self.should_update_again(latest_inventory_update, inventory_source.update_cache_timeout): + inventory_task = inventory_source.create_inventory_update(_eager_fields=dict(launch_type='dependency')) + inventory_task.signal_start() dependencies.append(inventory_task) else: dependencies.append(latest_inventory_update) - if dependencies: - self.add_dependencies(task, dependencies) - - return created_dependencies + return dependencies def gen_dep_for_inventory_update(self, inventory_task): - created_dependencies = [] if inventory_task.source == "scm": invsrc = inventory_task.inventory_source - if not invsrc.source_project.scm_update_on_launch: - return created_dependencies - - latest_src_project_update = self.get_latest_project_update(invsrc.source_project_id) - if self.should_update_related_project(inventory_task, latest_src_project_update): - latest_src_project_update = self.create_project_update(inventory_task, project_id=invsrc.source_project_id) - created_dependencies.append(latest_src_project_update) - self.add_dependencies(inventory_task, [latest_src_project_update]) - latest_src_project_update.scm_inventory_updates.add(inventory_task) - return created_dependencies + if invsrc: + return self.get_or_create_project_update(invsrc.source_project_id) + return [] @timeit def generate_dependencies(self, undeped_tasks): - created_dependencies = [] + dependencies = [] + self.cache_projects_and_sources(undeped_tasks) for task in undeped_tasks: task.log_lifecycle("acknowledged") if type(task) is Job: - created_dependencies += self.gen_dep_for_job(task) + job_deps = self.gen_dep_for_job(task) elif type(task) is InventoryUpdate: - created_dependencies += self.gen_dep_for_inventory_update(task) + job_deps = self.gen_dep_for_inventory_update(task) else: continue + if job_deps: + dependencies += job_deps + with disable_activity_stream(): + task.dependent_jobs.add(*dependencies) + logger.debug(f'Linked {[dep.log_format for dep in dependencies]} as dependencies of {task.log_format}') + UnifiedJob.objects.filter(pk__in=[task.pk for task in undeped_tasks]).update(dependencies_processed=True) - return created_dependencies - - def process_tasks(self): - deps = self.generate_dependencies(self.all_tasks) - self.generate_dependencies(deps) - self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(self.all_tasks) + len(deps)) + return dependencies @timeit def _schedule(self): self.get_tasks(dict(status__in=["pending"], dependencies_processed=False)) if len(self.all_tasks) > 0: - self.get_inventory_source_tasks() - self.process_tasks() + deps = self.generate_dependencies(self.all_tasks) + undeped_deps = [dep for dep in deps if dep.dependencies_processed is False] + self.generate_dependencies(undeped_deps) + self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(self.all_tasks) + len(undeped_deps)) ScheduleTaskManager().schedule() diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index 42d144d5cc..743609c760 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -331,15 +331,13 @@ def test_single_job_dependencies_project_launch(controlplane_instance_group, job p.save(skip_update=True) with mock.patch("awx.main.scheduler.TaskManager.start_task"): dm = DependencyManager() - with mock.patch.object(DependencyManager, "create_project_update", wraps=dm.create_project_update) as mock_pu: - dm.schedule() - mock_pu.assert_called_once_with(j) - pu = [x for x in p.project_updates.all()] - assert len(pu) == 1 - TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(pu[0], controlplane_instance_group, instance) - pu[0].status = "successful" - pu[0].save() + dm.schedule() + pu = [x for x in p.project_updates.all()] + assert len(pu) == 1 + TaskManager().schedule() + TaskManager.start_task.assert_called_once_with(pu[0], controlplane_instance_group, instance) + pu[0].status = "successful" + pu[0].save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, instance) @@ -359,15 +357,14 @@ def test_single_job_dependencies_inventory_update_launch(controlplane_instance_g i.inventory_sources.add(ii) with mock.patch("awx.main.scheduler.TaskManager.start_task"): dm = DependencyManager() - with mock.patch.object(DependencyManager, "create_inventory_update", wraps=dm.create_inventory_update) as mock_iu: - dm.schedule() - mock_iu.assert_called_once_with(j, ii) - iu = [x for x in ii.inventory_updates.all()] - assert len(iu) == 1 - TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(iu[0], controlplane_instance_group, instance) - iu[0].status = "successful" - iu[0].save() + dm.schedule() + assert ii.inventory_updates.count() == 1 + iu = [x for x in ii.inventory_updates.all()] + assert len(iu) == 1 + TaskManager().schedule() + TaskManager.start_task.assert_called_once_with(iu[0], controlplane_instance_group, instance) + iu[0].status = "successful" + iu[0].save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, instance) @@ -382,11 +379,11 @@ def test_inventory_update_launches_project_update(controlplane_instance_group, s iu = ii.create_inventory_update() iu.status = "pending" iu.save() + assert project.project_updates.count() == 0 with mock.patch("awx.main.scheduler.TaskManager.start_task"): dm = DependencyManager() - with mock.patch.object(DependencyManager, "create_project_update", wraps=dm.create_project_update) as mock_pu: - dm.schedule() - mock_pu.assert_called_with(iu, project_id=project.id) + dm.schedule() + assert project.project_updates.count() == 1 @pytest.mark.django_db @@ -407,9 +404,8 @@ def test_job_dependency_with_already_updated(controlplane_instance_group, job_te j.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): dm = DependencyManager() - with mock.patch.object(DependencyManager, "create_inventory_update", wraps=dm.create_inventory_update) as mock_iu: - dm.schedule() - mock_iu.assert_not_called() + dm.schedule() + assert ii.inventory_updates.count() == 0 with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, instance) @@ -442,7 +438,9 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa TaskManager().schedule() pu = p.project_updates.first() iu = ii.inventory_updates.first() - TaskManager.start_task.assert_has_calls([mock.call(iu, controlplane_instance_group, instance), mock.call(pu, controlplane_instance_group, instance)]) + TaskManager.start_task.assert_has_calls( + [mock.call(iu, controlplane_instance_group, instance), mock.call(pu, controlplane_instance_group, instance)], any_order=True + ) pu.status = "successful" pu.finished = pu.created + timedelta(seconds=1) pu.save() @@ -451,7 +449,9 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa iu.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_has_calls([mock.call(j1, controlplane_instance_group, instance), mock.call(j2, controlplane_instance_group, instance)]) + TaskManager.start_task.assert_has_calls( + [mock.call(j1, controlplane_instance_group, instance), mock.call(j2, controlplane_instance_group, instance)], any_order=True + ) pu = [x for x in p.project_updates.all()] iu = [x for x in ii.inventory_updates.all()] assert len(pu) == 1 diff --git a/docs/task_manager_system.md b/docs/task_manager_system.md index 4d39e554c3..9d7a0fed43 100644 --- a/docs/task_manager_system.md +++ b/docs/task_manager_system.md @@ -18,7 +18,7 @@ Independent tasks are run in order of creation time, earliest first. Tasks with ## Dependency Manager Responsible for looking at each pending task and determining whether it should create a dependency for that task. -For example, if `update_on_launch` is enabled of a task, a project update will be created as a dependency of that task. The Dependency Manager is responsible for creating that project update. +For example, if `scm_update_on_launch` is enabled for the project a task uses, a project update will be created as a dependency of that task. The Dependency Manager is responsible for creating that project update. Dependencies can also have their own dependencies, for example, @@ -47,17 +47,17 @@ Dependencies can also have their own dependencies, for example, ### Dependency Manager Steps 1. Get pending tasks (parent tasks) that have `dependencies_processed = False` -2. Create project update if +2. As optimization, cache related projects and inventory sources +3. Create project or inventory update for related project or inventory source if a. not already created - b. last project update outside of cache timeout window -3. Create inventory source update if - a. not already created - b. last inventory source update outside of cache timeout window -4. Check and create dependencies for these newly created dependencies - a. inventory source updates can have a project update dependency -5. All dependencies are linked to the parent task via the `dependent_jobs` field + b. last update failed + c. last project update outside of cache timeout window + d. some extra logic applies to inventory update creation +4. All dependencies (new or old) are linked to the parent task via the `dependent_jobs` field a. This allows us to cancel the parent task if the dependency fails or is canceled -6. Update the parent tasks with `dependencies_processed = True` +5. Update the parent tasks with `dependencies_processed = True` +6. Check and create dependencies for these newly created dependencies + a. inventory source updates can have a project update dependency ## Task Manager @@ -110,7 +110,7 @@ Special note -- the workflow manager is not scheduled to run periodically *direc Empirically, the periodic task manager has been effective in the past and will continue to be relied upon with the added event-triggered `schedule()`. -### Bulk Reschedule +### Bulk Reschedule Typically, each manager runs asynchronously via the dispatcher system. Dispatcher tasks take resources, so it is important to not schedule tasks unnecessarily. We also need a mechanism to run the manager *after* an atomic transaction block.