mirror of
https://github.com/ansible/awx.git
synced 2026-04-10 04:29:21 -02:30
Compare commits
12 Commits
AAP-57614-
...
AAP-45980
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2f80a4e30d | ||
|
|
9c2de57235 | ||
|
|
bb2f8cdd01 | ||
|
|
b83019bde6 | ||
|
|
6d94aa84e7 | ||
|
|
7155400efc | ||
|
|
e80ce43f87 | ||
|
|
595e093bbf | ||
|
|
cd7f6f602f | ||
|
|
310dd3e18f | ||
|
|
7c75788b0a | ||
|
|
ab294385ad |
2
Makefile
2
Makefile
@@ -581,7 +581,7 @@ detect-schema-change: genschema
|
||||
|
||||
validate-openapi-schema: genschema
|
||||
@echo "Validating OpenAPI schema from schema.json..."
|
||||
@python3 -c "from openapi_spec_validator import validate; import json; spec = json.load(open('schema.json')); validate(spec); print('✓ OpenAPI Schema is valid!')"
|
||||
@python3 -c "from openapi_spec_validator import validate; import json; spec = json.load(open('schema.json')); validate(spec); print('✓ Schema is valid')"
|
||||
|
||||
docker-compose-clean: awx/projects
|
||||
$(DOCKER_COMPOSE) -f tools/docker-compose/_sources/docker-compose.yml rm -sf
|
||||
|
||||
@@ -122,7 +122,6 @@ from awx.main.scheduler.task_manager_models import TaskManagerModels
|
||||
from awx.main.redact import UriCleaner, REPLACE_STR
|
||||
from awx.main.signals import update_inventory_computed_fields
|
||||
|
||||
|
||||
from awx.main.validators import vars_validate_or_raise
|
||||
|
||||
from awx.api.versioning import reverse
|
||||
@@ -2932,6 +2931,19 @@ class CredentialTypeSerializer(BaseSerializer):
|
||||
field['label'] = _(field['label'])
|
||||
if 'help_text' in field:
|
||||
field['help_text'] = _(field['help_text'])
|
||||
|
||||
# Deep copy inputs to avoid modifying the original model data
|
||||
inputs = value.get('inputs')
|
||||
if not isinstance(inputs, dict):
|
||||
inputs = {}
|
||||
value['inputs'] = copy.deepcopy(inputs)
|
||||
fields = value['inputs'].get('fields', [])
|
||||
if not isinstance(fields, list):
|
||||
fields = []
|
||||
|
||||
# Normalize fields and filter out internal fields
|
||||
value['inputs']['fields'] = [f for f in fields if not f.get('internal')]
|
||||
|
||||
return value
|
||||
|
||||
def filter_field_metadata(self, fields, method):
|
||||
|
||||
@@ -14,6 +14,7 @@ import sys
|
||||
import time
|
||||
from base64 import b64encode
|
||||
from collections import OrderedDict
|
||||
from jwt import decode as _jwt_decode
|
||||
|
||||
from urllib3.exceptions import ConnectTimeoutError
|
||||
|
||||
@@ -58,8 +59,13 @@ from drf_spectacular.utils import extend_schema_view, extend_schema
|
||||
from ansible_base.lib.utils.requests import get_remote_hosts
|
||||
from ansible_base.rbac.models import RoleEvaluation
|
||||
from ansible_base.lib.utils.schema import extend_schema_if_available
|
||||
from ansible_base.lib.workload_identity.controller import AutomationControllerJobScope
|
||||
|
||||
# flags
|
||||
from flags.state import flag_enabled
|
||||
|
||||
# AWX
|
||||
from awx.main.tasks.jobs import retrieve_workload_identity_jwt_with_claims
|
||||
from awx.main.tasks.system import send_notifications, update_inventory_computed_fields
|
||||
from awx.main.access import get_user_queryset
|
||||
from awx.api.generics import (
|
||||
@@ -1595,7 +1601,177 @@ class CredentialCopy(CopyAPIView):
|
||||
resource_purpose = 'copy of a credential'
|
||||
|
||||
|
||||
class CredentialExternalTest(SubDetailAPIView):
|
||||
class OIDCCredentialTestMixin:
|
||||
"""
|
||||
Mixin to add OIDC workload identity token support to credential test endpoints.
|
||||
|
||||
This mixin provides methods to handle OIDC-enabled external credentials that use
|
||||
workload identity tokens for authentication.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def _get_workload_identity_token(job_template: models.JobTemplate, jwt_aud: str) -> str:
|
||||
"""Generate a workload identity token for a job template.
|
||||
|
||||
Args:
|
||||
job_template: The JobTemplate instance to generate claims for
|
||||
jwt_aud: The JWT audience claim value
|
||||
|
||||
Returns:
|
||||
str: The generated JWT token
|
||||
"""
|
||||
claims = {
|
||||
AutomationControllerJobScope.CLAIM_ORGANIZATION_NAME: job_template.organization.name,
|
||||
AutomationControllerJobScope.CLAIM_ORGANIZATION_ID: job_template.organization.id,
|
||||
AutomationControllerJobScope.CLAIM_PROJECT_NAME: job_template.project.name,
|
||||
AutomationControllerJobScope.CLAIM_PROJECT_ID: job_template.project.id,
|
||||
AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_NAME: job_template.name,
|
||||
AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_ID: job_template.id,
|
||||
AutomationControllerJobScope.CLAIM_PLAYBOOK_NAME: job_template.playbook,
|
||||
}
|
||||
return retrieve_workload_identity_jwt_with_claims(
|
||||
claims=claims,
|
||||
audience=jwt_aud,
|
||||
scope=AutomationControllerJobScope.name,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _decode_jwt_payload_for_display(jwt_token):
|
||||
"""Decode JWT payload for display purposes only (signature not verified).
|
||||
|
||||
This is safe because the JWT was just created by AWX and is only decoded
|
||||
to show the user what claims are being sent to the external system.
|
||||
The external system will perform proper signature verification.
|
||||
|
||||
Args:
|
||||
jwt_token: The JWT token to decode
|
||||
|
||||
Returns:
|
||||
dict: The decoded JWT payload
|
||||
"""
|
||||
return _jwt_decode(jwt_token, algorithms=["RS256"], options={"verify_signature": False}) # NOSONAR python:S5659
|
||||
|
||||
def _has_workload_identity_token(self, credential_type_inputs):
|
||||
"""Check if credential type has an internal workload_identity_token field.
|
||||
|
||||
Args:
|
||||
credential_type_inputs: The inputs dict from a credential type
|
||||
|
||||
Returns:
|
||||
bool: True if the credential type has a workload_identity_token field marked as internal
|
||||
"""
|
||||
fields = credential_type_inputs.get('fields', []) if isinstance(credential_type_inputs, dict) else []
|
||||
return any(field.get('internal') and field.get('id') == 'workload_identity_token' for field in fields)
|
||||
|
||||
def _validate_and_get_job_template(self, job_template_id):
|
||||
"""Validate job template ID and return the JobTemplate instance.
|
||||
|
||||
Args:
|
||||
job_template_id: The job template ID from metadata
|
||||
|
||||
Returns:
|
||||
JobTemplate instance
|
||||
|
||||
Raises:
|
||||
ParseError: If job_template_id is invalid or not found
|
||||
"""
|
||||
if job_template_id is None:
|
||||
raise ParseError(_('Job template ID is required.'))
|
||||
|
||||
try:
|
||||
return models.JobTemplate.objects.get(id=int(job_template_id))
|
||||
except ValueError:
|
||||
raise ParseError(_('Job template ID must be an integer.'))
|
||||
except models.JobTemplate.DoesNotExist:
|
||||
raise ParseError(_('Job template with ID %(id)s does not exist.') % {'id': job_template_id})
|
||||
|
||||
def _handle_oidc_credential_test(self, backend_kwargs):
|
||||
"""
|
||||
Handle OIDC workload identity token generation for external credential test endpoints.
|
||||
|
||||
This method should only be called when FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED is enabled
|
||||
and the credential type has a workload_identity_token field.
|
||||
|
||||
Args:
|
||||
backend_kwargs: The kwargs dict to pass to the backend (will be modified in place)
|
||||
|
||||
Returns:
|
||||
dict: Response body containing details with the sent JWT payload
|
||||
|
||||
Raises:
|
||||
PermissionDenied: If user lacks access to the job template (re-raised for 403 response)
|
||||
|
||||
All other exceptions are caught and converted to 400 responses with error details.
|
||||
|
||||
Modifies backend_kwargs in place to add workload_identity_token.
|
||||
"""
|
||||
# Validate job template
|
||||
job_template_id = backend_kwargs.pop('job_template_id', None)
|
||||
job_template = self._validate_and_get_job_template(job_template_id)
|
||||
|
||||
# Check user access
|
||||
if not self.request.user.can_access(models.JobTemplate, 'start', job_template):
|
||||
raise PermissionDenied(_('You do not have access to job template with id: %(id)s.') % {'id': job_template.id})
|
||||
|
||||
# Generate workload identity token
|
||||
jwt_token = self._get_workload_identity_token(job_template, backend_kwargs.pop('jwt_aud', None))
|
||||
backend_kwargs['workload_identity_token'] = jwt_token
|
||||
|
||||
return {'details': {'sent_jwt_payload': self._decode_jwt_payload_for_display(jwt_token)}}
|
||||
|
||||
def _call_backend_with_error_handling(self, plugin, backend_kwargs, response_body):
|
||||
"""Call credential backend and handle errors, adding secret_value to response if OIDC details present."""
|
||||
try:
|
||||
with set_environ(**settings.AWX_TASK_ENV):
|
||||
secret_value = plugin.backend(**backend_kwargs)
|
||||
if 'details' in response_body:
|
||||
response_body['details']['secret_value'] = secret_value
|
||||
return Response(response_body, status=status.HTTP_202_ACCEPTED)
|
||||
except requests.exceptions.HTTPError as exc:
|
||||
message = self._extract_http_error_message(exc)
|
||||
self._add_error_to_response(response_body, message)
|
||||
return Response(response_body, status=status.HTTP_400_BAD_REQUEST)
|
||||
except Exception as exc:
|
||||
message = self._extract_generic_error_message(exc)
|
||||
self._add_error_to_response(response_body, message)
|
||||
return Response(response_body, status=status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
@staticmethod
|
||||
def _extract_http_error_message(exc):
|
||||
"""Extract error message from HTTPError, checking response JSON and text."""
|
||||
message = str(exc)
|
||||
if not hasattr(exc, 'response') or exc.response is None:
|
||||
return message
|
||||
|
||||
try:
|
||||
error_data = exc.response.json()
|
||||
if 'errors' in error_data and error_data['errors']:
|
||||
return ', '.join(error_data['errors'])
|
||||
if 'error' in error_data:
|
||||
return error_data['error']
|
||||
except (ValueError, KeyError):
|
||||
if exc.response.text:
|
||||
return exc.response.text
|
||||
return message
|
||||
|
||||
@staticmethod
|
||||
def _extract_generic_error_message(exc):
|
||||
"""Extract error message from exception, handling ConnectTimeoutError specially."""
|
||||
message = str(exc) if str(exc) else exc.__class__.__name__
|
||||
for arg in getattr(exc, 'args', []):
|
||||
if isinstance(getattr(arg, 'reason', None), ConnectTimeoutError):
|
||||
return str(arg.reason)
|
||||
return message
|
||||
|
||||
@staticmethod
|
||||
def _add_error_to_response(response_body, message):
|
||||
"""Add error message to both 'detail' and 'details.error_message' fields."""
|
||||
response_body['detail'] = message
|
||||
if 'details' in response_body:
|
||||
response_body['details']['error_message'] = message
|
||||
|
||||
|
||||
class CredentialExternalTest(OIDCCredentialTestMixin, SubDetailAPIView):
|
||||
"""
|
||||
Test updates to the input values and metadata of an external credential
|
||||
before saving them.
|
||||
@@ -1622,23 +1798,22 @@ class CredentialExternalTest(SubDetailAPIView):
|
||||
if value != '$encrypted$':
|
||||
backend_kwargs[field_name] = value
|
||||
backend_kwargs.update(request.data.get('metadata', {}))
|
||||
try:
|
||||
with set_environ(**settings.AWX_TASK_ENV):
|
||||
obj.credential_type.plugin.backend(**backend_kwargs)
|
||||
return Response({}, status=status.HTTP_202_ACCEPTED)
|
||||
except requests.exceptions.HTTPError:
|
||||
message = """Test operation is not supported for credential type {}.
|
||||
This endpoint only supports credentials that connect to
|
||||
external secret management systems such as CyberArk, HashiCorp
|
||||
Vault, or cloud-based secret managers.""".format(obj.credential_type.kind)
|
||||
return Response({'detail': message}, status=status.HTTP_400_BAD_REQUEST)
|
||||
except Exception as exc:
|
||||
message = exc.__class__.__name__
|
||||
exc_args = getattr(exc, 'args', [])
|
||||
for a in exc_args:
|
||||
if isinstance(getattr(a, 'reason', None), ConnectTimeoutError):
|
||||
message = str(a.reason)
|
||||
return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
# Handle OIDC workload identity token generation if enabled
|
||||
response_body = {}
|
||||
if flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED') and self._has_workload_identity_token(obj.credential_type.inputs):
|
||||
try:
|
||||
oidc_response_body = self._handle_oidc_credential_test(backend_kwargs)
|
||||
response_body.update(oidc_response_body)
|
||||
except PermissionDenied:
|
||||
raise
|
||||
except Exception as exc:
|
||||
error_message = str(exc.detail) if hasattr(exc, 'detail') else str(exc)
|
||||
response_body['detail'] = error_message
|
||||
response_body['details'] = {'error_message': error_message}
|
||||
return Response(response_body, status=status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
return self._call_backend_with_error_handling(obj.credential_type.plugin, backend_kwargs, response_body)
|
||||
|
||||
|
||||
class CredentialInputSourceDetail(RetrieveUpdateDestroyAPIView):
|
||||
@@ -1668,7 +1843,7 @@ class CredentialInputSourceSubList(SubListCreateAPIView):
|
||||
parent_key = 'target_credential'
|
||||
|
||||
|
||||
class CredentialTypeExternalTest(SubDetailAPIView):
|
||||
class CredentialTypeExternalTest(OIDCCredentialTestMixin, SubDetailAPIView):
|
||||
"""
|
||||
Test a complete set of input values for an external credential before
|
||||
saving it.
|
||||
@@ -1685,19 +1860,22 @@ class CredentialTypeExternalTest(SubDetailAPIView):
|
||||
obj = self.get_object()
|
||||
backend_kwargs = request.data.get('inputs', {})
|
||||
backend_kwargs.update(request.data.get('metadata', {}))
|
||||
try:
|
||||
obj.plugin.backend(**backend_kwargs)
|
||||
return Response({}, status=status.HTTP_202_ACCEPTED)
|
||||
except requests.exceptions.HTTPError as exc:
|
||||
message = 'HTTP {}'.format(exc.response.status_code)
|
||||
return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST)
|
||||
except Exception as exc:
|
||||
message = exc.__class__.__name__
|
||||
args_exc = getattr(exc, 'args', [])
|
||||
for a in args_exc:
|
||||
if isinstance(getattr(a, 'reason', None), ConnectTimeoutError):
|
||||
message = str(a.reason)
|
||||
return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
# Handle OIDC workload identity token generation if enabled
|
||||
response_body = {}
|
||||
if flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED') and self._has_workload_identity_token(obj.inputs):
|
||||
try:
|
||||
oidc_response_body = self._handle_oidc_credential_test(backend_kwargs)
|
||||
response_body.update(oidc_response_body)
|
||||
except PermissionDenied:
|
||||
raise
|
||||
except Exception as exc:
|
||||
error_message = str(exc.detail) if hasattr(exc, 'detail') else str(exc)
|
||||
response_body['detail'] = error_message
|
||||
response_body['details'] = {'error_message': error_message}
|
||||
return Response(response_body, status=status.HTTP_400_BAD_REQUEST)
|
||||
|
||||
return self._call_backend_with_error_handling(obj.plugin, backend_kwargs, response_body)
|
||||
|
||||
|
||||
class HostRelatedSearchMixin(object):
|
||||
|
||||
@@ -409,10 +409,12 @@ class Command(BaseCommand):
|
||||
del_child_group_pks = list(set(db_children_name_pk_map.values()))
|
||||
for offset in range(0, len(del_child_group_pks), self._batch_size):
|
||||
child_group_pks = del_child_group_pks[offset : (offset + self._batch_size)]
|
||||
for db_child in db_children.filter(pk__in=child_group_pks):
|
||||
group_group_count += 1
|
||||
db_group.children.remove(db_child)
|
||||
logger.debug('Group "%s" removed from group "%s"', db_child.name, db_group.name)
|
||||
children_to_remove = list(db_children.filter(pk__in=child_group_pks))
|
||||
if children_to_remove:
|
||||
group_group_count += len(children_to_remove)
|
||||
db_group.children.remove(*children_to_remove)
|
||||
for db_child in children_to_remove:
|
||||
logger.debug('Group "%s" removed from group "%s"', db_child.name, db_group.name)
|
||||
# FIXME: Inventory source group relationships
|
||||
# Delete group/host relationships not present in imported data.
|
||||
db_hosts = db_group.hosts
|
||||
@@ -441,12 +443,12 @@ class Command(BaseCommand):
|
||||
del_host_pks = list(del_host_pks)
|
||||
for offset in range(0, len(del_host_pks), self._batch_size):
|
||||
del_pks = del_host_pks[offset : (offset + self._batch_size)]
|
||||
for db_host in db_hosts.filter(pk__in=del_pks):
|
||||
group_host_count += 1
|
||||
if db_host not in db_group.hosts.all():
|
||||
continue
|
||||
db_group.hosts.remove(db_host)
|
||||
logger.debug('Host "%s" removed from group "%s"', db_host.name, db_group.name)
|
||||
hosts_to_remove = list(db_hosts.filter(pk__in=del_pks))
|
||||
if hosts_to_remove:
|
||||
group_host_count += len(hosts_to_remove)
|
||||
db_group.hosts.remove(*hosts_to_remove)
|
||||
for db_host in hosts_to_remove:
|
||||
logger.debug('Host "%s" removed from group "%s"', db_host.name, db_group.name)
|
||||
if settings.SQL_DEBUG:
|
||||
logger.warning(
|
||||
'group-group and group-host deletions took %d queries for %d relationships',
|
||||
|
||||
@@ -531,6 +531,7 @@ class CredentialType(CommonModelNameNotUnique):
|
||||
existing = ct_class.objects.filter(name=default.name, kind=default.kind).first()
|
||||
if existing is not None:
|
||||
existing.namespace = default.namespace
|
||||
existing.description = getattr(default, 'description', '')
|
||||
existing.inputs = {}
|
||||
existing.injectors = {}
|
||||
existing.save()
|
||||
@@ -570,7 +571,14 @@ class CredentialType(CommonModelNameNotUnique):
|
||||
@classmethod
|
||||
def load_plugin(cls, ns, plugin):
|
||||
# TODO: User "side-loaded" credential custom_injectors isn't supported
|
||||
ManagedCredentialType.registry[ns] = SimpleNamespace(namespace=ns, name=plugin.name, kind='external', inputs=plugin.inputs, backend=plugin.backend)
|
||||
ManagedCredentialType.registry[ns] = SimpleNamespace(
|
||||
namespace=ns,
|
||||
name=plugin.name,
|
||||
kind='external',
|
||||
inputs=plugin.inputs,
|
||||
backend=plugin.backend,
|
||||
description=getattr(plugin, 'plugin_description', ''),
|
||||
)
|
||||
|
||||
def inject_credential(self, credential, env, safe_env, args, private_data_dir, container_root=None):
|
||||
from awx_plugins.interfaces._temporary_private_inject_api import inject_credential
|
||||
@@ -582,7 +590,13 @@ class CredentialTypeHelper:
|
||||
@classmethod
|
||||
def get_creation_params(cls, cred_type):
|
||||
if cred_type.kind == 'external':
|
||||
return dict(namespace=cred_type.namespace, kind=cred_type.kind, name=cred_type.name, managed=True)
|
||||
return {
|
||||
'namespace': cred_type.namespace,
|
||||
'kind': cred_type.kind,
|
||||
'name': cred_type.name,
|
||||
'managed': True,
|
||||
'description': getattr(cred_type, 'description', ''),
|
||||
}
|
||||
return dict(
|
||||
namespace=cred_type.namespace,
|
||||
kind=cred_type.kind,
|
||||
|
||||
@@ -345,7 +345,11 @@ class WorkflowJobNode(WorkflowNodeBase):
|
||||
)
|
||||
data.update(accepted_fields) # missing fields are handled in the scheduler
|
||||
# build ancestor artifacts, save them to node model for later
|
||||
aa_dict = {}
|
||||
# initialize from pre-seeded ancestor_artifacts (set on root nodes of
|
||||
# child workflows via seed_root_ancestor_artifacts to carry artifacts
|
||||
# from the parent workflow); exclude job_slice which is internal
|
||||
# metadata handled separately below
|
||||
aa_dict = {k: v for k, v in self.ancestor_artifacts.items() if k != 'job_slice'} if self.ancestor_artifacts else {}
|
||||
is_root_node = True
|
||||
for parent_node in self.get_parent_nodes():
|
||||
is_root_node = False
|
||||
@@ -366,11 +370,13 @@ class WorkflowJobNode(WorkflowNodeBase):
|
||||
data['survey_passwords'] = password_dict
|
||||
# process extra_vars
|
||||
extra_vars = data.get('extra_vars', {})
|
||||
if ujt_obj and isinstance(ujt_obj, (JobTemplate, WorkflowJobTemplate)):
|
||||
if ujt_obj and isinstance(ujt_obj, JobTemplate):
|
||||
if aa_dict:
|
||||
functional_aa_dict = copy(aa_dict)
|
||||
functional_aa_dict.pop('_ansible_no_log', None)
|
||||
extra_vars.update(functional_aa_dict)
|
||||
elif ujt_obj and isinstance(ujt_obj, WorkflowJobTemplate):
|
||||
pass # artifacts are applied via seed_root_ancestor_artifacts in the task manager
|
||||
|
||||
# Workflow Job extra_vars higher precedence than ancestor artifacts
|
||||
extra_vars.update(wj_special_vars)
|
||||
@@ -734,6 +740,18 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
|
||||
wj = wj.get_workflow_job()
|
||||
return ancestors
|
||||
|
||||
def seed_root_ancestor_artifacts(self, artifacts):
|
||||
"""Apply parent workflow artifacts to root nodes so they propagate
|
||||
through the normal ancestor_artifacts channel instead of being
|
||||
baked into this workflow's extra_vars."""
|
||||
self.workflow_job_nodes.exclude(
|
||||
workflowjobnodes_success__isnull=False,
|
||||
).exclude(
|
||||
workflowjobnodes_failure__isnull=False,
|
||||
).exclude(
|
||||
workflowjobnodes_always__isnull=False,
|
||||
).update(ancestor_artifacts=artifacts)
|
||||
|
||||
def get_effective_artifacts(self, **kwargs):
|
||||
"""
|
||||
For downstream jobs of a workflow nested inside of a workflow,
|
||||
|
||||
@@ -241,6 +241,8 @@ class WorkflowManager(TaskBase):
|
||||
job = spawn_node.unified_job_template.create_unified_job(**kv)
|
||||
spawn_node.job = job
|
||||
spawn_node.save()
|
||||
if spawn_node.ancestor_artifacts and isinstance(spawn_node.unified_job_template, WorkflowJobTemplate):
|
||||
job.seed_root_ancestor_artifacts(spawn_node.ancestor_artifacts)
|
||||
logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk)
|
||||
can_start = True
|
||||
if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate):
|
||||
|
||||
@@ -94,7 +94,7 @@ from flags.state import flag_enabled
|
||||
|
||||
# Workload Identity
|
||||
from ansible_base.lib.workload_identity.controller import AutomationControllerJobScope
|
||||
from ansible_base.resource_registry.workload_identity_client import get_workload_identity_client
|
||||
from awx.main.utils.workload_identity import retrieve_workload_identity_jwt_with_claims
|
||||
|
||||
logger = logging.getLogger('awx.main.tasks.jobs')
|
||||
|
||||
@@ -168,14 +168,12 @@ def retrieve_workload_identity_jwt(
|
||||
Raises:
|
||||
RuntimeError: if the workload identity client is not configured.
|
||||
"""
|
||||
client = get_workload_identity_client()
|
||||
if client is None:
|
||||
raise RuntimeError("Workload identity client is not configured")
|
||||
claims = populate_claims_for_workload(unified_job)
|
||||
kwargs = {"claims": claims, "scope": scope, "audience": audience}
|
||||
if workload_ttl_seconds:
|
||||
kwargs["workload_ttl_seconds"] = workload_ttl_seconds
|
||||
return client.request_workload_jwt(**kwargs).jwt
|
||||
return retrieve_workload_identity_jwt_with_claims(
|
||||
populate_claims_for_workload(unified_job),
|
||||
audience,
|
||||
scope,
|
||||
workload_ttl_seconds,
|
||||
)
|
||||
|
||||
|
||||
def with_path_cleanup(f):
|
||||
@@ -1682,7 +1680,7 @@ class RunProjectUpdate(BaseTask):
|
||||
return params
|
||||
|
||||
def build_credentials_list(self, project_update):
|
||||
if project_update.scm_type == 'insights' and project_update.credential:
|
||||
if project_update.credential:
|
||||
return [project_update.credential]
|
||||
return []
|
||||
|
||||
|
||||
11
awx/main/tests/data/projects/debug/set_stats.yml
Normal file
11
awx/main/tests/data/projects/debug/set_stats.yml
Normal file
@@ -0,0 +1,11 @@
|
||||
---
|
||||
- hosts: all
|
||||
gather_facts: false
|
||||
connection: local
|
||||
tasks:
|
||||
- name: Set artifacts via set_stats
|
||||
ansible.builtin.set_stats:
|
||||
data: "{{ stats_data }}"
|
||||
per_host: false
|
||||
aggregate: false
|
||||
when: stats_data is defined
|
||||
@@ -2,6 +2,7 @@ import json
|
||||
|
||||
import pytest
|
||||
|
||||
from ansible_base.lib.testing.util import feature_flag_enabled
|
||||
from awx.main.models.credential import CredentialType, Credential
|
||||
from awx.api.versioning import reverse
|
||||
|
||||
@@ -159,7 +160,8 @@ def test_create_as_admin(get, post, admin):
|
||||
response = get(reverse('api:credential_type_list'), admin)
|
||||
assert response.data['count'] == 1
|
||||
assert response.data['results'][0]['name'] == 'Custom Credential Type'
|
||||
assert response.data['results'][0]['inputs'] == {}
|
||||
# Serializer normalizes empty inputs to {'fields': []}
|
||||
assert response.data['results'][0]['inputs'] == {'fields': []}
|
||||
assert response.data['results'][0]['injectors'] == {}
|
||||
assert response.data['results'][0]['managed'] is False
|
||||
|
||||
@@ -474,3 +476,98 @@ def test_credential_type_rbac_external_test(post, alice, admin, credentialtype_e
|
||||
data = {'inputs': {}, 'metadata': {}}
|
||||
assert post(url, data, admin).status_code == 202
|
||||
assert post(url, data, alice).status_code == 403
|
||||
|
||||
|
||||
# --- Tests for internal field filtering with None/invalid inputs ---
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_credential_type_with_none_inputs(get, admin):
|
||||
"""Test that credential type with empty inputs dict works correctly."""
|
||||
# Create a credential type with empty dict
|
||||
ct = CredentialType.objects.create(
|
||||
kind='cloud',
|
||||
name='Test Type',
|
||||
managed=False,
|
||||
inputs={}, # Empty dict, not None (DB has NOT NULL constraint)
|
||||
)
|
||||
|
||||
url = reverse('api:credential_type_detail', kwargs={'pk': ct.pk})
|
||||
response = get(url, admin)
|
||||
assert response.status_code == 200
|
||||
# Should have normalized inputs to empty dict
|
||||
assert 'inputs' in response.data
|
||||
assert isinstance(response.data['inputs'], dict)
|
||||
assert response.data['inputs']['fields'] == []
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_credential_type_with_invalid_inputs_type(get, admin):
|
||||
"""Test that credential type with non-dict inputs doesn't cause errors."""
|
||||
# Create a credential type with invalid inputs type
|
||||
ct = CredentialType.objects.create(kind='cloud', name='Test Type', managed=False, inputs={'fields': 'not-a-list'})
|
||||
|
||||
url = reverse('api:credential_type_detail', kwargs={'pk': ct.pk})
|
||||
response = get(url, admin)
|
||||
assert response.status_code == 200
|
||||
# Should gracefully handle invalid fields type
|
||||
assert 'inputs' in response.data
|
||||
assert response.data['inputs']['fields'] == []
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_credential_type_filters_internal_fields(get, admin):
|
||||
"""Test that internal fields are filtered from API responses."""
|
||||
ct = CredentialType.objects.create(
|
||||
kind='cloud',
|
||||
name='Test OIDC Type',
|
||||
managed=False,
|
||||
inputs={
|
||||
'fields': [
|
||||
{'id': 'url', 'label': 'URL', 'type': 'string'},
|
||||
{'id': 'token', 'label': 'Token', 'type': 'string', 'secret': True, 'internal': True},
|
||||
{'id': 'public_field', 'label': 'Public', 'type': 'string'},
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
url = reverse('api:credential_type_detail', kwargs={'pk': ct.pk})
|
||||
with feature_flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'):
|
||||
response = get(url, admin)
|
||||
assert response.status_code == 200
|
||||
|
||||
field_ids = [f['id'] for f in response.data['inputs']['fields']]
|
||||
# Internal field should be filtered out
|
||||
assert 'token' not in field_ids
|
||||
assert 'url' in field_ids
|
||||
assert 'public_field' in field_ids
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_credential_type_list_filters_internal_fields(get, admin):
|
||||
"""Test that internal fields are filtered in list view."""
|
||||
CredentialType.objects.create(
|
||||
kind='cloud',
|
||||
name='Test OIDC Type',
|
||||
managed=False,
|
||||
inputs={
|
||||
'fields': [
|
||||
{'id': 'url', 'label': 'URL', 'type': 'string'},
|
||||
{'id': 'workload_identity_token', 'label': 'Token', 'type': 'string', 'secret': True, 'internal': True},
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
url = reverse('api:credential_type_list')
|
||||
with feature_flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'):
|
||||
response = get(url, admin)
|
||||
assert response.status_code == 200
|
||||
|
||||
# Find our credential type in the results
|
||||
test_ct = next((ct for ct in response.data['results'] if ct['name'] == 'Test OIDC Type'), None)
|
||||
assert test_ct is not None
|
||||
|
||||
field_ids = [f['id'] for f in test_ct['inputs']['fields']]
|
||||
# Internal field should be filtered out
|
||||
assert 'workload_identity_token' not in field_ids
|
||||
assert 'url' in field_ids
|
||||
|
||||
259
awx/main/tests/functional/api/test_oidc_credential_test.py
Normal file
259
awx/main/tests/functional/api/test_oidc_credential_test.py
Normal file
@@ -0,0 +1,259 @@
|
||||
"""
|
||||
Tests for OIDC workload identity credential test endpoints.
|
||||
|
||||
Tests the /api/v2/credentials/<id>/test/ and /api/v2/credential_types/<id>/test/
|
||||
endpoints when used with OIDC-enabled credential types.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest import mock
|
||||
|
||||
from django.test import override_settings
|
||||
|
||||
from awx.main.models import Credential, CredentialType, JobTemplate
|
||||
from awx.api.versioning import reverse
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def job_template(organization, project):
|
||||
"""Job template with organization and project for OIDC JWT generation."""
|
||||
return JobTemplate.objects.create(name='test-jt', organization=organization, project=project, playbook='helloworld.yml')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def oidc_credentialtype():
|
||||
"""Create a credential type with workload_identity_token internal field."""
|
||||
oidc_type_inputs = {
|
||||
'fields': [
|
||||
{'id': 'url', 'label': 'Vault URL', 'type': 'string', 'help_text': 'The Vault server URL.'},
|
||||
{'id': 'auth_path', 'label': 'Auth Path', 'type': 'string', 'help_text': 'JWT auth mount path.'},
|
||||
{'id': 'role_id', 'label': 'Role ID', 'type': 'string', 'help_text': 'Vault role.'},
|
||||
{'id': 'jwt_aud', 'label': 'JWT Audience', 'type': 'string', 'help_text': 'Expected audience.'},
|
||||
{'id': 'workload_identity_token', 'label': 'Workload Identity Token', 'type': 'string', 'secret': True, 'internal': True},
|
||||
],
|
||||
'metadata': [
|
||||
{'id': 'secret_path', 'label': 'Secret Path', 'type': 'string'},
|
||||
{'id': 'job_template_id', 'label': 'Job Template ID', 'type': 'string'},
|
||||
],
|
||||
'required': ['url', 'auth_path', 'role_id'],
|
||||
}
|
||||
|
||||
class MockPlugin(object):
|
||||
def backend(self, **kwargs):
|
||||
# Simulate successful backend call
|
||||
return 'secret'
|
||||
|
||||
with mock.patch('awx.main.models.credential.CredentialType.plugin', new_callable=mock.PropertyMock) as mock_plugin:
|
||||
mock_plugin.return_value = MockPlugin()
|
||||
oidc_type = CredentialType(kind='external', managed=True, namespace='hashivault-kv-oidc', name='HashiCorp Vault KV (OIDC)', inputs=oidc_type_inputs)
|
||||
oidc_type.save()
|
||||
yield oidc_type
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def oidc_credential(oidc_credentialtype):
|
||||
"""Create a credential using the OIDC credential type."""
|
||||
return Credential.objects.create(
|
||||
credential_type=oidc_credentialtype,
|
||||
name='oidc-vault-cred',
|
||||
inputs={'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'},
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_oidc_backend():
|
||||
"""Fixture that mocks OIDC JWT generation and credential backend."""
|
||||
with mock.patch('awx.api.views.retrieve_workload_identity_jwt_with_claims') as mock_jwt, mock.patch('awx.api.views._jwt_decode') as mock_decode, mock.patch(
|
||||
'awx.main.models.credential.CredentialType.plugin', new_callable=mock.PropertyMock
|
||||
) as mock_plugin:
|
||||
|
||||
# Set default return values
|
||||
mock_jwt.return_value = 'fake.jwt.token'
|
||||
mock_decode.return_value = {'iss': 'http://gateway/o', 'aud': 'vault'}
|
||||
|
||||
# Create mock backend
|
||||
mock_backend = mock.MagicMock()
|
||||
mock_backend.backend.return_value = 'secret'
|
||||
mock_plugin.return_value = mock_backend
|
||||
|
||||
# Yield all mocks for test customization
|
||||
yield {
|
||||
'jwt': mock_jwt,
|
||||
'decode': mock_decode,
|
||||
'plugin': mock_plugin,
|
||||
'backend': mock_backend,
|
||||
}
|
||||
|
||||
|
||||
# --- Tests for CredentialExternalTest endpoint ---
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=False)
|
||||
def test_credential_test_without_oidc_feature_flag(post, admin, oidc_credential):
|
||||
"""Test that credential test works without OIDC feature flag enabled."""
|
||||
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
|
||||
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': '1'}}
|
||||
|
||||
with mock.patch('awx.main.models.credential.CredentialType.plugin', new_callable=mock.PropertyMock) as mock_plugin:
|
||||
mock_backend = mock.MagicMock()
|
||||
mock_backend.backend.return_value = 'secret'
|
||||
mock_plugin.return_value = mock_backend
|
||||
|
||||
response = post(url, data, admin)
|
||||
assert response.status_code == 202
|
||||
# Should not contain JWT payload when feature flag is disabled
|
||||
assert 'details' not in response.data or 'sent_jwt_payload' not in response.data.get('details', {})
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.api.views.flag_enabled', return_value=True)
|
||||
@pytest.mark.parametrize(
|
||||
'job_template_id, expected_error',
|
||||
[
|
||||
(None, 'Job template ID is required'),
|
||||
('not-an-integer', 'must be an integer'),
|
||||
('99999', 'does not exist'),
|
||||
],
|
||||
ids=['missing_job_template_id', 'invalid_job_template_id_type', 'nonexistent_job_template_id'],
|
||||
)
|
||||
def test_credential_test_job_template_validation(mock_flag, post, admin, oidc_credential, job_template_id, expected_error):
|
||||
"""Test that invalid job_template_id values return 400 with appropriate error messages."""
|
||||
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
|
||||
data = {'metadata': {'secret_path': 'test/secret'}}
|
||||
if job_template_id is not None:
|
||||
data['metadata']['job_template_id'] = job_template_id
|
||||
|
||||
response = post(url, data, admin)
|
||||
assert response.status_code == 400
|
||||
assert 'details' in response.data
|
||||
assert 'error_message' in response.data['details']
|
||||
assert expected_error in response.data['details']['error_message']
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.api.views.flag_enabled', return_value=True)
|
||||
def test_credential_test_no_access_to_job_template(mock_flag, post, alice, oidc_credential, job_template):
|
||||
"""Test that user without access to job template gets 403."""
|
||||
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
|
||||
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
|
||||
|
||||
# Give alice use permission on credential but not on job template
|
||||
oidc_credential.use_role.members.add(alice)
|
||||
|
||||
response = post(url, data, alice)
|
||||
assert response.status_code == 403
|
||||
assert 'You do not have access to job template' in str(response.data)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.api.views.flag_enabled', return_value=True)
|
||||
def test_credential_test_success_returns_jwt_payload(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend):
|
||||
"""Test that successful test returns JWT payload in response."""
|
||||
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
|
||||
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
|
||||
|
||||
# Customize mock for this test
|
||||
mock_oidc_backend['decode'].return_value = {
|
||||
'iss': 'http://gateway/o',
|
||||
'sub': 'system:serviceaccount:default:awx-operator',
|
||||
'aud': 'vault',
|
||||
'job_template_id': job_template.id,
|
||||
}
|
||||
|
||||
response = post(url, data, admin)
|
||||
assert response.status_code == 202
|
||||
assert 'details' in response.data
|
||||
assert 'sent_jwt_payload' in response.data['details']
|
||||
assert response.data['details']['sent_jwt_payload']['job_template_id'] == job_template.id
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.api.views.flag_enabled', return_value=True)
|
||||
def test_credential_test_backend_failure_returns_jwt_and_error(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend):
|
||||
"""Test that backend failure still returns JWT payload along with error message."""
|
||||
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
|
||||
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
|
||||
|
||||
# Make backend fail
|
||||
mock_oidc_backend['backend'].backend.side_effect = RuntimeError('Connection failed')
|
||||
|
||||
response = post(url, data, admin)
|
||||
assert response.status_code == 400
|
||||
assert 'details' in response.data
|
||||
# Both JWT payload and error message should be present
|
||||
assert 'sent_jwt_payload' in response.data['details']
|
||||
assert 'error_message' in response.data['details']
|
||||
assert 'Connection failed' in response.data['details']['error_message']
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.api.views.flag_enabled', return_value=True)
|
||||
def test_credential_test_jwt_generation_failure(mock_flag, post, admin, oidc_credential, job_template):
|
||||
"""Test that JWT generation failure returns error without JWT payload."""
|
||||
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
|
||||
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
|
||||
|
||||
with mock.patch('awx.api.views.OIDCCredentialTestMixin._get_workload_identity_token') as mock_jwt:
|
||||
mock_jwt.side_effect = RuntimeError('Failed to generate JWT')
|
||||
|
||||
response = post(url, data, admin)
|
||||
assert response.status_code == 400
|
||||
assert 'details' in response.data
|
||||
assert 'error_message' in response.data['details']
|
||||
assert 'Failed to generate JWT' in response.data['details']['error_message']
|
||||
# No JWT payload when generation fails
|
||||
assert 'sent_jwt_payload' not in response.data['details']
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.api.views.flag_enabled', return_value=True)
|
||||
def test_credential_test_job_template_id_not_passed_to_backend(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend):
|
||||
"""Test that job_template_id and jwt_aud are removed from backend_kwargs."""
|
||||
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
|
||||
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
|
||||
|
||||
response = post(url, data, admin)
|
||||
assert response.status_code == 202
|
||||
|
||||
# Check that backend was called without job_template_id or jwt_aud
|
||||
call_kwargs = mock_oidc_backend['backend'].backend.call_args[1]
|
||||
assert 'job_template_id' not in call_kwargs
|
||||
assert 'jwt_aud' not in call_kwargs
|
||||
assert 'workload_identity_token' in call_kwargs
|
||||
|
||||
|
||||
# --- Tests for CredentialTypeExternalTest endpoint ---
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.api.views.flag_enabled', return_value=True)
|
||||
def test_credential_type_test_missing_job_template_id(mock_flag, post, admin, oidc_credentialtype):
|
||||
"""Test that missing job_template_id returns 400 for credential type test endpoint."""
|
||||
url = reverse('api:credential_type_external_test', kwargs={'pk': oidc_credentialtype.pk})
|
||||
data = {
|
||||
'inputs': {'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'},
|
||||
'metadata': {'secret_path': 'test/secret'},
|
||||
}
|
||||
|
||||
response = post(url, data, admin)
|
||||
assert response.status_code == 400
|
||||
assert 'details' in response.data
|
||||
assert 'error_message' in response.data['details']
|
||||
assert 'Job template ID is required' in response.data['details']['error_message']
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.api.views.flag_enabled', return_value=True)
|
||||
def test_credential_type_test_success_returns_jwt_payload(mock_flag, post, admin, oidc_credentialtype, job_template, mock_oidc_backend):
|
||||
"""Test that successful credential type test returns JWT payload."""
|
||||
url = reverse('api:credential_type_external_test', kwargs={'pk': oidc_credentialtype.pk})
|
||||
data = {
|
||||
'inputs': {'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'},
|
||||
'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)},
|
||||
}
|
||||
|
||||
response = post(url, data, admin)
|
||||
assert response.status_code == 202
|
||||
assert 'details' in response.data
|
||||
assert 'sent_jwt_payload' in response.data['details']
|
||||
@@ -305,6 +305,47 @@ class TestINIImports:
|
||||
has_host_group = inventory.groups.get(name='has_a_host')
|
||||
assert has_host_group.hosts.count() == 1
|
||||
|
||||
@mock.patch.object(inventory_import, 'AnsibleInventoryLoader', MockLoader)
|
||||
def test_overwrite_removes_stale_memberships(self, inventory):
|
||||
"""When overwrite is enabled, host-group and group-group memberships
|
||||
that are no longer in the imported data should be removed."""
|
||||
# First import: parent_group has two children, host_group has two hosts
|
||||
inventory_import.AnsibleInventoryLoader._data = {
|
||||
"_meta": {"hostvars": {"host1": {}, "host2": {}}},
|
||||
"all": {"children": ["ungrouped", "parent_group", "child_a", "child_b", "host_group"]},
|
||||
"parent_group": {"children": ["child_a", "child_b"]},
|
||||
"host_group": {"hosts": ["host1", "host2"]},
|
||||
"ungrouped": {"hosts": []},
|
||||
}
|
||||
cmd = inventory_import.Command()
|
||||
cmd.handle(inventory_id=inventory.pk, source=__file__, overwrite=True)
|
||||
|
||||
parent = inventory.groups.get(name='parent_group')
|
||||
assert set(parent.children.values_list('name', flat=True)) == {'child_a', 'child_b'}
|
||||
host_grp = inventory.groups.get(name='host_group')
|
||||
assert set(host_grp.hosts.values_list('name', flat=True)) == {'host1', 'host2'}
|
||||
|
||||
# Second import: child_b removed from parent_group, host2 moved out of host_group
|
||||
inventory_import.AnsibleInventoryLoader._data = {
|
||||
"_meta": {"hostvars": {"host1": {}, "host2": {}}},
|
||||
"all": {"children": ["ungrouped", "parent_group", "child_a", "child_b", "host_group"]},
|
||||
"parent_group": {"children": ["child_a"]},
|
||||
"host_group": {"hosts": ["host1"]},
|
||||
"ungrouped": {"hosts": ["host2"]},
|
||||
}
|
||||
cmd = inventory_import.Command()
|
||||
cmd.handle(inventory_id=inventory.pk, source=__file__, overwrite=True)
|
||||
|
||||
parent.refresh_from_db()
|
||||
host_grp.refresh_from_db()
|
||||
# child_b should be removed from parent_group
|
||||
assert set(parent.children.values_list('name', flat=True)) == {'child_a'}
|
||||
# host2 should be removed from host_group
|
||||
assert set(host_grp.hosts.values_list('name', flat=True)) == {'host1'}
|
||||
# host2 and child_b should still exist in the inventory, just not in those groups
|
||||
assert inventory.hosts.filter(name='host2').exists()
|
||||
assert inventory.groups.filter(name='child_b').exists()
|
||||
|
||||
@mock.patch.object(inventory_import, 'AnsibleInventoryLoader', MockLoader)
|
||||
def test_recursive_group_error(self, inventory):
|
||||
inventory_import.AnsibleInventoryLoader._data = {
|
||||
|
||||
206
awx/main/tests/live/tests/test_nested_workflow_artifacts.py
Normal file
206
awx/main/tests/live/tests/test_nested_workflow_artifacts.py
Normal file
@@ -0,0 +1,206 @@
|
||||
import json
|
||||
import pytest
|
||||
|
||||
from awx.main.tests.live.tests.conftest import wait_for_job
|
||||
|
||||
from awx.main.models import JobTemplate, WorkflowJobTemplate, WorkflowJobTemplateNode
|
||||
|
||||
JT_NAMES = ('artifact-test-first', 'artifact-test-second', 'artifact-test-reader')
|
||||
WFT_NAMES = ('artifact-test-outer-wf', 'artifact-test-inner-wf')
|
||||
|
||||
|
||||
@pytest.mark.django_db(transaction=True)
|
||||
def test_nested_workflow_set_stats_precedence(live_tmp_folder, demo_inv, project_factory, default_org):
|
||||
"""Reproducer for set_stats artifacts from an outer workflow leaking into
|
||||
an inner (child) workflow and overriding the inner workflow's own artifacts.
|
||||
|
||||
Outer WF: [job_first] --success--> [inner_wf]
|
||||
Inner WF: [job_second] --success--> [job_reader]
|
||||
|
||||
job_first sets via set_stats:
|
||||
var1: "outer-only" (only source, should propagate through)
|
||||
var2: "should-be-overridden" (will be overridden by job_second)
|
||||
|
||||
job_second sets via set_stats:
|
||||
var2: "from-inner" (should override outer's value)
|
||||
var3: "inner-only" (only source, should be available)
|
||||
|
||||
job_reader runs debug.yml (no set_stats), we inspect its extra_vars:
|
||||
var1 should be "outer-only" - outer artifacts propagate when uncontested
|
||||
var2 should be "from-inner" - inner artifacts override outer (THE BUG)
|
||||
var3 should be "inner-only" - inner-only artifacts propagate normally
|
||||
"""
|
||||
# Clean up resources from prior runs (delete individually for signals)
|
||||
for name in WFT_NAMES:
|
||||
for wft in WorkflowJobTemplate.objects.filter(name=name):
|
||||
wft.delete()
|
||||
for name in JT_NAMES:
|
||||
for jt in JobTemplate.objects.filter(name=name):
|
||||
jt.delete()
|
||||
|
||||
proj = project_factory(scm_url=f'file://{live_tmp_folder}/debug')
|
||||
if proj.current_job:
|
||||
wait_for_job(proj.current_job)
|
||||
|
||||
# job_first: sets var1 (outer-only) and var2 (to be overridden by inner)
|
||||
jt_first = JobTemplate.objects.create(
|
||||
name='artifact-test-first',
|
||||
project=proj,
|
||||
playbook='set_stats.yml',
|
||||
inventory=demo_inv,
|
||||
extra_vars=json.dumps({'stats_data': {'var1': 'outer-only', 'var2': 'should-be-overridden'}}),
|
||||
)
|
||||
# job_second: overrides var2, introduces var3
|
||||
jt_second = JobTemplate.objects.create(
|
||||
name='artifact-test-second',
|
||||
project=proj,
|
||||
playbook='set_stats.yml',
|
||||
inventory=demo_inv,
|
||||
extra_vars=json.dumps({'stats_data': {'var2': 'from-inner', 'var3': 'inner-only'}}),
|
||||
)
|
||||
# job_reader: just runs, we check what extra_vars it receives
|
||||
jt_reader = JobTemplate.objects.create(
|
||||
name='artifact-test-reader',
|
||||
project=proj,
|
||||
playbook='debug.yml',
|
||||
inventory=demo_inv,
|
||||
)
|
||||
|
||||
# Inner WFT: job_second -> job_reader
|
||||
inner_wft = WorkflowJobTemplate.objects.create(name='artifact-test-inner-wf', organization=default_org)
|
||||
inner_node_1 = WorkflowJobTemplateNode.objects.create(
|
||||
workflow_job_template=inner_wft,
|
||||
unified_job_template=jt_second,
|
||||
identifier='second',
|
||||
)
|
||||
inner_node_2 = WorkflowJobTemplateNode.objects.create(
|
||||
workflow_job_template=inner_wft,
|
||||
unified_job_template=jt_reader,
|
||||
identifier='reader',
|
||||
)
|
||||
inner_node_1.success_nodes.add(inner_node_2)
|
||||
|
||||
# Outer WFT: job_first -> inner_wf
|
||||
outer_wft = WorkflowJobTemplate.objects.create(name='artifact-test-outer-wf', organization=default_org)
|
||||
outer_node_1 = WorkflowJobTemplateNode.objects.create(
|
||||
workflow_job_template=outer_wft,
|
||||
unified_job_template=jt_first,
|
||||
identifier='first',
|
||||
)
|
||||
outer_node_2 = WorkflowJobTemplateNode.objects.create(
|
||||
workflow_job_template=outer_wft,
|
||||
unified_job_template=inner_wft,
|
||||
identifier='inner',
|
||||
)
|
||||
outer_node_1.success_nodes.add(outer_node_2)
|
||||
|
||||
# Launch and wait
|
||||
outer_wfj = outer_wft.create_unified_job()
|
||||
outer_wfj.signal_start()
|
||||
wait_for_job(outer_wfj, running_timeout=120)
|
||||
|
||||
# Find the reader job inside the inner workflow
|
||||
inner_wf_node = outer_wfj.workflow_job_nodes.get(identifier='inner')
|
||||
inner_wfj = inner_wf_node.job
|
||||
assert inner_wfj is not None, 'Inner workflow job was never created'
|
||||
|
||||
# Check that root node of inner WF (job_second) received outer artifacts
|
||||
second_node = inner_wfj.workflow_job_nodes.get(identifier='second')
|
||||
assert second_node.job is not None, 'Second job was never created'
|
||||
second_extra_vars = json.loads(second_node.job.extra_vars)
|
||||
assert second_extra_vars.get('var1') == 'outer-only', (
|
||||
f'Root node var1: expected "outer-only" (outer artifact should be available to root node), '
|
||||
f'got "{second_extra_vars.get("var1")}". '
|
||||
f'Outer artifacts are not reaching root nodes of child workflows.'
|
||||
)
|
||||
|
||||
reader_node = inner_wfj.workflow_job_nodes.get(identifier='reader')
|
||||
assert reader_node.job is not None, 'Reader job was never created'
|
||||
|
||||
reader_extra_vars = json.loads(reader_node.job.extra_vars)
|
||||
|
||||
# var1: only set by outer job_first, no conflict — should propagate through
|
||||
assert reader_extra_vars.get('var1') == 'outer-only', f'var1: expected "outer-only" (uncontested outer artifact), ' f'got "{reader_extra_vars.get("var1")}"'
|
||||
|
||||
# var2: set by outer as "should-be-overridden", then by inner as "from-inner"
|
||||
# Inner workflow's own ancestor artifacts should take precedence
|
||||
assert reader_extra_vars.get('var2') == 'from-inner', (
|
||||
f'var2: expected "from-inner" (inner workflow artifact should override outer), '
|
||||
f'got "{reader_extra_vars.get("var2")}". '
|
||||
f'Outer workflow artifacts are leaking via wj_special_vars. '
|
||||
f'reader node ancestor_artifacts={reader_node.ancestor_artifacts}'
|
||||
)
|
||||
|
||||
# var3: only set by inner job_second — should propagate normally
|
||||
assert reader_extra_vars.get('var3') == 'inner-only', f'var3: expected "inner-only" (inner-only artifact), ' f'got "{reader_extra_vars.get("var3")}"'
|
||||
|
||||
|
||||
@pytest.mark.django_db(transaction=True)
|
||||
def test_workflow_extra_vars_override_artifacts(live_tmp_folder, demo_inv, project_factory, default_org):
|
||||
"""Workflow extra_vars should take precedence over set_stats artifacts
|
||||
within a single (non-nested) workflow.
|
||||
|
||||
WF (extra_vars: my_var="from-wf-extra-vars"):
|
||||
[job_setter] --success--> [job_reader]
|
||||
|
||||
job_setter sets my_var="from-set-stats" via set_stats
|
||||
job_reader should see my_var="from-wf-extra-vars" because workflow
|
||||
extra_vars are higher precedence than ancestor artifacts.
|
||||
"""
|
||||
wft_name = 'artifact-test-wf-extra-vars-precedence'
|
||||
jt_names = ('artifact-test-setter', 'artifact-test-checker')
|
||||
|
||||
for wft in WorkflowJobTemplate.objects.filter(name=wft_name):
|
||||
wft.delete()
|
||||
for name in jt_names:
|
||||
for jt in JobTemplate.objects.filter(name=name):
|
||||
jt.delete()
|
||||
|
||||
proj = project_factory(scm_url=f'file://{live_tmp_folder}/debug')
|
||||
if proj.current_job:
|
||||
wait_for_job(proj.current_job)
|
||||
|
||||
jt_setter = JobTemplate.objects.create(
|
||||
name='artifact-test-setter',
|
||||
project=proj,
|
||||
playbook='set_stats.yml',
|
||||
inventory=demo_inv,
|
||||
extra_vars=json.dumps({'stats_data': {'my_var': 'from-set-stats'}}),
|
||||
)
|
||||
jt_checker = JobTemplate.objects.create(
|
||||
name='artifact-test-checker',
|
||||
project=proj,
|
||||
playbook='debug.yml',
|
||||
inventory=demo_inv,
|
||||
)
|
||||
|
||||
wft = WorkflowJobTemplate.objects.create(
|
||||
name=wft_name,
|
||||
organization=default_org,
|
||||
extra_vars=json.dumps({'my_var': 'from-wf-extra-vars'}),
|
||||
)
|
||||
node_1 = WorkflowJobTemplateNode.objects.create(
|
||||
workflow_job_template=wft,
|
||||
unified_job_template=jt_setter,
|
||||
identifier='setter',
|
||||
)
|
||||
node_2 = WorkflowJobTemplateNode.objects.create(
|
||||
workflow_job_template=wft,
|
||||
unified_job_template=jt_checker,
|
||||
identifier='checker',
|
||||
)
|
||||
node_1.success_nodes.add(node_2)
|
||||
|
||||
wfj = wft.create_unified_job()
|
||||
wfj.signal_start()
|
||||
wait_for_job(wfj, running_timeout=120)
|
||||
|
||||
checker_node = wfj.workflow_job_nodes.get(identifier='checker')
|
||||
assert checker_node.job is not None, 'Checker job was never created'
|
||||
|
||||
checker_extra_vars = json.loads(checker_node.job.extra_vars)
|
||||
assert checker_extra_vars.get('my_var') == 'from-wf-extra-vars', (
|
||||
f'Expected my_var="from-wf-extra-vars" (workflow extra_vars should override artifacts), '
|
||||
f'got my_var="{checker_extra_vars.get("my_var")}". '
|
||||
f'checker node ancestor_artifacts={checker_node.ancestor_artifacts}'
|
||||
)
|
||||
@@ -2,7 +2,11 @@
|
||||
|
||||
import pytest
|
||||
|
||||
from types import SimpleNamespace
|
||||
from unittest import mock
|
||||
|
||||
from awx.main.models import Credential, CredentialType
|
||||
from awx.main.models.credential import CredentialTypeHelper, ManagedCredentialType
|
||||
|
||||
from django.apps import apps
|
||||
|
||||
@@ -78,3 +82,53 @@ def test_credential_context_property_independent_instances():
|
||||
assert cred1.context == {'key1': 'value1'}
|
||||
assert cred2.context == {'key2': 'value2'}
|
||||
assert cred1.context is not cred2.context
|
||||
|
||||
|
||||
def test_load_plugin_passes_description():
|
||||
plugin = SimpleNamespace(name='test_plugin', inputs={'fields': []}, backend=None, plugin_description='A test plugin')
|
||||
CredentialType.load_plugin('test_ns', plugin)
|
||||
entry = ManagedCredentialType.registry['test_ns']
|
||||
assert entry.description == 'A test plugin'
|
||||
del ManagedCredentialType.registry['test_ns']
|
||||
|
||||
|
||||
def test_load_plugin_missing_description():
|
||||
plugin = SimpleNamespace(name='test_plugin', inputs={'fields': []}, backend=None)
|
||||
CredentialType.load_plugin('test_ns', plugin)
|
||||
entry = ManagedCredentialType.registry['test_ns']
|
||||
assert entry.description == ''
|
||||
del ManagedCredentialType.registry['test_ns']
|
||||
|
||||
|
||||
def test_get_creation_params_external_includes_description():
|
||||
cred_type = SimpleNamespace(namespace='test_ns', kind='external', name='Test', description='My description')
|
||||
params = CredentialTypeHelper.get_creation_params(cred_type)
|
||||
assert params['description'] == 'My description'
|
||||
|
||||
|
||||
def test_get_creation_params_external_missing_description():
|
||||
cred_type = SimpleNamespace(namespace='test_ns', kind='external', name='Test')
|
||||
params = CredentialTypeHelper.get_creation_params(cred_type)
|
||||
assert params['description'] == ''
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_setup_tower_managed_defaults_updates_description():
|
||||
registry_entry = SimpleNamespace(
|
||||
namespace='test_ns',
|
||||
kind='external',
|
||||
name='Test Plugin',
|
||||
inputs={'fields': []},
|
||||
backend=None,
|
||||
description='Updated description',
|
||||
)
|
||||
# Create an existing credential type with no description
|
||||
ct = CredentialType.objects.create(name='Test Plugin', kind='external', namespace='old_ns')
|
||||
assert ct.description == ''
|
||||
|
||||
with mock.patch.dict(ManagedCredentialType.registry, {'test_ns': registry_entry}, clear=True):
|
||||
CredentialType._setup_tower_managed_defaults()
|
||||
|
||||
ct.refresh_from_db()
|
||||
assert ct.description == 'Updated description'
|
||||
assert ct.namespace == 'test_ns'
|
||||
|
||||
@@ -473,7 +473,7 @@ def test_populate_claims_for_adhoc_command(workload_attrs, expected_claims):
|
||||
assert claims == expected_claims
|
||||
|
||||
|
||||
@mock.patch('awx.main.tasks.jobs.get_workload_identity_client')
|
||||
@mock.patch('awx.main.utils.workload_identity.get_workload_identity_client')
|
||||
def test_retrieve_workload_identity_jwt_returns_jwt_from_client(mock_get_client):
|
||||
"""retrieve_workload_identity_jwt returns the JWT string from the client."""
|
||||
mock_client = mock.MagicMock()
|
||||
@@ -502,7 +502,7 @@ def test_retrieve_workload_identity_jwt_returns_jwt_from_client(mock_get_client)
|
||||
assert call_kwargs['claims'][AutomationControllerJobScope.CLAIM_JOB_NAME] == 'Test Job'
|
||||
|
||||
|
||||
@mock.patch('awx.main.tasks.jobs.get_workload_identity_client')
|
||||
@mock.patch('awx.main.utils.workload_identity.get_workload_identity_client')
|
||||
def test_retrieve_workload_identity_jwt_passes_audience_and_scope(mock_get_client):
|
||||
"""retrieve_workload_identity_jwt passes audience and scope to the client."""
|
||||
mock_client = mock.MagicMock()
|
||||
@@ -518,7 +518,7 @@ def test_retrieve_workload_identity_jwt_passes_audience_and_scope(mock_get_clien
|
||||
mock_client.request_workload_jwt.assert_called_once_with(claims={'job_id': 1}, scope=scope, audience=audience)
|
||||
|
||||
|
||||
@mock.patch('awx.main.tasks.jobs.get_workload_identity_client')
|
||||
@mock.patch('awx.main.utils.workload_identity.get_workload_identity_client')
|
||||
def test_retrieve_workload_identity_jwt_passes_workload_ttl(mock_get_client):
|
||||
"""retrieve_workload_identity_jwt passes workload_ttl_seconds when provided."""
|
||||
mock_client = mock.Mock()
|
||||
@@ -542,7 +542,7 @@ def test_retrieve_workload_identity_jwt_passes_workload_ttl(mock_get_client):
|
||||
)
|
||||
|
||||
|
||||
@mock.patch('awx.main.tasks.jobs.get_workload_identity_client')
|
||||
@mock.patch('awx.main.utils.workload_identity.get_workload_identity_client')
|
||||
def test_retrieve_workload_identity_jwt_raises_when_client_not_configured(mock_get_client):
|
||||
"""retrieve_workload_identity_jwt raises RuntimeError when client is None."""
|
||||
mock_get_client.return_value = None
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
from ansible_base.resource_registry.workload_identity_client import get_workload_identity_client
|
||||
|
||||
__all__ = ['retrieve_workload_identity_jwt_with_claims']
|
||||
|
||||
|
||||
def retrieve_workload_identity_jwt_with_claims(
|
||||
claims: dict,
|
||||
audience: str,
|
||||
scope: str,
|
||||
workload_ttl_seconds: int | None = None,
|
||||
) -> str:
|
||||
"""Retrieve JWT token from workload claims.
|
||||
Raises:
|
||||
RuntimeError: if the workload identity client is not configured.
|
||||
"""
|
||||
client = get_workload_identity_client()
|
||||
if client is None:
|
||||
raise RuntimeError("Workload identity client is not configured")
|
||||
kwargs = {"claims": claims, "scope": scope, "audience": audience}
|
||||
if workload_ttl_seconds:
|
||||
kwargs["workload_ttl_seconds"] = workload_ttl_seconds
|
||||
return client.request_workload_jwt(**kwargs).jwt
|
||||
|
||||
@@ -34,9 +34,6 @@ def get_urlpatterns(prefix=None):
|
||||
re_path(r'^(?:api/)?500.html$', handle_500),
|
||||
re_path(r'^csp-violation/', handle_csp_violation),
|
||||
re_path(r'^login/', handle_login_redirect),
|
||||
# want api/v2/doesnotexist to return a 404, not match the ui urls,
|
||||
# so use a negative lookahead assertion here
|
||||
re_path(r'^(?!api/).*', include('awx.ui.urls', namespace='ui')),
|
||||
]
|
||||
|
||||
if settings.DYNACONF.is_development_mode:
|
||||
@@ -47,6 +44,12 @@ def get_urlpatterns(prefix=None):
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# want api/v2/doesnotexist to return a 404, not match the ui urls,
|
||||
# so use a negative lookahead assertion in the pattern below
|
||||
urlpatterns += [
|
||||
re_path(r'^(?!api/).*', include('awx.ui.urls', namespace='ui')),
|
||||
]
|
||||
|
||||
return urlpatterns
|
||||
|
||||
|
||||
|
||||
@@ -276,6 +276,7 @@ options:
|
||||
- ''
|
||||
- 'github'
|
||||
- 'gitlab'
|
||||
- 'bitbucket_dc'
|
||||
webhook_credential:
|
||||
description:
|
||||
- Personal Access Token for posting back the status to the service API
|
||||
@@ -436,7 +437,7 @@ def main():
|
||||
scm_branch=dict(),
|
||||
ask_scm_branch_on_launch=dict(type='bool'),
|
||||
job_slice_count=dict(type='int'),
|
||||
webhook_service=dict(choices=['github', 'gitlab', '']),
|
||||
webhook_service=dict(choices=['github', 'gitlab', 'bitbucket_dc', '']),
|
||||
webhook_credential=dict(),
|
||||
labels=dict(type="list", elements='str'),
|
||||
notification_templates_started=dict(type="list", elements='str'),
|
||||
|
||||
@@ -117,6 +117,7 @@ options:
|
||||
choices:
|
||||
- github
|
||||
- gitlab
|
||||
- bitbucket_dc
|
||||
webhook_credential:
|
||||
description:
|
||||
- Personal Access Token for posting back the status to the service API
|
||||
@@ -828,7 +829,7 @@ def main():
|
||||
ask_inventory_on_launch=dict(type='bool'),
|
||||
ask_scm_branch_on_launch=dict(type='bool'),
|
||||
ask_limit_on_launch=dict(type='bool'),
|
||||
webhook_service=dict(choices=['github', 'gitlab']),
|
||||
webhook_service=dict(choices=['github', 'gitlab', 'bitbucket_dc']),
|
||||
webhook_credential=dict(),
|
||||
labels=dict(type="list", elements='str'),
|
||||
notification_templates_started=dict(type="list", elements='str'),
|
||||
|
||||
124
awx_collection/test/awx/test_webhooks.py
Normal file
124
awx_collection/test/awx/test_webhooks.py
Normal file
@@ -0,0 +1,124 @@
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
from awx.main.models import JobTemplate, WorkflowJobTemplate
|
||||
|
||||
|
||||
# The backend supports these webhook services on job/workflow templates
|
||||
# (see awx/main/models/mixins.py). The collection modules must accept all of
|
||||
# them in their argument_spec ``choices`` list. This test guards against the
|
||||
# module's choices drifting from the backend -- see AAP-45980, where
|
||||
# ``bitbucket_dc`` had been supported by the API since migration 0188 but was
|
||||
# still being rejected by the job_template/workflow_job_template modules.
|
||||
WEBHOOK_SERVICES = ['github', 'gitlab', 'bitbucket_dc']
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('webhook_service', WEBHOOK_SERVICES)
|
||||
def test_job_template_accepts_webhook_service(run_module, admin_user, project, inventory, webhook_service):
|
||||
result = run_module(
|
||||
'job_template',
|
||||
{
|
||||
'name': 'foo',
|
||||
'playbook': 'helloworld.yml',
|
||||
'project': project.name,
|
||||
'inventory': inventory.name,
|
||||
'webhook_service': webhook_service,
|
||||
'state': 'present',
|
||||
},
|
||||
admin_user,
|
||||
)
|
||||
|
||||
assert not result.get('failed', False), result.get('msg', result)
|
||||
assert result.get('changed', False), result
|
||||
|
||||
jt = JobTemplate.objects.get(name='foo')
|
||||
assert jt.webhook_service == webhook_service
|
||||
|
||||
# Re-running with the same args must be a no-op (idempotence).
|
||||
result = run_module(
|
||||
'job_template',
|
||||
{
|
||||
'name': 'foo',
|
||||
'playbook': 'helloworld.yml',
|
||||
'project': project.name,
|
||||
'inventory': inventory.name,
|
||||
'webhook_service': webhook_service,
|
||||
'state': 'present',
|
||||
},
|
||||
admin_user,
|
||||
)
|
||||
assert not result.get('failed', False), result.get('msg', result)
|
||||
assert not result.get('changed', True), result
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('webhook_service', WEBHOOK_SERVICES)
|
||||
def test_workflow_job_template_accepts_webhook_service(run_module, admin_user, organization, webhook_service):
|
||||
result = run_module(
|
||||
'workflow_job_template',
|
||||
{
|
||||
'name': 'foo-workflow',
|
||||
'organization': organization.name,
|
||||
'webhook_service': webhook_service,
|
||||
'state': 'present',
|
||||
},
|
||||
admin_user,
|
||||
)
|
||||
|
||||
assert not result.get('failed', False), result.get('msg', result)
|
||||
assert result.get('changed', False), result
|
||||
|
||||
wfjt = WorkflowJobTemplate.objects.get(name='foo-workflow')
|
||||
assert wfjt.webhook_service == webhook_service
|
||||
|
||||
# Re-running with the same args must be a no-op (idempotence).
|
||||
result = run_module(
|
||||
'workflow_job_template',
|
||||
{
|
||||
'name': 'foo-workflow',
|
||||
'organization': organization.name,
|
||||
'webhook_service': webhook_service,
|
||||
'state': 'present',
|
||||
},
|
||||
admin_user,
|
||||
)
|
||||
assert not result.get('failed', False), result.get('msg', result)
|
||||
assert not result.get('changed', True), result
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_job_template_rejects_unknown_webhook_service(run_module, admin_user, project, inventory):
|
||||
result = run_module(
|
||||
'job_template',
|
||||
{
|
||||
'name': 'foo',
|
||||
'playbook': 'helloworld.yml',
|
||||
'project': project.name,
|
||||
'inventory': inventory.name,
|
||||
'webhook_service': 'not_a_real_service',
|
||||
'state': 'present',
|
||||
},
|
||||
admin_user,
|
||||
)
|
||||
assert result.get('failed', False), result
|
||||
assert 'webhook_service' in result.get('msg', '')
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_workflow_job_template_rejects_unknown_webhook_service(run_module, admin_user, organization):
|
||||
result = run_module(
|
||||
'workflow_job_template',
|
||||
{
|
||||
'name': 'foo-workflow',
|
||||
'organization': organization.name,
|
||||
'webhook_service': 'not_a_real_service',
|
||||
'state': 'present',
|
||||
},
|
||||
admin_user,
|
||||
)
|
||||
assert result.get('failed', False), result
|
||||
assert 'webhook_service' in result.get('msg', '')
|
||||
@@ -1,5 +1,5 @@
|
||||
[build-system]
|
||||
requires = ["setuptools>=45", "setuptools_scm[toml]>=6.2"]
|
||||
requires = ["setuptools>=45", "setuptools_scm[toml]>=6.2,<10"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
# Do not uncomment the line below. We need to be able to override the version via a file, and this
|
||||
|
||||
@@ -116,7 +116,7 @@ cython==3.1.3
|
||||
# via -r /awx_devel/requirements/requirements.in
|
||||
daphne==4.2.1
|
||||
# via -r /awx_devel/requirements/requirements.in
|
||||
dispatcherd[pg-notify]==2026.02.26
|
||||
dispatcherd[pg-notify]==2026.3.25
|
||||
# via -r /awx_devel/requirements/requirements.in
|
||||
distro==1.9.0
|
||||
# via -r /awx_devel/requirements/requirements.in
|
||||
|
||||
Reference in New Issue
Block a user