Compare commits

..

16 Commits

Author SHA1 Message Date
Peter Braun
09690c7918 Merge branch 'devel' into AAP-45980 2026-04-13 14:47:58 +02:00
Seth Foster
b8c9ae73cd Fix OIDC workload identity for inventory sync (#16390)
The cloud credential used by inventory updates was not going through
the OIDC workload identity token flow because it lives outside the
normal _credentials list. This overrides populate_workload_identity_tokens
in RunInventoryUpdate to include the cloud credential as an
additional_credentials argument to the base implementation, and
patches get_cloud_credential on the instance so the injector picks up
the credential with OIDC context intact.

Co-authored-by: Alan Rominger <arominge@redhat.com>
Co-authored-by: Dave Mulford <dmulford@redhat.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 16:26:18 -04:00
Stevenson Michel
d71f18fa44 [Devel] Config Endpoint Optimization (#16389)
* Improved performance of the config endpoint by reducing database queries in GET /api/controller/v2/config/
2026-04-09 16:24:03 -04:00
Tong He
e82a4246f3 Bind the install bundle to the ansible.receptor collection 2.0.8 version (#16396) 2026-04-09 17:09:26 +02:00
Peter Braun
2f80a4e30d update docs 2026-04-09 09:52:02 +02:00
Peter Braun
9c2de57235 add test 2026-04-09 09:37:56 +02:00
Peter Braun
bb2f8cdd01 support bitbucket_dc webhooks 2026-04-09 09:27:31 +02:00
Daniel Finca
b83019bde6 feat: support for oidc credential /test endpoint (#16370)
Adds support for testing external credentials that use OIDC workload identity tokens.
When FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED is enabled, the /test endpoints return
JWT payload details alongside test results.

- Add OIDC credential test endpoints with job template selection
- Return JWT payload and secret value in test response
- Maintain backward compatibility (detail field for errors)
- Add comprehensive unit and functional tests
- Refactor shared error handling logic

Co-authored-by: Daniel Finca <dfinca@redhat.com>
Co-authored-by: melissalkelly <melissalkelly1@gmail.com>
2026-04-06 15:56:11 -04:00
Alan Rominger
6d94aa84e7 Reorder URLs so that Django debug toolbar can work (#16352)
* Reorder URLs so that Django debug toolbar can work

* Move comment with URL move
2026-04-03 10:22:21 -04:00
Alan Rominger
7155400efc AAP-12516 [option 2] Handle nested workflow artifacts via root node ancestor_artifacts (#16381)
* Add new test for artfact precedence upstream node vs outer workflow

* Fix bugs, upstream artifacts come first for precedence

* Track nested artifacts path through ancestor_artifacts on root nodes

* Fix case where first root node did not get the vars

* touchup comment

* Prevent conflict with sliced jobs hack
2026-04-02 15:18:11 -04:00
melissalkelly
e80ce43f87 Fix workload identity project updates (#16373)
* fix: enable workload identity credentials for project updates

* Add explanatory comment for credential context handling

* Revert build_passwords
2026-03-31 14:48:27 +02:00
Stevenson Michel
595e093bbf [CI-Fix] Pin setuptools_scm<10 to fix api-lint failure (#16376)
Fix CI: Pin setuptools_scm<10 to fix api-lint build failure

setuptools-scm 10.0.5 (with its new vcs-versioning dependency) requires
a [tool.setuptools_scm] or [tool.vcs-versioning] section in pyproject.toml.
AWX intentionally omits this section because it uses a custom version
resolution via setup.cfg (version = attr: awx.get_version). The new major
version of setuptools-scm treats the missing section as a fatal error when
building the sdist in tox's isolated build, causing the linters environment
to fail.

Pinning to <10 restores compatibility with the existing version resolution
strategy.

Failing run: https://github.com/ansible/awx/actions/runs/23744310714
Branch: devel

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 14:37:16 -04:00
TVo
cd7f6f602f Fix OpenAPI schema validation message mismatch (#16372) 2026-03-25 12:36:10 -06:00
Chris Meyers
310dd3e18f Update dispatcherd to version 2026.3.25 (#16369)
Co-authored-by: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 10:57:01 -04:00
Matthew Sandoval
7c75788b0a AAP-67740 Pass plugin_description through to CredentialType.description (#16364)
* Pass plugin_description through to CredentialType.description

Propagate the plugin_description field from credential plugins into the
CredentialType description when loading and creating managed credential
types, including updates to existing records.

Assisted-by: Claude

* Add unit tests for plugin_description passthrough to CredentialType

Tests cover load_plugin, get_creation_params, and
_setup_tower_managed_defaults handling of the description field.

Assisted-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: PabloHiro <palonso@redhat.com>
2026-03-25 11:03:11 +01:00
Peter Braun
ab294385ad fix: avoid delete in loop in inventory import (#16366) 2026-03-24 15:37:59 +00:00
26 changed files with 1306 additions and 100 deletions

View File

@@ -581,7 +581,7 @@ detect-schema-change: genschema
validate-openapi-schema: genschema validate-openapi-schema: genschema
@echo "Validating OpenAPI schema from schema.json..." @echo "Validating OpenAPI schema from schema.json..."
@python3 -c "from openapi_spec_validator import validate; import json; spec = json.load(open('schema.json')); validate(spec); print('✓ OpenAPI Schema is valid!')" @python3 -c "from openapi_spec_validator import validate; import json; spec = json.load(open('schema.json')); validate(spec); print('✓ Schema is valid')"
docker-compose-clean: awx/projects docker-compose-clean: awx/projects
$(DOCKER_COMPOSE) -f tools/docker-compose/_sources/docker-compose.yml rm -sf $(DOCKER_COMPOSE) -f tools/docker-compose/_sources/docker-compose.yml rm -sf

View File

@@ -122,7 +122,6 @@ from awx.main.scheduler.task_manager_models import TaskManagerModels
from awx.main.redact import UriCleaner, REPLACE_STR from awx.main.redact import UriCleaner, REPLACE_STR
from awx.main.signals import update_inventory_computed_fields from awx.main.signals import update_inventory_computed_fields
from awx.main.validators import vars_validate_or_raise from awx.main.validators import vars_validate_or_raise
from awx.api.versioning import reverse from awx.api.versioning import reverse
@@ -2932,6 +2931,19 @@ class CredentialTypeSerializer(BaseSerializer):
field['label'] = _(field['label']) field['label'] = _(field['label'])
if 'help_text' in field: if 'help_text' in field:
field['help_text'] = _(field['help_text']) field['help_text'] = _(field['help_text'])
# Deep copy inputs to avoid modifying the original model data
inputs = value.get('inputs')
if not isinstance(inputs, dict):
inputs = {}
value['inputs'] = copy.deepcopy(inputs)
fields = value['inputs'].get('fields', [])
if not isinstance(fields, list):
fields = []
# Normalize fields and filter out internal fields
value['inputs']['fields'] = [f for f in fields if not f.get('internal')]
return value return value
def filter_field_metadata(self, fields, method): def filter_field_metadata(self, fields, method):

View File

@@ -1,4 +1,4 @@
--- ---
collections: collections:
- name: ansible.receptor - name: ansible.receptor
version: 2.0.6 version: 2.0.8

View File

@@ -14,6 +14,7 @@ import sys
import time import time
from base64 import b64encode from base64 import b64encode
from collections import OrderedDict from collections import OrderedDict
from jwt import decode as _jwt_decode
from urllib3.exceptions import ConnectTimeoutError from urllib3.exceptions import ConnectTimeoutError
@@ -58,8 +59,13 @@ from drf_spectacular.utils import extend_schema_view, extend_schema
from ansible_base.lib.utils.requests import get_remote_hosts from ansible_base.lib.utils.requests import get_remote_hosts
from ansible_base.rbac.models import RoleEvaluation from ansible_base.rbac.models import RoleEvaluation
from ansible_base.lib.utils.schema import extend_schema_if_available from ansible_base.lib.utils.schema import extend_schema_if_available
from ansible_base.lib.workload_identity.controller import AutomationControllerJobScope
# flags
from flags.state import flag_enabled
# AWX # AWX
from awx.main.tasks.jobs import retrieve_workload_identity_jwt_with_claims
from awx.main.tasks.system import send_notifications, update_inventory_computed_fields from awx.main.tasks.system import send_notifications, update_inventory_computed_fields
from awx.main.access import get_user_queryset from awx.main.access import get_user_queryset
from awx.api.generics import ( from awx.api.generics import (
@@ -1595,7 +1601,177 @@ class CredentialCopy(CopyAPIView):
resource_purpose = 'copy of a credential' resource_purpose = 'copy of a credential'
class CredentialExternalTest(SubDetailAPIView): class OIDCCredentialTestMixin:
"""
Mixin to add OIDC workload identity token support to credential test endpoints.
This mixin provides methods to handle OIDC-enabled external credentials that use
workload identity tokens for authentication.
"""
@staticmethod
def _get_workload_identity_token(job_template: models.JobTemplate, jwt_aud: str) -> str:
"""Generate a workload identity token for a job template.
Args:
job_template: The JobTemplate instance to generate claims for
jwt_aud: The JWT audience claim value
Returns:
str: The generated JWT token
"""
claims = {
AutomationControllerJobScope.CLAIM_ORGANIZATION_NAME: job_template.organization.name,
AutomationControllerJobScope.CLAIM_ORGANIZATION_ID: job_template.organization.id,
AutomationControllerJobScope.CLAIM_PROJECT_NAME: job_template.project.name,
AutomationControllerJobScope.CLAIM_PROJECT_ID: job_template.project.id,
AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_NAME: job_template.name,
AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_ID: job_template.id,
AutomationControllerJobScope.CLAIM_PLAYBOOK_NAME: job_template.playbook,
}
return retrieve_workload_identity_jwt_with_claims(
claims=claims,
audience=jwt_aud,
scope=AutomationControllerJobScope.name,
)
@staticmethod
def _decode_jwt_payload_for_display(jwt_token):
"""Decode JWT payload for display purposes only (signature not verified).
This is safe because the JWT was just created by AWX and is only decoded
to show the user what claims are being sent to the external system.
The external system will perform proper signature verification.
Args:
jwt_token: The JWT token to decode
Returns:
dict: The decoded JWT payload
"""
return _jwt_decode(jwt_token, algorithms=["RS256"], options={"verify_signature": False}) # NOSONAR python:S5659
def _has_workload_identity_token(self, credential_type_inputs):
"""Check if credential type has an internal workload_identity_token field.
Args:
credential_type_inputs: The inputs dict from a credential type
Returns:
bool: True if the credential type has a workload_identity_token field marked as internal
"""
fields = credential_type_inputs.get('fields', []) if isinstance(credential_type_inputs, dict) else []
return any(field.get('internal') and field.get('id') == 'workload_identity_token' for field in fields)
def _validate_and_get_job_template(self, job_template_id):
"""Validate job template ID and return the JobTemplate instance.
Args:
job_template_id: The job template ID from metadata
Returns:
JobTemplate instance
Raises:
ParseError: If job_template_id is invalid or not found
"""
if job_template_id is None:
raise ParseError(_('Job template ID is required.'))
try:
return models.JobTemplate.objects.get(id=int(job_template_id))
except ValueError:
raise ParseError(_('Job template ID must be an integer.'))
except models.JobTemplate.DoesNotExist:
raise ParseError(_('Job template with ID %(id)s does not exist.') % {'id': job_template_id})
def _handle_oidc_credential_test(self, backend_kwargs):
"""
Handle OIDC workload identity token generation for external credential test endpoints.
This method should only be called when FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED is enabled
and the credential type has a workload_identity_token field.
Args:
backend_kwargs: The kwargs dict to pass to the backend (will be modified in place)
Returns:
dict: Response body containing details with the sent JWT payload
Raises:
PermissionDenied: If user lacks access to the job template (re-raised for 403 response)
All other exceptions are caught and converted to 400 responses with error details.
Modifies backend_kwargs in place to add workload_identity_token.
"""
# Validate job template
job_template_id = backend_kwargs.pop('job_template_id', None)
job_template = self._validate_and_get_job_template(job_template_id)
# Check user access
if not self.request.user.can_access(models.JobTemplate, 'start', job_template):
raise PermissionDenied(_('You do not have access to job template with id: %(id)s.') % {'id': job_template.id})
# Generate workload identity token
jwt_token = self._get_workload_identity_token(job_template, backend_kwargs.pop('jwt_aud', None))
backend_kwargs['workload_identity_token'] = jwt_token
return {'details': {'sent_jwt_payload': self._decode_jwt_payload_for_display(jwt_token)}}
def _call_backend_with_error_handling(self, plugin, backend_kwargs, response_body):
"""Call credential backend and handle errors, adding secret_value to response if OIDC details present."""
try:
with set_environ(**settings.AWX_TASK_ENV):
secret_value = plugin.backend(**backend_kwargs)
if 'details' in response_body:
response_body['details']['secret_value'] = secret_value
return Response(response_body, status=status.HTTP_202_ACCEPTED)
except requests.exceptions.HTTPError as exc:
message = self._extract_http_error_message(exc)
self._add_error_to_response(response_body, message)
return Response(response_body, status=status.HTTP_400_BAD_REQUEST)
except Exception as exc:
message = self._extract_generic_error_message(exc)
self._add_error_to_response(response_body, message)
return Response(response_body, status=status.HTTP_400_BAD_REQUEST)
@staticmethod
def _extract_http_error_message(exc):
"""Extract error message from HTTPError, checking response JSON and text."""
message = str(exc)
if not hasattr(exc, 'response') or exc.response is None:
return message
try:
error_data = exc.response.json()
if 'errors' in error_data and error_data['errors']:
return ', '.join(error_data['errors'])
if 'error' in error_data:
return error_data['error']
except (ValueError, KeyError):
if exc.response.text:
return exc.response.text
return message
@staticmethod
def _extract_generic_error_message(exc):
"""Extract error message from exception, handling ConnectTimeoutError specially."""
message = str(exc) if str(exc) else exc.__class__.__name__
for arg in getattr(exc, 'args', []):
if isinstance(getattr(arg, 'reason', None), ConnectTimeoutError):
return str(arg.reason)
return message
@staticmethod
def _add_error_to_response(response_body, message):
"""Add error message to both 'detail' and 'details.error_message' fields."""
response_body['detail'] = message
if 'details' in response_body:
response_body['details']['error_message'] = message
class CredentialExternalTest(OIDCCredentialTestMixin, SubDetailAPIView):
""" """
Test updates to the input values and metadata of an external credential Test updates to the input values and metadata of an external credential
before saving them. before saving them.
@@ -1622,23 +1798,22 @@ class CredentialExternalTest(SubDetailAPIView):
if value != '$encrypted$': if value != '$encrypted$':
backend_kwargs[field_name] = value backend_kwargs[field_name] = value
backend_kwargs.update(request.data.get('metadata', {})) backend_kwargs.update(request.data.get('metadata', {}))
try:
with set_environ(**settings.AWX_TASK_ENV): # Handle OIDC workload identity token generation if enabled
obj.credential_type.plugin.backend(**backend_kwargs) response_body = {}
return Response({}, status=status.HTTP_202_ACCEPTED) if flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED') and self._has_workload_identity_token(obj.credential_type.inputs):
except requests.exceptions.HTTPError: try:
message = """Test operation is not supported for credential type {}. oidc_response_body = self._handle_oidc_credential_test(backend_kwargs)
This endpoint only supports credentials that connect to response_body.update(oidc_response_body)
external secret management systems such as CyberArk, HashiCorp except PermissionDenied:
Vault, or cloud-based secret managers.""".format(obj.credential_type.kind) raise
return Response({'detail': message}, status=status.HTTP_400_BAD_REQUEST) except Exception as exc:
except Exception as exc: error_message = str(exc.detail) if hasattr(exc, 'detail') else str(exc)
message = exc.__class__.__name__ response_body['detail'] = error_message
exc_args = getattr(exc, 'args', []) response_body['details'] = {'error_message': error_message}
for a in exc_args: return Response(response_body, status=status.HTTP_400_BAD_REQUEST)
if isinstance(getattr(a, 'reason', None), ConnectTimeoutError):
message = str(a.reason) return self._call_backend_with_error_handling(obj.credential_type.plugin, backend_kwargs, response_body)
return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST)
class CredentialInputSourceDetail(RetrieveUpdateDestroyAPIView): class CredentialInputSourceDetail(RetrieveUpdateDestroyAPIView):
@@ -1668,7 +1843,7 @@ class CredentialInputSourceSubList(SubListCreateAPIView):
parent_key = 'target_credential' parent_key = 'target_credential'
class CredentialTypeExternalTest(SubDetailAPIView): class CredentialTypeExternalTest(OIDCCredentialTestMixin, SubDetailAPIView):
""" """
Test a complete set of input values for an external credential before Test a complete set of input values for an external credential before
saving it. saving it.
@@ -1685,19 +1860,22 @@ class CredentialTypeExternalTest(SubDetailAPIView):
obj = self.get_object() obj = self.get_object()
backend_kwargs = request.data.get('inputs', {}) backend_kwargs = request.data.get('inputs', {})
backend_kwargs.update(request.data.get('metadata', {})) backend_kwargs.update(request.data.get('metadata', {}))
try:
obj.plugin.backend(**backend_kwargs) # Handle OIDC workload identity token generation if enabled
return Response({}, status=status.HTTP_202_ACCEPTED) response_body = {}
except requests.exceptions.HTTPError as exc: if flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED') and self._has_workload_identity_token(obj.inputs):
message = 'HTTP {}'.format(exc.response.status_code) try:
return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST) oidc_response_body = self._handle_oidc_credential_test(backend_kwargs)
except Exception as exc: response_body.update(oidc_response_body)
message = exc.__class__.__name__ except PermissionDenied:
args_exc = getattr(exc, 'args', []) raise
for a in args_exc: except Exception as exc:
if isinstance(getattr(a, 'reason', None), ConnectTimeoutError): error_message = str(exc.detail) if hasattr(exc, 'detail') else str(exc)
message = str(a.reason) response_body['detail'] = error_message
return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST) response_body['details'] = {'error_message': error_message}
return Response(response_body, status=status.HTTP_400_BAD_REQUEST)
return self._call_backend_with_error_handling(obj.plugin, backend_kwargs, response_body)
class HostRelatedSearchMixin(object): class HostRelatedSearchMixin(object):

View File

@@ -344,13 +344,22 @@ class ApiV2ConfigView(APIView):
become_methods=PRIVILEGE_ESCALATION_METHODS, become_methods=PRIVILEGE_ESCALATION_METHODS,
) )
if ( # Check superuser/auditor first
request.user.is_superuser if request.user.is_superuser or request.user.is_system_auditor:
or request.user.is_system_auditor has_org_access = True
or Organization.accessible_objects(request.user, 'admin_role').exists() else:
or Organization.accessible_objects(request.user, 'auditor_role').exists() # Single query checking all three organization role types at once
or Organization.accessible_objects(request.user, 'project_admin_role').exists() has_org_access = (
): (
Organization.access_qs(request.user, 'change')
| Organization.access_qs(request.user, 'audit')
| Organization.access_qs(request.user, 'add_project')
)
.distinct()
.exists()
)
if has_org_access:
data.update( data.update(
dict( dict(
project_base_dir=settings.PROJECTS_ROOT, project_base_dir=settings.PROJECTS_ROOT,
@@ -358,8 +367,10 @@ class ApiV2ConfigView(APIView):
custom_virtualenvs=get_custom_venv_choices(), custom_virtualenvs=get_custom_venv_choices(),
) )
) )
elif JobTemplate.accessible_objects(request.user, 'admin_role').exists(): else:
data['custom_virtualenvs'] = get_custom_venv_choices() # Only check JobTemplate access if org check failed
if JobTemplate.accessible_objects(request.user, 'admin_role').exists():
data['custom_virtualenvs'] = get_custom_venv_choices()
return Response(data) return Response(data)

View File

@@ -409,10 +409,12 @@ class Command(BaseCommand):
del_child_group_pks = list(set(db_children_name_pk_map.values())) del_child_group_pks = list(set(db_children_name_pk_map.values()))
for offset in range(0, len(del_child_group_pks), self._batch_size): for offset in range(0, len(del_child_group_pks), self._batch_size):
child_group_pks = del_child_group_pks[offset : (offset + self._batch_size)] child_group_pks = del_child_group_pks[offset : (offset + self._batch_size)]
for db_child in db_children.filter(pk__in=child_group_pks): children_to_remove = list(db_children.filter(pk__in=child_group_pks))
group_group_count += 1 if children_to_remove:
db_group.children.remove(db_child) group_group_count += len(children_to_remove)
logger.debug('Group "%s" removed from group "%s"', db_child.name, db_group.name) db_group.children.remove(*children_to_remove)
for db_child in children_to_remove:
logger.debug('Group "%s" removed from group "%s"', db_child.name, db_group.name)
# FIXME: Inventory source group relationships # FIXME: Inventory source group relationships
# Delete group/host relationships not present in imported data. # Delete group/host relationships not present in imported data.
db_hosts = db_group.hosts db_hosts = db_group.hosts
@@ -441,12 +443,12 @@ class Command(BaseCommand):
del_host_pks = list(del_host_pks) del_host_pks = list(del_host_pks)
for offset in range(0, len(del_host_pks), self._batch_size): for offset in range(0, len(del_host_pks), self._batch_size):
del_pks = del_host_pks[offset : (offset + self._batch_size)] del_pks = del_host_pks[offset : (offset + self._batch_size)]
for db_host in db_hosts.filter(pk__in=del_pks): hosts_to_remove = list(db_hosts.filter(pk__in=del_pks))
group_host_count += 1 if hosts_to_remove:
if db_host not in db_group.hosts.all(): group_host_count += len(hosts_to_remove)
continue db_group.hosts.remove(*hosts_to_remove)
db_group.hosts.remove(db_host) for db_host in hosts_to_remove:
logger.debug('Host "%s" removed from group "%s"', db_host.name, db_group.name) logger.debug('Host "%s" removed from group "%s"', db_host.name, db_group.name)
if settings.SQL_DEBUG: if settings.SQL_DEBUG:
logger.warning( logger.warning(
'group-group and group-host deletions took %d queries for %d relationships', 'group-group and group-host deletions took %d queries for %d relationships',

View File

@@ -531,6 +531,7 @@ class CredentialType(CommonModelNameNotUnique):
existing = ct_class.objects.filter(name=default.name, kind=default.kind).first() existing = ct_class.objects.filter(name=default.name, kind=default.kind).first()
if existing is not None: if existing is not None:
existing.namespace = default.namespace existing.namespace = default.namespace
existing.description = getattr(default, 'description', '')
existing.inputs = {} existing.inputs = {}
existing.injectors = {} existing.injectors = {}
existing.save() existing.save()
@@ -570,7 +571,14 @@ class CredentialType(CommonModelNameNotUnique):
@classmethod @classmethod
def load_plugin(cls, ns, plugin): def load_plugin(cls, ns, plugin):
# TODO: User "side-loaded" credential custom_injectors isn't supported # TODO: User "side-loaded" credential custom_injectors isn't supported
ManagedCredentialType.registry[ns] = SimpleNamespace(namespace=ns, name=plugin.name, kind='external', inputs=plugin.inputs, backend=plugin.backend) ManagedCredentialType.registry[ns] = SimpleNamespace(
namespace=ns,
name=plugin.name,
kind='external',
inputs=plugin.inputs,
backend=plugin.backend,
description=getattr(plugin, 'plugin_description', ''),
)
def inject_credential(self, credential, env, safe_env, args, private_data_dir, container_root=None): def inject_credential(self, credential, env, safe_env, args, private_data_dir, container_root=None):
from awx_plugins.interfaces._temporary_private_inject_api import inject_credential from awx_plugins.interfaces._temporary_private_inject_api import inject_credential
@@ -582,7 +590,13 @@ class CredentialTypeHelper:
@classmethod @classmethod
def get_creation_params(cls, cred_type): def get_creation_params(cls, cred_type):
if cred_type.kind == 'external': if cred_type.kind == 'external':
return dict(namespace=cred_type.namespace, kind=cred_type.kind, name=cred_type.name, managed=True) return {
'namespace': cred_type.namespace,
'kind': cred_type.kind,
'name': cred_type.name,
'managed': True,
'description': getattr(cred_type, 'description', ''),
}
return dict( return dict(
namespace=cred_type.namespace, namespace=cred_type.namespace,
kind=cred_type.kind, kind=cred_type.kind,

View File

@@ -345,7 +345,11 @@ class WorkflowJobNode(WorkflowNodeBase):
) )
data.update(accepted_fields) # missing fields are handled in the scheduler data.update(accepted_fields) # missing fields are handled in the scheduler
# build ancestor artifacts, save them to node model for later # build ancestor artifacts, save them to node model for later
aa_dict = {} # initialize from pre-seeded ancestor_artifacts (set on root nodes of
# child workflows via seed_root_ancestor_artifacts to carry artifacts
# from the parent workflow); exclude job_slice which is internal
# metadata handled separately below
aa_dict = {k: v for k, v in self.ancestor_artifacts.items() if k != 'job_slice'} if self.ancestor_artifacts else {}
is_root_node = True is_root_node = True
for parent_node in self.get_parent_nodes(): for parent_node in self.get_parent_nodes():
is_root_node = False is_root_node = False
@@ -366,11 +370,13 @@ class WorkflowJobNode(WorkflowNodeBase):
data['survey_passwords'] = password_dict data['survey_passwords'] = password_dict
# process extra_vars # process extra_vars
extra_vars = data.get('extra_vars', {}) extra_vars = data.get('extra_vars', {})
if ujt_obj and isinstance(ujt_obj, (JobTemplate, WorkflowJobTemplate)): if ujt_obj and isinstance(ujt_obj, JobTemplate):
if aa_dict: if aa_dict:
functional_aa_dict = copy(aa_dict) functional_aa_dict = copy(aa_dict)
functional_aa_dict.pop('_ansible_no_log', None) functional_aa_dict.pop('_ansible_no_log', None)
extra_vars.update(functional_aa_dict) extra_vars.update(functional_aa_dict)
elif ujt_obj and isinstance(ujt_obj, WorkflowJobTemplate):
pass # artifacts are applied via seed_root_ancestor_artifacts in the task manager
# Workflow Job extra_vars higher precedence than ancestor artifacts # Workflow Job extra_vars higher precedence than ancestor artifacts
extra_vars.update(wj_special_vars) extra_vars.update(wj_special_vars)
@@ -734,6 +740,18 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
wj = wj.get_workflow_job() wj = wj.get_workflow_job()
return ancestors return ancestors
def seed_root_ancestor_artifacts(self, artifacts):
"""Apply parent workflow artifacts to root nodes so they propagate
through the normal ancestor_artifacts channel instead of being
baked into this workflow's extra_vars."""
self.workflow_job_nodes.exclude(
workflowjobnodes_success__isnull=False,
).exclude(
workflowjobnodes_failure__isnull=False,
).exclude(
workflowjobnodes_always__isnull=False,
).update(ancestor_artifacts=artifacts)
def get_effective_artifacts(self, **kwargs): def get_effective_artifacts(self, **kwargs):
""" """
For downstream jobs of a workflow nested inside of a workflow, For downstream jobs of a workflow nested inside of a workflow,

View File

@@ -241,6 +241,8 @@ class WorkflowManager(TaskBase):
job = spawn_node.unified_job_template.create_unified_job(**kv) job = spawn_node.unified_job_template.create_unified_job(**kv)
spawn_node.job = job spawn_node.job = job
spawn_node.save() spawn_node.save()
if spawn_node.ancestor_artifacts and isinstance(spawn_node.unified_job_template, WorkflowJobTemplate):
job.seed_root_ancestor_artifacts(spawn_node.ancestor_artifacts)
logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk)
can_start = True can_start = True
if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate):

View File

@@ -277,6 +277,7 @@ class RunnerCallback:
def artifacts_handler(self, artifact_dir): def artifacts_handler(self, artifact_dir):
success, query_file_contents = try_load_query_file(artifact_dir) success, query_file_contents = try_load_query_file(artifact_dir)
if success: if success:
self.delay_update(event_queries_processed=False)
collections_info = collect_queries(query_file_contents) collections_info = collect_queries(query_file_contents)
for collection, data in collections_info.items(): for collection, data in collections_info.items():
version = data['version'] version = data['version']
@@ -300,24 +301,6 @@ class RunnerCallback:
else: else:
logger.warning(f'The file {COLLECTION_FILENAME} unexpectedly did not contain ansible_version') logger.warning(f'The file {COLLECTION_FILENAME} unexpectedly did not contain ansible_version')
# Write event_queries_processed and installed_collections directly
# to the DB instead of using delay_update. delay_update defers
# writes until the final job status save, but
# events_processed_hook (called from both the task runner after
# the final save and the callback receiver after the wrapup
# event) needs event_queries_processed=False visible in the DB
# to dispatch save_indirect_host_entries. The field defaults to
# True, so without a direct write the hook would see True and
# skip the dispatch. installed_collections is also written
# directly so it is available if the callback receiver
# dispatches before the final save.
from awx.main.models import Job
db_updates = {'event_queries_processed': False}
if 'installed_collections' in query_file_contents:
db_updates['installed_collections'] = query_file_contents['installed_collections']
Job.objects.filter(id=self.instance.id).update(**db_updates)
self.artifacts_processed = True self.artifacts_processed = True

View File

@@ -94,7 +94,7 @@ from flags.state import flag_enabled
# Workload Identity # Workload Identity
from ansible_base.lib.workload_identity.controller import AutomationControllerJobScope from ansible_base.lib.workload_identity.controller import AutomationControllerJobScope
from ansible_base.resource_registry.workload_identity_client import get_workload_identity_client from awx.main.utils.workload_identity import retrieve_workload_identity_jwt_with_claims
logger = logging.getLogger('awx.main.tasks.jobs') logger = logging.getLogger('awx.main.tasks.jobs')
@@ -168,14 +168,12 @@ def retrieve_workload_identity_jwt(
Raises: Raises:
RuntimeError: if the workload identity client is not configured. RuntimeError: if the workload identity client is not configured.
""" """
client = get_workload_identity_client() return retrieve_workload_identity_jwt_with_claims(
if client is None: populate_claims_for_workload(unified_job),
raise RuntimeError("Workload identity client is not configured") audience,
claims = populate_claims_for_workload(unified_job) scope,
kwargs = {"claims": claims, "scope": scope, "audience": audience} workload_ttl_seconds,
if workload_ttl_seconds: )
kwargs["workload_ttl_seconds"] = workload_ttl_seconds
return client.request_workload_jwt(**kwargs).jwt
def with_path_cleanup(f): def with_path_cleanup(f):
@@ -230,16 +228,19 @@ class BaseTask(object):
# Convert to list to prevent re-evaluation of QuerySet # Convert to list to prevent re-evaluation of QuerySet
return list(credentials_list) return list(credentials_list)
def populate_workload_identity_tokens(self): def populate_workload_identity_tokens(self, additional_credentials=None):
""" """
Populate credentials with workload identity tokens. Populate credentials with workload identity tokens.
Sets the context on Credential objects that have input sources Sets the context on Credential objects that have input sources
using compatible external credential types. using compatible external credential types.
""" """
credentials = list(self._credentials)
if additional_credentials:
credentials.extend(additional_credentials)
credential_input_sources = ( credential_input_sources = (
(credential.context, src) (credential.context, src)
for credential in self._credentials for credential in credentials
for src in credential.input_sources.all() for src in credential.input_sources.all()
if any( if any(
field.get('id') == 'workload_identity_token' and field.get('internal') field.get('id') == 'workload_identity_token' and field.get('internal')
@@ -1682,7 +1683,7 @@ class RunProjectUpdate(BaseTask):
return params return params
def build_credentials_list(self, project_update): def build_credentials_list(self, project_update):
if project_update.scm_type == 'insights' and project_update.credential: if project_update.credential:
return [project_update.credential] return [project_update.credential]
return [] return []
@@ -1865,6 +1866,24 @@ class RunInventoryUpdate(SourceControlMixin, BaseTask):
# All credentials not used by inventory source injector # All credentials not used by inventory source injector
return inventory_update.get_extra_credentials() return inventory_update.get_extra_credentials()
def populate_workload_identity_tokens(self, additional_credentials=None):
"""Also generate OIDC tokens for the cloud credential.
The cloud credential is not in _credentials (it is handled by the
inventory source injector), but it may still need a workload identity
token generated for it.
"""
cloud_cred = self.instance.get_cloud_credential()
creds = list(additional_credentials or [])
if cloud_cred:
creds.append(cloud_cred)
super().populate_workload_identity_tokens(additional_credentials=creds or None)
# Override get_cloud_credential on this instance so the injector
# uses the credential with OIDC context instead of doing a fresh
# DB fetch that would lose it.
if cloud_cred and cloud_cred.context:
self.instance.get_cloud_credential = lambda: cloud_cred
def build_project_dir(self, inventory_update, private_data_dir): def build_project_dir(self, inventory_update, private_data_dir):
source_project = None source_project = None
if inventory_update.inventory_source: if inventory_update.inventory_source:

View File

@@ -0,0 +1,11 @@
---
- hosts: all
gather_facts: false
connection: local
tasks:
- name: Set artifacts via set_stats
ansible.builtin.set_stats:
data: "{{ stats_data }}"
per_host: false
aggregate: false
when: stats_data is defined

View File

@@ -0,0 +1,84 @@
import pytest
from awx.api.versioning import reverse
from rest_framework import status
from awx.main.models.jobs import JobTemplate
@pytest.mark.django_db
class TestConfigEndpointFields:
def test_base_fields_all_users(self, get, rando):
url = reverse('api:api_v2_config_view')
response = get(url, rando, expect=200)
assert 'time_zone' in response.data
assert 'license_info' in response.data
assert 'version' in response.data
assert 'eula' in response.data
assert 'analytics_status' in response.data
assert 'analytics_collectors' in response.data
assert 'become_methods' in response.data
@pytest.mark.parametrize(
"role_type",
[
"superuser",
"system_auditor",
"org_admin",
"org_auditor",
"org_project_admin",
],
)
def test_privileged_users_conditional_fields(self, get, user, organization, admin, role_type):
url = reverse('api:api_v2_config_view')
if role_type == "superuser":
test_user = admin
elif role_type == "system_auditor":
test_user = user('system-auditor', is_superuser=False)
test_user.is_system_auditor = True
test_user.save()
elif role_type == "org_admin":
test_user = user('org-admin', is_superuser=False)
organization.admin_role.members.add(test_user)
elif role_type == "org_auditor":
test_user = user('org-auditor', is_superuser=False)
organization.auditor_role.members.add(test_user)
elif role_type == "org_project_admin":
test_user = user('org-project-admin', is_superuser=False)
organization.project_admin_role.members.add(test_user)
response = get(url, test_user, expect=200)
assert 'project_base_dir' in response.data
assert 'project_local_paths' in response.data
assert 'custom_virtualenvs' in response.data
def test_job_template_admin_gets_venvs_only(self, get, user, organization, project, inventory):
"""Test that JobTemplate admin without org access gets only custom_virtualenvs"""
jt_admin = user('jt-admin', is_superuser=False)
jt = JobTemplate.objects.create(name='test-jt', organization=organization, project=project, inventory=inventory)
jt.admin_role.members.add(jt_admin)
url = reverse('api:api_v2_config_view')
response = get(url, jt_admin, expect=200)
assert 'custom_virtualenvs' in response.data
assert 'project_base_dir' not in response.data
assert 'project_local_paths' not in response.data
def test_normal_user_no_conditional_fields(self, get, rando):
url = reverse('api:api_v2_config_view')
response = get(url, rando, expect=200)
assert 'project_base_dir' not in response.data
assert 'project_local_paths' not in response.data
assert 'custom_virtualenvs' not in response.data
def test_unauthenticated_denied(self, get):
"""Test that unauthenticated requests are denied"""
url = reverse('api:api_v2_config_view')
response = get(url, None, expect=401)
assert response.status_code == status.HTTP_401_UNAUTHORIZED

View File

@@ -2,6 +2,7 @@ import json
import pytest import pytest
from ansible_base.lib.testing.util import feature_flag_enabled
from awx.main.models.credential import CredentialType, Credential from awx.main.models.credential import CredentialType, Credential
from awx.api.versioning import reverse from awx.api.versioning import reverse
@@ -159,7 +160,8 @@ def test_create_as_admin(get, post, admin):
response = get(reverse('api:credential_type_list'), admin) response = get(reverse('api:credential_type_list'), admin)
assert response.data['count'] == 1 assert response.data['count'] == 1
assert response.data['results'][0]['name'] == 'Custom Credential Type' assert response.data['results'][0]['name'] == 'Custom Credential Type'
assert response.data['results'][0]['inputs'] == {} # Serializer normalizes empty inputs to {'fields': []}
assert response.data['results'][0]['inputs'] == {'fields': []}
assert response.data['results'][0]['injectors'] == {} assert response.data['results'][0]['injectors'] == {}
assert response.data['results'][0]['managed'] is False assert response.data['results'][0]['managed'] is False
@@ -474,3 +476,98 @@ def test_credential_type_rbac_external_test(post, alice, admin, credentialtype_e
data = {'inputs': {}, 'metadata': {}} data = {'inputs': {}, 'metadata': {}}
assert post(url, data, admin).status_code == 202 assert post(url, data, admin).status_code == 202
assert post(url, data, alice).status_code == 403 assert post(url, data, alice).status_code == 403
# --- Tests for internal field filtering with None/invalid inputs ---
@pytest.mark.django_db
def test_credential_type_with_none_inputs(get, admin):
"""Test that credential type with empty inputs dict works correctly."""
# Create a credential type with empty dict
ct = CredentialType.objects.create(
kind='cloud',
name='Test Type',
managed=False,
inputs={}, # Empty dict, not None (DB has NOT NULL constraint)
)
url = reverse('api:credential_type_detail', kwargs={'pk': ct.pk})
response = get(url, admin)
assert response.status_code == 200
# Should have normalized inputs to empty dict
assert 'inputs' in response.data
assert isinstance(response.data['inputs'], dict)
assert response.data['inputs']['fields'] == []
@pytest.mark.django_db
def test_credential_type_with_invalid_inputs_type(get, admin):
"""Test that credential type with non-dict inputs doesn't cause errors."""
# Create a credential type with invalid inputs type
ct = CredentialType.objects.create(kind='cloud', name='Test Type', managed=False, inputs={'fields': 'not-a-list'})
url = reverse('api:credential_type_detail', kwargs={'pk': ct.pk})
response = get(url, admin)
assert response.status_code == 200
# Should gracefully handle invalid fields type
assert 'inputs' in response.data
assert response.data['inputs']['fields'] == []
@pytest.mark.django_db
def test_credential_type_filters_internal_fields(get, admin):
"""Test that internal fields are filtered from API responses."""
ct = CredentialType.objects.create(
kind='cloud',
name='Test OIDC Type',
managed=False,
inputs={
'fields': [
{'id': 'url', 'label': 'URL', 'type': 'string'},
{'id': 'token', 'label': 'Token', 'type': 'string', 'secret': True, 'internal': True},
{'id': 'public_field', 'label': 'Public', 'type': 'string'},
]
},
)
url = reverse('api:credential_type_detail', kwargs={'pk': ct.pk})
with feature_flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'):
response = get(url, admin)
assert response.status_code == 200
field_ids = [f['id'] for f in response.data['inputs']['fields']]
# Internal field should be filtered out
assert 'token' not in field_ids
assert 'url' in field_ids
assert 'public_field' in field_ids
@pytest.mark.django_db
def test_credential_type_list_filters_internal_fields(get, admin):
"""Test that internal fields are filtered in list view."""
CredentialType.objects.create(
kind='cloud',
name='Test OIDC Type',
managed=False,
inputs={
'fields': [
{'id': 'url', 'label': 'URL', 'type': 'string'},
{'id': 'workload_identity_token', 'label': 'Token', 'type': 'string', 'secret': True, 'internal': True},
]
},
)
url = reverse('api:credential_type_list')
with feature_flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'):
response = get(url, admin)
assert response.status_code == 200
# Find our credential type in the results
test_ct = next((ct for ct in response.data['results'] if ct['name'] == 'Test OIDC Type'), None)
assert test_ct is not None
field_ids = [f['id'] for f in test_ct['inputs']['fields']]
# Internal field should be filtered out
assert 'workload_identity_token' not in field_ids
assert 'url' in field_ids

View File

@@ -0,0 +1,259 @@
"""
Tests for OIDC workload identity credential test endpoints.
Tests the /api/v2/credentials/<id>/test/ and /api/v2/credential_types/<id>/test/
endpoints when used with OIDC-enabled credential types.
"""
import pytest
from unittest import mock
from django.test import override_settings
from awx.main.models import Credential, CredentialType, JobTemplate
from awx.api.versioning import reverse
@pytest.fixture
def job_template(organization, project):
"""Job template with organization and project for OIDC JWT generation."""
return JobTemplate.objects.create(name='test-jt', organization=organization, project=project, playbook='helloworld.yml')
@pytest.fixture
def oidc_credentialtype():
"""Create a credential type with workload_identity_token internal field."""
oidc_type_inputs = {
'fields': [
{'id': 'url', 'label': 'Vault URL', 'type': 'string', 'help_text': 'The Vault server URL.'},
{'id': 'auth_path', 'label': 'Auth Path', 'type': 'string', 'help_text': 'JWT auth mount path.'},
{'id': 'role_id', 'label': 'Role ID', 'type': 'string', 'help_text': 'Vault role.'},
{'id': 'jwt_aud', 'label': 'JWT Audience', 'type': 'string', 'help_text': 'Expected audience.'},
{'id': 'workload_identity_token', 'label': 'Workload Identity Token', 'type': 'string', 'secret': True, 'internal': True},
],
'metadata': [
{'id': 'secret_path', 'label': 'Secret Path', 'type': 'string'},
{'id': 'job_template_id', 'label': 'Job Template ID', 'type': 'string'},
],
'required': ['url', 'auth_path', 'role_id'],
}
class MockPlugin(object):
def backend(self, **kwargs):
# Simulate successful backend call
return 'secret'
with mock.patch('awx.main.models.credential.CredentialType.plugin', new_callable=mock.PropertyMock) as mock_plugin:
mock_plugin.return_value = MockPlugin()
oidc_type = CredentialType(kind='external', managed=True, namespace='hashivault-kv-oidc', name='HashiCorp Vault KV (OIDC)', inputs=oidc_type_inputs)
oidc_type.save()
yield oidc_type
@pytest.fixture
def oidc_credential(oidc_credentialtype):
"""Create a credential using the OIDC credential type."""
return Credential.objects.create(
credential_type=oidc_credentialtype,
name='oidc-vault-cred',
inputs={'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'},
)
@pytest.fixture
def mock_oidc_backend():
"""Fixture that mocks OIDC JWT generation and credential backend."""
with mock.patch('awx.api.views.retrieve_workload_identity_jwt_with_claims') as mock_jwt, mock.patch('awx.api.views._jwt_decode') as mock_decode, mock.patch(
'awx.main.models.credential.CredentialType.plugin', new_callable=mock.PropertyMock
) as mock_plugin:
# Set default return values
mock_jwt.return_value = 'fake.jwt.token'
mock_decode.return_value = {'iss': 'http://gateway/o', 'aud': 'vault'}
# Create mock backend
mock_backend = mock.MagicMock()
mock_backend.backend.return_value = 'secret'
mock_plugin.return_value = mock_backend
# Yield all mocks for test customization
yield {
'jwt': mock_jwt,
'decode': mock_decode,
'plugin': mock_plugin,
'backend': mock_backend,
}
# --- Tests for CredentialExternalTest endpoint ---
@pytest.mark.django_db
@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=False)
def test_credential_test_without_oidc_feature_flag(post, admin, oidc_credential):
"""Test that credential test works without OIDC feature flag enabled."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': '1'}}
with mock.patch('awx.main.models.credential.CredentialType.plugin', new_callable=mock.PropertyMock) as mock_plugin:
mock_backend = mock.MagicMock()
mock_backend.backend.return_value = 'secret'
mock_plugin.return_value = mock_backend
response = post(url, data, admin)
assert response.status_code == 202
# Should not contain JWT payload when feature flag is disabled
assert 'details' not in response.data or 'sent_jwt_payload' not in response.data.get('details', {})
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
@pytest.mark.parametrize(
'job_template_id, expected_error',
[
(None, 'Job template ID is required'),
('not-an-integer', 'must be an integer'),
('99999', 'does not exist'),
],
ids=['missing_job_template_id', 'invalid_job_template_id_type', 'nonexistent_job_template_id'],
)
def test_credential_test_job_template_validation(mock_flag, post, admin, oidc_credential, job_template_id, expected_error):
"""Test that invalid job_template_id values return 400 with appropriate error messages."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret'}}
if job_template_id is not None:
data['metadata']['job_template_id'] = job_template_id
response = post(url, data, admin)
assert response.status_code == 400
assert 'details' in response.data
assert 'error_message' in response.data['details']
assert expected_error in response.data['details']['error_message']
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_no_access_to_job_template(mock_flag, post, alice, oidc_credential, job_template):
"""Test that user without access to job template gets 403."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
# Give alice use permission on credential but not on job template
oidc_credential.use_role.members.add(alice)
response = post(url, data, alice)
assert response.status_code == 403
assert 'You do not have access to job template' in str(response.data)
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_success_returns_jwt_payload(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend):
"""Test that successful test returns JWT payload in response."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
# Customize mock for this test
mock_oidc_backend['decode'].return_value = {
'iss': 'http://gateway/o',
'sub': 'system:serviceaccount:default:awx-operator',
'aud': 'vault',
'job_template_id': job_template.id,
}
response = post(url, data, admin)
assert response.status_code == 202
assert 'details' in response.data
assert 'sent_jwt_payload' in response.data['details']
assert response.data['details']['sent_jwt_payload']['job_template_id'] == job_template.id
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_backend_failure_returns_jwt_and_error(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend):
"""Test that backend failure still returns JWT payload along with error message."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
# Make backend fail
mock_oidc_backend['backend'].backend.side_effect = RuntimeError('Connection failed')
response = post(url, data, admin)
assert response.status_code == 400
assert 'details' in response.data
# Both JWT payload and error message should be present
assert 'sent_jwt_payload' in response.data['details']
assert 'error_message' in response.data['details']
assert 'Connection failed' in response.data['details']['error_message']
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_jwt_generation_failure(mock_flag, post, admin, oidc_credential, job_template):
"""Test that JWT generation failure returns error without JWT payload."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
with mock.patch('awx.api.views.OIDCCredentialTestMixin._get_workload_identity_token') as mock_jwt:
mock_jwt.side_effect = RuntimeError('Failed to generate JWT')
response = post(url, data, admin)
assert response.status_code == 400
assert 'details' in response.data
assert 'error_message' in response.data['details']
assert 'Failed to generate JWT' in response.data['details']['error_message']
# No JWT payload when generation fails
assert 'sent_jwt_payload' not in response.data['details']
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_job_template_id_not_passed_to_backend(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend):
"""Test that job_template_id and jwt_aud are removed from backend_kwargs."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
response = post(url, data, admin)
assert response.status_code == 202
# Check that backend was called without job_template_id or jwt_aud
call_kwargs = mock_oidc_backend['backend'].backend.call_args[1]
assert 'job_template_id' not in call_kwargs
assert 'jwt_aud' not in call_kwargs
assert 'workload_identity_token' in call_kwargs
# --- Tests for CredentialTypeExternalTest endpoint ---
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_type_test_missing_job_template_id(mock_flag, post, admin, oidc_credentialtype):
"""Test that missing job_template_id returns 400 for credential type test endpoint."""
url = reverse('api:credential_type_external_test', kwargs={'pk': oidc_credentialtype.pk})
data = {
'inputs': {'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'},
'metadata': {'secret_path': 'test/secret'},
}
response = post(url, data, admin)
assert response.status_code == 400
assert 'details' in response.data
assert 'error_message' in response.data['details']
assert 'Job template ID is required' in response.data['details']['error_message']
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_type_test_success_returns_jwt_payload(mock_flag, post, admin, oidc_credentialtype, job_template, mock_oidc_backend):
"""Test that successful credential type test returns JWT payload."""
url = reverse('api:credential_type_external_test', kwargs={'pk': oidc_credentialtype.pk})
data = {
'inputs': {'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'},
'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)},
}
response = post(url, data, admin)
assert response.status_code == 202
assert 'details' in response.data
assert 'sent_jwt_payload' in response.data['details']

View File

@@ -305,6 +305,47 @@ class TestINIImports:
has_host_group = inventory.groups.get(name='has_a_host') has_host_group = inventory.groups.get(name='has_a_host')
assert has_host_group.hosts.count() == 1 assert has_host_group.hosts.count() == 1
@mock.patch.object(inventory_import, 'AnsibleInventoryLoader', MockLoader)
def test_overwrite_removes_stale_memberships(self, inventory):
"""When overwrite is enabled, host-group and group-group memberships
that are no longer in the imported data should be removed."""
# First import: parent_group has two children, host_group has two hosts
inventory_import.AnsibleInventoryLoader._data = {
"_meta": {"hostvars": {"host1": {}, "host2": {}}},
"all": {"children": ["ungrouped", "parent_group", "child_a", "child_b", "host_group"]},
"parent_group": {"children": ["child_a", "child_b"]},
"host_group": {"hosts": ["host1", "host2"]},
"ungrouped": {"hosts": []},
}
cmd = inventory_import.Command()
cmd.handle(inventory_id=inventory.pk, source=__file__, overwrite=True)
parent = inventory.groups.get(name='parent_group')
assert set(parent.children.values_list('name', flat=True)) == {'child_a', 'child_b'}
host_grp = inventory.groups.get(name='host_group')
assert set(host_grp.hosts.values_list('name', flat=True)) == {'host1', 'host2'}
# Second import: child_b removed from parent_group, host2 moved out of host_group
inventory_import.AnsibleInventoryLoader._data = {
"_meta": {"hostvars": {"host1": {}, "host2": {}}},
"all": {"children": ["ungrouped", "parent_group", "child_a", "child_b", "host_group"]},
"parent_group": {"children": ["child_a"]},
"host_group": {"hosts": ["host1"]},
"ungrouped": {"hosts": ["host2"]},
}
cmd = inventory_import.Command()
cmd.handle(inventory_id=inventory.pk, source=__file__, overwrite=True)
parent.refresh_from_db()
host_grp.refresh_from_db()
# child_b should be removed from parent_group
assert set(parent.children.values_list('name', flat=True)) == {'child_a'}
# host2 should be removed from host_group
assert set(host_grp.hosts.values_list('name', flat=True)) == {'host1'}
# host2 and child_b should still exist in the inventory, just not in those groups
assert inventory.hosts.filter(name='host2').exists()
assert inventory.groups.filter(name='child_b').exists()
@mock.patch.object(inventory_import, 'AnsibleInventoryLoader', MockLoader) @mock.patch.object(inventory_import, 'AnsibleInventoryLoader', MockLoader)
def test_recursive_group_error(self, inventory): def test_recursive_group_error(self, inventory):
inventory_import.AnsibleInventoryLoader._data = { inventory_import.AnsibleInventoryLoader._data = {

View File

@@ -0,0 +1,206 @@
import json
import pytest
from awx.main.tests.live.tests.conftest import wait_for_job
from awx.main.models import JobTemplate, WorkflowJobTemplate, WorkflowJobTemplateNode
JT_NAMES = ('artifact-test-first', 'artifact-test-second', 'artifact-test-reader')
WFT_NAMES = ('artifact-test-outer-wf', 'artifact-test-inner-wf')
@pytest.mark.django_db(transaction=True)
def test_nested_workflow_set_stats_precedence(live_tmp_folder, demo_inv, project_factory, default_org):
"""Reproducer for set_stats artifacts from an outer workflow leaking into
an inner (child) workflow and overriding the inner workflow's own artifacts.
Outer WF: [job_first] --success--> [inner_wf]
Inner WF: [job_second] --success--> [job_reader]
job_first sets via set_stats:
var1: "outer-only" (only source, should propagate through)
var2: "should-be-overridden" (will be overridden by job_second)
job_second sets via set_stats:
var2: "from-inner" (should override outer's value)
var3: "inner-only" (only source, should be available)
job_reader runs debug.yml (no set_stats), we inspect its extra_vars:
var1 should be "outer-only" - outer artifacts propagate when uncontested
var2 should be "from-inner" - inner artifacts override outer (THE BUG)
var3 should be "inner-only" - inner-only artifacts propagate normally
"""
# Clean up resources from prior runs (delete individually for signals)
for name in WFT_NAMES:
for wft in WorkflowJobTemplate.objects.filter(name=name):
wft.delete()
for name in JT_NAMES:
for jt in JobTemplate.objects.filter(name=name):
jt.delete()
proj = project_factory(scm_url=f'file://{live_tmp_folder}/debug')
if proj.current_job:
wait_for_job(proj.current_job)
# job_first: sets var1 (outer-only) and var2 (to be overridden by inner)
jt_first = JobTemplate.objects.create(
name='artifact-test-first',
project=proj,
playbook='set_stats.yml',
inventory=demo_inv,
extra_vars=json.dumps({'stats_data': {'var1': 'outer-only', 'var2': 'should-be-overridden'}}),
)
# job_second: overrides var2, introduces var3
jt_second = JobTemplate.objects.create(
name='artifact-test-second',
project=proj,
playbook='set_stats.yml',
inventory=demo_inv,
extra_vars=json.dumps({'stats_data': {'var2': 'from-inner', 'var3': 'inner-only'}}),
)
# job_reader: just runs, we check what extra_vars it receives
jt_reader = JobTemplate.objects.create(
name='artifact-test-reader',
project=proj,
playbook='debug.yml',
inventory=demo_inv,
)
# Inner WFT: job_second -> job_reader
inner_wft = WorkflowJobTemplate.objects.create(name='artifact-test-inner-wf', organization=default_org)
inner_node_1 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=inner_wft,
unified_job_template=jt_second,
identifier='second',
)
inner_node_2 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=inner_wft,
unified_job_template=jt_reader,
identifier='reader',
)
inner_node_1.success_nodes.add(inner_node_2)
# Outer WFT: job_first -> inner_wf
outer_wft = WorkflowJobTemplate.objects.create(name='artifact-test-outer-wf', organization=default_org)
outer_node_1 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=outer_wft,
unified_job_template=jt_first,
identifier='first',
)
outer_node_2 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=outer_wft,
unified_job_template=inner_wft,
identifier='inner',
)
outer_node_1.success_nodes.add(outer_node_2)
# Launch and wait
outer_wfj = outer_wft.create_unified_job()
outer_wfj.signal_start()
wait_for_job(outer_wfj, running_timeout=120)
# Find the reader job inside the inner workflow
inner_wf_node = outer_wfj.workflow_job_nodes.get(identifier='inner')
inner_wfj = inner_wf_node.job
assert inner_wfj is not None, 'Inner workflow job was never created'
# Check that root node of inner WF (job_second) received outer artifacts
second_node = inner_wfj.workflow_job_nodes.get(identifier='second')
assert second_node.job is not None, 'Second job was never created'
second_extra_vars = json.loads(second_node.job.extra_vars)
assert second_extra_vars.get('var1') == 'outer-only', (
f'Root node var1: expected "outer-only" (outer artifact should be available to root node), '
f'got "{second_extra_vars.get("var1")}". '
f'Outer artifacts are not reaching root nodes of child workflows.'
)
reader_node = inner_wfj.workflow_job_nodes.get(identifier='reader')
assert reader_node.job is not None, 'Reader job was never created'
reader_extra_vars = json.loads(reader_node.job.extra_vars)
# var1: only set by outer job_first, no conflict — should propagate through
assert reader_extra_vars.get('var1') == 'outer-only', f'var1: expected "outer-only" (uncontested outer artifact), ' f'got "{reader_extra_vars.get("var1")}"'
# var2: set by outer as "should-be-overridden", then by inner as "from-inner"
# Inner workflow's own ancestor artifacts should take precedence
assert reader_extra_vars.get('var2') == 'from-inner', (
f'var2: expected "from-inner" (inner workflow artifact should override outer), '
f'got "{reader_extra_vars.get("var2")}". '
f'Outer workflow artifacts are leaking via wj_special_vars. '
f'reader node ancestor_artifacts={reader_node.ancestor_artifacts}'
)
# var3: only set by inner job_second — should propagate normally
assert reader_extra_vars.get('var3') == 'inner-only', f'var3: expected "inner-only" (inner-only artifact), ' f'got "{reader_extra_vars.get("var3")}"'
@pytest.mark.django_db(transaction=True)
def test_workflow_extra_vars_override_artifacts(live_tmp_folder, demo_inv, project_factory, default_org):
"""Workflow extra_vars should take precedence over set_stats artifacts
within a single (non-nested) workflow.
WF (extra_vars: my_var="from-wf-extra-vars"):
[job_setter] --success--> [job_reader]
job_setter sets my_var="from-set-stats" via set_stats
job_reader should see my_var="from-wf-extra-vars" because workflow
extra_vars are higher precedence than ancestor artifacts.
"""
wft_name = 'artifact-test-wf-extra-vars-precedence'
jt_names = ('artifact-test-setter', 'artifact-test-checker')
for wft in WorkflowJobTemplate.objects.filter(name=wft_name):
wft.delete()
for name in jt_names:
for jt in JobTemplate.objects.filter(name=name):
jt.delete()
proj = project_factory(scm_url=f'file://{live_tmp_folder}/debug')
if proj.current_job:
wait_for_job(proj.current_job)
jt_setter = JobTemplate.objects.create(
name='artifact-test-setter',
project=proj,
playbook='set_stats.yml',
inventory=demo_inv,
extra_vars=json.dumps({'stats_data': {'my_var': 'from-set-stats'}}),
)
jt_checker = JobTemplate.objects.create(
name='artifact-test-checker',
project=proj,
playbook='debug.yml',
inventory=demo_inv,
)
wft = WorkflowJobTemplate.objects.create(
name=wft_name,
organization=default_org,
extra_vars=json.dumps({'my_var': 'from-wf-extra-vars'}),
)
node_1 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=wft,
unified_job_template=jt_setter,
identifier='setter',
)
node_2 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=wft,
unified_job_template=jt_checker,
identifier='checker',
)
node_1.success_nodes.add(node_2)
wfj = wft.create_unified_job()
wfj.signal_start()
wait_for_job(wfj, running_timeout=120)
checker_node = wfj.workflow_job_nodes.get(identifier='checker')
assert checker_node.job is not None, 'Checker job was never created'
checker_extra_vars = json.loads(checker_node.job.extra_vars)
assert checker_extra_vars.get('my_var') == 'from-wf-extra-vars', (
f'Expected my_var="from-wf-extra-vars" (workflow extra_vars should override artifacts), '
f'got my_var="{checker_extra_vars.get("my_var")}". '
f'checker node ancestor_artifacts={checker_node.ancestor_artifacts}'
)

View File

@@ -2,7 +2,11 @@
import pytest import pytest
from types import SimpleNamespace
from unittest import mock
from awx.main.models import Credential, CredentialType from awx.main.models import Credential, CredentialType
from awx.main.models.credential import CredentialTypeHelper, ManagedCredentialType
from django.apps import apps from django.apps import apps
@@ -78,3 +82,53 @@ def test_credential_context_property_independent_instances():
assert cred1.context == {'key1': 'value1'} assert cred1.context == {'key1': 'value1'}
assert cred2.context == {'key2': 'value2'} assert cred2.context == {'key2': 'value2'}
assert cred1.context is not cred2.context assert cred1.context is not cred2.context
def test_load_plugin_passes_description():
plugin = SimpleNamespace(name='test_plugin', inputs={'fields': []}, backend=None, plugin_description='A test plugin')
CredentialType.load_plugin('test_ns', plugin)
entry = ManagedCredentialType.registry['test_ns']
assert entry.description == 'A test plugin'
del ManagedCredentialType.registry['test_ns']
def test_load_plugin_missing_description():
plugin = SimpleNamespace(name='test_plugin', inputs={'fields': []}, backend=None)
CredentialType.load_plugin('test_ns', plugin)
entry = ManagedCredentialType.registry['test_ns']
assert entry.description == ''
del ManagedCredentialType.registry['test_ns']
def test_get_creation_params_external_includes_description():
cred_type = SimpleNamespace(namespace='test_ns', kind='external', name='Test', description='My description')
params = CredentialTypeHelper.get_creation_params(cred_type)
assert params['description'] == 'My description'
def test_get_creation_params_external_missing_description():
cred_type = SimpleNamespace(namespace='test_ns', kind='external', name='Test')
params = CredentialTypeHelper.get_creation_params(cred_type)
assert params['description'] == ''
@pytest.mark.django_db
def test_setup_tower_managed_defaults_updates_description():
registry_entry = SimpleNamespace(
namespace='test_ns',
kind='external',
name='Test Plugin',
inputs={'fields': []},
backend=None,
description='Updated description',
)
# Create an existing credential type with no description
ct = CredentialType.objects.create(name='Test Plugin', kind='external', namespace='old_ns')
assert ct.description == ''
with mock.patch.dict(ManagedCredentialType.registry, {'test_ns': registry_entry}, clear=True):
CredentialType._setup_tower_managed_defaults()
ct.refresh_from_db()
assert ct.description == 'Updated description'
assert ct.namespace == 'test_ns'

View File

@@ -473,7 +473,7 @@ def test_populate_claims_for_adhoc_command(workload_attrs, expected_claims):
assert claims == expected_claims assert claims == expected_claims
@mock.patch('awx.main.tasks.jobs.get_workload_identity_client') @mock.patch('awx.main.utils.workload_identity.get_workload_identity_client')
def test_retrieve_workload_identity_jwt_returns_jwt_from_client(mock_get_client): def test_retrieve_workload_identity_jwt_returns_jwt_from_client(mock_get_client):
"""retrieve_workload_identity_jwt returns the JWT string from the client.""" """retrieve_workload_identity_jwt returns the JWT string from the client."""
mock_client = mock.MagicMock() mock_client = mock.MagicMock()
@@ -502,7 +502,7 @@ def test_retrieve_workload_identity_jwt_returns_jwt_from_client(mock_get_client)
assert call_kwargs['claims'][AutomationControllerJobScope.CLAIM_JOB_NAME] == 'Test Job' assert call_kwargs['claims'][AutomationControllerJobScope.CLAIM_JOB_NAME] == 'Test Job'
@mock.patch('awx.main.tasks.jobs.get_workload_identity_client') @mock.patch('awx.main.utils.workload_identity.get_workload_identity_client')
def test_retrieve_workload_identity_jwt_passes_audience_and_scope(mock_get_client): def test_retrieve_workload_identity_jwt_passes_audience_and_scope(mock_get_client):
"""retrieve_workload_identity_jwt passes audience and scope to the client.""" """retrieve_workload_identity_jwt passes audience and scope to the client."""
mock_client = mock.MagicMock() mock_client = mock.MagicMock()
@@ -518,7 +518,7 @@ def test_retrieve_workload_identity_jwt_passes_audience_and_scope(mock_get_clien
mock_client.request_workload_jwt.assert_called_once_with(claims={'job_id': 1}, scope=scope, audience=audience) mock_client.request_workload_jwt.assert_called_once_with(claims={'job_id': 1}, scope=scope, audience=audience)
@mock.patch('awx.main.tasks.jobs.get_workload_identity_client') @mock.patch('awx.main.utils.workload_identity.get_workload_identity_client')
def test_retrieve_workload_identity_jwt_passes_workload_ttl(mock_get_client): def test_retrieve_workload_identity_jwt_passes_workload_ttl(mock_get_client):
"""retrieve_workload_identity_jwt passes workload_ttl_seconds when provided.""" """retrieve_workload_identity_jwt passes workload_ttl_seconds when provided."""
mock_client = mock.Mock() mock_client = mock.Mock()
@@ -542,7 +542,7 @@ def test_retrieve_workload_identity_jwt_passes_workload_ttl(mock_get_client):
) )
@mock.patch('awx.main.tasks.jobs.get_workload_identity_client') @mock.patch('awx.main.utils.workload_identity.get_workload_identity_client')
def test_retrieve_workload_identity_jwt_raises_when_client_not_configured(mock_get_client): def test_retrieve_workload_identity_jwt_raises_when_client_not_configured(mock_get_client):
"""retrieve_workload_identity_jwt raises RuntimeError when client is None.""" """retrieve_workload_identity_jwt raises RuntimeError when client is None."""
mock_get_client.return_value = None mock_get_client.return_value = None
@@ -590,3 +590,67 @@ def test_populate_workload_identity_tokens_passes_get_instance_timeout_to_client
scope=AutomationControllerJobScope.name, scope=AutomationControllerJobScope.name,
workload_ttl_seconds=expected_ttl, workload_ttl_seconds=expected_ttl,
) )
class TestRunInventoryUpdatePopulateWorkloadIdentityTokens:
"""Tests for RunInventoryUpdate.populate_workload_identity_tokens."""
def test_cloud_credential_passed_as_additional_credential(self):
"""The cloud credential is forwarded to super().populate_workload_identity_tokens via additional_credentials."""
cloud_cred = mock.MagicMock(name='cloud_cred')
cloud_cred.context = {}
task = jobs.RunInventoryUpdate()
task.instance = mock.MagicMock()
task.instance.get_cloud_credential.return_value = cloud_cred
task._credentials = []
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
task.populate_workload_identity_tokens()
mock_super.assert_called_once_with(additional_credentials=[cloud_cred])
def test_no_cloud_credential_calls_super_with_none(self):
"""When there is no cloud credential, super() is called with additional_credentials=None."""
task = jobs.RunInventoryUpdate()
task.instance = mock.MagicMock()
task.instance.get_cloud_credential.return_value = None
task._credentials = []
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
task.populate_workload_identity_tokens()
mock_super.assert_called_once_with(additional_credentials=None)
def test_additional_credentials_combined_with_cloud_credential(self):
"""Caller-supplied additional_credentials are combined with the cloud credential."""
cloud_cred = mock.MagicMock(name='cloud_cred')
cloud_cred.context = {}
extra_cred = mock.MagicMock(name='extra_cred')
task = jobs.RunInventoryUpdate()
task.instance = mock.MagicMock()
task.instance.get_cloud_credential.return_value = cloud_cred
task._credentials = []
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
task.populate_workload_identity_tokens(additional_credentials=[extra_cred])
mock_super.assert_called_once_with(additional_credentials=[extra_cred, cloud_cred])
def test_cloud_credential_override_after_context_set(self):
"""After OIDC processing, get_cloud_credential is overridden on the instance when context is populated."""
cloud_cred = mock.MagicMock(name='cloud_cred')
# Simulate that super().populate_workload_identity_tokens populates context
cloud_cred.context = {'workload_identity_token': 'eyJ.test.jwt'}
task = jobs.RunInventoryUpdate()
task.instance = mock.MagicMock()
task.instance.get_cloud_credential.return_value = cloud_cred
task._credentials = []
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens'):
task.populate_workload_identity_tokens()
# The instance's get_cloud_credential should now return the same object with context
assert task.instance.get_cloud_credential() is cloud_cred

View File

@@ -0,0 +1,22 @@
from ansible_base.resource_registry.workload_identity_client import get_workload_identity_client
__all__ = ['retrieve_workload_identity_jwt_with_claims']
def retrieve_workload_identity_jwt_with_claims(
claims: dict,
audience: str,
scope: str,
workload_ttl_seconds: int | None = None,
) -> str:
"""Retrieve JWT token from workload claims.
Raises:
RuntimeError: if the workload identity client is not configured.
"""
client = get_workload_identity_client()
if client is None:
raise RuntimeError("Workload identity client is not configured")
kwargs = {"claims": claims, "scope": scope, "audience": audience}
if workload_ttl_seconds:
kwargs["workload_ttl_seconds"] = workload_ttl_seconds
return client.request_workload_jwt(**kwargs).jwt

View File

@@ -34,9 +34,6 @@ def get_urlpatterns(prefix=None):
re_path(r'^(?:api/)?500.html$', handle_500), re_path(r'^(?:api/)?500.html$', handle_500),
re_path(r'^csp-violation/', handle_csp_violation), re_path(r'^csp-violation/', handle_csp_violation),
re_path(r'^login/', handle_login_redirect), re_path(r'^login/', handle_login_redirect),
# want api/v2/doesnotexist to return a 404, not match the ui urls,
# so use a negative lookahead assertion here
re_path(r'^(?!api/).*', include('awx.ui.urls', namespace='ui')),
] ]
if settings.DYNACONF.is_development_mode: if settings.DYNACONF.is_development_mode:
@@ -47,6 +44,12 @@ def get_urlpatterns(prefix=None):
except ImportError: except ImportError:
pass pass
# want api/v2/doesnotexist to return a 404, not match the ui urls,
# so use a negative lookahead assertion in the pattern below
urlpatterns += [
re_path(r'^(?!api/).*', include('awx.ui.urls', namespace='ui')),
]
return urlpatterns return urlpatterns

View File

@@ -276,6 +276,7 @@ options:
- '' - ''
- 'github' - 'github'
- 'gitlab' - 'gitlab'
- 'bitbucket_dc'
webhook_credential: webhook_credential:
description: description:
- Personal Access Token for posting back the status to the service API - Personal Access Token for posting back the status to the service API
@@ -436,7 +437,7 @@ def main():
scm_branch=dict(), scm_branch=dict(),
ask_scm_branch_on_launch=dict(type='bool'), ask_scm_branch_on_launch=dict(type='bool'),
job_slice_count=dict(type='int'), job_slice_count=dict(type='int'),
webhook_service=dict(choices=['github', 'gitlab', '']), webhook_service=dict(choices=['github', 'gitlab', 'bitbucket_dc', '']),
webhook_credential=dict(), webhook_credential=dict(),
labels=dict(type="list", elements='str'), labels=dict(type="list", elements='str'),
notification_templates_started=dict(type="list", elements='str'), notification_templates_started=dict(type="list", elements='str'),

View File

@@ -117,6 +117,7 @@ options:
choices: choices:
- github - github
- gitlab - gitlab
- bitbucket_dc
webhook_credential: webhook_credential:
description: description:
- Personal Access Token for posting back the status to the service API - Personal Access Token for posting back the status to the service API
@@ -828,7 +829,7 @@ def main():
ask_inventory_on_launch=dict(type='bool'), ask_inventory_on_launch=dict(type='bool'),
ask_scm_branch_on_launch=dict(type='bool'), ask_scm_branch_on_launch=dict(type='bool'),
ask_limit_on_launch=dict(type='bool'), ask_limit_on_launch=dict(type='bool'),
webhook_service=dict(choices=['github', 'gitlab']), webhook_service=dict(choices=['github', 'gitlab', 'bitbucket_dc']),
webhook_credential=dict(), webhook_credential=dict(),
labels=dict(type="list", elements='str'), labels=dict(type="list", elements='str'),
notification_templates_started=dict(type="list", elements='str'), notification_templates_started=dict(type="list", elements='str'),

View File

@@ -0,0 +1,124 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import pytest
from awx.main.models import JobTemplate, WorkflowJobTemplate
# The backend supports these webhook services on job/workflow templates
# (see awx/main/models/mixins.py). The collection modules must accept all of
# them in their argument_spec ``choices`` list. This test guards against the
# module's choices drifting from the backend -- see AAP-45980, where
# ``bitbucket_dc`` had been supported by the API since migration 0188 but was
# still being rejected by the job_template/workflow_job_template modules.
WEBHOOK_SERVICES = ['github', 'gitlab', 'bitbucket_dc']
@pytest.mark.django_db
@pytest.mark.parametrize('webhook_service', WEBHOOK_SERVICES)
def test_job_template_accepts_webhook_service(run_module, admin_user, project, inventory, webhook_service):
result = run_module(
'job_template',
{
'name': 'foo',
'playbook': 'helloworld.yml',
'project': project.name,
'inventory': inventory.name,
'webhook_service': webhook_service,
'state': 'present',
},
admin_user,
)
assert not result.get('failed', False), result.get('msg', result)
assert result.get('changed', False), result
jt = JobTemplate.objects.get(name='foo')
assert jt.webhook_service == webhook_service
# Re-running with the same args must be a no-op (idempotence).
result = run_module(
'job_template',
{
'name': 'foo',
'playbook': 'helloworld.yml',
'project': project.name,
'inventory': inventory.name,
'webhook_service': webhook_service,
'state': 'present',
},
admin_user,
)
assert not result.get('failed', False), result.get('msg', result)
assert not result.get('changed', True), result
@pytest.mark.django_db
@pytest.mark.parametrize('webhook_service', WEBHOOK_SERVICES)
def test_workflow_job_template_accepts_webhook_service(run_module, admin_user, organization, webhook_service):
result = run_module(
'workflow_job_template',
{
'name': 'foo-workflow',
'organization': organization.name,
'webhook_service': webhook_service,
'state': 'present',
},
admin_user,
)
assert not result.get('failed', False), result.get('msg', result)
assert result.get('changed', False), result
wfjt = WorkflowJobTemplate.objects.get(name='foo-workflow')
assert wfjt.webhook_service == webhook_service
# Re-running with the same args must be a no-op (idempotence).
result = run_module(
'workflow_job_template',
{
'name': 'foo-workflow',
'organization': organization.name,
'webhook_service': webhook_service,
'state': 'present',
},
admin_user,
)
assert not result.get('failed', False), result.get('msg', result)
assert not result.get('changed', True), result
@pytest.mark.django_db
def test_job_template_rejects_unknown_webhook_service(run_module, admin_user, project, inventory):
result = run_module(
'job_template',
{
'name': 'foo',
'playbook': 'helloworld.yml',
'project': project.name,
'inventory': inventory.name,
'webhook_service': 'not_a_real_service',
'state': 'present',
},
admin_user,
)
assert result.get('failed', False), result
assert 'webhook_service' in result.get('msg', '')
@pytest.mark.django_db
def test_workflow_job_template_rejects_unknown_webhook_service(run_module, admin_user, organization):
result = run_module(
'workflow_job_template',
{
'name': 'foo-workflow',
'organization': organization.name,
'webhook_service': 'not_a_real_service',
'state': 'present',
},
admin_user,
)
assert result.get('failed', False), result
assert 'webhook_service' in result.get('msg', '')

View File

@@ -1,5 +1,5 @@
[build-system] [build-system]
requires = ["setuptools>=45", "setuptools_scm[toml]>=6.2"] requires = ["setuptools>=45", "setuptools_scm[toml]>=6.2,<10"]
build-backend = "setuptools.build_meta" build-backend = "setuptools.build_meta"
# Do not uncomment the line below. We need to be able to override the version via a file, and this # Do not uncomment the line below. We need to be able to override the version via a file, and this

View File

@@ -116,7 +116,7 @@ cython==3.1.3
# via -r /awx_devel/requirements/requirements.in # via -r /awx_devel/requirements/requirements.in
daphne==4.2.1 daphne==4.2.1
# via -r /awx_devel/requirements/requirements.in # via -r /awx_devel/requirements/requirements.in
dispatcherd[pg-notify]==2026.02.26 dispatcherd[pg-notify]==2026.3.25
# via -r /awx_devel/requirements/requirements.in # via -r /awx_devel/requirements/requirements.in
distro==1.9.0 distro==1.9.0
# via -r /awx_devel/requirements/requirements.in # via -r /awx_devel/requirements/requirements.in