Simplifications for DependencyManager (#13533)

This commit is contained in:
Alan Rominger
2023-07-27 15:42:29 -04:00
committed by GitHub
parent 5a63533967
commit ab5cc2e69c
3 changed files with 102 additions and 172 deletions

View File

@@ -25,7 +25,6 @@ from awx.main.models import (
InventoryUpdate, InventoryUpdate,
Job, Job,
Project, Project,
ProjectUpdate,
UnifiedJob, UnifiedJob,
WorkflowApproval, WorkflowApproval,
WorkflowJob, WorkflowJob,
@@ -281,184 +280,115 @@ class WorkflowManager(TaskBase):
class DependencyManager(TaskBase): class DependencyManager(TaskBase):
def __init__(self): def __init__(self):
super().__init__(prefix="dependency_manager") super().__init__(prefix="dependency_manager")
self.all_projects = {}
self.all_inventory_sources = {}
def create_project_update(self, task, project_id=None): def cache_projects_and_sources(self, task_list):
if project_id is None: project_ids = set()
project_id = task.project_id
project_task = Project.objects.get(id=project_id).create_project_update(_eager_fields=dict(launch_type='dependency'))
# Project created 1 seconds behind
project_task.created = task.created - timedelta(seconds=1)
project_task.status = 'pending'
project_task.save()
logger.debug('Spawned {} as dependency of {}'.format(project_task.log_format, task.log_format))
return project_task
def create_inventory_update(self, task, inventory_source_task):
inventory_task = InventorySource.objects.get(id=inventory_source_task.id).create_inventory_update(_eager_fields=dict(launch_type='dependency'))
inventory_task.created = task.created - timedelta(seconds=2)
inventory_task.status = 'pending'
inventory_task.save()
logger.debug('Spawned {} as dependency of {}'.format(inventory_task.log_format, task.log_format))
return inventory_task
def add_dependencies(self, task, dependencies):
with disable_activity_stream():
task.dependent_jobs.add(*dependencies)
def get_inventory_source_tasks(self):
inventory_ids = set() inventory_ids = set()
for task in self.all_tasks: for task in task_list:
if isinstance(task, Job): if isinstance(task, Job):
inventory_ids.add(task.inventory_id) if task.project_id:
self.all_inventory_sources = [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)] project_ids.add(task.project_id)
if task.inventory_id:
inventory_ids.add(task.inventory_id)
elif isinstance(task, InventoryUpdate):
if task.inventory_source and task.inventory_source.source_project_id:
project_ids.add(task.inventory_source.source_project_id)
def get_latest_inventory_update(self, inventory_source): for proj in Project.objects.filter(id__in=project_ids, scm_update_on_launch=True):
latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created") self.all_projects[proj.id] = proj
if not latest_inventory_update.exists():
return None
return latest_inventory_update.first()
def should_update_inventory_source(self, job, latest_inventory_update): for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True):
now = tz_now() self.all_inventory_sources.setdefault(invsrc.inventory_id, [])
self.all_inventory_sources[invsrc.inventory_id].append(invsrc)
if latest_inventory_update is None: @staticmethod
def should_update_again(update, cache_timeout):
'''
If it has never updated, we need to update
If there is already an update in progress then we do not need to a new create one
If the last update failed, we always need to try and update again
If current time is more than cache_timeout after last update, then we need a new one
'''
if (update is None) or (update.status in ['failed', 'canceled', 'error']):
return True return True
''' if update.status in ['waiting', 'pending', 'running']:
If there's already a inventory update utilizing this job that's about to run
then we don't need to create one
'''
if latest_inventory_update.status in ['waiting', 'pending', 'running']:
return False return False
timeout_seconds = timedelta(seconds=latest_inventory_update.inventory_source.update_cache_timeout) return bool(((update.finished + timedelta(seconds=cache_timeout))) < tz_now())
if (latest_inventory_update.finished + timeout_seconds) < now:
return True
if latest_inventory_update.inventory_source.update_on_launch is True and latest_inventory_update.status in ['failed', 'canceled', 'error']:
return True
return False
def get_latest_project_update(self, project_id): def get_or_create_project_update(self, project_id):
latest_project_update = ProjectUpdate.objects.filter(project=project_id, job_type='check').order_by("-created") project = self.all_projects.get(project_id, None)
if not latest_project_update.exists(): if project is not None:
return None latest_project_update = project.project_updates.filter(job_type='check').order_by("-created").first()
return latest_project_update.first() if self.should_update_again(latest_project_update, project.scm_update_cache_timeout):
project_task = project.create_project_update(_eager_fields=dict(launch_type='dependency'))
def should_update_related_project(self, job, latest_project_update): project_task.signal_start()
now = tz_now() return [project_task]
else:
if latest_project_update is None: return [latest_project_update]
return True return []
if latest_project_update.status in ['failed', 'canceled']:
return True
'''
If there's already a project update utilizing this job that's about to run
then we don't need to create one
'''
if latest_project_update.status in ['waiting', 'pending', 'running']:
return False
'''
If the latest project update has a created time == job_created_time-1
then consider the project update found. This is so we don't enter an infinite loop
of updating the project when cache timeout is 0.
'''
if (
latest_project_update.project.scm_update_cache_timeout == 0
and latest_project_update.launch_type == 'dependency'
and latest_project_update.created == job.created - timedelta(seconds=1)
):
return False
'''
Normal Cache Timeout Logic
'''
timeout_seconds = timedelta(seconds=latest_project_update.project.scm_update_cache_timeout)
if (latest_project_update.finished + timeout_seconds) < now:
return True
return False
def gen_dep_for_job(self, task): def gen_dep_for_job(self, task):
created_dependencies = [] dependencies = self.get_or_create_project_update(task.project_id)
dependencies = []
# TODO: Can remove task.project None check after scan-job-default-playbook is removed
if task.project is not None and task.project.scm_update_on_launch is True:
latest_project_update = self.get_latest_project_update(task.project_id)
if self.should_update_related_project(task, latest_project_update):
latest_project_update = self.create_project_update(task)
created_dependencies.append(latest_project_update)
dependencies.append(latest_project_update)
# Inventory created 2 seconds behind job
try: try:
start_args = json.loads(decrypt_field(task, field_name="start_args")) start_args = json.loads(decrypt_field(task, field_name="start_args"))
except ValueError: except ValueError:
start_args = dict() start_args = dict()
# generator for inventory sources related to this task # generator for update-on-launch inventory sources related to this task
task_inv_sources = (invsrc for invsrc in self.all_inventory_sources if invsrc.inventory_id == task.inventory_id) for inventory_source in self.all_inventory_sources.get(task.inventory_id, []):
for inventory_source in task_inv_sources:
if "inventory_sources_already_updated" in start_args and inventory_source.id in start_args['inventory_sources_already_updated']: if "inventory_sources_already_updated" in start_args and inventory_source.id in start_args['inventory_sources_already_updated']:
continue continue
if not inventory_source.update_on_launch: latest_inventory_update = inventory_source.inventory_updates.order_by("-created").first()
continue if self.should_update_again(latest_inventory_update, inventory_source.update_cache_timeout):
latest_inventory_update = self.get_latest_inventory_update(inventory_source) inventory_task = inventory_source.create_inventory_update(_eager_fields=dict(launch_type='dependency'))
if self.should_update_inventory_source(task, latest_inventory_update): inventory_task.signal_start()
inventory_task = self.create_inventory_update(task, inventory_source)
created_dependencies.append(inventory_task)
dependencies.append(inventory_task) dependencies.append(inventory_task)
else: else:
dependencies.append(latest_inventory_update) dependencies.append(latest_inventory_update)
if dependencies: return dependencies
self.add_dependencies(task, dependencies)
return created_dependencies
def gen_dep_for_inventory_update(self, inventory_task): def gen_dep_for_inventory_update(self, inventory_task):
created_dependencies = []
if inventory_task.source == "scm": if inventory_task.source == "scm":
invsrc = inventory_task.inventory_source invsrc = inventory_task.inventory_source
if not invsrc.source_project.scm_update_on_launch: if invsrc:
return created_dependencies return self.get_or_create_project_update(invsrc.source_project_id)
return []
latest_src_project_update = self.get_latest_project_update(invsrc.source_project_id)
if self.should_update_related_project(inventory_task, latest_src_project_update):
latest_src_project_update = self.create_project_update(inventory_task, project_id=invsrc.source_project_id)
created_dependencies.append(latest_src_project_update)
self.add_dependencies(inventory_task, [latest_src_project_update])
latest_src_project_update.scm_inventory_updates.add(inventory_task)
return created_dependencies
@timeit @timeit
def generate_dependencies(self, undeped_tasks): def generate_dependencies(self, undeped_tasks):
created_dependencies = [] dependencies = []
self.cache_projects_and_sources(undeped_tasks)
for task in undeped_tasks: for task in undeped_tasks:
task.log_lifecycle("acknowledged") task.log_lifecycle("acknowledged")
if type(task) is Job: if type(task) is Job:
created_dependencies += self.gen_dep_for_job(task) job_deps = self.gen_dep_for_job(task)
elif type(task) is InventoryUpdate: elif type(task) is InventoryUpdate:
created_dependencies += self.gen_dep_for_inventory_update(task) job_deps = self.gen_dep_for_inventory_update(task)
else: else:
continue continue
if job_deps:
dependencies += job_deps
with disable_activity_stream():
task.dependent_jobs.add(*dependencies)
logger.debug(f'Linked {[dep.log_format for dep in dependencies]} as dependencies of {task.log_format}')
UnifiedJob.objects.filter(pk__in=[task.pk for task in undeped_tasks]).update(dependencies_processed=True) UnifiedJob.objects.filter(pk__in=[task.pk for task in undeped_tasks]).update(dependencies_processed=True)
return created_dependencies return dependencies
def process_tasks(self):
deps = self.generate_dependencies(self.all_tasks)
self.generate_dependencies(deps)
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(self.all_tasks) + len(deps))
@timeit @timeit
def _schedule(self): def _schedule(self):
self.get_tasks(dict(status__in=["pending"], dependencies_processed=False)) self.get_tasks(dict(status__in=["pending"], dependencies_processed=False))
if len(self.all_tasks) > 0: if len(self.all_tasks) > 0:
self.get_inventory_source_tasks() deps = self.generate_dependencies(self.all_tasks)
self.process_tasks() undeped_deps = [dep for dep in deps if dep.dependencies_processed is False]
self.generate_dependencies(undeped_deps)
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(self.all_tasks) + len(undeped_deps))
ScheduleTaskManager().schedule() ScheduleTaskManager().schedule()

