diff --git a/awx/main/exceptions.py b/awx/main/exceptions.py index cc2dda4968..5de5782244 100644 --- a/awx/main/exceptions.py +++ b/awx/main/exceptions.py @@ -1,24 +1,36 @@ -class AwxTaskError(Exception): - """Base exception for errors in unified job runs""" - def __init__(self, task, message=None): +# Copyright (c) 2018 Ansible by Red Hat +# All Rights Reserved. + +# Celery does not respect exception type when using a serializer different than pickle; +# and awx uses the json serializer +# https://github.com/celery/celery/issues/3586 + + +class _AwxTaskError(): + def build_exception(self, task, message=None): if message is None: message = "Execution error running {}".format(task.log_format) - super(AwxTaskError, self).__init__(message) - self.task = task - - -class TaskCancel(AwxTaskError): - """Canceled flag caused run_pexpect to kill the job run""" - def __init__(self, task, rc): - super(TaskCancel, self).__init__( - task, message="{} was canceled (rc={})".format(task.log_format, rc)) - self.rc = rc + e = Exception(message) + e.task = task + e.is_awx_task_error = True + return e + + def TaskCancel(self, task, rc): + """Canceled flag caused run_pexpect to kill the job run""" + message="{} was canceled (rc={})".format(task.log_format, rc) + e = self.build_exception(task, message) + e.rc = rc + e.awx_task_error_type = "TaskCancel" + return e + + def TaskError(self, task, rc): + """Userspace error (non-zero exit code) in run_pexpect subprocess""" + message = "{} encountered an error (rc={}), please see task stdout for details.".format(task.log_format, rc) + e = self.build_exception(task, message) + e.rc = rc + e.awx_task_error_type = "TaskError" + return e -class TaskError(AwxTaskError): - """Userspace error (non-zero exit code) in run_pexpect subprocess""" - def __init__(self, task, rc): - super(TaskError, self).__init__( - task, message="%s encountered an error (rc=%s), please see task stdout for details.".format(task.log_format, rc)) - self.rc = rc +AwxTaskError = _AwxTaskError() diff --git a/awx/main/tasks.py b/awx/main/tasks.py index d744fc7cae..b5e5a2c712 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -48,7 +48,7 @@ from awx import celery_app from awx.main.constants import CLOUD_PROVIDERS, PRIVILEGE_ESCALATION_METHODS from awx.main.models import * # noqa from awx.main.models.unified_jobs import ACTIVE_STATES -from awx.main.exceptions import AwxTaskError, TaskCancel, TaskError +from awx.main.exceptions import AwxTaskError from awx.main.queue import CallbackQueueDispatcher from awx.main.expect import run, isolated_manager from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, @@ -79,7 +79,7 @@ logger = logging.getLogger('awx.main.tasks') class LogErrorsTask(Task): def on_failure(self, exc, task_id, args, kwargs, einfo): - if isinstance(exc, AwxTaskError): + if getattr(exc, 'is_awx_task_error', False): # Error caused by user / tracked in job output logger.warning(str(exc)) elif isinstance(self, BaseTask): @@ -361,8 +361,9 @@ def handle_work_success(self, result, task_actual): @shared_task(queue='tower', base=LogErrorsTask) -def handle_work_error(request, exc, traceback, task_id, subtasks=None): - logger.debug('Executing error task id %s, subtasks: %s' % (request.id, str(subtasks))) +def handle_work_error(task_id, *args, **kwargs): + subtasks = kwargs.get('subtasks', None) + logger.debug('Executing error task id %s, subtasks: %s' % (task_id, str(subtasks))) first_instance = None first_instance_type = '' if subtasks is not None: @@ -910,9 +911,9 @@ class BaseTask(LogErrorsTask): # Raising an exception will mark the job as 'failed' in celery # and will stop a task chain from continuing to execute if status == 'canceled': - raise TaskCancel(instance, rc) + raise AwxTaskError.TaskCancel(instance, rc) else: - raise TaskError(instance, rc) + raise AwxTaskError.TaskError(instance, rc) def get_ssh_key_path(self, instance, **kwargs): '''