just like we fail running tasks fail waiting tasks

* Associate the celery_id with the job at the earliest point possible.
This ensures that a waiting job has a celery id. Thus, we are free to
fail waiting jobs that don't have a celery id.
This commit is contained in:
Chris Meyers
2017-02-24 12:07:04 -05:00
parent 3b489977a0
commit 903d0472f0
4 changed files with 9 additions and 9 deletions

View File

@@ -930,7 +930,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
def start_celery_task(self, opts, error_callback, success_callback): def start_celery_task(self, opts, error_callback, success_callback):
task_class = self._get_task_class() task_class = self._get_task_class()
task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback) async_result = task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback)
return async_result.id
def start(self, error_callback, success_callback, **kwargs): def start(self, error_callback, success_callback, **kwargs):
''' '''

View File

@@ -60,8 +60,7 @@ class TaskManager():
''' '''
Tasks that are running and SHOULD have a celery task. Tasks that are running and SHOULD have a celery task.
''' '''
def get_running_tasks(self): def get_running_tasks(self, status_list=('running',)):
status_list = ('running',)
jobs = JobDict.filter_partial(status=status_list) jobs = JobDict.filter_partial(status=status_list)
inventory_updates = InventoryUpdateDict.filter_partial(status=status_list) inventory_updates = InventoryUpdateDict.filter_partial(status=status_list)
@@ -216,15 +215,15 @@ class TaskManager():
else: else:
if type(job_obj) is WorkflowJob: if type(job_obj) is WorkflowJob:
job_obj.status = 'running' job_obj.status = 'running'
else:
job_obj.save() celery_task_id = job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler)
job_obj.celery_task_id = celery_task_id
self.consume_capacity(task) self.consume_capacity(task)
job_obj.save()
def post_commit(): def post_commit():
job_obj.websocket_emit_status(job_obj.status) job_obj.websocket_emit_status(job_obj.status)
if job_obj.status != 'failed':
job_obj.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler)
connection.on_commit(post_commit) connection.on_commit(post_commit)

View File

@@ -52,7 +52,7 @@ def run_fail_inconsistent_running_jobs():
# TODO: Failed to contact celery. We should surface this. # TODO: Failed to contact celery. We should surface this.
return None return None
all_running_sorted_tasks = scheduler.get_running_tasks() all_running_sorted_tasks = scheduler.get_running_tasks(status_list=('running', 'waiting',))
scheduler.process_celery_tasks(active_tasks, all_running_sorted_tasks) scheduler.process_celery_tasks(active_tasks, all_running_sorted_tasks)
except DatabaseError: except DatabaseError:
return return

View File

@@ -665,7 +665,7 @@ class BaseTask(Task):
''' '''
Run the job/task and capture its output. Run the job/task and capture its output.
''' '''
instance = self.update_model(pk, status='running', celery_task_id='' if self.request.id is None else self.request.id) instance = self.update_model(pk, status='running')
instance.websocket_emit_status("running") instance.websocket_emit_status("running")
status, rc, tb = 'error', None, '' status, rc, tb = 'error', None, ''