Complete the new task job dependency system and add post-run error handler

This commit is contained in:
Matthew Jones
2014-01-29 14:04:07 -05:00
parent 2514a3dd9a
commit 47950f28a7
3 changed files with 52 additions and 48 deletions

View File

@@ -23,7 +23,7 @@ import uuid
import pexpect
# Celery
from celery import Task
from celery import Task, task
# Django
from django.conf import settings
@@ -35,12 +35,32 @@ from django.utils.timezone import now
from awx.main.models import Job, JobEvent, ProjectUpdate, InventoryUpdate
from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url
__all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryImport']
__all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'handle_work_error']
logger = logging.getLogger('awx.main.tasks')
# FIXME: Cleanly cancel task when celery worker is stopped.
@task(bind=True)
def handle_work_error(self, task_id, subtasks=None):
print('Executing error task id %s, subtasks: %s' % (str(self.request.id), str(subtasks)))
if subtasks is not None:
for each_task in subtasks:
if each_task['type'] == 'project_update':
instance = ProjectUpdate.objects.get(id=each_task['id'])
elif each_task['type'] == 'inventory_update':
instance = InventoryUpdate.objects.get(id=each_task['id'])
elif each_task['type': 'job']:
instance = Job.objects.get(id=each_task['id'])
else:
# Unknown task type
break
if instance.celery_task_id != instance.celery_task_id:
instance.status = 'failed'
instance.failed = True
instance.result_traceback = "Previous Task Failed: %s" % str(subtasks)
instance.save()
class BaseTask(Task):
name = None
@@ -205,13 +225,8 @@ class BaseTask(Task):
if logfile_pos != logfile.tell():
logfile_pos = logfile.tell()
last_stdout_update = time.time()
# Update instance status here (also updates modified timestamp, so
# we have a way to know the task is still running, otherwise the
# post_run_hook below would cancel long-running tasks that are
# really still active).
#TODO: Find replacement for cancel flag
#TODO: Something about checking celery status
# instance = self.update_model(instance.pk, status='running')
# if instance.cancel_flag:
# child.close(True)
# canceled = True
@@ -237,40 +252,14 @@ class BaseTask(Task):
return True
def post_run_hook(self, instance, **kwargs):
'''
Hook for actions to run after job/task has completed.
'''
# Cleanup instances that appear to be stuck.
try:
stuck_task_timeout = int(getattr(settings, 'STUCK_TASK_TIMEOUT', 300))
except (TypeError, ValueError):
stuck_task_timeout = 0
if stuck_task_timeout <= 0:
return
# Never less than 30 seconds so we're not messing with active tasks.
stuck_task_timeout = max(stuck_task_timeout, 30)
cutoff = now() - datetime.timedelta(seconds=stuck_task_timeout)
qs = self.model.objects.filter(status__in=('new', 'waiting', 'running'))
qs = qs.filter(modified__lt=cutoff)
for obj in qs:
# If new, created but never started. If waiting or running, the
# modified timestamp should updated regularly, else the task is
# probably stuck.
# If pending, we could be started but celeryd is not running, or
# we're waiting for an open slot in celeryd -- in either case we
# shouldn't necessarily cancel the task. Slim chance that somehow
# the task was started, picked up by celery, but hit an error
# before we could update the status.
obj.status = 'canceled'
obj.result_traceback += '\nCanceled stuck %s.' % unicode(self.model._meta.verbose_name)
obj.save(update_fields=['status', 'result_traceback'])
pass
@transaction.commit_on_success
def run(self, pk, **kwargs):
'''
Run the job/task and capture its output.
'''
instance = self.update_model(pk)
instance = self.update_model(pk, status='pending', celery_task_id=self.request.id)
status, stdout, tb = 'error', '', ''
output_replacements = []
try:
@@ -308,6 +297,10 @@ class BaseTask(Task):
result_traceback=tb,
output_replacements=output_replacements)
self.post_run_hook(instance, **kwargs)
if status != 'successful':
# Raising an exception will mark the job as 'failed' in celery
# and will stop a task chain from continuing to execute
raise Exception("Task %s(pk:%s) encountered an error" % (str(self.model.__class__), str(pk)))
class RunJob(BaseTask):
'''
@@ -434,6 +427,9 @@ class RunJob(BaseTask):
(job.project.local_path, root))
return cwd
def get_idle_timeout(self):
return getattr(settings, 'JOB_RUN_IDLE_TIMEOUT', 300)
def get_password_prompts(self):
d = super(RunJob, self).get_password_prompts()
d[re.compile(r'^Enter passphrase for .*:\s*?$', re.M)] = 'ssh_key_unlock'
@@ -801,6 +797,9 @@ class RunInventoryUpdate(BaseTask):
def build_cwd(self, inventory_update, **kwargs):
return self.get_path_to('..', 'plugins', 'inventory')
def get_idle_timeout(self):
return getattr(settings, 'INVENTORY_UPDATE_IDLE_TIMEOUT', 300)
def pre_run_check(self, inventory_update, **kwargs):
'''
Hook for checking inventory update before running.