SCM inv source trigger project update

- scm based inventory sources should launch project updates prior to
running inventory updates for that source.
- fixes scenario where a job is based on projectA, but the inventory
source is based on projectB. Running the job will likely trigger a
sync for projectA, but not projectB.

comments
This commit is contained in:
Seth Foster
2022-04-26 14:17:12 -04:00
parent cfdba959dd
commit 1b662fcca5
6 changed files with 132 additions and 62 deletions

View File

@@ -0,0 +1,18 @@
# Generated by Django 3.2.13 on 2022-05-02 21:27
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0161_unifiedjob_host_status_counts'),
]
operations = [
migrations.AlterField(
model_name='unifiedjob',
name='dependent_jobs',
field=models.ManyToManyField(editable=False, related_name='unifiedjob_blocked_jobs', to='main.UnifiedJob'),
),
]

View File

@@ -415,33 +415,47 @@ class TaskManagerJobMixin(TaskManagerUnifiedJobMixin):
class Meta: class Meta:
abstract = True abstract = True
def get_jobs_fail_chain(self):
return [self.project_update] if self.project_update else []
def dependent_jobs_finished(self): def dependent_jobs_finished(self):
for j in self.dependent_jobs.all(): # if any dependent jobs are pending, waiting, or running, return False
if j.status in ['pending', 'waiting', 'running']: return not any(j.status in ACTIVE_STATES for j in self.dependent_jobs.all())
return False
return True
class TaskManagerUpdateOnLaunchMixin(TaskManagerUnifiedJobMixin): class TaskManagerUpdateOnLaunchMixin(TaskManagerUnifiedJobMixin):
class Meta: class Meta:
abstract = True abstract = True
def get_jobs_fail_chain(self):
return list(self.dependent_jobs.all())
class TaskManagerProjectUpdateMixin(TaskManagerUpdateOnLaunchMixin): class TaskManagerProjectUpdateMixin(TaskManagerUpdateOnLaunchMixin):
class Meta: class Meta:
abstract = True abstract = True
def get_jobs_fail_chain(self):
return list(self.unifiedjob_blocked_jobs.all())
class TaskManagerInventoryUpdateMixin(TaskManagerUpdateOnLaunchMixin): class TaskManagerInventoryUpdateMixin(TaskManagerUpdateOnLaunchMixin):
class Meta: class Meta:
abstract = True abstract = True
def get_jobs_fail_chain(self):
blocked_jobs = list(self.unifiedjob_blocked_jobs.all())
other_updates = []
if blocked_jobs:
# blocked_jobs[0] is just a reference to a job that depends on this
# inventory update.
# We can look at the dependencies of this blocked job to find other
# inventory sources that are safe to fail.
# Since the dependencies could also include project updates,
# we need to check for type.
for dep in blocked_jobs[0].dependent_jobs.all():
if type(dep) is type(self) and dep.id != self.id:
other_updates.append(dep)
return blocked_jobs + other_updates
def dependent_jobs_finished(self):
# if any dependent jobs are pending, waiting, or running, return False
return not any(j.status in ACTIVE_STATES for j in self.dependent_jobs.all())
class ExecutionEnvironmentMixin(models.Model): class ExecutionEnvironmentMixin(models.Model):
class Meta: class Meta:

View File

