From 1beaccb9c919c52acfb3cac9536d8d7a0fd6f855 Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Wed, 20 Sep 2017 13:13:10 -0400 Subject: [PATCH 1/5] move TaskManager out of init --- awx/main/scheduler/{__init__.py => task_manager.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename awx/main/scheduler/{__init__.py => task_manager.py} (100%) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/task_manager.py similarity index 100% rename from awx/main/scheduler/__init__.py rename to awx/main/scheduler/task_manager.py From 11b2bc33fe08e116347c21cc8ae5baa6e9352bb8 Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Wed, 20 Sep 2017 13:14:01 -0400 Subject: [PATCH 2/5] add scheduler module __init__ --- awx/main/scheduler/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 awx/main/scheduler/__init__.py diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py new file mode 100644 index 0000000000..87e635fc01 --- /dev/null +++ b/awx/main/scheduler/__init__.py @@ -0,0 +1,4 @@ +# Copyright (c) 2017 Ansible, Inc. +# + +from awx.main.scheduler.task_manager import TaskManager # noqa From 2889df80131bd1322c53caddcc6be54cd2a1f334 Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Wed, 20 Sep 2017 13:15:02 -0400 Subject: [PATCH 3/5] ensure project sync/inv updates are added to the dependencies --- awx/main/scheduler/task_manager.py | 63 ++++++++++++++++++++++++------ 1 file changed, 50 insertions(+), 13 deletions(-) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 549f1546f3..5652e59d99 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -17,8 +17,19 @@ from django.db.models import Q from django.contrib.contenttypes.models import ContentType # AWX -from awx.main.models import * # noqa -#from awx.main.scheduler.dag_simple import SimpleDAG +from awx.main.models import ( + AdHocCommand, + Instance, + InstanceGroup, + InventorySource, + InventoryUpdate, + Job, + Project, + ProjectUpdate, + SystemJob, + UnifiedJob, + WorkflowJob, +) from awx.main.scheduler.dag_workflow import WorkflowDAG from awx.main.utils.pglock import advisory_lock from awx.main.utils import get_type_for_model @@ -294,16 +305,23 @@ class TaskManager(): # Add task + all deps except self dep.dependent_jobs.add(*([task] + filter(lambda d: d != dep, dependencies))) + def get_latest_inventory_update(self, inventory_source): + latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created") + if not latest_inventory_update.exists(): + return None + return latest_inventory_update.first() + def should_update_inventory_source(self, job, inventory_source): now = tz_now() # Already processed dependencies for this job if job.dependent_jobs.all(): return False - latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created") - if not latest_inventory_update.exists(): + + latest_inventory_update = self.get_latest_inventory_update(inventory_source) + + if latest_inventory_update is None: return True - latest_inventory_update = latest_inventory_update.first() ''' If there's already a inventory update utilizing this job that's about to run then we don't need to create one @@ -319,14 +337,22 @@ class TaskManager(): return True return False + def get_latest_project_update(self, job): + latest_project_update = ProjectUpdate.objects.filter(project=job.project, job_type='check').order_by("-created") + if not latest_project_update.exists(): + return None + return latest_project_update.first() + def should_update_related_project(self, job): now = tz_now() if job.dependent_jobs.all(): return False - latest_project_update = ProjectUpdate.objects.filter(project=job.project, job_type='check').order_by("-created") - if not latest_project_update.exists(): + + latest_project_update = self.get_latest_project_update(job) + + if latest_project_update is None: return True - latest_project_update = latest_project_update.first() + if latest_project_update.status in ['failed', 'canceled']: return True @@ -358,16 +384,27 @@ class TaskManager(): dependencies = [] if type(task) is Job: # TODO: Can remove task.project None check after scan-job-default-playbook is removed - if task.project is not None and task.project.scm_update_on_launch is True and \ - self.should_update_related_project(task): - project_task = self.create_project_update(task) - dependencies.append(project_task) - # Inventory created 2 seconds behind job + if task.project is not None and task.project.scm_update_on_launch is True: + if self.should_update_related_project(task): + project_task = self.create_project_update(task) + dependencies.append(project_task) + else: + latest_project_update = self.get_latest_project_update(task) + if latest_project_update.status in ['waiting', 'pending', 'running']: + dependencies.append(latest_project_update) + + # Inventory created 2 seconds behind job if task.launch_type != 'callback': for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]: + if not inventory_source.update_on_launch: + continue if self.should_update_inventory_source(task, inventory_source): inventory_task = self.create_inventory_update(task, inventory_source) dependencies.append(inventory_task) + else: + latest_inventory_update = self.get_latest_inventory_update(inventory_source) + if latest_inventory_update.status in ['waiting', 'pending', 'running']: + dependencies.append(latest_inventory_update) if len(dependencies) > 0: self.capture_chain_failure_dependencies(task, dependencies) From 0f4523fabfddf94bedf40d8284706a691a9d276b Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Wed, 20 Sep 2017 13:15:54 -0400 Subject: [PATCH 4/5] Fix up unit and functional tests --- .../task_management/test_rampart_groups.py | 18 ++++-------------- awx/main/tests/unit/test_task_manager.py | 4 ++-- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/awx/main/tests/functional/task_management/test_rampart_groups.py b/awx/main/tests/functional/task_management/test_rampart_groups.py index c81556e091..f4c6ba95bf 100644 --- a/awx/main/tests/functional/task_management/test_rampart_groups.py +++ b/awx/main/tests/functional/task_management/test_rampart_groups.py @@ -69,20 +69,10 @@ def test_multi_group_with_shared_dependency(instance_factory, default_instance_g pu.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j1, ig1, []) - j1.finished = j1.created + timedelta(seconds=2) - j1.status = "successful" - j1.save() - with mock.patch("awx.main.scheduler.TaskManager.start_task"): - TaskManager().schedule() - pu = p.project_updates.last() - TaskManager.start_task.assert_called_once_with(pu, default_instance_group, [j2]) - pu.finished = pu.created + timedelta(seconds=1) - pu.status = "successful" - pu.save() - with mock.patch("awx.main.scheduler.TaskManager.start_task"): - TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j2, ig2, []) + + TaskManager.start_task.assert_any_call(j1, ig1, []) + TaskManager.start_task.assert_any_call(j2, ig2, []) + assert TaskManager.start_task.call_count == 2 @pytest.mark.django_db diff --git a/awx/main/tests/unit/test_task_manager.py b/awx/main/tests/unit/test_task_manager.py index 1937b7b5ca..9e8066c8a1 100644 --- a/awx/main/tests/unit/test_task_manager.py +++ b/awx/main/tests/unit/test_task_manager.py @@ -22,7 +22,7 @@ class TestCleanupInconsistentCeleryTasks(): @mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, [])) @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) @mock.patch.object(Instance.objects, 'filter', return_value=mock.MagicMock(first=lambda: None)) - @mock.patch('awx.main.scheduler.logger') + @mock.patch('awx.main.scheduler.task_manager.logger') def test_instance_does_not_exist(self, logger_mock, *args): logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked")) tm = TaskManager() @@ -38,7 +38,7 @@ class TestCleanupInconsistentCeleryTasks(): @mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {'host1': []})) @mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[]) @mock.patch.object(TaskManager, 'get_running_tasks') - @mock.patch('awx.main.scheduler.logger') + @mock.patch('awx.main.scheduler.task_manager.logger') def test_save_failed(self, logger_mock, get_running_tasks, *args): logger_mock.error = mock.MagicMock() job = Job(id=2, modified=tz_now(), status='running', celery_task_id='blah', execution_node='host1') From 095a93d89585b2be33c0db331582f37d23aabfc0 Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Wed, 20 Sep 2017 13:57:01 -0400 Subject: [PATCH 5/5] remove duplicated get_latest calls --- awx/main/scheduler/task_manager.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 5652e59d99..dde05aade7 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -311,15 +311,13 @@ class TaskManager(): return None return latest_inventory_update.first() - def should_update_inventory_source(self, job, inventory_source): + def should_update_inventory_source(self, job, latest_inventory_update): now = tz_now() # Already processed dependencies for this job if job.dependent_jobs.all(): return False - latest_inventory_update = self.get_latest_inventory_update(inventory_source) - if latest_inventory_update is None: return True ''' @@ -343,13 +341,11 @@ class TaskManager(): return None return latest_project_update.first() - def should_update_related_project(self, job): + def should_update_related_project(self, job, latest_project_update): now = tz_now() if job.dependent_jobs.all(): return False - latest_project_update = self.get_latest_project_update(job) - if latest_project_update is None: return True @@ -385,11 +381,11 @@ class TaskManager(): if type(task) is Job: # TODO: Can remove task.project None check after scan-job-default-playbook is removed if task.project is not None and task.project.scm_update_on_launch is True: - if self.should_update_related_project(task): + latest_project_update = self.get_latest_project_update(task) + if self.should_update_related_project(task, latest_project_update): project_task = self.create_project_update(task) dependencies.append(project_task) else: - latest_project_update = self.get_latest_project_update(task) if latest_project_update.status in ['waiting', 'pending', 'running']: dependencies.append(latest_project_update) @@ -398,11 +394,11 @@ class TaskManager(): for inventory_source in [invsrc for invsrc in self.all_inventory_sources if invsrc.inventory == task.inventory]: if not inventory_source.update_on_launch: continue - if self.should_update_inventory_source(task, inventory_source): + latest_inventory_update = self.get_latest_inventory_update(inventory_source) + if self.should_update_inventory_source(task, latest_inventory_update): inventory_task = self.create_inventory_update(task, inventory_source) dependencies.append(inventory_task) else: - latest_inventory_update = self.get_latest_inventory_update(inventory_source) if latest_inventory_update.status in ['waiting', 'pending', 'running']: dependencies.append(latest_inventory_update)