Merge pull request #12073 from fosterseth/scm_invsrc_project_update

SCM inv source should trigger project update
This commit is contained in:
JST 2022-05-16 09:17:45 -03:00 committed by GitHub
commit 237402068c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 153 additions and 72 deletions

View File

@ -0,0 +1,18 @@
# Generated by Django 3.2.13 on 2022-05-02 21:27
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0161_unifiedjob_host_status_counts'),
]
operations = [
migrations.AlterField(
model_name='unifiedjob',
name='dependent_jobs',
field=models.ManyToManyField(editable=False, related_name='unifiedjob_blocked_jobs', to='main.UnifiedJob'),
),
]

View File

@ -407,41 +407,54 @@ class TaskManagerUnifiedJobMixin(models.Model):
def get_jobs_fail_chain(self):
return []
def dependent_jobs_finished(self):
return True
class TaskManagerJobMixin(TaskManagerUnifiedJobMixin):
class Meta:
abstract = True
def get_jobs_fail_chain(self):
return [self.project_update] if self.project_update else []
def dependent_jobs_finished(self):
for j in self.dependent_jobs.all():
if j.status in ['pending', 'waiting', 'running']:
return False
return True
class TaskManagerUpdateOnLaunchMixin(TaskManagerUnifiedJobMixin):
class Meta:
abstract = True
def get_jobs_fail_chain(self):
return list(self.dependent_jobs.all())
class TaskManagerProjectUpdateMixin(TaskManagerUpdateOnLaunchMixin):
class Meta:
abstract = True
def get_jobs_fail_chain(self):
# project update can be a dependency of an inventory update, in which
# case we need to fail the job that may have spawned the inventory
# update.
# The inventory update will fail, but since it is not running it will
# not cascade fail to the job from the errback logic in apply_async. As
# such we should capture it here.
blocked_jobs = list(self.unifiedjob_blocked_jobs.all().prefetch_related("unifiedjob_blocked_jobs"))
other_tasks = []
for b in blocked_jobs:
other_tasks += list(b.unifiedjob_blocked_jobs.all())
return blocked_jobs + other_tasks
class TaskManagerInventoryUpdateMixin(TaskManagerUpdateOnLaunchMixin):
class Meta:
abstract = True
def get_jobs_fail_chain(self):
blocked_jobs = list(self.unifiedjob_blocked_jobs.all())
other_updates = []
if blocked_jobs:
# blocked_jobs[0] is just a reference to a job that depends on this
# inventory update.
# We can look at the dependencies of this blocked job to find other
# inventory sources that are safe to fail.
# Since the dependencies could also include project updates,
# we need to check for type.
for dep in blocked_jobs[0].dependent_jobs.all():
if type(dep) is type(self) and dep.id != self.id:
other_updates.append(dep)
return blocked_jobs + other_updates
class ExecutionEnvironmentMixin(models.Model):
class Meta:

View File

