Files
awx/awx/main/exceptions.py
Hao Liu 21c463c0dd [Feature] Pre job run OPA policy enforcement (#15947)
Co-authored-by: Jiří Jeřábek (Jiri Jerabek) <Jerabekjirka@email.cz>
Co-authored-by: Alexander Saprykin <cutwatercore@gmail.com>
2025-05-07 14:51:23 +00:00

50 lines
1.4 KiB
Python

# Copyright (c) 2018 Ansible by Red Hat
# All Rights Reserved.
class _AwxTaskError:
def build_exception(self, task, message=None):
if message is None:
message = "Execution error running {}".format(task.log_format)
e = Exception(message)
e.task = task
e.is_awx_task_error = True
return e
def TaskCancel(self, task, rc):
"""Canceled flag caused run_pexpect to kill the job run"""
message = "{} was canceled (rc={})".format(task.log_format, rc)
e = self.build_exception(task, message)
e.rc = rc
e.awx_task_error_type = "TaskCancel"
return e
def TaskError(self, task, rc):
"""Userspace error (non-zero exit code) in run_pexpect subprocess"""
message = "{} encountered an error (rc={}), please see task stdout for details.".format(task.log_format, rc)
e = self.build_exception(task, message)
e.rc = rc
e.awx_task_error_type = "TaskError"
return e
AwxTaskError = _AwxTaskError()
class PostRunError(Exception):
def __init__(self, msg, status='failed', tb=''):
self.status = status
self.tb = tb
super(PostRunError, self).__init__(msg)
class PolicyEvaluationError(Exception):
def __init__(self, msg, status='failed', tb=''):
self.status = status
self.tb = tb
super(PolicyEvaluationError, self).__init__(msg)
class ReceptorNodeNotFound(RuntimeError):
pass