diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index f17b2b4c55..162805bd25 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -930,7 +930,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique def start_celery_task(self, opts, error_callback, success_callback): task_class = self._get_task_class() - task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback) + async_result = task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback) + return async_result.id def start(self, error_callback, success_callback, **kwargs): ''' diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index a48ca3ad23..f45a1344cd 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -60,8 +60,7 @@ class TaskManager(): ''' Tasks that are running and SHOULD have a celery task. ''' - def get_running_tasks(self): - status_list = ('running',) + def get_running_tasks(self, status_list=('running',)): jobs = JobDict.filter_partial(status=status_list) inventory_updates = InventoryUpdateDict.filter_partial(status=status_list) @@ -216,15 +215,15 @@ class TaskManager(): else: if type(job_obj) is WorkflowJob: job_obj.status = 'running' - - job_obj.save() + else: + celery_task_id = job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler) + job_obj.celery_task_id = celery_task_id self.consume_capacity(task) + job_obj.save() def post_commit(): job_obj.websocket_emit_status(job_obj.status) - if job_obj.status != 'failed': - job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler) connection.on_commit(post_commit) diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 6e169224b7..a6c7b3f9e1 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -52,7 +52,7 @@ def run_fail_inconsistent_running_jobs(): # TODO: Failed to contact celery. We should surface this. return None - all_running_sorted_tasks = scheduler.get_running_tasks() + all_running_sorted_tasks = scheduler.get_running_tasks(status_list=('running', 'waiting',)) scheduler.process_celery_tasks(active_tasks, all_running_sorted_tasks) except DatabaseError: return diff --git a/awx/main/tasks.py b/awx/main/tasks.py index ce4c9bc5bd..defa9efffe 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -665,7 +665,7 @@ class BaseTask(Task): ''' Run the job/task and capture its output. ''' - instance = self.update_model(pk, status='running', celery_task_id='' if self.request.id is None else self.request.id) + instance = self.update_model(pk, status='running') instance.websocket_emit_status("running") status, rc, tb = 'error', None, ''