@@ -575,7 +575,8 @@ class UnifiedJob(
dependent_jobs = models.ManyToManyField( dependent_jobs = models.ManyToManyField(
'self', 'self',
editable=False, editable=False,
related_name='%(class)s_blocked_jobs+', related_name='%(class)s_blocked_jobs',
symmetrical=False,
) )
execution_node = models.TextField( execution_node = models.TextField(
blank=True, blank=True,

View File

@@ -281,8 +281,10 @@ class TaskManager:
for task in running_tasks: for task in running_tasks:
self.dependency_graph.add_job(task) self.dependency_graph.add_job(task)
def create_project_update(self, task): def create_project_update(self, task, project_id=None):
project_task = Project.objects.get(id=task.project_id).create_project_update(_eager_fields=dict(launch_type='dependency')) if project_id is None:
project_id = task.project_id
project_task = Project.objects.get(id=project_id).create_project_update(_eager_fields=dict(launch_type='dependency'))
# Project created 1 seconds behind # Project created 1 seconds behind
project_task.created = task.created - timedelta(seconds=1) project_task.created = task.created - timedelta(seconds=1)
@@ -302,14 +304,10 @@ class TaskManager:
# self.process_inventory_sources(inventory_sources) # self.process_inventory_sources(inventory_sources)
return inventory_task return inventory_task
def capture_chain_failure_dependencies(self, task, dependencies): def add_dependencies(self, task, dependencies):
with disable_activity_stream(): with disable_activity_stream():
task.dependent_jobs.add(*dependencies) task.dependent_jobs.add(*dependencies)
for dep in dependencies:
# Add task + all deps except self
dep.dependent_jobs.add(*([task] + [d for d in dependencies if d != dep]))
def get_latest_inventory_update(self, inventory_source): def get_latest_inventory_update(self, inventory_source):
latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created") latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created")
if not latest_inventory_update.exists(): if not latest_inventory_update.exists():
@@ -335,8 +333,8 @@ class TaskManager:
return True return True
return False return False
def get_latest_project_update(self, job): def get_latest_project_update(self, project_id):
latest_project_update = ProjectUpdate.objects.filter(project=job.project, job_type='check').order_by("-created") latest_project_update = ProjectUpdate.objects.filter(project=project_id, job_type='check').order_by("-created")
if not latest_project_update.exists(): if not latest_project_update.exists():
return None return None
return latest_project_update.first() return latest_project_update.first()
@@ -376,16 +374,12 @@ class TaskManager:
return True return True
return False return False
def generate_dependencies(self, undeped_tasks): def gen_dep_for_job(self, task):
created_dependencies = [] created_dependencies = []
for task in undeped_tasks:
task.log_lifecycle("acknowledged")
dependencies = [] dependencies = []
if not type(task) is Job:
continue
# TODO: Can remove task.project None check after scan-job-default-playbook is removed # TODO: Can remove task.project None check after scan-job-default-playbook is removed
if task.project is not None and task.project.scm_update_on_launch is True: if task.project is not None and task.project.scm_update_on_launch is True:
latest_project_update = self.get_latest_project_update(task) latest_project_update = self.get_latest_project_update(task.project_id)
if self.should_update_related_project(task, latest_project_update): if self.should_update_related_project(task, latest_project_update):
project_task = self.create_project_update(task) project_task = self.create_project_update(task)
created_dependencies.append(project_task) created_dependencies.append(project_task)
@@ -398,7 +392,9 @@ class TaskManager:
start_args = json.loads(decrypt_field(task, field_name="start_args")) start_args = json.loads(decrypt_field(task, field_name="start_args"))
except ValueError: except ValueError:
start_args = dict() start_args = dict()
for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]: # generator for inventory sources related to this task
task_inv_sources = (invsrc for invsrc in self.all_inventory_sources if invsrc.inventory_id == task.inventory_id)
for inventory_source in task_inv_sources:
if "inventory_sources_already_updated" in start_args and inventory_source.id in start_args['inventory_sources_already_updated']: if "inventory_sources_already_updated" in start_args and inventory_source.id in start_args['inventory_sources_already_updated']:
continue continue
if not inventory_source.update_on_launch: if not inventory_source.update_on_launch:
@@ -411,10 +407,38 @@ class TaskManager:
else: else:
dependencies.append(latest_inventory_update) dependencies.append(latest_inventory_update)
if len(dependencies) > 0: if dependencies:
self.capture_chain_failure_dependencies(task, dependencies) self.add_dependencies(task, dependencies)
return created_dependencies
def gen_dep_for_inventory_update(self, inventory_task):
created_dependencies = []
if inventory_task.source == "scm":
invsrc = inventory_task.inventory_source
if not invsrc.source_project.scm_update_on_launch:
return created_dependencies
latest_src_project_update = self.get_latest_project_update(invsrc.source_project_id)
if self.should_update_related_project(inventory_task, latest_src_project_update):
latest_src_project_update = self.create_project_update(inventory_task, project_id=invsrc.source_project_id)
created_dependencies.append(latest_src_project_update)
self.add_dependencies(inventory_task, [latest_src_project_update])
return created_dependencies
def generate_dependencies(self, undeped_tasks):
created_dependencies = []
for task in undeped_tasks:
task.log_lifecycle("acknowledged")
if type(task) is Job:
created_dependencies += self.gen_dep_for_job(task)
elif type(task) is InventoryUpdate:
created_dependencies += self.gen_dep_for_inventory_update(task)
else:
continue
UnifiedJob.objects.filter(pk__in=[task.pk for task in undeped_tasks]).update(dependencies_processed=True) UnifiedJob.objects.filter(pk__in=[task.pk for task in undeped_tasks]).update(dependencies_processed=True)
return created_dependencies return created_dependencies
def process_pending_tasks(self, pending_tasks): def process_pending_tasks(self, pending_tasks):
@@ -572,6 +596,8 @@ class TaskManager:
pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending'] pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending']
undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed] undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed]
dependencies = self.generate_dependencies(undeped_tasks) dependencies = self.generate_dependencies(undeped_tasks)
deps_of_deps = self.generate_dependencies(dependencies)
dependencies += deps_of_deps
self.process_pending_tasks(dependencies) self.process_pending_tasks(dependencies)
self.process_pending_tasks(pending_tasks) self.process_pending_tasks(pending_tasks)

