From cc7c2957cffdfe38e8c5a3970d1db3acd8e1ae3e Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Mon, 5 Dec 2016 15:30:21 -0500 Subject: [PATCH] always chain failures * When inv and proj updates trigger from a JT run, if either update fails then the job template should get marked failed. Before this commit, the job template would get marked failed ONLY if there was enough capacity to run all the associated updates within the same schedule() call. If, instead, the associated updates were ran in another schedule() call, the failure chain was lost. This changeset fixes that by saving the necessary data in the dependent_jobs relationship so that the failure is always chained. --- awx/main/models/unified_jobs.py | 7 ++++ awx/main/scheduler/__init__.py | 38 ++++++++++++++++++ awx/main/scheduler/partial.py | 39 +++++++++++++++++-- awx/main/tests/unit/scheduler/conftest.py | 1 + .../test_scheduler_inventory_update.py | 18 +++++++++ 5 files changed, 100 insertions(+), 3 deletions(-) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 31afe69d32..a9e1e631df 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -353,6 +353,10 @@ class UnifiedJobTypeStringMixin(object): def _underscore_to_camel(cls, word): return ''.join(x.capitalize() or '_' for x in word.split('_')) + @classmethod + def _camel_to_underscore(cls, word): + return re.sub('(?!^)([A-Z]+)', r'_\1', word).lower() + @classmethod def _model_type(cls, job_type): # Django >= 1.9 @@ -371,6 +375,9 @@ class UnifiedJobTypeStringMixin(object): return None return model.objects.get(id=job_id) + def model_to_str(self): + return UnifiedJobTypeStringMixin._camel_to_underscore(self.__class__.__name__) + class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, UnifiedJobTypeStringMixin): ''' diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 8569fb5cfc..a50a4177ac 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -166,6 +166,9 @@ class TaskManager(): return (active_task_queues, active_tasks) + def get_dependent_jobs_for_inv_and_proj_update(self, job_obj): + return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()] + def start_task(self, task, dependent_tasks=[]): from awx.main.tasks import handle_work_error, handle_work_success @@ -179,6 +182,17 @@ class TaskManager(): success_handler = handle_work_success.s(task_actual=task_actual) job_obj = task.get_full() + ''' + This is to account for when there isn't enough capacity to execute all + dependent jobs (i.e. proj or inv update) within the same schedule() + call. + + Proceeding calls to schedule() need to recontruct the proj or inv + update -> job fail logic dependency. The below call recontructs that + failure dependency. + ''' + if len(dependencies) == 0: + dependencies = self.get_dependent_jobs_for_inv_and_proj_update(job_obj) job_obj.status = 'waiting' (start_status, opts) = job_obj.pre_start() @@ -230,10 +244,32 @@ class TaskManager(): return inventory_task + ''' + Since we are dealing with partial objects we don't get to take advantage + of Django to resolve the type of related Many to Many field dependent_job. + + Hence the, potentional, double query in this method. + ''' + def get_related_dependent_jobs_as_patials(self, job_ids): + dependent_partial_jobs = [] + for id in job_ids: + if ProjectUpdate.objects.filter(id=id).exists(): + dependent_partial_jobs.append(ProjectUpdateDict({"id": id}).refresh_partial()) + elif InventoryUpdate.objects.filter(id=id).exists(): + dependent_partial_jobs.append(InventoryUpdateDict({"id": id}).refresh_partial()) + return dependent_partial_jobs + + def capture_chain_failure_dependencies(self, task, dependencies): + for dep in dependencies: + dep_obj = task.get_full() + dep_obj.dependent_jobs.add(task['id']) + dep_obj.save() + def generate_dependencies(self, task): dependencies = [] # TODO: What if the project is null ? if type(task) is JobDict: + if task['project__scm_update_on_launch'] is True and \ self.graph.should_update_related_project(task): project_task = self.create_project_update(task) @@ -248,6 +284,8 @@ class TaskManager(): if self.graph.should_update_related_inventory_source(task, inventory_source_task['id']): inventory_task = self.create_inventory_update(task, inventory_source_task) dependencies.append(inventory_task) + + self.capture_chain_failure_dependencies(task, dependencies) return dependencies def process_latest_project_updates(self, latest_project_updates): diff --git a/awx/main/scheduler/partial.py b/awx/main/scheduler/partial.py index d16634f369..03f07005dd 100644 --- a/awx/main/scheduler/partial.py +++ b/awx/main/scheduler/partial.py @@ -1,6 +1,7 @@ # Python import json +import itertools # AWX from awx.main.utils import decrypt_field_value @@ -61,13 +62,36 @@ class PartialModelDict(object): def task_impact(self): raise RuntimeError("Inherit and implement me") + @classmethod + def merge_values(cls, values): + grouped_results = itertools.groupby(values, key=lambda value: value['id']) + + merged_values = [] + for k, g in grouped_results: + print k + groups = list(g) + merged_value = {} + for group in groups: + for key, val in group.iteritems(): + if not merged_value.get(key): + merged_value[key] = val + elif val != merged_value[key]: + if isinstance(merged_value[key], list): + if val not in merged_value[key]: + merged_value[key].append(val) + else: + old_val = merged_value[key] + merged_value[key] = [old_val, val] + merged_values.append(merged_value) + return merged_values + class JobDict(PartialModelDict): FIELDS = ( 'id', 'status', 'job_template_id', 'inventory_id', 'project_id', 'launch_type', 'limit', 'allow_simultaneous', 'created', 'job_type', 'celery_task_id', 'project__scm_update_on_launch', - 'forks', 'start_args', + 'forks', 'start_args', 'dependent_jobs__id', ) model = Job @@ -85,6 +109,14 @@ class JobDict(PartialModelDict): start_args = start_args or {} return start_args.get('inventory_sources_already_updated', []) + @classmethod + def filter_partial(cls, status=[]): + kv = { + 'status__in': status + } + merged = PartialModelDict.merge_values(cls.model.objects.filter(**kv).values(*cls.get_db_values())) + return [cls(o) for o in merged] + class ProjectUpdateDict(PartialModelDict): FIELDS = ( @@ -134,7 +166,8 @@ class InventoryUpdateDict(PartialModelDict): #'inventory_source__update_on_launch', #'inventory_source__update_cache_timeout', FIELDS = ( - 'id', 'status', 'created', 'celery_task_id', 'inventory_source_id', 'inventory_source__inventory_id', + 'id', 'status', 'created', 'celery_task_id', 'inventory_source_id', + 'inventory_source__inventory_id', ) model = InventoryUpdate @@ -217,7 +250,7 @@ class SystemJobDict(PartialModelDict): class AdHocCommandDict(PartialModelDict): FIELDS = ( - 'id', 'created', 'status', 'inventory_id', + 'id', 'created', 'status', 'inventory_id', 'dependent_jobs__id', ) model = AdHocCommand diff --git a/awx/main/tests/unit/scheduler/conftest.py b/awx/main/tests/unit/scheduler/conftest.py index f04ba12a0b..40e221d0cc 100644 --- a/awx/main/tests/unit/scheduler/conftest.py +++ b/awx/main/tests/unit/scheduler/conftest.py @@ -36,6 +36,7 @@ def scheduler_factory(mocker, epoch): def no_create_project_update(task): raise RuntimeError("create_project_update should not be called") + mocker.patch.object(sched, 'capture_chain_failure_dependencies') mocker.patch.object(sched, 'get_tasks', return_value=tasks) mocker.patch.object(sched, 'get_running_workflow_jobs', return_value=[]) mocker.patch.object(sched, 'get_inventory_source_tasks', return_value=inventory_sources) diff --git a/awx/main/tests/unit/scheduler/test_scheduler_inventory_update.py b/awx/main/tests/unit/scheduler/test_scheduler_inventory_update.py index 5e49eec729..e337d0fd9c 100644 --- a/awx/main/tests/unit/scheduler/test_scheduler_inventory_update.py +++ b/awx/main/tests/unit/scheduler/test_scheduler_inventory_update.py @@ -87,3 +87,21 @@ class TestCreateDependentInventoryUpdate(): scheduler._schedule() scheduler.start_task.assert_called_with(waiting_inventory_update, [pending_job]) + + +class TestCaptureChainFailureDependencies(): + @pytest.fixture + def inventory_id_sources(self, inventory_source_factory): + return [ + (1, [inventory_source_factory(id=1)]), + ] + + def test(self, scheduler_factory, pending_job, waiting_inventory_update, inventory_id_sources): + scheduler = scheduler_factory(tasks=[pending_job], + create_inventory_update=waiting_inventory_update, + inventory_sources=inventory_id_sources) + + scheduler._schedule() + + scheduler.capture_chain_failure_dependencies.assert_called_with(pending_job, [waiting_inventory_update]) +