diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 31afe69d32..a9e1e631df 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -353,6 +353,10 @@ class UnifiedJobTypeStringMixin(object): def _underscore_to_camel(cls, word): return ''.join(x.capitalize() or '_' for x in word.split('_')) + @classmethod + def _camel_to_underscore(cls, word): + return re.sub('(?!^)([A-Z]+)', r'_\1', word).lower() + @classmethod def _model_type(cls, job_type): # Django >= 1.9 @@ -371,6 +375,9 @@ class UnifiedJobTypeStringMixin(object): return None return model.objects.get(id=job_id) + def model_to_str(self): + return UnifiedJobTypeStringMixin._camel_to_underscore(self.__class__.__name__) + class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, UnifiedJobTypeStringMixin): ''' diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 8569fb5cfc..a50a4177ac 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -166,6 +166,9 @@ class TaskManager(): return (active_task_queues, active_tasks) + def get_dependent_jobs_for_inv_and_proj_update(self, job_obj): + return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()] + def start_task(self, task, dependent_tasks=[]): from awx.main.tasks import handle_work_error, handle_work_success @@ -179,6 +182,17 @@ class TaskManager(): success_handler = handle_work_success.s(task_actual=task_actual) job_obj = task.get_full() + ''' + This is to account for when there isn't enough capacity to execute all + dependent jobs (i.e. proj or inv update) within the same schedule() + call. + + Proceeding calls to schedule() need to recontruct the proj or inv + update -> job fail logic dependency. The below call recontructs that + failure dependency. + ''' + if len(dependencies) == 0: + dependencies = self.get_dependent_jobs_for_inv_and_proj_update(job_obj) job_obj.status = 'waiting' (start_status, opts) = job_obj.pre_start() @@ -230,10 +244,32 @@ class TaskManager(): return inventory_task + ''' + Since we are dealing with partial objects we don't get to take advantage + of Django to resolve the type of related Many to Many field dependent_job. + + Hence the, potentional, double query in this method. + ''' + def get_related_dependent_jobs_as_patials(self, job_ids): + dependent_partial_jobs = [] + for id in job_ids: + if ProjectUpdate.objects.filter(id=id).exists(): + dependent_partial_jobs.append(ProjectUpdateDict({"id": id}).refresh_partial()) + elif InventoryUpdate.objects.filter(id=id).exists(): + dependent_partial_jobs.append(InventoryUpdateDict({"id": id}).refresh_partial()) + return dependent_partial_jobs + + def capture_chain_failure_dependencies(self, task, dependencies): + for dep in dependencies: + dep_obj = task.get_full() + dep_obj.dependent_jobs.add(task['id']) + dep_obj.save() + def generate_dependencies(self, task): dependencies = [] # TODO: What if the project is null ? if type(task) is JobDict: + if task['project__scm_update_on_launch'] is True and \ self.graph.should_update_related_project(task): project_task = self.create_project_update(task) @@ -248,6 +284,8 @@ class TaskManager(): if self.graph.should_update_related_inventory_source(task, inventory_source_task['id']): inventory_task = self.create_inventory_update(task, inventory_source_task) dependencies.append(inventory_task) + + self.capture_chain_failure_dependencies(task, dependencies) return dependencies def process_latest_project_updates(self, latest_project_updates): diff --git a/awx/main/scheduler/partial.py b/awx/main/scheduler/partial.py index d16634f369..03f07005dd 100644 --- a/awx/main/scheduler/partial.py +++ b/awx/main/scheduler/partial.py @@ -1,6 +1,7 @@ # Python import json +import itertools # AWX from awx.main.utils import decrypt_field_value @@ -61,13 +62,36 @@ class PartialModelDict(object): def task_impact(self): raise RuntimeError("Inherit and implement me") + @classmethod + def merge_values(cls, values): + grouped_results = itertools.groupby(values, key=lambda value: value['id']) + + merged_values = [] + for k, g in grouped_results: + print k + groups = list(g) + merged_value = {} + for group in groups: + for key, val in group.iteritems(): + if not merged_value.get(key): + merged_value[key] = val + elif val != merged_value[key]: + if isinstance(merged_value[key], list): + if val not in merged_value[key]: + merged_value[key].append(val) + else: + old_val = merged_value[key] + merged_value[key] = [old_val, val] + merged_values.append(merged_value) + return merged_values + class JobDict(PartialModelDict): FIELDS = ( 'id', 'status', 'job_template_id', 'inventory_id', 'project_id', 'launch_type', 'limit', 'allow_simultaneous', 'created', 'job_type', 'celery_task_id', 'project__scm_update_on_launch', - 'forks', 'start_args', + 'forks', 'start_args', 'dependent_jobs__id', ) model = Job @@ -85,6 +109,14 @@ class JobDict(PartialModelDict): start_args = start_args or {} return start_args.get('inventory_sources_already_updated', []) + @classmethod + def filter_partial(cls, status=[]): + kv = { + 'status__in': status + } + merged = PartialModelDict.merge_values(cls.model.objects.filter(**kv).values(*cls.get_db_values())) + return [cls(o) for o in merged] + class ProjectUpdateDict(PartialModelDict): FIELDS = ( @@ -134,7 +166,8 @@ class InventoryUpdateDict(PartialModelDict): #'inventory_source__update_on_launch', #'inventory_source__update_cache_timeout', FIELDS = ( - 'id', 'status', 'created', 'celery_task_id', 'inventory_source_id', 'inventory_source__inventory_id', + 'id', 'status', 'created', 'celery_task_id', 'inventory_source_id', + 'inventory_source__inventory_id', ) model = InventoryUpdate @@ -217,7 +250,7 @@ class SystemJobDict(PartialModelDict): class AdHocCommandDict(PartialModelDict): FIELDS = ( - 'id', 'created', 'status', 'inventory_id', + 'id', 'created', 'status', 'inventory_id', 'dependent_jobs__id', ) model = AdHocCommand diff --git a/awx/main/tests/unit/scheduler/conftest.py b/awx/main/tests/unit/scheduler/conftest.py index f04ba12a0b..40e221d0cc 100644 --- a/awx/main/tests/unit/scheduler/conftest.py +++ b/awx/main/tests/unit/scheduler/conftest.py @@ -36,6 +36,7 @@ def scheduler_factory(mocker, epoch): def no_create_project_update(task): raise RuntimeError("create_project_update should not be called") + mocker.patch.object(sched, 'capture_chain_failure_dependencies') mocker.patch.object(sched, 'get_tasks', return_value=tasks) mocker.patch.object(sched, 'get_running_workflow_jobs', return_value=[]) mocker.patch.object(sched, 'get_inventory_source_tasks', return_value=inventory_sources) diff --git a/awx/main/tests/unit/scheduler/test_scheduler_inventory_update.py b/awx/main/tests/unit/scheduler/test_scheduler_inventory_update.py index 5e49eec729..e337d0fd9c 100644 --- a/awx/main/tests/unit/scheduler/test_scheduler_inventory_update.py +++ b/awx/main/tests/unit/scheduler/test_scheduler_inventory_update.py @@ -87,3 +87,21 @@ class TestCreateDependentInventoryUpdate(): scheduler._schedule() scheduler.start_task.assert_called_with(waiting_inventory_update, [pending_job]) + + +class TestCaptureChainFailureDependencies(): + @pytest.fixture + def inventory_id_sources(self, inventory_source_factory): + return [ + (1, [inventory_source_factory(id=1)]), + ] + + def test(self, scheduler_factory, pending_job, waiting_inventory_update, inventory_id_sources): + scheduler = scheduler_factory(tasks=[pending_job], + create_inventory_update=waiting_inventory_update, + inventory_sources=inventory_id_sources) + + scheduler._schedule() + + scheduler.capture_chain_failure_dependencies.assert_called_with(pending_job, [waiting_inventory_update]) +