@ -575,7 +575,8 @@ class UnifiedJob(
dependent_jobs = models.ManyToManyField(
'self',
editable=False,
related_name='%(class)s_blocked_jobs+',
related_name='%(class)s_blocked_jobs',
symmetrical=False,
)
execution_node = models.TextField(
blank=True,

View File

@ -26,7 +26,7 @@ class DependencyGraph(object):
# The reason for tracking both inventory and inventory sources:
# Consider InvA, which has two sources, InvSource1, InvSource2.
# JobB might depend on InvA, which launches two updates, one for each source.
# To determine if JobB can run, we can just check InvA, which is marked in
# To determine if JobB can run, we can just check InvA, which is marked in
# INVENTORY_UPDATES, instead of having to check for both entries in
# INVENTORY_SOURCE_UPDATES.
self.data[self.INVENTORY_UPDATES] = {}

View File

@ -34,6 +34,7 @@ from awx.main.utils.pglock import advisory_lock
from awx.main.utils import get_type_for_model, task_manager_bulk_reschedule, schedule_task_manager
from awx.main.utils.common import create_partition
from awx.main.signals import disable_activity_stream
from awx.main.constants import ACTIVE_STATES
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.scheduler.task_manager_models import TaskManagerInstances
from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups
@ -79,10 +80,23 @@ class TaskManager:
if blocked_by:
return blocked_by
if not task.dependent_jobs_finished():
blocked_by = task.dependent_jobs.first()
if blocked_by:
return blocked_by
for dep in task.dependent_jobs.all():
if dep.status in ACTIVE_STATES:
return dep
# if we detect a failed or error dependency, go ahead and fail this
# task. The errback on the dependency takes some time to trigger,
# and we don't want the task to enter running state if its
# dependency has failed or errored.
elif dep.status in ("error", "failed"):
task.status = 'failed'
task.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
get_type_for_model(type(dep)),
dep.name,
dep.id,
)
task.save(update_fields=['status', 'job_explanation'])
task.websocket_emit_status('failed')
return dep
return None
@ -281,8 +295,10 @@ class TaskManager:
for task in running_tasks:
self.dependency_graph.add_job(task)
def create_project_update(self, task):
project_task = Project.objects.get(id=task.project_id).create_project_update(_eager_fields=dict(launch_type='dependency'))
def create_project_update(self, task, project_id=None):
if project_id is None:
project_id = task.project_id
project_task = Project.objects.get(id=project_id).create_project_update(_eager_fields=dict(launch_type='dependency'))
# Project created 1 seconds behind
project_task.created = task.created - timedelta(seconds=1)
@ -302,14 +318,10 @@ class TaskManager:
# self.process_inventory_sources(inventory_sources)
return inventory_task
def capture_chain_failure_dependencies(self, task, dependencies):
def add_dependencies(self, task, dependencies):
with disable_activity_stream():
task.dependent_jobs.add(*dependencies)
for dep in dependencies:
# Add task + all deps except self
dep.dependent_jobs.add(*([task] + [d for d in dependencies if d != dep]))
def get_latest_inventory_update(self, inventory_source):
latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created")
if not latest_inventory_update.exists():
@ -335,8 +347,8 @@ class TaskManager:
return True
return False
def get_latest_project_update(self, job):
latest_project_update = ProjectUpdate.objects.filter(project=job.project, job_type='check').order_by("-created")
def get_latest_project_update(self, project_id):
latest_project_update = ProjectUpdate.objects.filter(project=project_id, job_type='check').order_by("-created")
if not latest_project_update.exists():
return None
return latest_project_update.first()
@ -376,45 +388,69 @@ class TaskManager:
return True
return False
def gen_dep_for_job(self, task):
created_dependencies = []
dependencies = []
# TODO: Can remove task.project None check after scan-job-default-playbook is removed
if task.project is not None and task.project.scm_update_on_launch is True:
latest_project_update = self.get_latest_project_update(task.project_id)
if self.should_update_related_project(task, latest_project_update):
latest_project_update = self.create_project_update(task)
created_dependencies.append(latest_project_update)
dependencies.append(latest_project_update)
# Inventory created 2 seconds behind job
try:
start_args = json.loads(decrypt_field(task, field_name="start_args"))
except ValueError:
start_args = dict()
# generator for inventory sources related to this task
task_inv_sources = (invsrc for invsrc in self.all_inventory_sources if invsrc.inventory_id == task.inventory_id)
for inventory_source in task_inv_sources:
if "inventory_sources_already_updated" in start_args and inventory_source.id in start_args['inventory_sources_already_updated']:
continue
if not inventory_source.update_on_launch:
continue
latest_inventory_update = self.get_latest_inventory_update(inventory_source)
if self.should_update_inventory_source(task, latest_inventory_update):
inventory_task = self.create_inventory_update(task, inventory_source)
created_dependencies.append(inventory_task)
dependencies.append(inventory_task)
else:
dependencies.append(latest_inventory_update)
if dependencies:
self.add_dependencies(task, dependencies)
return created_dependencies
def gen_dep_for_inventory_update(self, inventory_task):
created_dependencies = []
if inventory_task.source == "scm":
invsrc = inventory_task.inventory_source
if not invsrc.source_project.scm_update_on_launch:
return created_dependencies
latest_src_project_update = self.get_latest_project_update(invsrc.source_project_id)
if self.should_update_related_project(inventory_task, latest_src_project_update):
latest_src_project_update = self.create_project_update(inventory_task, project_id=invsrc.source_project_id)
created_dependencies.append(latest_src_project_update)
self.add_dependencies(inventory_task, [latest_src_project_update])
latest_src_project_update.scm_inventory_updates.add(inventory_task)
return created_dependencies
def generate_dependencies(self, undeped_tasks):
created_dependencies = []
for task in undeped_tasks:
task.log_lifecycle("acknowledged")
dependencies = []
if not type(task) is Job:
if type(task) is Job:
created_dependencies += self.gen_dep_for_job(task)
elif type(task) is InventoryUpdate:
created_dependencies += self.gen_dep_for_inventory_update(task)
else:
continue
# TODO: Can remove task.project None check after scan-job-default-playbook is removed
if task.project is not None and task.project.scm_update_on_launch is True:
latest_project_update = self.get_latest_project_update(task)
if self.should_update_related_project(task, latest_project_update):
project_task = self.create_project_update(task)
created_dependencies.append(project_task)
dependencies.append(project_task)
else:
dependencies.append(latest_project_update)
# Inventory created 2 seconds behind job
try:
start_args = json.loads(decrypt_field(task, field_name="start_args"))
except ValueError:
start_args = dict()
for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]:
if "inventory_sources_already_updated" in start_args and inventory_source.id in start_args['inventory_sources_already_updated']:
continue
if not inventory_source.update_on_launch:
continue
latest_inventory_update = self.get_latest_inventory_update(inventory_source)
if self.should_update_inventory_source(task, latest_inventory_update):
inventory_task = self.create_inventory_update(task, inventory_source)
created_dependencies.append(inventory_task)
dependencies.append(inventory_task)
else:
dependencies.append(latest_inventory_update)
if len(dependencies) > 0:
self.capture_chain_failure_dependencies(task, dependencies)
UnifiedJob.objects.filter(pk__in=[task.pk for task in undeped_tasks]).update(dependencies_processed=True)
return created_dependencies
def process_pending_tasks(self, pending_tasks):
@ -572,6 +608,8 @@ class TaskManager:
pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending']
undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed]
dependencies = self.generate_dependencies(undeped_tasks)
deps_of_deps = self.generate_dependencies(dependencies)
dependencies += deps_of_deps
self.process_pending_tasks(dependencies)
self.process_pending_tasks(pending_tasks)

