AC-507 Allow forcefully marking a job canceled if we can detect it isn't running via Celery, make sure a job canceled before it is started doesn't attempt to run at all, raise an exception for canceled jobs to interrupt the task chain.

This commit is contained in:
Chris Church 2014-02-09 01:49:00 -05:00
parent 2423c9a63c
commit 818f235f72
2 changed files with 57 additions and 8 deletions

View File

@ -11,6 +11,7 @@ import os.path
import yaml
# Django
from django.conf import settings
from django.db import models
from django.db import transaction
from django.core.exceptions import ValidationError
@ -405,10 +406,47 @@ class CommonTask(PrimordialModel):
def can_cancel(self):
return bool(self.status in ('pending', 'waiting', 'running'))
def _force_cancel(self):
# Update the status to 'canceled' if we can detect that the job
# really isn't running (i.e. celery has crashed or forcefully
# killed the worker).
task_statuses = ('STARTED', 'SUCCESS', 'FAILED', 'RETRY', 'REVOKED')
try:
taskmeta = self.celery_task
if not taskmeta or taskmeta.status not in task_statuses:
return
from celery import current_app
i = current_app.control.inspect()
for v in (i.active() or {}).values():
if taskmeta.task_id in [x['id'] for x in v]:
return
for v in (i.reserved() or {}).values():
if taskmeta.task_id in [x['id'] for x in v]:
return
for v in (i.revoked() or {}).values():
if taskmeta.task_id in [x['id'] for x in v]:
return
for v in (i.scheduled() or {}).values():
if taskmeta.task_id in [x['id'] for x in v]:
return
instance = self.__class__.objects.get(pk=self.pk)
if instance.can_cancel:
instance.status = 'canceled'
update_fields = ['status']
if not instance.result_traceback:
instance.result_traceback = 'Forced cancel'
update_fields.append('result_traceback')
instance.save(update_fields=update_fields)
except: # FIXME: Log this exception!
if settings.DEBUG:
raise
def cancel(self):
# FIXME: Force cancel!
if self.can_cancel:
if not self.cancel_flag:
self.cancel_flag = True
self.save(update_fields=['cancel_flag'])
if settings.BROKER_URL.startswith('amqp://'):
self._force_cancel()
return self.cancel_flag

View File

@ -294,7 +294,14 @@ class BaseTask(Task):
output_replacements = []
try:
if not self.pre_run_check(instance, **kwargs):
return
if hasattr(settings, 'CELERY_UNIT_TEST'):
return
else:
# Stop the task chain and prevent starting the job if it has
# already been canceled.
instance = self.update_model(pk)
status = instance.status
raise RuntimeError('not starting %s task' % instance.status)
instance = self.update_model(pk, status='running')
kwargs['private_data_file'] = self.build_private_data_file(instance, **kwargs)
kwargs['passwords'] = self.build_passwords(instance, **kwargs)
@ -312,7 +319,8 @@ class BaseTask(Task):
job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename)
status, stdout = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle)
except Exception:
tb = traceback.format_exc()
if status != 'canceled':
tb = traceback.format_exc()
finally:
if kwargs.get('private_data_file', ''):
try:
@ -330,7 +338,10 @@ class BaseTask(Task):
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
# Raising an exception will mark the job as 'failed' in celery
# and will stop a task chain from continuing to execute
raise Exception("Task %s(pk:%s) encountered an error" % (str(self.model.__class__), str(pk)))
if status == 'canceled':
raise Exception("Task %s(pk:%s) was canceled" % (str(self.model.__class__), str(pk)))
else:
raise Exception("Task %s(pk:%s) encountered an error" % (str(self.model.__class__), str(pk)))
class RunJob(BaseTask):
'''
@ -477,7 +488,10 @@ class RunJob(BaseTask):
'''
Hook for checking job before running.
'''
if job.status in ('pending', 'waiting'):
if job.cancel_flag:
job = self.update_model(job.pk, status='canceled')
return False
elif job.status in ('pending', 'waiting'):
job = self.update_model(job.pk, status='pending')
# Start another task to process job events.
if settings.BROKER_URL.startswith('amqp://'):
@ -486,9 +500,6 @@ class RunJob(BaseTask):
'job_id': job.id,
}, serializer='json')
return True
elif job.cancel_flag:
job = self.update_model(job.pk, status='canceled')
return False
else:
return False