diff --git a/awx/main/models/base.py b/awx/main/models/base.py index 7baf81e93b..45cabc5a83 100644 --- a/awx/main/models/base.py +++ b/awx/main/models/base.py @@ -11,6 +11,7 @@ import os.path import yaml # Django +from django.conf import settings from django.db import models from django.db import transaction from django.core.exceptions import ValidationError @@ -405,10 +406,47 @@ class CommonTask(PrimordialModel): def can_cancel(self): return bool(self.status in ('pending', 'waiting', 'running')) + def _force_cancel(self): + # Update the status to 'canceled' if we can detect that the job + # really isn't running (i.e. celery has crashed or forcefully + # killed the worker). + task_statuses = ('STARTED', 'SUCCESS', 'FAILED', 'RETRY', 'REVOKED') + try: + taskmeta = self.celery_task + if not taskmeta or taskmeta.status not in task_statuses: + return + from celery import current_app + i = current_app.control.inspect() + for v in (i.active() or {}).values(): + if taskmeta.task_id in [x['id'] for x in v]: + return + for v in (i.reserved() or {}).values(): + if taskmeta.task_id in [x['id'] for x in v]: + return + for v in (i.revoked() or {}).values(): + if taskmeta.task_id in [x['id'] for x in v]: + return + for v in (i.scheduled() or {}).values(): + if taskmeta.task_id in [x['id'] for x in v]: + return + instance = self.__class__.objects.get(pk=self.pk) + if instance.can_cancel: + instance.status = 'canceled' + update_fields = ['status'] + if not instance.result_traceback: + instance.result_traceback = 'Forced cancel' + update_fields.append('result_traceback') + instance.save(update_fields=update_fields) + except: # FIXME: Log this exception! + if settings.DEBUG: + raise + def cancel(self): - # FIXME: Force cancel! if self.can_cancel: if not self.cancel_flag: self.cancel_flag = True self.save(update_fields=['cancel_flag']) + if settings.BROKER_URL.startswith('amqp://'): + self._force_cancel() return self.cancel_flag + diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 1981c79722..d198a406e0 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -294,7 +294,14 @@ class BaseTask(Task): output_replacements = [] try: if not self.pre_run_check(instance, **kwargs): - return + if hasattr(settings, 'CELERY_UNIT_TEST'): + return + else: + # Stop the task chain and prevent starting the job if it has + # already been canceled. + instance = self.update_model(pk) + status = instance.status + raise RuntimeError('not starting %s task' % instance.status) instance = self.update_model(pk, status='running') kwargs['private_data_file'] = self.build_private_data_file(instance, **kwargs) kwargs['passwords'] = self.build_passwords(instance, **kwargs) @@ -312,7 +319,8 @@ class BaseTask(Task): job_cwd=cwd, job_env=safe_env, result_stdout_file=stdout_filename) status, stdout = self.run_pexpect(instance, args, cwd, env, kwargs['passwords'], stdout_handle) except Exception: - tb = traceback.format_exc() + if status != 'canceled': + tb = traceback.format_exc() finally: if kwargs.get('private_data_file', ''): try: @@ -330,7 +338,10 @@ class BaseTask(Task): if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): # Raising an exception will mark the job as 'failed' in celery # and will stop a task chain from continuing to execute - raise Exception("Task %s(pk:%s) encountered an error" % (str(self.model.__class__), str(pk))) + if status == 'canceled': + raise Exception("Task %s(pk:%s) was canceled" % (str(self.model.__class__), str(pk))) + else: + raise Exception("Task %s(pk:%s) encountered an error" % (str(self.model.__class__), str(pk))) class RunJob(BaseTask): ''' @@ -477,7 +488,10 @@ class RunJob(BaseTask): ''' Hook for checking job before running. ''' - if job.status in ('pending', 'waiting'): + if job.cancel_flag: + job = self.update_model(job.pk, status='canceled') + return False + elif job.status in ('pending', 'waiting'): job = self.update_model(job.pk, status='pending') # Start another task to process job events. if settings.BROKER_URL.startswith('amqp://'): @@ -486,9 +500,6 @@ class RunJob(BaseTask): 'job_id': job.id, }, serializer='json') return True - elif job.cancel_flag: - job = self.update_model(job.pk, status='canceled') - return False else: return False