diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 162805bd25..f17b2b4c55 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -930,8 +930,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique def start_celery_task(self, opts, error_callback, success_callback): task_class = self._get_task_class() - async_result = task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback) - return async_result.id + task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback) def start(self, error_callback, success_callback, **kwargs): ''' diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index f45a1344cd..a48ca3ad23 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -60,7 +60,8 @@ class TaskManager(): ''' Tasks that are running and SHOULD have a celery task. ''' - def get_running_tasks(self, status_list=('running',)): + def get_running_tasks(self): + status_list = ('running',) jobs = JobDict.filter_partial(status=status_list) inventory_updates = InventoryUpdateDict.filter_partial(status=status_list) @@ -215,15 +216,15 @@ class TaskManager(): else: if type(job_obj) is WorkflowJob: job_obj.status = 'running' - else: - celery_task_id = job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler) - job_obj.celery_task_id = celery_task_id + + job_obj.save() self.consume_capacity(task) - job_obj.save() def post_commit(): job_obj.websocket_emit_status(job_obj.status) + if job_obj.status != 'failed': + job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler) connection.on_commit(post_commit) diff --git a/awx/main/scheduler/dependency_graph.py b/awx/main/scheduler/dependency_graph.py index 61f08c4241..a94a158335 100644 --- a/awx/main/scheduler/dependency_graph.py +++ b/awx/main/scheduler/dependency_graph.py @@ -83,6 +83,11 @@ class DependencyGraph(object): ''' def should_update_related_project(self, job): now = self.get_now() + + # Already processed dependencies for this job + if job.data['dependent_jobs__id'] is not None: + return False + latest_project_update = self.data[self.LATEST_PROJECT_UPDATES].get(job['project_id'], None) if not latest_project_update: return True diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index a6c7b3f9e1..6e169224b7 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -52,7 +52,7 @@ def run_fail_inconsistent_running_jobs(): # TODO: Failed to contact celery. We should surface this. return None - all_running_sorted_tasks = scheduler.get_running_tasks(status_list=('running', 'waiting',)) + all_running_sorted_tasks = scheduler.get_running_tasks() scheduler.process_celery_tasks(active_tasks, all_running_sorted_tasks) except DatabaseError: return diff --git a/awx/main/tasks.py b/awx/main/tasks.py index defa9efffe..ce4c9bc5bd 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -665,7 +665,7 @@ class BaseTask(Task): ''' Run the job/task and capture its output. ''' - instance = self.update_model(pk, status='running') + instance = self.update_model(pk, status='running', celery_task_id='' if self.request.id is None else self.request.id) instance.websocket_emit_status("running") status, rc, tb = 'error', None, '' diff --git a/awx/main/tests/unit/scheduler/test_dependency_graph.py b/awx/main/tests/unit/scheduler/test_dependency_graph.py index c1c92411ea..ca74ec9bf7 100644 --- a/awx/main/tests/unit/scheduler/test_dependency_graph.py +++ b/awx/main/tests/unit/scheduler/test_dependency_graph.py @@ -17,8 +17,10 @@ def graph(): @pytest.fixture -def job(): - return dict(project_id=1) +def job(job_factory): + j = job_factory() + j.project_id = 1 + return j @pytest.fixture @@ -36,13 +38,11 @@ def unsuccessful_last_project(graph, job): @pytest.fixture -def last_dependent_project(graph): +def last_dependent_project(graph, job): now = tz_now() - job = { - 'project_id': 1, - 'created': now, - } + job['project_id'] = 1 + job['created'] = now pu = ProjectUpdateDict(dict(id=1, project_id=1, status='waiting', project__scm_update_cache_timeout=0, launch_type='dependency', @@ -57,10 +57,8 @@ def last_dependent_project(graph): def timedout_project_update(graph, job): now = tz_now() - job = { - 'project_id': 1, - 'created': now, - } + job['project_id'] = 1 + job['created'] = now pu = ProjectUpdateDict(dict(id=1, project_id=1, status='successful', project__scm_update_cache_timeout=10, launch_type='dependency', @@ -76,10 +74,8 @@ def timedout_project_update(graph, job): def not_timedout_project_update(graph, job): now = tz_now() - job = { - 'project_id': 1, - 'created': now, - } + job['project_id'] = 1 + job['created'] = now pu = ProjectUpdateDict(dict(id=1, project_id=1, status='successful', project__scm_update_cache_timeout=3600, launch_type='dependency',