View File

@@ -331,15 +331,13 @@ def test_single_job_dependencies_project_launch(controlplane_instance_group, job
p.save(skip_update=True) p.save(skip_update=True)
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
dm = DependencyManager() dm = DependencyManager()
with mock.patch.object(DependencyManager, "create_project_update", wraps=dm.create_project_update) as mock_pu: dm.schedule()
dm.schedule() pu = [x for x in p.project_updates.all()]
mock_pu.assert_called_once_with(j) assert len(pu) == 1
pu = [x for x in p.project_updates.all()] TaskManager().schedule()
assert len(pu) == 1 TaskManager.start_task.assert_called_once_with(pu[0], controlplane_instance_group, instance)
TaskManager().schedule() pu[0].status = "successful"
TaskManager.start_task.assert_called_once_with(pu[0], controlplane_instance_group, instance) pu[0].save()
pu[0].status = "successful"
pu[0].save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule() TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, instance) TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, instance)
@@ -359,15 +357,14 @@ def test_single_job_dependencies_inventory_update_launch(controlplane_instance_g
i.inventory_sources.add(ii) i.inventory_sources.add(ii)
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
dm = DependencyManager() dm = DependencyManager()
with mock.patch.object(DependencyManager, "create_inventory_update", wraps=dm.create_inventory_update) as mock_iu: dm.schedule()
dm.schedule() assert ii.inventory_updates.count() == 1
mock_iu.assert_called_once_with(j, ii) iu = [x for x in ii.inventory_updates.all()]
iu = [x for x in ii.inventory_updates.all()] assert len(iu) == 1
assert len(iu) == 1 TaskManager().schedule()
TaskManager().schedule() TaskManager.start_task.assert_called_once_with(iu[0], controlplane_instance_group, instance)
TaskManager.start_task.assert_called_once_with(iu[0], controlplane_instance_group, instance) iu[0].status = "successful"
iu[0].status = "successful" iu[0].save()
iu[0].save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule() TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, instance) TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, instance)
@@ -382,11 +379,11 @@ def test_inventory_update_launches_project_update(controlplane_instance_group, s
iu = ii.create_inventory_update() iu = ii.create_inventory_update()
iu.status = "pending" iu.status = "pending"
iu.save() iu.save()
assert project.project_updates.count() == 0
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
dm = DependencyManager() dm = DependencyManager()
with mock.patch.object(DependencyManager, "create_project_update", wraps=dm.create_project_update) as mock_pu: dm.schedule()
dm.schedule() assert project.project_updates.count() == 1
mock_pu.assert_called_with(iu, project_id=project.id)
@pytest.mark.django_db @pytest.mark.django_db
@@ -407,9 +404,8 @@ def test_job_dependency_with_already_updated(controlplane_instance_group, job_te
j.save() j.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
dm = DependencyManager() dm = DependencyManager()
with mock.patch.object(DependencyManager, "create_inventory_update", wraps=dm.create_inventory_update) as mock_iu: dm.schedule()
dm.schedule() assert ii.inventory_updates.count() == 0
mock_iu.assert_not_called()
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule() TaskManager().schedule()
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, instance) TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, instance)
@@ -442,7 +438,9 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa
TaskManager().schedule() TaskManager().schedule()
pu = p.project_updates.first() pu = p.project_updates.first()
iu = ii.inventory_updates.first() iu = ii.inventory_updates.first()
TaskManager.start_task.assert_has_calls([mock.call(iu, controlplane_instance_group, instance), mock.call(pu, controlplane_instance_group, instance)]) TaskManager.start_task.assert_has_calls(
[mock.call(iu, controlplane_instance_group, instance), mock.call(pu, controlplane_instance_group, instance)], any_order=True
)
pu.status = "successful" pu.status = "successful"
pu.finished = pu.created + timedelta(seconds=1) pu.finished = pu.created + timedelta(seconds=1)
pu.save() pu.save()
@@ -451,7 +449,9 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa
iu.save() iu.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
TaskManager().schedule() TaskManager().schedule()
TaskManager.start_task.assert_has_calls([mock.call(j1, controlplane_instance_group, instance), mock.call(j2, controlplane_instance_group, instance)]) TaskManager.start_task.assert_has_calls(
[mock.call(j1, controlplane_instance_group, instance), mock.call(j2, controlplane_instance_group, instance)], any_order=True
)
pu = [x for x in p.project_updates.all()] pu = [x for x in p.project_updates.all()]
iu = [x for x in ii.inventory_updates.all()] iu = [x for x in ii.inventory_updates.all()]
assert len(pu) == 1 assert len(pu) == 1

