mirror of
https://github.com/ansible/awx.git
synced 2026-03-13 23:17:32 -02:30
Merge pull request #921 from chrismeyersfsu/fix-handle_work_error-689
Fix handle_work_error()
This commit is contained in:
@@ -1,24 +1,36 @@
|
|||||||
class AwxTaskError(Exception):
|
# Copyright (c) 2018 Ansible by Red Hat
|
||||||
"""Base exception for errors in unified job runs"""
|
# All Rights Reserved.
|
||||||
def __init__(self, task, message=None):
|
|
||||||
|
# Celery does not respect exception type when using a serializer different than pickle;
|
||||||
|
# and awx uses the json serializer
|
||||||
|
# https://github.com/celery/celery/issues/3586
|
||||||
|
|
||||||
|
|
||||||
|
class _AwxTaskError():
|
||||||
|
def build_exception(self, task, message=None):
|
||||||
if message is None:
|
if message is None:
|
||||||
message = "Execution error running {}".format(task.log_format)
|
message = "Execution error running {}".format(task.log_format)
|
||||||
super(AwxTaskError, self).__init__(message)
|
e = Exception(message)
|
||||||
self.task = task
|
e.task = task
|
||||||
|
e.is_awx_task_error = True
|
||||||
|
return e
|
||||||
|
|
||||||
|
def TaskCancel(self, task, rc):
|
||||||
|
"""Canceled flag caused run_pexpect to kill the job run"""
|
||||||
|
message="{} was canceled (rc={})".format(task.log_format, rc)
|
||||||
|
e = self.build_exception(task, message)
|
||||||
|
e.rc = rc
|
||||||
|
e.awx_task_error_type = "TaskCancel"
|
||||||
|
return e
|
||||||
|
|
||||||
|
def TaskError(self, task, rc):
|
||||||
|
"""Userspace error (non-zero exit code) in run_pexpect subprocess"""
|
||||||
|
message = "{} encountered an error (rc={}), please see task stdout for details.".format(task.log_format, rc)
|
||||||
|
e = self.build_exception(task, message)
|
||||||
|
e.rc = rc
|
||||||
|
e.awx_task_error_type = "TaskError"
|
||||||
|
return e
|
||||||
|
|
||||||
|
|
||||||
class TaskCancel(AwxTaskError):
|
AwxTaskError = _AwxTaskError()
|
||||||
"""Canceled flag caused run_pexpect to kill the job run"""
|
|
||||||
def __init__(self, task, rc):
|
|
||||||
super(TaskCancel, self).__init__(
|
|
||||||
task, message="{} was canceled (rc={})".format(task.log_format, rc))
|
|
||||||
self.rc = rc
|
|
||||||
|
|
||||||
|
|
||||||
class TaskError(AwxTaskError):
|
|
||||||
"""Userspace error (non-zero exit code) in run_pexpect subprocess"""
|
|
||||||
def __init__(self, task, rc):
|
|
||||||
super(TaskError, self).__init__(
|
|
||||||
task, message="%s encountered an error (rc=%s), please see task stdout for details.".format(task.log_format, rc))
|
|
||||||
self.rc = rc
|
|
||||||
|
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ from awx import celery_app
|
|||||||
from awx.main.constants import CLOUD_PROVIDERS, PRIVILEGE_ESCALATION_METHODS
|
from awx.main.constants import CLOUD_PROVIDERS, PRIVILEGE_ESCALATION_METHODS
|
||||||
from awx.main.models import * # noqa
|
from awx.main.models import * # noqa
|
||||||
from awx.main.models.unified_jobs import ACTIVE_STATES
|
from awx.main.models.unified_jobs import ACTIVE_STATES
|
||||||
from awx.main.exceptions import AwxTaskError, TaskCancel, TaskError
|
from awx.main.exceptions import AwxTaskError
|
||||||
from awx.main.queue import CallbackQueueDispatcher
|
from awx.main.queue import CallbackQueueDispatcher
|
||||||
from awx.main.expect import run, isolated_manager
|
from awx.main.expect import run, isolated_manager
|
||||||
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
|
from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url,
|
||||||
@@ -79,7 +79,7 @@ logger = logging.getLogger('awx.main.tasks')
|
|||||||
|
|
||||||
class LogErrorsTask(Task):
|
class LogErrorsTask(Task):
|
||||||
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
||||||
if isinstance(exc, AwxTaskError):
|
if getattr(exc, 'is_awx_task_error', False):
|
||||||
# Error caused by user / tracked in job output
|
# Error caused by user / tracked in job output
|
||||||
logger.warning(str(exc))
|
logger.warning(str(exc))
|
||||||
elif isinstance(self, BaseTask):
|
elif isinstance(self, BaseTask):
|
||||||
@@ -361,8 +361,9 @@ def handle_work_success(self, result, task_actual):
|
|||||||
|
|
||||||
|
|
||||||
@shared_task(queue='tower', base=LogErrorsTask)
|
@shared_task(queue='tower', base=LogErrorsTask)
|
||||||
def handle_work_error(request, exc, traceback, task_id, subtasks=None):
|
def handle_work_error(task_id, *args, **kwargs):
|
||||||
logger.debug('Executing error task id %s, subtasks: %s' % (request.id, str(subtasks)))
|
subtasks = kwargs.get('subtasks', None)
|
||||||
|
logger.debug('Executing error task id %s, subtasks: %s' % (task_id, str(subtasks)))
|
||||||
first_instance = None
|
first_instance = None
|
||||||
first_instance_type = ''
|
first_instance_type = ''
|
||||||
if subtasks is not None:
|
if subtasks is not None:
|
||||||
@@ -910,9 +911,9 @@ class BaseTask(LogErrorsTask):
|
|||||||
# Raising an exception will mark the job as 'failed' in celery
|
# Raising an exception will mark the job as 'failed' in celery
|
||||||
# and will stop a task chain from continuing to execute
|
# and will stop a task chain from continuing to execute
|
||||||
if status == 'canceled':
|
if status == 'canceled':
|
||||||
raise TaskCancel(instance, rc)
|
raise AwxTaskError.TaskCancel(instance, rc)
|
||||||
else:
|
else:
|
||||||
raise TaskError(instance, rc)
|
raise AwxTaskError.TaskError(instance, rc)
|
||||||
|
|
||||||
def get_ssh_key_path(self, instance, **kwargs):
|
def get_ssh_key_path(self, instance, **kwargs):
|
||||||
'''
|
'''
|
||||||
|
|||||||
Reference in New Issue
Block a user