From 47950f28a722233f6644dbf1b2ed58ba99684f06 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 29 Jan 2014 14:04:07 -0500 Subject: [PATCH] Complete the new task job dependency system and add post-run error handler --- awx/main/models/base.py | 2 ++ awx/main/models/jobs.py | 29 +++++++++-------- awx/main/tasks.py | 69 ++++++++++++++++++++--------------------- 3 files changed, 52 insertions(+), 48 deletions(-) diff --git a/awx/main/models/base.py b/awx/main/models/base.py index 0182cee98d..02ae73519d 100644 --- a/awx/main/models/base.py +++ b/awx/main/models/base.py @@ -367,6 +367,8 @@ class CommonTask(PrimordialModel): return [] def start_signature(self, **kwargs): + from awx.main.tasks import handle_work_error + task_class = self._get_task_class() if not self.can_start: return False diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index d0ac720b43..5deabe4f4d 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -335,6 +335,7 @@ class Job(CommonTask): return self._get_hosts(job_host_summaries__processed__gt=0) def start(self, **kwargs): + from awx.main.tasks import handle_work_error task_class = self._get_task_class() if not self.can_start: return False @@ -348,6 +349,7 @@ class Job(CommonTask): transaction.commit() runnable_tasks = [] + run_tasks = [] inventory_updates_actual = [] project_update_actual = None @@ -355,28 +357,29 @@ class Job(CommonTask): inventory = self.inventory is_qs = inventory.inventory_sources.filter(active=True, update_on_launch=True) if project.scm_update_on_launch: - # TODO: We assume these return a tuple but not on error - project_update, project_update_sig = project.update_signature() - if not project_update: + project_update_details = project.update_signature() + if not project_update_details: # TODO: Set error here pass else: - project_update_actual = project_update - # TODO: append a callback to gather the status? - runnable_tasks.append(project_update_sig) - # TODO: need to add celery task id to proj update instance + runnable_tasks.append({'obj': project_update_details[0], + 'sig': project_update_details[1], + 'type': 'project_update'}) if is_qs.count(): for inventory_source in is_qs: - # TODO: We assume these return a tuple but not on error - inventory_update, inventory_update_sig = inventory_source.update_signature() + inventory_update_details = inventory_source.update_signature() if not inventory_update: # TODO: Set error here pass else: - inventory_updates_actual.append(inventory_update) - runnable_tasks.append(inventory_update_sig) - job_actual = task_class().si(self.pk, **opts) - runnable_tasks.append(job_actual) + runnable_tasks.append({'obj': inventory_update_details[0], + 'sig': inventory_update_details[1], + 'type': 'inventory_update'}) + thisjob = {'type': 'job', 'id': self.id} + for idx in xrange(len(runnable_tasks)): + dependent_tasks = [{'type': r['type'], 'id': r['obj'].id} for r in runnable_tasks[idx:]] + [thisjob] + run_tasks.append(runnable_tasks[idx]['sig'].set(link_error=handle_work_error.s(subtasks=dependent_tasks))) + run_tasks.append(task_class().si(self.pk, **opts).set(link_error=handle_work_error.s(subtasks=[thisjob]))) print runnable_tasks res = chain(runnable_tasks)() return True diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 2b31fe8151..04227572f9 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -23,7 +23,7 @@ import uuid import pexpect # Celery -from celery import Task +from celery import Task, task # Django from django.conf import settings @@ -35,12 +35,32 @@ from django.utils.timezone import now from awx.main.models import Job, JobEvent, ProjectUpdate, InventoryUpdate from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url -__all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryImport'] +__all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'handle_work_error'] logger = logging.getLogger('awx.main.tasks') # FIXME: Cleanly cancel task when celery worker is stopped. +@task(bind=True) +def handle_work_error(self, task_id, subtasks=None): + print('Executing error task id %s, subtasks: %s' % (str(self.request.id), str(subtasks))) + if subtasks is not None: + for each_task in subtasks: + if each_task['type'] == 'project_update': + instance = ProjectUpdate.objects.get(id=each_task['id']) + elif each_task['type'] == 'inventory_update': + instance = InventoryUpdate.objects.get(id=each_task['id']) + elif each_task['type': 'job']: + instance = Job.objects.get(id=each_task['id']) + else: + # Unknown task type + break + if instance.celery_task_id != instance.celery_task_id: + instance.status = 'failed' + instance.failed = True + instance.result_traceback = "Previous Task Failed: %s" % str(subtasks) + instance.save() + class BaseTask(Task): name = None @@ -205,13 +225,8 @@ class BaseTask(Task): if logfile_pos != logfile.tell(): logfile_pos = logfile.tell() last_stdout_update = time.time() - # Update instance status here (also updates modified timestamp, so - # we have a way to know the task is still running, otherwise the - # post_run_hook below would cancel long-running tasks that are - # really still active). #TODO: Find replacement for cancel flag #TODO: Something about checking celery status - # instance = self.update_model(instance.pk, status='running') # if instance.cancel_flag: # child.close(True) # canceled = True @@ -237,40 +252,14 @@ class BaseTask(Task): return True def post_run_hook(self, instance, **kwargs): - ''' - Hook for actions to run after job/task has completed. - ''' - # Cleanup instances that appear to be stuck. - try: - stuck_task_timeout = int(getattr(settings, 'STUCK_TASK_TIMEOUT', 300)) - except (TypeError, ValueError): - stuck_task_timeout = 0 - if stuck_task_timeout <= 0: - return - # Never less than 30 seconds so we're not messing with active tasks. - stuck_task_timeout = max(stuck_task_timeout, 30) - cutoff = now() - datetime.timedelta(seconds=stuck_task_timeout) - qs = self.model.objects.filter(status__in=('new', 'waiting', 'running')) - qs = qs.filter(modified__lt=cutoff) - for obj in qs: - # If new, created but never started. If waiting or running, the - # modified timestamp should updated regularly, else the task is - # probably stuck. - # If pending, we could be started but celeryd is not running, or - # we're waiting for an open slot in celeryd -- in either case we - # shouldn't necessarily cancel the task. Slim chance that somehow - # the task was started, picked up by celery, but hit an error - # before we could update the status. - obj.status = 'canceled' - obj.result_traceback += '\nCanceled stuck %s.' % unicode(self.model._meta.verbose_name) - obj.save(update_fields=['status', 'result_traceback']) + pass @transaction.commit_on_success def run(self, pk, **kwargs): ''' Run the job/task and capture its output. ''' - instance = self.update_model(pk) + instance = self.update_model(pk, status='pending', celery_task_id=self.request.id) status, stdout, tb = 'error', '', '' output_replacements = [] try: @@ -308,6 +297,10 @@ class BaseTask(Task): result_traceback=tb, output_replacements=output_replacements) self.post_run_hook(instance, **kwargs) + if status != 'successful': + # Raising an exception will mark the job as 'failed' in celery + # and will stop a task chain from continuing to execute + raise Exception("Task %s(pk:%s) encountered an error" % (str(self.model.__class__), str(pk))) class RunJob(BaseTask): ''' @@ -434,6 +427,9 @@ class RunJob(BaseTask): (job.project.local_path, root)) return cwd + def get_idle_timeout(self): + return getattr(settings, 'JOB_RUN_IDLE_TIMEOUT', 300) + def get_password_prompts(self): d = super(RunJob, self).get_password_prompts() d[re.compile(r'^Enter passphrase for .*:\s*?$', re.M)] = 'ssh_key_unlock' @@ -801,6 +797,9 @@ class RunInventoryUpdate(BaseTask): def build_cwd(self, inventory_update, **kwargs): return self.get_path_to('..', 'plugins', 'inventory') + def get_idle_timeout(self): + return getattr(settings, 'INVENTORY_UPDATE_IDLE_TIMEOUT', 300) + def pre_run_check(self, inventory_update, **kwargs): ''' Hook for checking inventory update before running.