View File

@ -695,7 +695,7 @@ def handle_work_error(task_id, *args, **kwargs):
first_instance = instance
first_instance_type = each_task['type']
if instance.celery_task_id != task_id and not instance.cancel_flag and not instance.status == 'successful':
if instance.celery_task_id != task_id and not instance.cancel_flag and not instance.status in ('successful', 'failed'):
instance.status = 'failed'
instance.failed = True
if not instance.job_explanation:

View File

@ -324,6 +324,22 @@ def test_single_job_dependencies_inventory_update_launch(controlplane_instance_g
TaskManager.start_task.assert_called_once_with(j, controlplane_instance_group, [], instance)
@pytest.mark.django_db
def test_inventory_update_launches_project_update(controlplane_instance_group, scm_inventory_source):
ii = scm_inventory_source
project = scm_inventory_source.source_project
project.scm_update_on_launch = True
project.save()
iu = ii.create_inventory_update()
iu.status = "pending"
iu.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
tm = TaskManager()
with mock.patch.object(TaskManager, "create_project_update", wraps=tm.create_project_update) as mock_pu:
tm.schedule()
mock_pu.assert_called_with(iu, project_id=project.id)
@pytest.mark.django_db
def test_job_dependency_with_already_updated(controlplane_instance_group, job_template_factory, mocker, inventory_source_factory):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"])
@ -382,7 +398,7 @@ def test_shared_dependencies_launch(controlplane_instance_group, job_template_fa
pu = p.project_updates.first()
iu = ii.inventory_updates.first()
TaskManager.start_task.assert_has_calls(
[mock.call(iu, controlplane_instance_group, [j1, j2, pu], instance), mock.call(pu, controlplane_instance_group, [j1, j2, iu], instance)]
[mock.call(iu, controlplane_instance_group, [j1, j2], instance), mock.call(pu, controlplane_instance_group, [j1, j2], instance)]
)
pu.status = "successful"
pu.finished = pu.created + timedelta(seconds=1)
@ -464,7 +480,6 @@ def test_generate_dependencies_only_once(job_template_factory):
job.status = "pending"
job.name = "job_gen_dep"
job.save()
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
# job starts with dependencies_processed as False
assert not job.dependencies_processed
@ -478,10 +493,6 @@ def test_generate_dependencies_only_once(job_template_factory):
# Run ._schedule() again, but make sure .generate_dependencies() is not
# called with job in the argument list
tm = TaskManager()
tm.generate_dependencies = mock.MagicMock()
tm.generate_dependencies = mock.MagicMock(return_value=[])
tm._schedule()
# .call_args is tuple, (positional_args, kwargs), [0][0] then is
# the first positional arg, i.e. the first argument of
# .generate_dependencies()
assert tm.generate_dependencies.call_args[0][0] == []
tm.generate_dependencies.assert_has_calls([mock.call([]), mock.call([])])