View File

@@ -324,6 +324,22 @@ def test_single_job_dependencies_inventory_update_launch(controlplane_instance_g
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance) TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance)
@pytest.mark.django_db
def test_inventory_update_launches_project_update(controlplane_instance_group, scm_inventory_source):
ii = scm_inventory_source
project = scm_inventory_source.source_project
project.scm_update_on_launch = True
project.save()
iu = ii.create_inventory_update()
iu.status = "pending"
iu.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
tm = TaskManager()
with mock.patch.object(TaskManager, "create_project_update", wraps=tm.create_project_update) as mock_pu:
tm.schedule()
mock_pu.assert_called_with(iu, project_id=project.id)
@pytest.mark.django_db @pytest.mark.django_db
def test_job_dependency_with_already_updated(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory): def test_job_dependency_with_already_updated(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"])
@@ -382,7 +398,7 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa
pu = p.project_updates.first() pu = p.project_updates.first()
iu = ii.inventory_updates.first() iu = ii.inventory_updates.first()
TaskManager.start_task.assert_has_calls( TaskManager.start_task.assert_has_calls(
[mock.call(iu, controlplane_instance_group, [j1, j2, pu], instance), mock.call(pu, controlplane_instance_group, [j1, j2, iu], instance)] [mock.call(iu, controlplane_instance_group, [j1, j2], instance), mock.call(pu, controlplane_instance_group, [j1, j2], instance)]
) )
pu.status = "successful" pu.status = "successful"
pu.finished = pu.created + timedelta(seconds=1) pu.finished = pu.created + timedelta(seconds=1)
@@ -464,7 +480,6 @@ def test_generate_dependencies_only_once(job_template_factory):
job.status = "pending" job.status = "pending"
job.name = "job_gen_dep" job.name = "job_gen_dep"
job.save() job.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"): with mock.patch("awx.main.scheduler.TaskManager.start_task"):
# job starts with dependencies_processed as False # job starts with dependencies_processed as False
assert not job.dependencies_processed assert not job.dependencies_processed
@@ -478,10 +493,6 @@ def test_generate_dependencies_only_once(job_template_factory):
# Run ._schedule() again, but make sure .generate_dependencies() is not # Run ._schedule() again, but make sure .generate_dependencies() is not
# called with job in the argument list # called with job in the argument list
tm = TaskManager() tm = TaskManager()
tm.generate_dependencies = mock.MagicMock() tm.generate_dependencies = mock.MagicMock(return_value=[])
tm._schedule() tm._schedule()
tm.generate_dependencies.assert_has_calls([mock.call([]), mock.call([])])
# .call_args is tuple, (positional_args, kwargs), [0][0] then is
# the first positional arg, i.e. the first argument of
# .generate_dependencies()
assert tm.generate_dependencies.call_args[0][0] == []