View File

@@ -18,7 +18,7 @@ Independent tasks are run in order of creation time, earliest first. Tasks with
## Dependency Manager ## Dependency Manager
Responsible for looking at each pending task and determining whether it should create a dependency for that task. Responsible for looking at each pending task and determining whether it should create a dependency for that task.
For example, if `update_on_launch` is enabled of a task, a project update will be created as a dependency of that task. The Dependency Manager is responsible for creating that project update. For example, if `scm_update_on_launch` is enabled for the project a task uses, a project update will be created as a dependency of that task. The Dependency Manager is responsible for creating that project update.
Dependencies can also have their own dependencies, for example, Dependencies can also have their own dependencies, for example,
@@ -47,17 +47,17 @@ Dependencies can also have their own dependencies, for example,
### Dependency Manager Steps ### Dependency Manager Steps
1. Get pending tasks (parent tasks) that have `dependencies_processed = False` 1. Get pending tasks (parent tasks) that have `dependencies_processed = False`
2. Create project update if 2. As optimization, cache related projects and inventory sources
3. Create project or inventory update for related project or inventory source if
a. not already created a. not already created
b. last project update outside of cache timeout window b. last update failed
3. Create inventory source update if c. last project update outside of cache timeout window
a. not already created d. some extra logic applies to inventory update creation
b. last inventory source update outside of cache timeout window 4. All dependencies (new or old) are linked to the parent task via the `dependent_jobs` field
4. Check and create dependencies for these newly created dependencies
a. inventory source updates can have a project update dependency
5. All dependencies are linked to the parent task via the `dependent_jobs` field
a. This allows us to cancel the parent task if the dependency fails or is canceled a. This allows us to cancel the parent task if the dependency fails or is canceled
6. Update the parent tasks with `dependencies_processed = True` 5. Update the parent tasks with `dependencies_processed = True`
6. Check and create dependencies for these newly created dependencies
a. inventory source updates can have a project update dependency
## Task Manager ## Task Manager