mirror of
https://github.com/ansible/awx.git
synced 2026-01-22 15:08:03 -03:30
Merge pull request #5553 from chrismeyersfsu/fix-waiting_blocked
just like we fail running tasks fail waiting tasks
This commit is contained in:
commit
d020c81047
@ -930,7 +930,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
||||
|
||||
def start_celery_task(self, opts, error_callback, success_callback):
|
||||
task_class = self._get_task_class()
|
||||
task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback)
|
||||
async_result = task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback)
|
||||
return async_result.id
|
||||
|
||||
def start(self, error_callback, success_callback, **kwargs):
|
||||
'''
|
||||
|
||||
@ -60,8 +60,7 @@ class TaskManager():
|
||||
'''
|
||||
Tasks that are running and SHOULD have a celery task.
|
||||
'''
|
||||
def get_running_tasks(self):
|
||||
status_list = ('running',)
|
||||
def get_running_tasks(self, status_list=('running',)):
|
||||
|
||||
jobs = JobDict.filter_partial(status=status_list)
|
||||
inventory_updates = InventoryUpdateDict.filter_partial(status=status_list)
|
||||
@ -216,15 +215,15 @@ class TaskManager():
|
||||
else:
|
||||
if type(job_obj) is WorkflowJob:
|
||||
job_obj.status = 'running'
|
||||
|
||||
job_obj.save()
|
||||
else:
|
||||
celery_task_id = job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler)
|
||||
job_obj.celery_task_id = celery_task_id
|
||||
|
||||
self.consume_capacity(task)
|
||||
job_obj.save()
|
||||
|
||||
def post_commit():
|
||||
job_obj.websocket_emit_status(job_obj.status)
|
||||
if job_obj.status != 'failed':
|
||||
job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler)
|
||||
|
||||
connection.on_commit(post_commit)
|
||||
|
||||
|
||||
@ -52,7 +52,7 @@ def run_fail_inconsistent_running_jobs():
|
||||
# TODO: Failed to contact celery. We should surface this.
|
||||
return None
|
||||
|
||||
all_running_sorted_tasks = scheduler.get_running_tasks()
|
||||
all_running_sorted_tasks = scheduler.get_running_tasks(status_list=('running', 'waiting',))
|
||||
scheduler.process_celery_tasks(active_tasks, all_running_sorted_tasks)
|
||||
except DatabaseError:
|
||||
return
|
||||
|
||||
@ -665,7 +665,7 @@ class BaseTask(Task):
|
||||
'''
|
||||
Run the job/task and capture its output.
|
||||
'''
|
||||
instance = self.update_model(pk, status='running', celery_task_id='' if self.request.id is None else self.request.id)
|
||||
instance = self.update_model(pk, status='running')
|
||||
|
||||
instance.websocket_emit_status("running")
|
||||
status, rc, tb = 'error', None, ''
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user