diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 51862c8855..f801e3be58 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -3,6 +3,7 @@ # Python import cStringIO +import datetime import distutils.version import json import logging @@ -23,6 +24,7 @@ from celery import Task # Django from django.conf import settings +from django.utils.timezone import now # AWX from awx.main.models import Job, ProjectUpdate @@ -192,6 +194,35 @@ class BaseTask(Task): return False return True + def post_run_hook(self, instance, **kwargs): + ''' + Hook for actions to run after job/task has completed. + ''' + # Cleanup instances that appear to be stuck. + try: + stuck_task_timeout = int(getattr(settings, 'STUCK_TASK_TIMEOUT', 300)) + except (TypeError, ValueError): + stuck_task_timeout = 0 + if stuck_task_timeout <= 0: + return + # Never less than 30 seconds so we're not messing with active tasks. + stuck_task_timeout = max(stuck_task_timeout, 30) + cutoff = now() - datetime.timedelta(seconds=stuck_task_timeout) + qs = self.model.objects.filter(status__in=('new', 'waiting', 'running')) + qs = qs.filter(modified__lt=cutoff) + for obj in qs: + # If new, created but never started. If waiting or running, the + # modified timestamp should updated regularly, else the task is + # probably stuck. + # If pending, we could be started but celeryd is not running, or + # we're waiting for an open slot in celeryd -- in either case we + # shouldn't necessarily cancel the task. Slim chance that somehow + # the task was started, picked up by celery, but hit an error + # before we could update the status. + obj.status = 'canceled' + obj.result_traceback += '\nCanceled stuck %s.' % unicode(self.model._meta.verbose_name) + obj.save(update_fields=['status', 'result_traceback']) + def run(self, pk, **kwargs): ''' Run the job/task using ansible-playbook and capture its output. @@ -224,6 +255,7 @@ class BaseTask(Task): instance = self.update_model(pk, status=status, result_stdout=stdout, result_traceback=tb, output_replacements=output_replacements) + self.post_run_hook(instance, **kwargs) class RunJob(BaseTask): '''