Compare commits

..

25 Commits

Author SHA1 Message Date
Peter Braun
5e05a40621 Merge branch 'devel' into AAP-18302 2026-04-27 14:59:35 +02:00
Peter Braun
f3b7d442c3 feat: add test to ensure credential secret values are not returned (#16434) 2026-04-27 12:50:51 +00:00
Peter Braun
f4e322ef28 fix: constructed inventories no longer increase the host count 2026-04-27 11:33:24 +02:00
Dirk Julich
376f964a40 [devel backport] AAP-41742: Fix workflow node update failing when JT has unprompted labels (#16426)
* AAP-41742: Fix workflow node update failing when JT has unprompted labels

PATCH extra_data on a workflow node fails with
{"labels":["Field is not configured to prompt on launch."]}
when the node has labels associated but the JT has
ask_labels_on_launch=False.

The serializer was passing all persisted M2M state from prompts_dict()
to _accept_or_ignore_job_kwargs() on every PATCH, re-validating
unchanged fields. Fix scopes validation to only the fields in the
request; full re-validation still occurs when unified_job_template
is being changed.

* Capture attrs keys before _build_mock_obj mutates them

_build_mock_obj() pops pseudo-fields (limit, scm_branch, job_tags,
etc.) from attrs. Computing requested_prompt_fields after the pop
would miss those fields and skip their ask_on_launch validation.

* Include survey_passwords when validating extra_vars prompts

prompts_dict() emits survey_passwords alongside extra_vars.
_accept_or_ignore_job_kwargs uses it to decrypt encrypted survey
values before validation. Without it, encrypted password blobs
are validated as-is against the survey spec.

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-24 16:17:04 +02:00
Peter Braun
c71a49e044 fix: do not include secret values in the credentials test endpoint an… (#16425)
fix: do not include secret values in the credentials test endpoint and add a guard to make sure credentials are testable
2026-04-24 12:35:12 +00:00
Adrià Sala
99ac0d39dc feat: improve unauthorized response on aap deployments (#16422) 2026-04-23 14:35:45 +00:00
Stevenson Michel
55ad29ac68 [Devel] Performance Optimization for Select Hosts Query (#16413)
* Fixed black reformating

* Make test simulate 500k hosts in real world scenario
2026-04-22 12:05:36 -04:00
Alan Rominger
3fd3b741b6 Correctly restrict push actions to ownership repos (#16398)
* Correctly restrict push actions to ownership repos

* Use standard action to see if push actions should run

* Run spec job for 2.6 and higher

* Be even more restrictve, do not push if on a fork
2026-04-21 11:26:04 -04:00
Seth Foster
1636abd669 AAP-71844 Fix rrule fast-forward across DST boundaries (#16407)
Fix rrule fast-forward producing wrong occurrences across DST boundaries

The UTC round-trip in _fast_forward_rrule shifts the dtstart's local
hour when the original and fast-forwarded times are in different DST
periods. Since dateutil generates HOURLY occurrences by stepping in
local time, the shifted hour changes the set of reachable hours. With
BYHOUR constraints this causes a ValueError crash; without BYHOUR,
occurrences are silently shifted by 1 hour.

Fix by performing all arithmetic in the dtstart's original timezone.
Python aware-datetime subtraction already computes absolute elapsed
time regardless of timezone, so the UTC conversion was unnecessary
for correctness and actively harmful during fall-back ambiguity.

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-21 10:54:42 -04:00
Sean Sullivan
d21e0141ce AAP-70257 controller collection should retry transient HTTP errors with exponential backoff. (#16415)
controller collection should retry transient HTTP errors with exponential backoff
2026-04-21 08:12:08 -06:00
Seth Foster
e5bae59f5a fix import for refactored method (#16394)
retrieve_workload_identity_jwt_with_claims is now
in a separate utility file, not in jobs.py

Signed-off-by: Seth Foster <fosterbseth@gmail.com>
2026-04-15 10:47:17 -04:00
Peter Braun
a8afbd1ca3 Aap 45980 (#16395)
* support bitbucket_dc webhooks

* add test

* update docs
2026-04-14 14:00:59 +00:00
Adrià Sala
da996c01a0 feat: integrate awx-tui to the awx_devel image (#16399) 2026-04-14 10:08:19 +02:00
Seth Foster
b8c9ae73cd Fix OIDC workload identity for inventory sync (#16390)
The cloud credential used by inventory updates was not going through
the OIDC workload identity token flow because it lives outside the
normal _credentials list. This overrides populate_workload_identity_tokens
in RunInventoryUpdate to include the cloud credential as an
additional_credentials argument to the base implementation, and
patches get_cloud_credential on the instance so the injector picks up
the credential with OIDC context intact.

Co-authored-by: Alan Rominger <arominge@redhat.com>
Co-authored-by: Dave Mulford <dmulford@redhat.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 16:26:18 -04:00
Stevenson Michel
d71f18fa44 [Devel] Config Endpoint Optimization (#16389)
* Improved performance of the config endpoint by reducing database queries in GET /api/controller/v2/config/
2026-04-09 16:24:03 -04:00
Tong He
e82a4246f3 Bind the install bundle to the ansible.receptor collection 2.0.8 version (#16396) 2026-04-09 17:09:26 +02:00
Daniel Finca
b83019bde6 feat: support for oidc credential /test endpoint (#16370)
Adds support for testing external credentials that use OIDC workload identity tokens.
When FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED is enabled, the /test endpoints return
JWT payload details alongside test results.

- Add OIDC credential test endpoints with job template selection
- Return JWT payload and secret value in test response
- Maintain backward compatibility (detail field for errors)
- Add comprehensive unit and functional tests
- Refactor shared error handling logic

Co-authored-by: Daniel Finca <dfinca@redhat.com>
Co-authored-by: melissalkelly <melissalkelly1@gmail.com>
2026-04-06 15:56:11 -04:00
Alan Rominger
6d94aa84e7 Reorder URLs so that Django debug toolbar can work (#16352)
* Reorder URLs so that Django debug toolbar can work

* Move comment with URL move
2026-04-03 10:22:21 -04:00
Alan Rominger
7155400efc AAP-12516 [option 2] Handle nested workflow artifacts via root node ancestor_artifacts (#16381)
* Add new test for artfact precedence upstream node vs outer workflow

* Fix bugs, upstream artifacts come first for precedence

* Track nested artifacts path through ancestor_artifacts on root nodes

* Fix case where first root node did not get the vars

* touchup comment

* Prevent conflict with sliced jobs hack
2026-04-02 15:18:11 -04:00
melissalkelly
e80ce43f87 Fix workload identity project updates (#16373)
* fix: enable workload identity credentials for project updates

* Add explanatory comment for credential context handling

* Revert build_passwords
2026-03-31 14:48:27 +02:00
Stevenson Michel
595e093bbf [CI-Fix] Pin setuptools_scm<10 to fix api-lint failure (#16376)
Fix CI: Pin setuptools_scm<10 to fix api-lint build failure

setuptools-scm 10.0.5 (with its new vcs-versioning dependency) requires
a [tool.setuptools_scm] or [tool.vcs-versioning] section in pyproject.toml.
AWX intentionally omits this section because it uses a custom version
resolution via setup.cfg (version = attr: awx.get_version). The new major
version of setuptools-scm treats the missing section as a fatal error when
building the sdist in tox's isolated build, causing the linters environment
to fail.

Pinning to <10 restores compatibility with the existing version resolution
strategy.

Failing run: https://github.com/ansible/awx/actions/runs/23744310714
Branch: devel

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 14:37:16 -04:00
TVo
cd7f6f602f Fix OpenAPI schema validation message mismatch (#16372) 2026-03-25 12:36:10 -06:00
Chris Meyers
310dd3e18f Update dispatcherd to version 2026.3.25 (#16369)
Co-authored-by: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 10:57:01 -04:00
Matthew Sandoval
7c75788b0a AAP-67740 Pass plugin_description through to CredentialType.description (#16364)
* Pass plugin_description through to CredentialType.description

Propagate the plugin_description field from credential plugins into the
CredentialType description when loading and creating managed credential
types, including updates to existing records.

Assisted-by: Claude

* Add unit tests for plugin_description passthrough to CredentialType

Tests cover load_plugin, get_creation_params, and
_setup_tower_managed_defaults handling of the description field.

Assisted-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: PabloHiro <palonso@redhat.com>
2026-03-25 11:03:11 +01:00
Peter Braun
ab294385ad fix: avoid delete in loop in inventory import (#16366) 2026-03-24 15:37:59 +00:00
42 changed files with 1967 additions and 230 deletions

55
.github/workflows/_repo-owns-branch.yml vendored Normal file
View File

@@ -0,0 +1,55 @@
---
name: Repo Owns Branch
# Reusable workflow that determines whether the current repository
# owns the current branch for push operations.
#
# Ownership rules:
# - ansible/awx owns: devel, feature_*
# - ansible/tower owns: stable-*, release_*
# - workflow_dispatch is always allowed
#
# All other repo/branch combinations are skipped.
on:
workflow_call:
outputs:
should_run:
description: Whether this repo owns the current branch
value: ${{ jobs.check.outputs.should_run }}
jobs:
check:
runs-on: ubuntu-latest
outputs:
should_run: ${{ steps.check.outputs.should_run }}
steps:
- name: Check branch ownership
id: check
run: |
REPO="${{ github.repository }}"
BRANCH="${{ github.ref_name }}"
EVENT="${{ github.event_name }}"
if [[ "$EVENT" == "workflow_dispatch" ]]; then
echo "should_run=true" >> $GITHUB_OUTPUT
echo "Manual trigger — allowed"
exit 0
fi
# ansible/awx owns devel and feature_* branches
if [[ "$REPO" == "ansible/awx" ]] && [[ "$BRANCH" == "devel" || "$BRANCH" == feature_* ]]; then
echo "should_run=true" >> $GITHUB_OUTPUT
echo "Repository '$REPO' owns branch '$BRANCH'"
exit 0
fi
# ansible/tower owns stable-* and release_* branches
if [[ "$REPO" == "ansible/tower" ]] && [[ "$BRANCH" == stable-* || "$BRANCH" == release_* ]]; then
echo "should_run=true" >> $GITHUB_OUTPUT
echo "Repository '$REPO' owns branch '$BRANCH'"
exit 0
fi
echo "should_run=false" >> $GITHUB_OUTPUT
echo "Repository '$REPO' does not own branch '$BRANCH' — skipping"

View File

@@ -12,7 +12,12 @@ on:
- feature_*
- stable-*
jobs:
check-ownership:
uses: ./.github/workflows/_repo-owns-branch.yml
push-development-images:
needs: check-ownership
if: needs.check-ownership.outputs.should_run == 'true'
runs-on: ubuntu-latest
timeout-minutes: 120
permissions:
@@ -30,12 +35,6 @@ jobs:
make-target: awx-kube-buildx
steps:
- name: Skipping build of awx image for non-awx repository
run: |
echo "Skipping build of awx image for non-awx repository"
exit 0
if: matrix.build-targets.image-name == 'awx' && !endsWith(github.repository, '/awx')
- uses: actions/checkout@v4
with:
show-progress: false

View File

@@ -16,9 +16,16 @@ on:
push:
branches:
- devel
- 'stable-2.[6-9]'
- 'stable-2.[1-9][0-9]'
workflow_dispatch: # Allow manual triggering for testing
jobs:
check-ownership:
uses: ./.github/workflows/_repo-owns-branch.yml
sync-openapi-spec:
needs: check-ownership
if: needs.check-ownership.outputs.should_run == 'true'
name: Sync OpenAPI spec to central repo
runs-on: ubuntu-latest
permissions:

View File

@@ -13,7 +13,12 @@ on:
- feature_**
- stable-**
jobs:
check-ownership:
uses: ./.github/workflows/_repo-owns-branch.yml
push:
needs: check-ownership
if: needs.check-ownership.outputs.should_run == 'true'
runs-on: ubuntu-latest
timeout-minutes: 60
permissions:

View File

@@ -106,6 +106,12 @@ else
DOCKER_KUBE_CACHE_FLAG=$(DOCKER_CACHE)
endif
# AWX TUI variables
AWX_HOST ?= https://localhost:8043
AWX_USER ?= admin
AWX_PASSWORD ?= $$(awk -F"'" '/^admin_password:/{print $$2}' tools/docker-compose/_sources/secrets/admin_password.yml 2>/dev/null || echo "admin")
AWX_VERIFY_SSL ?= false
.PHONY: awx-link clean clean-tmp clean-venv requirements requirements_dev \
update_requirements upgrade_requirements update_requirements_dev \
docker_update_requirements docker_upgrade_requirements docker_update_requirements_dev \
@@ -571,6 +577,20 @@ docker-compose-runtest: awx/projects docker-compose-sources
docker-compose-build-schema: awx/projects docker-compose-sources
$(DOCKER_COMPOSE) -f tools/docker-compose/_sources/docker-compose.yml run --rm --service-ports --no-deps awx_1 make genschema
awx-tui:
@if ! command -v awx-tui > /dev/null 2>&1; then \
$(PYTHON) -m pip install awx-tui; \
fi
@if [ -f "$(HOME)/.config/awx-tui/config.yaml" ]; then \
$(PYTHON) -m awx_tui.main; \
else \
AWX_HOST=$(AWX_HOST) \
AWX_USER=$(AWX_USER) \
AWX_PASSWORD=$(AWX_PASSWORD) \
AWX_VERIFY_SSL=$(AWX_VERIFY_SSL) \
$(PYTHON) -m awx_tui.main --host $(AWX_HOST); \
fi
SCHEMA_DIFF_BASE_FOLDER ?= awx
SCHEMA_DIFF_BASE_BRANCH ?= devel
detect-schema-change: genschema
@@ -581,7 +601,7 @@ detect-schema-change: genschema
validate-openapi-schema: genschema
@echo "Validating OpenAPI schema from schema.json..."
@python3 -c "from openapi_spec_validator import validate; import json; spec = json.load(open('schema.json')); validate(spec); print('✓ OpenAPI Schema is valid!')"
@python3 -c "from openapi_spec_validator import validate; import json; spec = json.load(open('schema.json')); validate(spec); print('✓ Schema is valid')"
docker-compose-clean: awx/projects
$(DOCKER_COMPOSE) -f tools/docker-compose/_sources/docker-compose.yml rm -sf

View File

@@ -272,7 +272,10 @@ class APIView(views.APIView):
response = self.handle_exception(self.__init_request_error__)
if response.status_code == 401:
if response.data and 'detail' in response.data:
response.data['detail'] += _(' To establish a login session, visit') + ' /api/login/.'
if getattr(settings, 'RESOURCE_SERVER__URL', None):
response.data['detail'] += _(' Direct access is not allowed, authenticate via the platform gateway.')
else:
response.data['detail'] += _(' To establish a login session, visit') + ' /api/login/.'
logger.info(status_msg)
else:
logger.warning(status_msg)

View File

@@ -122,7 +122,6 @@ from awx.main.scheduler.task_manager_models import TaskManagerModels
from awx.main.redact import UriCleaner, REPLACE_STR
from awx.main.signals import update_inventory_computed_fields
from awx.main.validators import vars_validate_or_raise
from awx.api.versioning import reverse
@@ -2079,9 +2078,17 @@ class BulkHostCreateSerializer(serializers.Serializer):
if request and not request.user.is_superuser:
if request.user not in inv.admin_role:
raise serializers.ValidationError(_(f'Inventory with id {inv.id} not found or lack permissions to add hosts.'))
current_hostnames = set(inv.hosts.values_list('name', flat=True))
# Performance optimization (AAP-67978): Instead of loading ALL host names from
# the inventory, only check if the specific new names already exist in the database.
new_names = [host['name'] for host in attrs['hosts']]
duplicate_new_names = [n for n in new_names if n in current_hostnames or new_names.count(n) > 1]
new_name_counts = Counter(new_names)
duplicates_in_new = [name for name, count in new_name_counts.items() if count > 1]
unique_new_names = list(new_name_counts.keys())
existing_duplicates = list(Host.objects.filter(inventory=inv, name__in=unique_new_names).values_list('name', flat=True))
duplicate_new_names = list(set(duplicates_in_new + existing_duplicates))
if duplicate_new_names:
raise serializers.ValidationError(_(f'Hostnames must be unique in an inventory. Duplicates found: {duplicate_new_names}'))
@@ -2932,6 +2939,19 @@ class CredentialTypeSerializer(BaseSerializer):
field['label'] = _(field['label'])
if 'help_text' in field:
field['help_text'] = _(field['help_text'])
# Deep copy inputs to avoid modifying the original model data
inputs = value.get('inputs')
if not isinstance(inputs, dict):
inputs = {}
value['inputs'] = copy.deepcopy(inputs)
fields = value['inputs'].get('fields', [])
if not isinstance(fields, list):
fields = []
# Normalize fields and filter out internal fields
value['inputs']['fields'] = [f for f in fields if not f.get('internal')]
return value
def filter_field_metadata(self, fields, method):
@@ -4122,9 +4142,28 @@ class LaunchConfigurationBaseSerializer(BaseSerializer):
attrs['extra_data'][key] = db_extra_data[key]
# Build unsaved version of this config, use it to detect prompts errors
# Capture keys before _build_mock_obj pops pseudo-fields from attrs
incoming_attr_keys = set(attrs.keys())
mock_obj = self._build_mock_obj(attrs)
if set(list(ujt.get_ask_mapping().keys()) + ['extra_data']) & set(attrs.keys()):
accepted, rejected, errors = ujt._accept_or_ignore_job_kwargs(_exclude_errors=self.exclude_errors, **mock_obj.prompts_dict())
ask_mapping_keys = set(ujt.get_ask_mapping().keys())
requested_prompt_fields = incoming_attr_keys & ask_mapping_keys
if 'extra_data' in incoming_attr_keys:
requested_prompt_fields.add('extra_vars')
requested_prompt_fields.add('survey_passwords')
# prompts_dict() pulls persisted M2M state (labels, credentials,
# instance_groups) via the instance pk. Only re-validate the full prompt
# state when the caller is switching the underlying template; otherwise
# restrict validation to the fields the request explicitly provided.
if 'unified_job_template' in attrs:
prompts_to_validate = mock_obj.prompts_dict()
elif requested_prompt_fields:
prompts_to_validate = {k: v for k, v in mock_obj.prompts_dict().items() if k in requested_prompt_fields}
else:
prompts_to_validate = None
if prompts_to_validate is not None:
accepted, rejected, errors = ujt._accept_or_ignore_job_kwargs(_exclude_errors=self.exclude_errors, **prompts_to_validate)
else:
# Only perform validation of prompts if prompts fields are provided
errors = {}

View File

@@ -1,4 +1,4 @@
---
collections:
- name: ansible.receptor
version: 2.0.6
version: 2.0.8

View File

@@ -14,6 +14,7 @@ import sys
import time
from base64 import b64encode
from collections import OrderedDict
from jwt import decode as _jwt_decode
from urllib3.exceptions import ConnectTimeoutError
@@ -58,8 +59,13 @@ from drf_spectacular.utils import extend_schema_view, extend_schema
from ansible_base.lib.utils.requests import get_remote_hosts
from ansible_base.rbac.models import RoleEvaluation
from ansible_base.lib.utils.schema import extend_schema_if_available
from ansible_base.lib.workload_identity.controller import AutomationControllerJobScope
# flags
from flags.state import flag_enabled
# AWX
from awx.main.utils.workload_identity import retrieve_workload_identity_jwt_with_claims
from awx.main.tasks.system import send_notifications, update_inventory_computed_fields
from awx.main.access import get_user_queryset
from awx.api.generics import (
@@ -1595,7 +1601,175 @@ class CredentialCopy(CopyAPIView):
resource_purpose = 'copy of a credential'
class CredentialExternalTest(SubDetailAPIView):
class OIDCCredentialTestMixin:
"""
Mixin to add OIDC workload identity token support to credential test endpoints.
This mixin provides methods to handle OIDC-enabled external credentials that use
workload identity tokens for authentication.
"""
@staticmethod
def _get_workload_identity_token(job_template: models.JobTemplate, jwt_aud: str) -> str:
"""Generate a workload identity token for a job template.
Args:
job_template: The JobTemplate instance to generate claims for
jwt_aud: The JWT audience claim value
Returns:
str: The generated JWT token
"""
claims = {
AutomationControllerJobScope.CLAIM_ORGANIZATION_NAME: job_template.organization.name,
AutomationControllerJobScope.CLAIM_ORGANIZATION_ID: job_template.organization.id,
AutomationControllerJobScope.CLAIM_PROJECT_NAME: job_template.project.name,
AutomationControllerJobScope.CLAIM_PROJECT_ID: job_template.project.id,
AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_NAME: job_template.name,
AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_ID: job_template.id,
AutomationControllerJobScope.CLAIM_PLAYBOOK_NAME: job_template.playbook,
}
return retrieve_workload_identity_jwt_with_claims(
claims=claims,
audience=jwt_aud,
scope=AutomationControllerJobScope.name,
)
@staticmethod
def _decode_jwt_payload_for_display(jwt_token):
"""Decode JWT payload for display purposes only (signature not verified).
This is safe because the JWT was just created by AWX and is only decoded
to show the user what claims are being sent to the external system.
The external system will perform proper signature verification.
Args:
jwt_token: The JWT token to decode
Returns:
dict: The decoded JWT payload
"""
return _jwt_decode(jwt_token, algorithms=["RS256"], options={"verify_signature": False}) # NOSONAR python:S5659
def _has_workload_identity_token(self, credential_type_inputs):
"""Check if credential type has an internal workload_identity_token field.
Args:
credential_type_inputs: The inputs dict from a credential type
Returns:
bool: True if the credential type has a workload_identity_token field marked as internal
"""
fields = credential_type_inputs.get('fields', []) if isinstance(credential_type_inputs, dict) else []
return any(field.get('internal') and field.get('id') == 'workload_identity_token' for field in fields)
def _validate_and_get_job_template(self, job_template_id):
"""Validate job template ID and return the JobTemplate instance.
Args:
job_template_id: The job template ID from metadata
Returns:
JobTemplate instance
Raises:
ParseError: If job_template_id is invalid or not found
"""
if job_template_id is None:
raise ParseError(_('Job template ID is required.'))
try:
return models.JobTemplate.objects.get(id=int(job_template_id))
except ValueError:
raise ParseError(_('Job template ID must be an integer.'))
except models.JobTemplate.DoesNotExist:
raise ParseError(_('Job template with ID %(id)s does not exist.') % {'id': job_template_id})
def _handle_oidc_credential_test(self, backend_kwargs):
"""
Handle OIDC workload identity token generation for external credential test endpoints.
This method should only be called when FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED is enabled
and the credential type has a workload_identity_token field.
Args:
backend_kwargs: The kwargs dict to pass to the backend (will be modified in place)
Returns:
dict: Response body containing details with the sent JWT payload
Raises:
PermissionDenied: If user lacks access to the job template (re-raised for 403 response)
All other exceptions are caught and converted to 400 responses with error details.
Modifies backend_kwargs in place to add workload_identity_token.
"""
# Validate job template
job_template_id = backend_kwargs.pop('job_template_id', None)
job_template = self._validate_and_get_job_template(job_template_id)
# Check user access
if not self.request.user.can_access(models.JobTemplate, 'start', job_template):
raise PermissionDenied(_('You do not have access to job template with id: %(id)s.') % {'id': job_template.id})
# Generate workload identity token
jwt_token = self._get_workload_identity_token(job_template, backend_kwargs.pop('jwt_aud', None))
backend_kwargs['workload_identity_token'] = jwt_token
return {'details': {'sent_jwt_payload': self._decode_jwt_payload_for_display(jwt_token)}}
def _call_backend_with_error_handling(self, plugin, backend_kwargs, response_body):
"""Call credential backend and handle errors."""
try:
with set_environ(**settings.AWX_TASK_ENV):
plugin.backend(**backend_kwargs)
return Response(response_body, status=status.HTTP_202_ACCEPTED)
except requests.exceptions.HTTPError as exc:
message = self._extract_http_error_message(exc)
self._add_error_to_response(response_body, message)
return Response(response_body, status=status.HTTP_400_BAD_REQUEST)
except Exception as exc:
message = self._extract_generic_error_message(exc)
self._add_error_to_response(response_body, message)
return Response(response_body, status=status.HTTP_400_BAD_REQUEST)
@staticmethod
def _extract_http_error_message(exc):
"""Extract error message from HTTPError, checking response JSON and text."""
message = str(exc)
if not hasattr(exc, 'response') or exc.response is None:
return message
try:
error_data = exc.response.json()
if 'errors' in error_data and error_data['errors']:
return ', '.join(error_data['errors'])
if 'error' in error_data:
return error_data['error']
except (ValueError, KeyError):
if exc.response.text:
return exc.response.text
return message
@staticmethod
def _extract_generic_error_message(exc):
"""Extract error message from exception, handling ConnectTimeoutError specially."""
message = str(exc) if str(exc) else exc.__class__.__name__
for arg in getattr(exc, 'args', []):
if isinstance(getattr(arg, 'reason', None), ConnectTimeoutError):
return str(arg.reason)
return message
@staticmethod
def _add_error_to_response(response_body, message):
"""Add error message to both 'detail' and 'details.error_message' fields."""
response_body['detail'] = message
if 'details' in response_body:
response_body['details']['error_message'] = message
class CredentialExternalTest(OIDCCredentialTestMixin, SubDetailAPIView):
"""
Test updates to the input values and metadata of an external credential
before saving them.
@@ -1615,6 +1789,8 @@ class CredentialExternalTest(SubDetailAPIView):
It does not support standard credential types such as Machine, SCM, and Cloud."""})
def post(self, request, *args, **kwargs):
obj = self.get_object()
if obj.credential_type.kind != 'external':
raise ParseError(_('Credential is not testable.'))
backend_kwargs = {}
for field_name, value in obj.inputs.items():
backend_kwargs[field_name] = obj.get_input(field_name)
@@ -1622,23 +1798,22 @@ class CredentialExternalTest(SubDetailAPIView):
if value != '$encrypted$':
backend_kwargs[field_name] = value
backend_kwargs.update(request.data.get('metadata', {}))
try:
with set_environ(**settings.AWX_TASK_ENV):
obj.credential_type.plugin.backend(**backend_kwargs)
return Response({}, status=status.HTTP_202_ACCEPTED)
except requests.exceptions.HTTPError:
message = """Test operation is not supported for credential type {}.
This endpoint only supports credentials that connect to
external secret management systems such as CyberArk, HashiCorp
Vault, or cloud-based secret managers.""".format(obj.credential_type.kind)
return Response({'detail': message}, status=status.HTTP_400_BAD_REQUEST)
except Exception as exc:
message = exc.__class__.__name__
exc_args = getattr(exc, 'args', [])
for a in exc_args:
if isinstance(getattr(a, 'reason', None), ConnectTimeoutError):
message = str(a.reason)
return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST)
# Handle OIDC workload identity token generation if enabled
response_body = {}
if flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED') and self._has_workload_identity_token(obj.credential_type.inputs):
try:
oidc_response_body = self._handle_oidc_credential_test(backend_kwargs)
response_body.update(oidc_response_body)
except PermissionDenied:
raise
except Exception as exc:
error_message = str(exc.detail) if hasattr(exc, 'detail') else str(exc)
response_body['detail'] = error_message
response_body['details'] = {'error_message': error_message}
return Response(response_body, status=status.HTTP_400_BAD_REQUEST)
return self._call_backend_with_error_handling(obj.credential_type.plugin, backend_kwargs, response_body)
class CredentialInputSourceDetail(RetrieveUpdateDestroyAPIView):
@@ -1668,7 +1843,7 @@ class CredentialInputSourceSubList(SubListCreateAPIView):
parent_key = 'target_credential'
class CredentialTypeExternalTest(SubDetailAPIView):
class CredentialTypeExternalTest(OIDCCredentialTestMixin, SubDetailAPIView):
"""
Test a complete set of input values for an external credential before
saving it.
@@ -1683,21 +1858,26 @@ class CredentialTypeExternalTest(SubDetailAPIView):
@extend_schema_if_available(extensions={"x-ai-description": "Test a complete set of input values for an external credential"})
def post(self, request, *args, **kwargs):
obj = self.get_object()
if obj.kind != 'external':
raise ParseError(_('Credential type is not testable.'))
backend_kwargs = request.data.get('inputs', {})
backend_kwargs.update(request.data.get('metadata', {}))
try:
obj.plugin.backend(**backend_kwargs)
return Response({}, status=status.HTTP_202_ACCEPTED)
except requests.exceptions.HTTPError as exc:
message = 'HTTP {}'.format(exc.response.status_code)
return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST)
except Exception as exc:
message = exc.__class__.__name__
args_exc = getattr(exc, 'args', [])
for a in args_exc:
if isinstance(getattr(a, 'reason', None), ConnectTimeoutError):
message = str(a.reason)
return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST)
# Handle OIDC workload identity token generation if enabled
response_body = {}
if flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED') and self._has_workload_identity_token(obj.inputs):
try:
oidc_response_body = self._handle_oidc_credential_test(backend_kwargs)
response_body.update(oidc_response_body)
except PermissionDenied:
raise
except Exception as exc:
error_message = str(exc.detail) if hasattr(exc, 'detail') else str(exc)
response_body['detail'] = error_message
response_body['details'] = {'error_message': error_message}
return Response(response_body, status=status.HTTP_400_BAD_REQUEST)
return self._call_backend_with_error_handling(obj.plugin, backend_kwargs, response_body)
class HostRelatedSearchMixin(object):

View File

@@ -344,13 +344,22 @@ class ApiV2ConfigView(APIView):
become_methods=PRIVILEGE_ESCALATION_METHODS,
)
if (
request.user.is_superuser
or request.user.is_system_auditor
or Organization.accessible_objects(request.user, 'admin_role').exists()
or Organization.accessible_objects(request.user, 'auditor_role').exists()
or Organization.accessible_objects(request.user, 'project_admin_role').exists()
):
# Check superuser/auditor first
if request.user.is_superuser or request.user.is_system_auditor:
has_org_access = True
else:
# Single query checking all three organization role types at once
has_org_access = (
(
Organization.access_qs(request.user, 'change')
| Organization.access_qs(request.user, 'audit')
| Organization.access_qs(request.user, 'add_project')
)
.distinct()
.exists()
)
if has_org_access:
data.update(
dict(
project_base_dir=settings.PROJECTS_ROOT,
@@ -358,8 +367,10 @@ class ApiV2ConfigView(APIView):
custom_virtualenvs=get_custom_venv_choices(),
)
)
elif JobTemplate.accessible_objects(request.user, 'admin_role').exists():
data['custom_virtualenvs'] = get_custom_venv_choices()
else:
# Only check JobTemplate access if org check failed
if JobTemplate.accessible_objects(request.user, 'admin_role').exists():
data['custom_virtualenvs'] = get_custom_venv_choices()
return Response(data)

View File

@@ -902,6 +902,10 @@ class HostAccess(BaseAccess):
)
prefetch_related = ('groups', 'inventory_sources')
def get_queryset(self):
qs = super().get_queryset()
return qs.exclude(inventory__kind='constructed')
def filtered_queryset(self):
return self.model.objects.filter(inventory__in=Inventory.accessible_pk_qs(self.user, 'read_role'))

View File

@@ -409,10 +409,12 @@ class Command(BaseCommand):
del_child_group_pks = list(set(db_children_name_pk_map.values()))
for offset in range(0, len(del_child_group_pks), self._batch_size):
child_group_pks = del_child_group_pks[offset : (offset + self._batch_size)]
for db_child in db_children.filter(pk__in=child_group_pks):
group_group_count += 1
db_group.children.remove(db_child)
logger.debug('Group "%s" removed from group "%s"', db_child.name, db_group.name)
children_to_remove = list(db_children.filter(pk__in=child_group_pks))
if children_to_remove:
group_group_count += len(children_to_remove)
db_group.children.remove(*children_to_remove)
for db_child in children_to_remove:
logger.debug('Group "%s" removed from group "%s"', db_child.name, db_group.name)
# FIXME: Inventory source group relationships
# Delete group/host relationships not present in imported data.
db_hosts = db_group.hosts
@@ -441,12 +443,12 @@ class Command(BaseCommand):
del_host_pks = list(del_host_pks)
for offset in range(0, len(del_host_pks), self._batch_size):
del_pks = del_host_pks[offset : (offset + self._batch_size)]
for db_host in db_hosts.filter(pk__in=del_pks):
group_host_count += 1
if db_host not in db_group.hosts.all():
continue
db_group.hosts.remove(db_host)
logger.debug('Host "%s" removed from group "%s"', db_host.name, db_group.name)
hosts_to_remove = list(db_hosts.filter(pk__in=del_pks))
if hosts_to_remove:
group_host_count += len(hosts_to_remove)
db_group.hosts.remove(*hosts_to_remove)
for db_host in hosts_to_remove:
logger.debug('Host "%s" removed from group "%s"', db_host.name, db_group.name)
if settings.SQL_DEBUG:
logger.warning(
'group-group and group-host deletions took %d queries for %d relationships',

View File

@@ -531,6 +531,7 @@ class CredentialType(CommonModelNameNotUnique):
existing = ct_class.objects.filter(name=default.name, kind=default.kind).first()
if existing is not None:
existing.namespace = default.namespace
existing.description = getattr(default, 'description', '')
existing.inputs = {}
existing.injectors = {}
existing.save()
@@ -570,7 +571,14 @@ class CredentialType(CommonModelNameNotUnique):
@classmethod
def load_plugin(cls, ns, plugin):
# TODO: User "side-loaded" credential custom_injectors isn't supported
ManagedCredentialType.registry[ns] = SimpleNamespace(namespace=ns, name=plugin.name, kind='external', inputs=plugin.inputs, backend=plugin.backend)
ManagedCredentialType.registry[ns] = SimpleNamespace(
namespace=ns,
name=plugin.name,
kind='external',
inputs=plugin.inputs,
backend=plugin.backend,
description=getattr(plugin, 'plugin_description', ''),
)
def inject_credential(self, credential, env, safe_env, args, private_data_dir, container_root=None):
from awx_plugins.interfaces._temporary_private_inject_api import inject_credential
@@ -582,7 +590,13 @@ class CredentialTypeHelper:
@classmethod
def get_creation_params(cls, cred_type):
if cred_type.kind == 'external':
return dict(namespace=cred_type.namespace, kind=cred_type.kind, name=cred_type.name, managed=True)
return {
'namespace': cred_type.namespace,
'kind': cred_type.kind,
'name': cred_type.name,
'managed': True,
'description': getattr(cred_type, 'description', ''),
}
return dict(
namespace=cred_type.namespace,
kind=cred_type.kind,

View File

@@ -72,10 +72,10 @@ def _fast_forward_rrule(rrule, ref_dt=None):
if ref_dt is None:
ref_dt = now()
ref_dt = ref_dt.astimezone(datetime.timezone.utc)
dtstart_tz = rrule._dtstart.tzinfo
ref_dt = ref_dt.astimezone(dtstart_tz)
rrule_dtstart_utc = rrule._dtstart.astimezone(datetime.timezone.utc)
if rrule_dtstart_utc > ref_dt:
if rrule._dtstart > ref_dt:
return rrule
interval = rrule._interval if rrule._interval else 1
@@ -84,20 +84,14 @@ def _fast_forward_rrule(rrule, ref_dt=None):
elif rrule._freq == dateutil.rrule.MINUTELY:
interval *= 60
# if after converting to seconds the interval is still a fraction,
# just return original rrule
if isinstance(interval, float) and not interval.is_integer():
return rrule
seconds_since_dtstart = (ref_dt - rrule_dtstart_utc).total_seconds()
seconds_since_dtstart = (ref_dt - rrule._dtstart).total_seconds()
# it is important to fast forward by a number that is divisible by
# interval. For example, if interval is 7 hours, we fast forward by 7, 14, 21, etc. hours.
# Otherwise, the occurrences after the fast forward might not match the ones before.
# x // y is integer division, lopping off any remainder, so that we get the outcome we want.
interval_aligned_offset = datetime.timedelta(seconds=(seconds_since_dtstart // interval) * interval)
new_start = rrule_dtstart_utc + interval_aligned_offset
new_rrule = rrule.replace(dtstart=new_start.astimezone(rrule._dtstart.tzinfo))
new_start = rrule._dtstart + interval_aligned_offset
new_rrule = rrule.replace(dtstart=new_start)
return new_rrule

View File

@@ -345,7 +345,11 @@ class WorkflowJobNode(WorkflowNodeBase):
)
data.update(accepted_fields) # missing fields are handled in the scheduler
# build ancestor artifacts, save them to node model for later
aa_dict = {}
# initialize from pre-seeded ancestor_artifacts (set on root nodes of
# child workflows via seed_root_ancestor_artifacts to carry artifacts
# from the parent workflow); exclude job_slice which is internal
# metadata handled separately below
aa_dict = {k: v for k, v in self.ancestor_artifacts.items() if k != 'job_slice'} if self.ancestor_artifacts else {}
is_root_node = True
for parent_node in self.get_parent_nodes():
is_root_node = False
@@ -366,11 +370,13 @@ class WorkflowJobNode(WorkflowNodeBase):
data['survey_passwords'] = password_dict
# process extra_vars
extra_vars = data.get('extra_vars', {})
if ujt_obj and isinstance(ujt_obj, (JobTemplate, WorkflowJobTemplate)):
if ujt_obj and isinstance(ujt_obj, JobTemplate):
if aa_dict:
functional_aa_dict = copy(aa_dict)
functional_aa_dict.pop('_ansible_no_log', None)
extra_vars.update(functional_aa_dict)
elif ujt_obj and isinstance(ujt_obj, WorkflowJobTemplate):
pass # artifacts are applied via seed_root_ancestor_artifacts in the task manager
# Workflow Job extra_vars higher precedence than ancestor artifacts
extra_vars.update(wj_special_vars)
@@ -734,6 +740,18 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
wj = wj.get_workflow_job()
return ancestors
def seed_root_ancestor_artifacts(self, artifacts):
"""Apply parent workflow artifacts to root nodes so they propagate
through the normal ancestor_artifacts channel instead of being
baked into this workflow's extra_vars."""
self.workflow_job_nodes.exclude(
workflowjobnodes_success__isnull=False,
).exclude(
workflowjobnodes_failure__isnull=False,
).exclude(
workflowjobnodes_always__isnull=False,
).update(ancestor_artifacts=artifacts)
def get_effective_artifacts(self, **kwargs):
"""
For downstream jobs of a workflow nested inside of a workflow,

View File

@@ -241,6 +241,8 @@ class WorkflowManager(TaskBase):
job = spawn_node.unified_job_template.create_unified_job(**kv)
spawn_node.job = job
spawn_node.save()
if spawn_node.ancestor_artifacts and isinstance(spawn_node.unified_job_template, WorkflowJobTemplate):
job.seed_root_ancestor_artifacts(spawn_node.ancestor_artifacts)
logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk)
can_start = True
if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate):

View File

@@ -277,6 +277,7 @@ class RunnerCallback:
def artifacts_handler(self, artifact_dir):
success, query_file_contents = try_load_query_file(artifact_dir)
if success:
self.delay_update(event_queries_processed=False)
collections_info = collect_queries(query_file_contents)
for collection, data in collections_info.items():
version = data['version']
@@ -300,24 +301,6 @@ class RunnerCallback:
else:
logger.warning(f'The file {COLLECTION_FILENAME} unexpectedly did not contain ansible_version')
# Write event_queries_processed and installed_collections directly
# to the DB instead of using delay_update. delay_update defers
# writes until the final job status save, but
# events_processed_hook (called from both the task runner after
# the final save and the callback receiver after the wrapup
# event) needs event_queries_processed=False visible in the DB
# to dispatch save_indirect_host_entries. The field defaults to
# True, so without a direct write the hook would see True and
# skip the dispatch. installed_collections is also written
# directly so it is available if the callback receiver
# dispatches before the final save.
from awx.main.models import Job
db_updates = {'event_queries_processed': False}
if 'installed_collections' in query_file_contents:
db_updates['installed_collections'] = query_file_contents['installed_collections']
Job.objects.filter(id=self.instance.id).update(**db_updates)
self.artifacts_processed = True

View File

@@ -94,7 +94,7 @@ from flags.state import flag_enabled
# Workload Identity
from ansible_base.lib.workload_identity.controller import AutomationControllerJobScope
from ansible_base.resource_registry.workload_identity_client import get_workload_identity_client
from awx.main.utils.workload_identity import retrieve_workload_identity_jwt_with_claims
logger = logging.getLogger('awx.main.tasks.jobs')
@@ -168,14 +168,12 @@ def retrieve_workload_identity_jwt(
Raises:
RuntimeError: if the workload identity client is not configured.
"""
client = get_workload_identity_client()
if client is None:
raise RuntimeError("Workload identity client is not configured")
claims = populate_claims_for_workload(unified_job)
kwargs = {"claims": claims, "scope": scope, "audience": audience}
if workload_ttl_seconds:
kwargs["workload_ttl_seconds"] = workload_ttl_seconds
return client.request_workload_jwt(**kwargs).jwt
return retrieve_workload_identity_jwt_with_claims(
populate_claims_for_workload(unified_job),
audience,
scope,
workload_ttl_seconds,
)
def with_path_cleanup(f):
@@ -230,16 +228,19 @@ class BaseTask(object):
# Convert to list to prevent re-evaluation of QuerySet
return list(credentials_list)
def populate_workload_identity_tokens(self):
def populate_workload_identity_tokens(self, additional_credentials=None):
"""
Populate credentials with workload identity tokens.
Sets the context on Credential objects that have input sources
using compatible external credential types.
"""
credentials = list(self._credentials)
if additional_credentials:
credentials.extend(additional_credentials)
credential_input_sources = (
(credential.context, src)
for credential in self._credentials
for credential in credentials
for src in credential.input_sources.all()
if any(
field.get('id') == 'workload_identity_token' and field.get('internal')
@@ -1682,7 +1683,7 @@ class RunProjectUpdate(BaseTask):
return params
def build_credentials_list(self, project_update):
if project_update.scm_type == 'insights' and project_update.credential:
if project_update.credential:
return [project_update.credential]
return []
@@ -1865,6 +1866,24 @@ class RunInventoryUpdate(SourceControlMixin, BaseTask):
# All credentials not used by inventory source injector
return inventory_update.get_extra_credentials()
def populate_workload_identity_tokens(self, additional_credentials=None):
"""Also generate OIDC tokens for the cloud credential.
The cloud credential is not in _credentials (it is handled by the
inventory source injector), but it may still need a workload identity
token generated for it.
"""
cloud_cred = self.instance.get_cloud_credential()
creds = list(additional_credentials or [])
if cloud_cred:
creds.append(cloud_cred)
super().populate_workload_identity_tokens(additional_credentials=creds or None)
# Override get_cloud_credential on this instance so the injector
# uses the credential with OIDC context instead of doing a fresh
# DB fetch that would lose it.
if cloud_cred and cloud_cred.context:
self.instance.get_cloud_credential = lambda: cloud_cred
def build_project_dir(self, inventory_update, private_data_dir):
source_project = None
if inventory_update.inventory_source:

View File

@@ -0,0 +1,11 @@
---
- hosts: all
gather_facts: false
connection: local
tasks:
- name: Set artifacts via set_stats
ansible.builtin.set_stats:
data: "{{ stats_data }}"
per_host: false
aggregate: false
when: stats_data is defined

View File

@@ -0,0 +1,84 @@
import pytest
from awx.api.versioning import reverse
from rest_framework import status
from awx.main.models.jobs import JobTemplate
@pytest.mark.django_db
class TestConfigEndpointFields:
def test_base_fields_all_users(self, get, rando):
url = reverse('api:api_v2_config_view')
response = get(url, rando, expect=200)
assert 'time_zone' in response.data
assert 'license_info' in response.data
assert 'version' in response.data
assert 'eula' in response.data
assert 'analytics_status' in response.data
assert 'analytics_collectors' in response.data
assert 'become_methods' in response.data
@pytest.mark.parametrize(
"role_type",
[
"superuser",
"system_auditor",
"org_admin",
"org_auditor",
"org_project_admin",
],
)
def test_privileged_users_conditional_fields(self, get, user, organization, admin, role_type):
url = reverse('api:api_v2_config_view')
if role_type == "superuser":
test_user = admin
elif role_type == "system_auditor":
test_user = user('system-auditor', is_superuser=False)
test_user.is_system_auditor = True
test_user.save()
elif role_type == "org_admin":
test_user = user('org-admin', is_superuser=False)
organization.admin_role.members.add(test_user)
elif role_type == "org_auditor":
test_user = user('org-auditor', is_superuser=False)
organization.auditor_role.members.add(test_user)
elif role_type == "org_project_admin":
test_user = user('org-project-admin', is_superuser=False)
organization.project_admin_role.members.add(test_user)
response = get(url, test_user, expect=200)
assert 'project_base_dir' in response.data
assert 'project_local_paths' in response.data
assert 'custom_virtualenvs' in response.data
def test_job_template_admin_gets_venvs_only(self, get, user, organization, project, inventory):
"""Test that JobTemplate admin without org access gets only custom_virtualenvs"""
jt_admin = user('jt-admin', is_superuser=False)
jt = JobTemplate.objects.create(name='test-jt', organization=organization, project=project, inventory=inventory)
jt.admin_role.members.add(jt_admin)
url = reverse('api:api_v2_config_view')
response = get(url, jt_admin, expect=200)
assert 'custom_virtualenvs' in response.data
assert 'project_base_dir' not in response.data
assert 'project_local_paths' not in response.data
def test_normal_user_no_conditional_fields(self, get, rando):
url = reverse('api:api_v2_config_view')
response = get(url, rando, expect=200)
assert 'project_base_dir' not in response.data
assert 'project_local_paths' not in response.data
assert 'custom_virtualenvs' not in response.data
def test_unauthenticated_denied(self, get):
"""Test that unauthenticated requests are denied"""
url = reverse('api:api_v2_config_view')
response = get(url, None, expect=401)
assert response.status_code == status.HTTP_401_UNAUTHORIZED

View File

@@ -2,6 +2,7 @@ import json
import pytest
from ansible_base.lib.testing.util import feature_flag_enabled
from awx.main.models.credential import CredentialType, Credential
from awx.api.versioning import reverse
@@ -159,7 +160,8 @@ def test_create_as_admin(get, post, admin):
response = get(reverse('api:credential_type_list'), admin)
assert response.data['count'] == 1
assert response.data['results'][0]['name'] == 'Custom Credential Type'
assert response.data['results'][0]['inputs'] == {}
# Serializer normalizes empty inputs to {'fields': []}
assert response.data['results'][0]['inputs'] == {'fields': []}
assert response.data['results'][0]['injectors'] == {}
assert response.data['results'][0]['managed'] is False
@@ -474,3 +476,98 @@ def test_credential_type_rbac_external_test(post, alice, admin, credentialtype_e
data = {'inputs': {}, 'metadata': {}}
assert post(url, data, admin).status_code == 202
assert post(url, data, alice).status_code == 403
# --- Tests for internal field filtering with None/invalid inputs ---
@pytest.mark.django_db
def test_credential_type_with_none_inputs(get, admin):
"""Test that credential type with empty inputs dict works correctly."""
# Create a credential type with empty dict
ct = CredentialType.objects.create(
kind='cloud',
name='Test Type',
managed=False,
inputs={}, # Empty dict, not None (DB has NOT NULL constraint)
)
url = reverse('api:credential_type_detail', kwargs={'pk': ct.pk})
response = get(url, admin)
assert response.status_code == 200
# Should have normalized inputs to empty dict
assert 'inputs' in response.data
assert isinstance(response.data['inputs'], dict)
assert response.data['inputs']['fields'] == []
@pytest.mark.django_db
def test_credential_type_with_invalid_inputs_type(get, admin):
"""Test that credential type with non-dict inputs doesn't cause errors."""
# Create a credential type with invalid inputs type
ct = CredentialType.objects.create(kind='cloud', name='Test Type', managed=False, inputs={'fields': 'not-a-list'})
url = reverse('api:credential_type_detail', kwargs={'pk': ct.pk})
response = get(url, admin)
assert response.status_code == 200
# Should gracefully handle invalid fields type
assert 'inputs' in response.data
assert response.data['inputs']['fields'] == []
@pytest.mark.django_db
def test_credential_type_filters_internal_fields(get, admin):
"""Test that internal fields are filtered from API responses."""
ct = CredentialType.objects.create(
kind='cloud',
name='Test OIDC Type',
managed=False,
inputs={
'fields': [
{'id': 'url', 'label': 'URL', 'type': 'string'},
{'id': 'token', 'label': 'Token', 'type': 'string', 'secret': True, 'internal': True},
{'id': 'public_field', 'label': 'Public', 'type': 'string'},
]
},
)
url = reverse('api:credential_type_detail', kwargs={'pk': ct.pk})
with feature_flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'):
response = get(url, admin)
assert response.status_code == 200
field_ids = [f['id'] for f in response.data['inputs']['fields']]
# Internal field should be filtered out
assert 'token' not in field_ids
assert 'url' in field_ids
assert 'public_field' in field_ids
@pytest.mark.django_db
def test_credential_type_list_filters_internal_fields(get, admin):
"""Test that internal fields are filtered in list view."""
CredentialType.objects.create(
kind='cloud',
name='Test OIDC Type',
managed=False,
inputs={
'fields': [
{'id': 'url', 'label': 'URL', 'type': 'string'},
{'id': 'workload_identity_token', 'label': 'Token', 'type': 'string', 'secret': True, 'internal': True},
]
},
)
url = reverse('api:credential_type_list')
with feature_flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'):
response = get(url, admin)
assert response.status_code == 200
# Find our credential type in the results
test_ct = next((ct for ct in response.data['results'] if ct['name'] == 'Test OIDC Type'), None)
assert test_ct is not None
field_ids = [f['id'] for f in test_ct['inputs']['fields']]
# Internal field should be filtered out
assert 'workload_identity_token' not in field_ids
assert 'url' in field_ids

View File

@@ -0,0 +1,312 @@
"""
Tests for OIDC workload identity credential test endpoints.
Tests the /api/v2/credentials/<id>/test/ and /api/v2/credential_types/<id>/test/
endpoints when used with OIDC-enabled credential types.
"""
import pytest
from unittest import mock
from django.test import override_settings
from awx.main.models import Credential, CredentialType, JobTemplate
from awx.api.versioning import reverse
@pytest.fixture
def job_template(organization, project):
"""Job template with organization and project for OIDC JWT generation."""
return JobTemplate.objects.create(name='test-jt', organization=organization, project=project, playbook='helloworld.yml')
@pytest.fixture
def oidc_credentialtype():
"""Create a credential type with workload_identity_token internal field."""
oidc_type_inputs = {
'fields': [
{'id': 'url', 'label': 'Vault URL', 'type': 'string', 'help_text': 'The Vault server URL.'},
{'id': 'auth_path', 'label': 'Auth Path', 'type': 'string', 'help_text': 'JWT auth mount path.'},
{'id': 'role_id', 'label': 'Role ID', 'type': 'string', 'help_text': 'Vault role.'},
{'id': 'jwt_aud', 'label': 'JWT Audience', 'type': 'string', 'help_text': 'Expected audience.'},
{'id': 'workload_identity_token', 'label': 'Workload Identity Token', 'type': 'string', 'secret': True, 'internal': True},
],
'metadata': [
{'id': 'secret_path', 'label': 'Secret Path', 'type': 'string'},
{'id': 'job_template_id', 'label': 'Job Template ID', 'type': 'string'},
],
'required': ['url', 'auth_path', 'role_id'],
}
class MockPlugin(object):
def backend(self, **kwargs):
# Simulate successful backend call
return 'secret'
with mock.patch('awx.main.models.credential.CredentialType.plugin', new_callable=mock.PropertyMock) as mock_plugin:
mock_plugin.return_value = MockPlugin()
oidc_type = CredentialType(kind='external', managed=True, namespace='hashivault-kv-oidc', name='HashiCorp Vault KV (OIDC)', inputs=oidc_type_inputs)
oidc_type.save()
yield oidc_type
@pytest.fixture
def oidc_credential(oidc_credentialtype):
"""Create a credential using the OIDC credential type."""
return Credential.objects.create(
credential_type=oidc_credentialtype,
name='oidc-vault-cred',
inputs={'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'},
)
@pytest.fixture
def mock_oidc_backend():
"""Fixture that mocks OIDC JWT generation and credential backend."""
with mock.patch('awx.api.views.retrieve_workload_identity_jwt_with_claims') as mock_jwt, mock.patch('awx.api.views._jwt_decode') as mock_decode, mock.patch(
'awx.main.models.credential.CredentialType.plugin', new_callable=mock.PropertyMock
) as mock_plugin:
# Set default return values
mock_jwt.return_value = 'fake.jwt.token'
mock_decode.return_value = {'iss': 'http://gateway/o', 'aud': 'vault'}
# Create mock backend
mock_backend = mock.MagicMock()
mock_backend.backend.return_value = 'secret'
mock_plugin.return_value = mock_backend
# Yield all mocks for test customization
yield {
'jwt': mock_jwt,
'decode': mock_decode,
'plugin': mock_plugin,
'backend': mock_backend,
}
# --- Tests for CredentialExternalTest endpoint ---
@pytest.mark.django_db
@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=False)
def test_credential_test_without_oidc_feature_flag(post, admin, oidc_credential):
"""Test that credential test works without OIDC feature flag enabled."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': '1'}}
with mock.patch('awx.main.models.credential.CredentialType.plugin', new_callable=mock.PropertyMock) as mock_plugin:
mock_backend = mock.MagicMock()
mock_backend.backend.return_value = 'secret'
mock_plugin.return_value = mock_backend
response = post(url, data, admin)
assert response.status_code == 202
# Should not contain JWT payload when feature flag is disabled
assert 'details' not in response.data or 'sent_jwt_payload' not in response.data.get('details', {})
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
@pytest.mark.parametrize(
'job_template_id, expected_error',
[
(None, 'Job template ID is required'),
('not-an-integer', 'must be an integer'),
('99999', 'does not exist'),
],
ids=['missing_job_template_id', 'invalid_job_template_id_type', 'nonexistent_job_template_id'],
)
def test_credential_test_job_template_validation(mock_flag, post, admin, oidc_credential, job_template_id, expected_error):
"""Test that invalid job_template_id values return 400 with appropriate error messages."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret'}}
if job_template_id is not None:
data['metadata']['job_template_id'] = job_template_id
response = post(url, data, admin)
assert response.status_code == 400
assert 'details' in response.data
assert 'error_message' in response.data['details']
assert expected_error in response.data['details']['error_message']
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_no_access_to_job_template(mock_flag, post, alice, oidc_credential, job_template):
"""Test that user without access to job template gets 403."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
# Give alice use permission on credential but not on job template
oidc_credential.use_role.members.add(alice)
response = post(url, data, alice)
assert response.status_code == 403
assert 'You do not have access to job template' in str(response.data)
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_success_returns_jwt_payload(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend):
"""Test that successful test returns JWT payload in response."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
# Customize mock for this test
mock_oidc_backend['decode'].return_value = {
'iss': 'http://gateway/o',
'sub': 'system:serviceaccount:default:awx-operator',
'aud': 'vault',
'job_template_id': job_template.id,
}
response = post(url, data, admin)
assert response.status_code == 202
assert 'details' in response.data
assert 'sent_jwt_payload' in response.data['details']
assert response.data['details']['sent_jwt_payload']['job_template_id'] == job_template.id
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_response_does_not_contain_secret_value(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend):
"""
the OIDC credential test endpoint must not echo the resolved Vault secret back to the caller.
"""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
credential_secret_value = 'CREDENTIAL_SECRET'
mock_oidc_backend['backend'].backend.return_value = credential_secret_value
response = post(url, data, admin)
assert response.status_code == 202
assert 'details' in response.data
assert 'sent_jwt_payload' in response.data['details']
assert 'secret_value' not in response.data['details']
assert credential_secret_value not in str(response.data)
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_backend_failure_returns_jwt_and_error(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend):
"""Test that backend failure still returns JWT payload along with error message."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
# Make backend fail
mock_oidc_backend['backend'].backend.side_effect = RuntimeError('Connection failed')
response = post(url, data, admin)
assert response.status_code == 400
assert 'details' in response.data
# Both JWT payload and error message should be present
assert 'sent_jwt_payload' in response.data['details']
assert 'error_message' in response.data['details']
assert 'Connection failed' in response.data['details']['error_message']
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_jwt_generation_failure(mock_flag, post, admin, oidc_credential, job_template):
"""Test that JWT generation failure returns error without JWT payload."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
with mock.patch('awx.api.views.OIDCCredentialTestMixin._get_workload_identity_token') as mock_jwt:
mock_jwt.side_effect = RuntimeError('Failed to generate JWT')
response = post(url, data, admin)
assert response.status_code == 400
assert 'details' in response.data
assert 'error_message' in response.data['details']
assert 'Failed to generate JWT' in response.data['details']['error_message']
# No JWT payload when generation fails
assert 'sent_jwt_payload' not in response.data['details']
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_job_template_id_not_passed_to_backend(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend):
"""Test that job_template_id and jwt_aud are removed from backend_kwargs."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
response = post(url, data, admin)
assert response.status_code == 202
# Check that backend was called without job_template_id or jwt_aud
call_kwargs = mock_oidc_backend['backend'].backend.call_args[1]
assert 'job_template_id' not in call_kwargs
assert 'jwt_aud' not in call_kwargs
assert 'workload_identity_token' in call_kwargs
# --- Tests for CredentialTypeExternalTest endpoint ---
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_type_test_response_does_not_contain_secret_value(mock_flag, post, admin, oidc_credentialtype, job_template, mock_oidc_backend):
"""
the credential-type variant of the test endpoint should not return the secret value
"""
url = reverse('api:credential_type_external_test', kwargs={'pk': oidc_credentialtype.pk})
data = {
'inputs': {'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'},
'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)},
}
credential_type_seret_value = 'CREDENTIAL_TYPE_SECRET'
mock_oidc_backend['backend'].backend.return_value = credential_type_seret_value
response = post(url, data, admin)
assert response.status_code == 202
assert 'details' in response.data
assert 'sent_jwt_payload' in response.data['details']
assert 'secret_value' not in response.data['details']
assert credential_type_seret_value not in str(response.data)
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_type_test_missing_job_template_id(mock_flag, post, admin, oidc_credentialtype):
"""Test that missing job_template_id returns 400 for credential type test endpoint."""
url = reverse('api:credential_type_external_test', kwargs={'pk': oidc_credentialtype.pk})
data = {
'inputs': {'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'},
'metadata': {'secret_path': 'test/secret'},
}
response = post(url, data, admin)
assert response.status_code == 400
assert 'details' in response.data
assert 'error_message' in response.data['details']
assert 'Job template ID is required' in response.data['details']['error_message']
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_type_test_success_returns_jwt_payload(mock_flag, post, admin, oidc_credentialtype, job_template, mock_oidc_backend):
"""Test that successful credential type test returns JWT payload."""
url = reverse('api:credential_type_external_test', kwargs={'pk': oidc_credentialtype.pk})
data = {
'inputs': {'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'},
'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)},
}
response = post(url, data, admin)
assert response.status_code == 202
assert 'details' in response.data
assert 'sent_jwt_payload' in response.data['details']
@pytest.mark.django_db
def test_credential_external_test_returns_400_for_non_external_credential(post, admin, credential):
# credential fixture creates a non-external credential (e.g. SSH/vault kind)
url = reverse('api:credential_external_test', kwargs={'pk': credential.pk})
response = post(url, {'metadata': {}}, admin)
assert response.status_code == 400
assert 'not testable' in response.data.get('detail', '').lower()

View File

@@ -13,6 +13,7 @@ from awx.main.models.workflow import (
WorkflowJobTemplateNode,
)
from awx.main.models.credential import Credential
from awx.main.models.label import Label
from awx.main.scheduler import TaskManager, WorkflowManager, DependencyManager
# Django
@@ -51,6 +52,31 @@ def test_node_accepts_prompted_fields(inventory, project, workflow_job_template,
post(url, {'unified_job_template': job_template.pk, 'limit': 'webservers'}, user=admin_user, expect=201)
@pytest.mark.django_db
def test_node_extra_data_patch_with_unprompted_labels(inventory, project, organization, workflow_job_template, patch, admin_user):
"""AAP-41742: PATCH extra_data on a workflow node should succeed even when
the node has labels associated but the JT has ask_labels_on_launch=False."""
jt = JobTemplate.objects.create(
inventory=inventory,
project=project,
playbook='helloworld.yml',
ask_variables_on_launch=True,
ask_labels_on_launch=False,
)
label = Label.objects.create(name='repro-label', organization=organization)
node = WorkflowJobTemplateNode.objects.create(
workflow_job_template=workflow_job_template,
unified_job_template=jt,
extra_data={'foo': 'bar'},
)
node.labels.add(label)
url = reverse('api:workflow_job_template_node_detail', kwargs={'pk': node.pk})
r = patch(url, {'extra_data': {'foo': 'edited'}}, user=admin_user, expect=200)
assert r.data['extra_data'] == {'foo': 'edited'}
@pytest.mark.django_db
@pytest.mark.parametrize(
"field_name, field_value",

View File

@@ -305,6 +305,47 @@ class TestINIImports:
has_host_group = inventory.groups.get(name='has_a_host')
assert has_host_group.hosts.count() == 1
@mock.patch.object(inventory_import, 'AnsibleInventoryLoader', MockLoader)
def test_overwrite_removes_stale_memberships(self, inventory):
"""When overwrite is enabled, host-group and group-group memberships
that are no longer in the imported data should be removed."""
# First import: parent_group has two children, host_group has two hosts
inventory_import.AnsibleInventoryLoader._data = {
"_meta": {"hostvars": {"host1": {}, "host2": {}}},
"all": {"children": ["ungrouped", "parent_group", "child_a", "child_b", "host_group"]},
"parent_group": {"children": ["child_a", "child_b"]},
"host_group": {"hosts": ["host1", "host2"]},
"ungrouped": {"hosts": []},
}
cmd = inventory_import.Command()
cmd.handle(inventory_id=inventory.pk, source=__file__, overwrite=True)
parent = inventory.groups.get(name='parent_group')
assert set(parent.children.values_list('name', flat=True)) == {'child_a', 'child_b'}
host_grp = inventory.groups.get(name='host_group')
assert set(host_grp.hosts.values_list('name', flat=True)) == {'host1', 'host2'}
# Second import: child_b removed from parent_group, host2 moved out of host_group
inventory_import.AnsibleInventoryLoader._data = {
"_meta": {"hostvars": {"host1": {}, "host2": {}}},
"all": {"children": ["ungrouped", "parent_group", "child_a", "child_b", "host_group"]},
"parent_group": {"children": ["child_a"]},
"host_group": {"hosts": ["host1"]},
"ungrouped": {"hosts": ["host2"]},
}
cmd = inventory_import.Command()
cmd.handle(inventory_id=inventory.pk, source=__file__, overwrite=True)
parent.refresh_from_db()
host_grp.refresh_from_db()
# child_b should be removed from parent_group
assert set(parent.children.values_list('name', flat=True)) == {'child_a'}
# host2 should be removed from host_group
assert set(host_grp.hosts.values_list('name', flat=True)) == {'host1'}
# host2 and child_b should still exist in the inventory, just not in those groups
assert inventory.hosts.filter(name='host2').exists()
assert inventory.groups.filter(name='child_b').exists()
@mock.patch.object(inventory_import, 'AnsibleInventoryLoader', MockLoader)
def test_recursive_group_error(self, inventory):
inventory_import.AnsibleInventoryLoader._data = {

View File

@@ -101,6 +101,34 @@ def test_host_access(organization, inventory, group, user, group_factory):
assert inventory_admin_access.can_read(host) is False
@pytest.mark.django_db
def test_host_access_excludes_constructed_inventory_hosts(organization, inventory, user):
"""
Exclude hosts from constructed inventory for all users.
"""
constructed_inv = organization.inventories.create(name='constructed-inv', kind='constructed')
real_host = Host.objects.create(inventory=inventory, name='hostA')
shadow_host = Host.objects.create(inventory=constructed_inv, name='hostA')
# Non-superuser with read on both inventories
reader = user('reader', False)
inventory.read_role.members.add(reader)
constructed_inv.read_role.members.add(reader)
reader_qs = HostAccess(reader).get_queryset()
assert real_host in reader_qs
assert shadow_host not in reader_qs
# Superuser path: should get the same result
superuser = user('super', True)
super_qs = HostAccess(superuser).get_queryset()
assert real_host in super_qs
assert shadow_host not in super_qs
# Sanity: shadow rows still exist in the DB and are reachable via inventory filtering
assert Host.objects.filter(inventory=constructed_inv, name='hostA').exists()
@pytest.mark.django_db
def test_inventory_source_credential_check(rando, inventory_source, credential):
inventory_source.inventory.admin_role.members.add(rando)

View File

@@ -445,3 +445,142 @@ def get_inventory_hosts(get, inv_id, use_user):
data = get(reverse('api:inventory_hosts_list', kwargs={'pk': inv_id}), use_user, expect=200).data
results = [host['id'] for host in data['results']]
return results
# Tests for BulkHostCreateSerializer duplicate detection optimization
@pytest.mark.django_db
def test_bulk_host_create_duplicate_within_batch(organization, inventory, post, user):
"""
Test that duplicate hostnames within the same batch are detected.
This tests the Counter-based duplicate detection logic.
"""
inventory.organization = organization
inv_admin = user('inventory_admin', False)
organization.member_role.members.add(inv_admin)
inventory.admin_role.members.add(inv_admin)
# Try to create hosts where 'duplicate-host' appears twice in the same batch
hosts = [
{'name': 'unique-host-1'},
{'name': 'duplicate-host'},
{'name': 'unique-host-2'},
{'name': 'duplicate-host'}, # Duplicate within batch
]
response = post(reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': hosts}, inv_admin, expect=400)
assert 'Hostnames must be unique in an inventory' in response.data['__all__'][0]
assert 'duplicate-host' in response.data['__all__'][0]
assert Host.objects.filter(inventory=inventory).count() == 0
@pytest.mark.django_db
def test_bulk_host_create_duplicate_against_existing(organization, inventory, post, user):
"""
Test that duplicate hostnames against existing inventory hosts are detected.
This tests the database query-based duplicate detection.
"""
inventory.organization = organization
inv_admin = user('inventory_admin', False)
organization.member_role.members.add(inv_admin)
inventory.admin_role.members.add(inv_admin)
Host.objects.create(name='existing-host-1', inventory=inventory)
Host.objects.create(name='existing-host-2', inventory=inventory)
# Try to create hosts where one already exists
hosts = [
{'name': 'new-host-1'},
{'name': 'existing-host-1'},
{'name': 'new-host-2'},
]
response = post(reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': hosts}, inv_admin, expect=400)
assert 'Hostnames must be unique in an inventory' in response.data['__all__'][0]
assert 'existing-host-1' in response.data['__all__'][0]
assert Host.objects.filter(inventory=inventory).count() == 2
@pytest.mark.django_db
def test_bulk_host_create_combined_duplicates(organization, inventory, post, user):
"""
Test detection of both batch-internal duplicates and duplicates against existing hosts.
"""
inventory.organization = organization
inventory_admin = user('inventory_admin', False)
organization.member_role.members.add(inventory_admin)
inventory.admin_role.members.add(inventory_admin)
Host.objects.create(name='existing-host', inventory=inventory)
# Try to create hosts with both types of duplicates
hosts = [
{'name': 'new-host'},
{'name': 'batch-duplicate'},
{'name': 'existing-host'},
{'name': 'batch-duplicate'},
]
response = post(reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': hosts}, inventory_admin, expect=400)
error_message = response.data['__all__'][0]
assert 'Hostnames must be unique in an inventory' in error_message
assert 'batch-duplicate' in error_message or 'existing-host' in error_message
@pytest.mark.django_db
def test_bulk_host_create_no_duplicates_success(organization, inventory, post, user):
"""
Test that hosts are created successfully when there are no duplicates.
"""
inventory.organization = organization
inventory_admin = user('inventory_admin', False)
organization.member_role.members.add(inventory_admin)
inventory.admin_role.members.add(inventory_admin)
Host.objects.create(name='existing-host-1', inventory=inventory)
Host.objects.create(name='existing-host-2', inventory=inventory)
# Create new hosts with unique names
hosts = [
{'name': 'new-host-1'},
{'name': 'new-host-2'},
{'name': 'new-host-3'},
]
response = post(reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': hosts}, inventory_admin, expect=201)
assert len(response.data['hosts']) == 3
assert Host.objects.filter(inventory=inventory).count() == 5
assert Host.objects.filter(inventory=inventory, name='new-host-1').exists()
assert Host.objects.filter(inventory=inventory, name='new-host-2').exists()
assert Host.objects.filter(inventory=inventory, name='new-host-3').exists()
@pytest.mark.django_db
def test_bulk_host_create_performance_large_inventory(organization, inventory, post, user, django_assert_max_num_queries):
"""
Test that duplicate detection is performant and doesn't load all hosts.
"""
inventory.organization = organization
inventory_admin = user('inventory_admin', False)
organization.member_role.members.add(inventory_admin)
inventory.admin_role.members.add(inventory_admin)
# Create 10k existing hosts to simulate a reasonably large inventory
from django.utils.timezone import now
_now = now()
existing_hosts = [Host(name=f'existing-host-{i}', inventory=inventory, created=_now, modified=_now) for i in range(10000)]
Host.objects.bulk_create(existing_hosts)
new_hosts = [{'name': f'new-host-{i}'} for i in range(10)]
# The number of queries should be bounded and not scale with inventory size
# This should be around 15-20 queries regardless of whether there are 10k or 500k+ existing hosts
with django_assert_max_num_queries(20):
response = post(reverse('api:bulk_host_create'), {'inventory': inventory.id, 'hosts': new_hosts}, inventory_admin, expect=201)
assert len(response.data['hosts']) == 10
assert Host.objects.filter(inventory=inventory).count() == 10010

View File

@@ -0,0 +1,206 @@
import json
import pytest
from awx.main.tests.live.tests.conftest import wait_for_job
from awx.main.models import JobTemplate, WorkflowJobTemplate, WorkflowJobTemplateNode
JT_NAMES = ('artifact-test-first', 'artifact-test-second', 'artifact-test-reader')
WFT_NAMES = ('artifact-test-outer-wf', 'artifact-test-inner-wf')
@pytest.mark.django_db(transaction=True)
def test_nested_workflow_set_stats_precedence(live_tmp_folder, demo_inv, project_factory, default_org):
"""Reproducer for set_stats artifacts from an outer workflow leaking into
an inner (child) workflow and overriding the inner workflow's own artifacts.
Outer WF: [job_first] --success--> [inner_wf]
Inner WF: [job_second] --success--> [job_reader]
job_first sets via set_stats:
var1: "outer-only" (only source, should propagate through)
var2: "should-be-overridden" (will be overridden by job_second)
job_second sets via set_stats:
var2: "from-inner" (should override outer's value)
var3: "inner-only" (only source, should be available)
job_reader runs debug.yml (no set_stats), we inspect its extra_vars:
var1 should be "outer-only" - outer artifacts propagate when uncontested
var2 should be "from-inner" - inner artifacts override outer (THE BUG)
var3 should be "inner-only" - inner-only artifacts propagate normally
"""
# Clean up resources from prior runs (delete individually for signals)
for name in WFT_NAMES:
for wft in WorkflowJobTemplate.objects.filter(name=name):
wft.delete()
for name in JT_NAMES:
for jt in JobTemplate.objects.filter(name=name):
jt.delete()
proj = project_factory(scm_url=f'file://{live_tmp_folder}/debug')
if proj.current_job:
wait_for_job(proj.current_job)
# job_first: sets var1 (outer-only) and var2 (to be overridden by inner)
jt_first = JobTemplate.objects.create(
name='artifact-test-first',
project=proj,
playbook='set_stats.yml',
inventory=demo_inv,
extra_vars=json.dumps({'stats_data': {'var1': 'outer-only', 'var2': 'should-be-overridden'}}),
)
# job_second: overrides var2, introduces var3
jt_second = JobTemplate.objects.create(
name='artifact-test-second',
project=proj,
playbook='set_stats.yml',
inventory=demo_inv,
extra_vars=json.dumps({'stats_data': {'var2': 'from-inner', 'var3': 'inner-only'}}),
)
# job_reader: just runs, we check what extra_vars it receives
jt_reader = JobTemplate.objects.create(
name='artifact-test-reader',
project=proj,
playbook='debug.yml',
inventory=demo_inv,
)
# Inner WFT: job_second -> job_reader
inner_wft = WorkflowJobTemplate.objects.create(name='artifact-test-inner-wf', organization=default_org)
inner_node_1 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=inner_wft,
unified_job_template=jt_second,
identifier='second',
)
inner_node_2 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=inner_wft,
unified_job_template=jt_reader,
identifier='reader',
)
inner_node_1.success_nodes.add(inner_node_2)
# Outer WFT: job_first -> inner_wf
outer_wft = WorkflowJobTemplate.objects.create(name='artifact-test-outer-wf', organization=default_org)
outer_node_1 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=outer_wft,
unified_job_template=jt_first,
identifier='first',
)
outer_node_2 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=outer_wft,
unified_job_template=inner_wft,
identifier='inner',
)
outer_node_1.success_nodes.add(outer_node_2)
# Launch and wait
outer_wfj = outer_wft.create_unified_job()
outer_wfj.signal_start()
wait_for_job(outer_wfj, running_timeout=120)
# Find the reader job inside the inner workflow
inner_wf_node = outer_wfj.workflow_job_nodes.get(identifier='inner')
inner_wfj = inner_wf_node.job
assert inner_wfj is not None, 'Inner workflow job was never created'
# Check that root node of inner WF (job_second) received outer artifacts
second_node = inner_wfj.workflow_job_nodes.get(identifier='second')
assert second_node.job is not None, 'Second job was never created'
second_extra_vars = json.loads(second_node.job.extra_vars)
assert second_extra_vars.get('var1') == 'outer-only', (
f'Root node var1: expected "outer-only" (outer artifact should be available to root node), '
f'got "{second_extra_vars.get("var1")}". '
f'Outer artifacts are not reaching root nodes of child workflows.'
)
reader_node = inner_wfj.workflow_job_nodes.get(identifier='reader')
assert reader_node.job is not None, 'Reader job was never created'
reader_extra_vars = json.loads(reader_node.job.extra_vars)
# var1: only set by outer job_first, no conflict — should propagate through
assert reader_extra_vars.get('var1') == 'outer-only', f'var1: expected "outer-only" (uncontested outer artifact), ' f'got "{reader_extra_vars.get("var1")}"'
# var2: set by outer as "should-be-overridden", then by inner as "from-inner"
# Inner workflow's own ancestor artifacts should take precedence
assert reader_extra_vars.get('var2') == 'from-inner', (
f'var2: expected "from-inner" (inner workflow artifact should override outer), '
f'got "{reader_extra_vars.get("var2")}". '
f'Outer workflow artifacts are leaking via wj_special_vars. '
f'reader node ancestor_artifacts={reader_node.ancestor_artifacts}'
)
# var3: only set by inner job_second — should propagate normally
assert reader_extra_vars.get('var3') == 'inner-only', f'var3: expected "inner-only" (inner-only artifact), ' f'got "{reader_extra_vars.get("var3")}"'
@pytest.mark.django_db(transaction=True)
def test_workflow_extra_vars_override_artifacts(live_tmp_folder, demo_inv, project_factory, default_org):
"""Workflow extra_vars should take precedence over set_stats artifacts
within a single (non-nested) workflow.
WF (extra_vars: my_var="from-wf-extra-vars"):
[job_setter] --success--> [job_reader]
job_setter sets my_var="from-set-stats" via set_stats
job_reader should see my_var="from-wf-extra-vars" because workflow
extra_vars are higher precedence than ancestor artifacts.
"""
wft_name = 'artifact-test-wf-extra-vars-precedence'
jt_names = ('artifact-test-setter', 'artifact-test-checker')
for wft in WorkflowJobTemplate.objects.filter(name=wft_name):
wft.delete()
for name in jt_names:
for jt in JobTemplate.objects.filter(name=name):
jt.delete()
proj = project_factory(scm_url=f'file://{live_tmp_folder}/debug')
if proj.current_job:
wait_for_job(proj.current_job)
jt_setter = JobTemplate.objects.create(
name='artifact-test-setter',
project=proj,
playbook='set_stats.yml',
inventory=demo_inv,
extra_vars=json.dumps({'stats_data': {'my_var': 'from-set-stats'}}),
)
jt_checker = JobTemplate.objects.create(
name='artifact-test-checker',
project=proj,
playbook='debug.yml',
inventory=demo_inv,
)
wft = WorkflowJobTemplate.objects.create(
name=wft_name,
organization=default_org,
extra_vars=json.dumps({'my_var': 'from-wf-extra-vars'}),
)
node_1 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=wft,
unified_job_template=jt_setter,
identifier='setter',
)
node_2 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=wft,
unified_job_template=jt_checker,
identifier='checker',
)
node_1.success_nodes.add(node_2)
wfj = wft.create_unified_job()
wfj.signal_start()
wait_for_job(wfj, running_timeout=120)
checker_node = wfj.workflow_job_nodes.get(identifier='checker')
assert checker_node.job is not None, 'Checker job was never created'
checker_extra_vars = json.loads(checker_node.job.extra_vars)
assert checker_extra_vars.get('my_var') == 'from-wf-extra-vars', (
f'Expected my_var="from-wf-extra-vars" (workflow extra_vars should override artifacts), '
f'got my_var="{checker_extra_vars.get("my_var")}". '
f'checker node ancestor_artifacts={checker_node.ancestor_artifacts}'
)

View File

@@ -2,7 +2,11 @@
import pytest
from types import SimpleNamespace
from unittest import mock
from awx.main.models import Credential, CredentialType
from awx.main.models.credential import CredentialTypeHelper, ManagedCredentialType
from django.apps import apps
@@ -78,3 +82,53 @@ def test_credential_context_property_independent_instances():
assert cred1.context == {'key1': 'value1'}
assert cred2.context == {'key2': 'value2'}
assert cred1.context is not cred2.context
def test_load_plugin_passes_description():
plugin = SimpleNamespace(name='test_plugin', inputs={'fields': []}, backend=None, plugin_description='A test plugin')
CredentialType.load_plugin('test_ns', plugin)
entry = ManagedCredentialType.registry['test_ns']
assert entry.description == 'A test plugin'
del ManagedCredentialType.registry['test_ns']
def test_load_plugin_missing_description():
plugin = SimpleNamespace(name='test_plugin', inputs={'fields': []}, backend=None)
CredentialType.load_plugin('test_ns', plugin)
entry = ManagedCredentialType.registry['test_ns']
assert entry.description == ''
del ManagedCredentialType.registry['test_ns']
def test_get_creation_params_external_includes_description():
cred_type = SimpleNamespace(namespace='test_ns', kind='external', name='Test', description='My description')
params = CredentialTypeHelper.get_creation_params(cred_type)
assert params['description'] == 'My description'
def test_get_creation_params_external_missing_description():
cred_type = SimpleNamespace(namespace='test_ns', kind='external', name='Test')
params = CredentialTypeHelper.get_creation_params(cred_type)
assert params['description'] == ''
@pytest.mark.django_db
def test_setup_tower_managed_defaults_updates_description():
registry_entry = SimpleNamespace(
namespace='test_ns',
kind='external',
name='Test Plugin',
inputs={'fields': []},
backend=None,
description='Updated description',
)
# Create an existing credential type with no description
ct = CredentialType.objects.create(name='Test Plugin', kind='external', namespace='old_ns')
assert ct.description == ''
with mock.patch.dict(ManagedCredentialType.registry, {'test_ns': registry_entry}, clear=True):
CredentialType._setup_tower_managed_defaults()
ct.refresh_from_db()
assert ct.description == 'Updated description'
assert ct.namespace == 'test_ns'

View File

@@ -473,7 +473,7 @@ def test_populate_claims_for_adhoc_command(workload_attrs, expected_claims):
assert claims == expected_claims
@mock.patch('awx.main.tasks.jobs.get_workload_identity_client')
@mock.patch('awx.main.utils.workload_identity.get_workload_identity_client')
def test_retrieve_workload_identity_jwt_returns_jwt_from_client(mock_get_client):
"""retrieve_workload_identity_jwt returns the JWT string from the client."""
mock_client = mock.MagicMock()
@@ -502,7 +502,7 @@ def test_retrieve_workload_identity_jwt_returns_jwt_from_client(mock_get_client)
assert call_kwargs['claims'][AutomationControllerJobScope.CLAIM_JOB_NAME] == 'Test Job'
@mock.patch('awx.main.tasks.jobs.get_workload_identity_client')
@mock.patch('awx.main.utils.workload_identity.get_workload_identity_client')
def test_retrieve_workload_identity_jwt_passes_audience_and_scope(mock_get_client):
"""retrieve_workload_identity_jwt passes audience and scope to the client."""
mock_client = mock.MagicMock()
@@ -518,7 +518,7 @@ def test_retrieve_workload_identity_jwt_passes_audience_and_scope(mock_get_clien
mock_client.request_workload_jwt.assert_called_once_with(claims={'job_id': 1}, scope=scope, audience=audience)
@mock.patch('awx.main.tasks.jobs.get_workload_identity_client')
@mock.patch('awx.main.utils.workload_identity.get_workload_identity_client')
def test_retrieve_workload_identity_jwt_passes_workload_ttl(mock_get_client):
"""retrieve_workload_identity_jwt passes workload_ttl_seconds when provided."""
mock_client = mock.Mock()
@@ -542,7 +542,7 @@ def test_retrieve_workload_identity_jwt_passes_workload_ttl(mock_get_client):
)
@mock.patch('awx.main.tasks.jobs.get_workload_identity_client')
@mock.patch('awx.main.utils.workload_identity.get_workload_identity_client')
def test_retrieve_workload_identity_jwt_raises_when_client_not_configured(mock_get_client):
"""retrieve_workload_identity_jwt raises RuntimeError when client is None."""
mock_get_client.return_value = None
@@ -590,3 +590,67 @@ def test_populate_workload_identity_tokens_passes_get_instance_timeout_to_client
scope=AutomationControllerJobScope.name,
workload_ttl_seconds=expected_ttl,
)
class TestRunInventoryUpdatePopulateWorkloadIdentityTokens:
"""Tests for RunInventoryUpdate.populate_workload_identity_tokens."""
def test_cloud_credential_passed_as_additional_credential(self):
"""The cloud credential is forwarded to super().populate_workload_identity_tokens via additional_credentials."""
cloud_cred = mock.MagicMock(name='cloud_cred')
cloud_cred.context = {}
task = jobs.RunInventoryUpdate()
task.instance = mock.MagicMock()
task.instance.get_cloud_credential.return_value = cloud_cred
task._credentials = []
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
task.populate_workload_identity_tokens()
mock_super.assert_called_once_with(additional_credentials=[cloud_cred])
def test_no_cloud_credential_calls_super_with_none(self):
"""When there is no cloud credential, super() is called with additional_credentials=None."""
task = jobs.RunInventoryUpdate()
task.instance = mock.MagicMock()
task.instance.get_cloud_credential.return_value = None
task._credentials = []
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
task.populate_workload_identity_tokens()
mock_super.assert_called_once_with(additional_credentials=None)
def test_additional_credentials_combined_with_cloud_credential(self):
"""Caller-supplied additional_credentials are combined with the cloud credential."""
cloud_cred = mock.MagicMock(name='cloud_cred')
cloud_cred.context = {}
extra_cred = mock.MagicMock(name='extra_cred')
task = jobs.RunInventoryUpdate()
task.instance = mock.MagicMock()
task.instance.get_cloud_credential.return_value = cloud_cred
task._credentials = []
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
task.populate_workload_identity_tokens(additional_credentials=[extra_cred])
mock_super.assert_called_once_with(additional_credentials=[extra_cred, cloud_cred])
def test_cloud_credential_override_after_context_set(self):
"""After OIDC processing, get_cloud_credential is overridden on the instance when context is populated."""
cloud_cred = mock.MagicMock(name='cloud_cred')
# Simulate that super().populate_workload_identity_tokens populates context
cloud_cred.context = {'workload_identity_token': 'eyJ.test.jwt'}
task = jobs.RunInventoryUpdate()
task.instance = mock.MagicMock()
task.instance.get_cloud_credential.return_value = cloud_cred
task._credentials = []
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens'):
task.populate_workload_identity_tokens()
# The instance's get_cloud_credential should now return the same object with context
assert task.instance.get_cloud_credential() is cloud_cred

View File

@@ -7,7 +7,7 @@ from django.utils.timezone import now
from awx.main.models.schedules import _fast_forward_rrule, Schedule
from dateutil.rrule import HOURLY, MINUTELY, MONTHLY
REF_DT = datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc)
REF_DT = datetime.datetime(2026, 4, 16, tzinfo=datetime.timezone.utc)
@pytest.mark.parametrize(
@@ -20,6 +20,10 @@ REF_DT = datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc)
'DTSTART;TZID=America/New_York:20201118T200000 RRULE:FREQ=MINUTELY;INTERVAL=5;WKST=SU;BYMONTH=2,3;BYMONTHDAY=18;BYHOUR=5;BYMINUTE=35;BYSECOND=0',
id='every-5-minutes-at-5:35:00-am-on-the-18th-day-of-feb-or-march-with-week-starting-on-sundays',
),
pytest.param(
'DTSTART;TZID=America/New_York:20251211T130000 RRULE:FREQ=HOURLY;INTERVAL=4;WKST=MO;BYDAY=MO,TU,WE,TH,FR;BYHOUR=1,5,9,13,17,21;BYMINUTE=0',
id='every-4-hours-at-1-5-9-13-17-21-am-on-monday-through-friday-with-week-starting-on-monday',
),
pytest.param(
'DTSTART;TZID=America/New_York:20201118T200000 RRULE:FREQ=HOURLY;INTERVAL=5;WKST=SU;BYMONTH=2,3;BYHOUR=5',
id='every-5-hours-at-5-am-in-feb-or-march-with-week-starting-on-sundays',
@@ -48,6 +52,7 @@ def test_fast_forwarded_rrule_matches_original_occurrence(rrulestr):
[
pytest.param(datetime.datetime(2024, 12, 1, 0, 0, tzinfo=datetime.timezone.utc), id='ref-dt-out-of-dst'),
pytest.param(datetime.datetime(2024, 6, 1, 0, 0, tzinfo=datetime.timezone.utc), id='ref-dt-in-dst'),
pytest.param(datetime.datetime(2024, 11, 3, 6, 30, tzinfo=datetime.timezone.utc), id='ref-dt-fall-back-day'),
],
)
@pytest.mark.parametrize(
@@ -58,6 +63,8 @@ def test_fast_forwarded_rrule_matches_original_occurrence(rrulestr):
pytest.param(
'DTSTART;TZID=Europe/Lisbon:20230703T005800 RRULE:INTERVAL=10;FREQ=MINUTELY;BYHOUR=9,10,11,12,13,14,15,16,17,18,19,20,21', id='rrule-in-dst-by-hour'
),
pytest.param('DTSTART;TZID=America/New_York:20230313T005800 RRULE:FREQ=MINUTELY;INTERVAL=7', id='rrule-post-dst-7min'),
pytest.param('DTSTART;TZID=America/New_York:20230313T005800 RRULE:FREQ=MINUTELY;INTERVAL=13', id='rrule-post-dst-13min'),
],
)
def test_fast_forward_across_dst(rrulestr, ref_dt):

View File

@@ -0,0 +1,22 @@
from ansible_base.resource_registry.workload_identity_client import get_workload_identity_client
__all__ = ['retrieve_workload_identity_jwt_with_claims']
def retrieve_workload_identity_jwt_with_claims(
claims: dict,
audience: str,
scope: str,
workload_ttl_seconds: int | None = None,
) -> str:
"""Retrieve JWT token from workload claims.
Raises:
RuntimeError: if the workload identity client is not configured.
"""
client = get_workload_identity_client()
if client is None:
raise RuntimeError("Workload identity client is not configured")
kwargs = {"claims": claims, "scope": scope, "audience": audience}
if workload_ttl_seconds:
kwargs["workload_ttl_seconds"] = workload_ttl_seconds
return client.request_workload_jwt(**kwargs).jwt

View File

@@ -34,9 +34,6 @@ def get_urlpatterns(prefix=None):
re_path(r'^(?:api/)?500.html$', handle_500),
re_path(r'^csp-violation/', handle_csp_violation),
re_path(r'^login/', handle_login_redirect),
# want api/v2/doesnotexist to return a 404, not match the ui urls,
# so use a negative lookahead assertion here
re_path(r'^(?!api/).*', include('awx.ui.urls', namespace='ui')),
]
if settings.DYNACONF.is_development_mode:
@@ -47,6 +44,12 @@ def get_urlpatterns(prefix=None):
except ImportError:
pass
# want api/v2/doesnotexist to return a 404, not match the ui urls,
# so use a negative lookahead assertion in the pattern below
urlpatterns += [
re_path(r'^(?!api/).*', include('awx.ui.urls', namespace='ui')),
]
return urlpatterns

View File

@@ -55,6 +55,20 @@ options:
- Defaults to 10s, but this is handled by the shared module_utils code
type: float
aliases: [ aap_request_timeout ]
max_retries:
description:
- Specify the max retries to be used with some connection issues.
- Defaults to 5.
- If value not set, will try environment variable C(AAP_MAX_RETRIES) and then config files.
type: int
aliases: [ aap_max_retries ]
retry_backoff_factor:
description:
- Backoff factor used when retrying connections.
- Defaults to 2.
- If value not set, will try environment variable C(AAP_RETRY_BACKOFF_FACTOR) and then config files.
type: int
aliases: [ aap_retry_backoff_factor ]
controller_config_file:
description:
- Path to the controller config file.

View File

@@ -76,6 +76,24 @@ options:
why: Support for AAP variables
alternatives: 'AAP_REQUEST_TIMEOUT'
aliases: [ aap_request_timeout ]
max_retries:
description:
- Specify the max retries to be used with some connection issues.
- Defaults to 5.
- This will not work with the export or import modules.
type: int
env:
- name: AAP_MAX_RETRIES
aliases: [ aap_max_retries ]
retry_backoff_factor:
description:
- Backoff factor used when retrying connections.
- Defaults to 2.
- This will not work with the export or import modules.
type: int
env:
- name: AAP_RETRY_BACKOFF_FACTOR
aliases: [ aap_retry_backoff_factor ]
notes:
- If no I(config_file) is provided we will attempt to use the tower-cli library
defaults to find your host information.

View File

@@ -15,6 +15,7 @@ from ansible.module_utils.six.moves.configparser import ConfigParser, NoOptionEr
from base64 import b64encode
from socket import getaddrinfo, IPPROTO_TCP
import time
import random
from json import loads, dumps
from os.path import isfile, expanduser, split, join, exists, isdir
from os import access, R_OK, getcwd, environ, getenv
@@ -37,6 +38,19 @@ except ImportError:
CONTROLLER_BASE_PATH_ENV_VAR = "CONTROLLER_OPTIONAL_API_URLPATTERN_PREFIX"
# 502/503: request never reached the server — always safe to retry any method
ALWAYS_RETRYABLE = {
502: ['GET', 'POST', 'PATCH', 'DELETE'], # Bad Gateway
503: ['GET', 'POST', 'PATCH', 'DELETE'], # Service Unavailable
}
# 500/504: idempotent methods only — GETs are reads, PATCH/DELETE are
# idempotent by definition; POST is excluded unless we know it's safe.
IDEMPOTENT_RETRYABLE = {
500: ['GET', 'PATCH', 'DELETE'], # Internal Server Error
504: ['GET', 'PATCH', 'DELETE'], # Gateway Timeout
}
class ConfigFileException(Exception):
pass
@@ -72,6 +86,16 @@ class ControllerModule(AnsibleModule):
aliases=['aap_request_timeout'],
required=False,
fallback=(env_fallback, ['CONTROLLER_REQUEST_TIMEOUT', 'AAP_REQUEST_TIMEOUT'])),
max_retries=dict(
type='int',
aliases=['aap_max_retries'],
required=False,
fallback=(env_fallback, ['AAP_MAX_RETRIES'])),
retry_backoff_factor=dict(
type='int',
aliases=['aap_retry_backoff_factor'],
required=False,
fallback=(env_fallback, ['AAP_RETRY_BACKOFF_FACTOR'])),
aap_token=dict(
type='raw',
no_log=True,
@@ -92,12 +116,16 @@ class ControllerModule(AnsibleModule):
'password': 'controller_password',
'verify_ssl': 'validate_certs',
'request_timeout': 'request_timeout',
'max_retries': 'max_retries',
'retry_backoff_factor': 'retry_backoff_factor',
}
host = '127.0.0.1'
username = None
password = None
verify_ssl = True
request_timeout = 10
max_retries = 5
retry_backoff_factor = 2
authenticated = False
config_name = 'tower_cli.cfg'
version_checked = False
@@ -488,6 +516,49 @@ class ControllerAPIModule(ControllerModule):
def resolve_name_to_id(self, endpoint, name_or_id):
return self.get_exactly_one(endpoint, name_or_id)['id']
def is_retryable(self, status_code, method, endpoint):
"""
Determine whether a failed request is safe to retry.
Args:
status_code (int): HTTP status code returned by the server.
method (str): HTTP verb in uppercase ('GET', 'POST', etc.).
endpoint (str): The API endpoint path (e.g. '/api/v2/job_templates/1/launch/').
Returns:
bool: True if the request can safely be retried.
"""
# --- Always safe: 502/503 mean the request never reached AWX ---
if method in ALWAYS_RETRYABLE.get(status_code, []):
return True
# --- Safe for inherently idempotent methods (GET, PATCH, DELETE) ---
if method in IDEMPOTENT_RETRYABLE.get(status_code, []):
return True
# --- POST/PATCH on 500/504: safe UNLESS the endpoint triggers execution ---
if method in ('POST', 'PATCH') and status_code in (500, 504):
# /launch, /relaunch, /callback etc. — retrying would double-execute
# Catches: /job_templates/1/launch/, /workflow_job_templates/1/launch/,
# /jobs/1/relaunch/, /ad_hoc_commands/1/relaunch/ …
launch_keywords = ('/launch', '/relaunch', '/callback')
if any(kw in endpoint for kw in launch_keywords):
return False
# POST to the ad_hoc_commands collection root creates AND immediately
# executes the command — not safe to retry.
# PATCH to /ad_hoc_commands/<id>/ is fine (handled by PATCH branch above
# but would also pass through here correctly).
if method == 'POST' and endpoint.rstrip('/').endswith('/ad_hoc_commands'):
return False
# All other POST/PATCH endpoints (create resource, update resource) are
# safe: a 500/504 before the DB transaction commits means no side-effect.
return True
return False
def make_request(self, method, endpoint, *args, **kwargs):
# In case someone is calling us directly; make sure we were given a method, let's not just assume a GET
if not method:
@@ -512,121 +583,155 @@ class ControllerAPIModule(ControllerModule):
headers.setdefault('Content-Type', 'application/json')
kwargs['headers'] = headers
data = None # Important, if content type is not JSON, this should not be dict type
data = None
if headers.get('Content-Type', '') == 'application/json':
data = dumps(kwargs.get('data', {}))
try:
response = self.session.open(
method, url.geturl(),
headers=headers,
timeout=self.request_timeout,
validate_certs=self.verify_ssl,
follow_redirects=True,
data=data
)
except (SSLValidationError) as ssl_err:
self.fail_json(msg="Could not establish a secure connection to your host ({1}): {0}.".format(url.netloc, ssl_err))
except (ConnectionError) as con_err:
self.fail_json(msg="There was a network error of some kind trying to connect to your host ({1}): {0}.".format(url.netloc, con_err))
except (HTTPError) as he:
# Sanity check: Did the server send back some kind of internal error?
if he.code >= 500:
self.fail_json(msg='The host sent back a server error ({1}): {0}. Please check the logs and try again later'.format(url.path, he))
# Sanity check: Did we fail to authenticate properly? If so, fail out now; this is always a failure.
elif he.code == 401:
self.fail_json(msg='Invalid authentication credentials for {0} (HTTP 401).'.format(url.path))
# Sanity check: Did we get a forbidden response, which means that the user isn't allowed to do this? Report that.
elif he.code == 403:
# Hack: Tell the customer to use the platform supported collection when interacting with Org, Team, User Controller endpoints
err_msg = he.fp.read().decode('utf-8')
try:
# Defensive coding. Handle json responses and non-json responses
err_msg = loads(err_msg)
err_msg = err_msg['detail']
# JSONDecodeError only available on Python 3.5+
except ValueError:
pass
prepend_msg = " Use the collection ansible.platform to modify resources Organization, User, or Team." if (
"this resource via the platform ingress") in err_msg else ""
self.fail_json(msg="You don't have permission to {1} to {0} (HTTP 403).{2}".format(url.path, method, prepend_msg))
# Sanity check: Did we get a 404 response?
# Requests with primary keys will return a 404 if there is no response, and we want to consistently trap these.
elif he.code == 404:
if kwargs.get('return_none_on_404', False):
return None
self.fail_json(msg='The requested object could not be found at {0}.'.format(url.path))
# Sanity check: Did we get a 405 response?
# A 405 means we used a method that isn't allowed. Usually this is a bad request, but it requires special treatment because the
# API sends it as a logic error in a few situations (e.g. trying to cancel a job that isn't running).
elif he.code == 405:
self.fail_json(msg="Cannot make a request with the {0} method to this endpoint {1}".format(method, url.path))
# Sanity check: Did we get some other kind of error? If so, write an appropriate error message.
elif he.code >= 400:
# We are going to return a 400 so the module can decide what to do with it
page_data = he.read()
try:
return {'status_code': he.code, 'json': loads(page_data)}
# JSONDecodeError only available on Python 3.5+
except ValueError:
return {'status_code': he.code, 'text': page_data}
elif he.code == 204 and method == 'DELETE':
# A 204 is a normal response for a delete function
pass
else:
self.fail_json(msg="Unexpected return code when calling {0}: {1}".format(url.geturl(), he))
except (Exception) as e:
self.fail_json(msg="There was an unknown error when trying to connect to {2}: {0} {1}".format(type(e).__name__, e, url.geturl()))
# ----------------------------------------------------------------
# Retry loop — wraps only the session.open() + HTTPError handling
# Everything above (auth, URL building) happens once before the loop
# ----------------------------------------------------------------
max_retries = self.max_retries
backoff_factor = self.retry_backoff_factor
last_response = None
if not self.version_checked:
# In PY2 we get back an HTTPResponse object but PY2 is returning an addinfourl
# First try to get the headers in PY3 format and then drop down to PY2.
try:
controller_type = response.getheader('X-API-Product-Name', None)
controller_version = response.getheader('X-API-Product-Version', None)
except Exception:
controller_type = response.info().getheader('X-API-Product-Name', None)
controller_version = response.info().getheader('X-API-Product-Version', None)
for attempt in range(max_retries + 1): # attempt 0 = first try
parsed_collection_version = Version(self._COLLECTION_VERSION).version
if controller_version:
parsed_controller_version = Version(controller_version).version
if controller_type == 'AWX':
collection_compare_ver = parsed_collection_version[0]
controller_compare_ver = parsed_controller_version[0]
else:
collection_compare_ver = "{0}.{1}".format(parsed_collection_version[0], parsed_collection_version[1])
controller_compare_ver = '{0}.{1}'.format(parsed_controller_version[0], parsed_controller_version[1])
if self._COLLECTION_TYPE not in self.collection_to_version or self.collection_to_version[self._COLLECTION_TYPE] != controller_type:
self.warn("You are using the {0} version of this collection but connecting to {1}".format(self._COLLECTION_TYPE, controller_type))
elif collection_compare_ver != controller_compare_ver:
self.warn(
"You are running collection version {0} but connecting to {2} version {1}".format(
self._COLLECTION_VERSION, controller_version, controller_type
)
if attempt > 0:
sleep_time = (backoff_factor ** (attempt - 1)) * (0.5 + random.random())
self.warn(
'Retrying {0} {1} (attempt {2}/{3}) after {4}s due to status {5}'.format(
method, url.path, attempt, max_retries, sleep_time,
last_response if last_response else 'connection error'
)
)
time.sleep(sleep_time)
self.version_checked = True
response_body = ''
try:
response_body = response.read()
except (Exception) as e:
self.fail_json(msg="Failed to read response body: {0}".format(e))
response_json = {}
if response_body and response_body != '':
try:
response_json = loads(response_body)
except (Exception) as e:
self.fail_json(msg="Failed to parse the response json: {0}".format(e))
response = self.session.open(
method, url.geturl(),
headers=headers,
timeout=self.request_timeout,
validate_certs=self.verify_ssl,
follow_redirects=True,
data=data
)
if PY2:
status_code = response.getcode()
else:
status_code = response.status
return {'status_code': status_code, 'json': response_json}
except (SSLValidationError) as ssl_err:
# SSL errors are never retryable — cert problems won't fix themselves
self.fail_json(msg="Could not establish a secure connection to your host ({0}): {1}.".format(url.netloc, ssl_err))
except (ConnectionError) as con_err:
# Connection errors may be transient — retry if we have attempts left
last_response = 'ConnectionError'
if attempt < max_retries:
continue
self.fail_json(msg="There was a network error of some kind trying to connect to your host ({0}): {1}.".format(url.netloc, con_err))
except (HTTPError) as he:
# ---- Retryable HTTP errors ----
if self.is_retryable(he.code, method, url.path):
# Exhausted retries on a retryable error go on to regular failure checks.
if attempt < max_retries:
continue
# Exhausted retries - provide informative message
self.fail_json(
msg="Request to {0} failed with status {1} after {2} retries. "
"This may indicate the server is overloaded.".format(url.path, he.code, max_retries)
)
# ---- Non-retryable HTTP errors (existing behaviour preserved) ----
if he.code >= 500:
self.fail_json(msg='The host sent back a server error ({1}): {0}. Please check the logs and try again later'.format(url.path, he))
elif he.code == 401:
self.fail_json(msg='Invalid authentication credentials for {0} (HTTP 401).'.format(url.path))
elif he.code == 403:
body = he.read()
raw = body.decode('utf-8') if isinstance(body, bytes) else str(body)
if 'unable to connect to database' in raw.lower():
if attempt < max_retries:
continue
self.fail_json(
msg="Request to {0} failed with status 403 (database unavailable) after {1} retries.".format(url.path, max_retries),
)
# Reuse raw instead of reading again
try:
err_msg = loads(raw)
err_msg = err_msg['detail']
except (ValueError, KeyError):
err_msg = raw
prepend_msg = " Use the collection ansible.platform to modify resources Organization, User, or Team." if (
"this resource via the platform ingress") in err_msg else ""
self.fail_json(msg="You don't have permission to {1} to {0} (HTTP 403).{2}".format(url.path, method, prepend_msg))
elif he.code == 404:
if kwargs.get('return_none_on_404', False):
return None
self.fail_json(msg='The requested object could not be found at {0}.'.format(url.path))
elif he.code == 405:
self.fail_json(msg="Cannot make a request with the {0} method to this endpoint {1}".format(method, url.path))
elif he.code >= 400:
page_data = he.read()
try:
return {'status_code': he.code, 'json': loads(page_data)}
except ValueError:
return {'status_code': he.code, 'text': page_data}
else:
self.fail_json(msg="Unexpected return code when calling {0}: {1}".format(url.geturl(), he))
except (Exception) as e:
self.fail_json(msg="There was an unknown error when trying to connect to {2}: {0} {1}".format(type(e).__name__, e, url.geturl()))
# ----------------------------------------------------------------
# Successful response — fall through from session.open()
# The version check and response parsing happen once on success
# ----------------------------------------------------------------
if not self.version_checked:
try:
controller_type = response.getheader('X-API-Product-Name', None)
controller_version = response.getheader('X-API-Product-Version', None)
except Exception:
controller_type = response.info().getheader('X-API-Product-Name', None)
controller_version = response.info().getheader('X-API-Product-Version', None)
parsed_collection_version = Version(self._COLLECTION_VERSION).version
if controller_version:
parsed_controller_version = Version(controller_version).version
if controller_type == 'AWX':
collection_compare_ver = parsed_collection_version[0]
controller_compare_ver = parsed_controller_version[0]
else:
collection_compare_ver = "{0}.{1}".format(parsed_collection_version[0], parsed_collection_version[1])
controller_compare_ver = '{0}.{1}'.format(parsed_controller_version[0], parsed_controller_version[1])
if self._COLLECTION_TYPE not in self.collection_to_version or self.collection_to_version[self._COLLECTION_TYPE] != controller_type:
self.warn("You are using the {0} version of this collection but connecting to {1}".format(self._COLLECTION_TYPE, controller_type))
elif collection_compare_ver != controller_compare_ver:
self.warn(
"You are running collection version {0} but connecting to {2} version {1}".format(
self._COLLECTION_VERSION, controller_version, controller_type
)
)
self.version_checked = True
response_body = ''
try:
response_body = response.read()
except (Exception) as e:
self.fail_json(msg="Failed to read response body: {0}".format(e))
response_json = {}
if response_body and response_body != '':
try:
response_json = loads(response_body)
except (Exception) as e:
self.fail_json(msg="Failed to parse the response json: {0}".format(e))
if PY2:
status_code = response.getcode()
else:
status_code = response.status
return {'status_code': status_code, 'json': response_json}
def api_path(self, app_key=None):

View File

@@ -276,6 +276,7 @@ options:
- ''
- 'github'
- 'gitlab'
- 'bitbucket_dc'
webhook_credential:
description:
- Personal Access Token for posting back the status to the service API
@@ -436,7 +437,7 @@ def main():
scm_branch=dict(),
ask_scm_branch_on_launch=dict(type='bool'),
job_slice_count=dict(type='int'),
webhook_service=dict(choices=['github', 'gitlab', '']),
webhook_service=dict(choices=['github', 'gitlab', 'bitbucket_dc', '']),
webhook_credential=dict(),
labels=dict(type="list", elements='str'),
notification_templates_started=dict(type="list", elements='str'),

View File

@@ -117,6 +117,7 @@ options:
choices:
- github
- gitlab
- bitbucket_dc
webhook_credential:
description:
- Personal Access Token for posting back the status to the service API
@@ -828,7 +829,7 @@ def main():
ask_inventory_on_launch=dict(type='bool'),
ask_scm_branch_on_launch=dict(type='bool'),
ask_limit_on_launch=dict(type='bool'),
webhook_service=dict(choices=['github', 'gitlab']),
webhook_service=dict(choices=['github', 'gitlab', 'bitbucket_dc']),
webhook_credential=dict(),
labels=dict(type="list", elements='str'),
notification_templates_started=dict(type="list", elements='str'),

View File

@@ -0,0 +1,124 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import pytest
from awx.main.models import JobTemplate, WorkflowJobTemplate
# The backend supports these webhook services on job/workflow templates
# (see awx/main/models/mixins.py). The collection modules must accept all of
# them in their argument_spec ``choices`` list. This test guards against the
# module's choices drifting from the backend -- see AAP-45980, where
# ``bitbucket_dc`` had been supported by the API since migration 0188 but was
# still being rejected by the job_template/workflow_job_template modules.
WEBHOOK_SERVICES = ['github', 'gitlab', 'bitbucket_dc']
@pytest.mark.django_db
@pytest.mark.parametrize('webhook_service', WEBHOOK_SERVICES)
def test_job_template_accepts_webhook_service(run_module, admin_user, project, inventory, webhook_service):
result = run_module(
'job_template',
{
'name': 'foo',
'playbook': 'helloworld.yml',
'project': project.name,
'inventory': inventory.name,
'webhook_service': webhook_service,
'state': 'present',
},
admin_user,
)
assert not result.get('failed', False), result.get('msg', result)
assert result.get('changed', False), result
jt = JobTemplate.objects.get(name='foo')
assert jt.webhook_service == webhook_service
# Re-running with the same args must be a no-op (idempotence).
result = run_module(
'job_template',
{
'name': 'foo',
'playbook': 'helloworld.yml',
'project': project.name,
'inventory': inventory.name,
'webhook_service': webhook_service,
'state': 'present',
},
admin_user,
)
assert not result.get('failed', False), result.get('msg', result)
assert not result.get('changed', True), result
@pytest.mark.django_db
@pytest.mark.parametrize('webhook_service', WEBHOOK_SERVICES)
def test_workflow_job_template_accepts_webhook_service(run_module, admin_user, organization, webhook_service):
result = run_module(
'workflow_job_template',
{
'name': 'foo-workflow',
'organization': organization.name,
'webhook_service': webhook_service,
'state': 'present',
},
admin_user,
)
assert not result.get('failed', False), result.get('msg', result)
assert result.get('changed', False), result
wfjt = WorkflowJobTemplate.objects.get(name='foo-workflow')
assert wfjt.webhook_service == webhook_service
# Re-running with the same args must be a no-op (idempotence).
result = run_module(
'workflow_job_template',
{
'name': 'foo-workflow',
'organization': organization.name,
'webhook_service': webhook_service,
'state': 'present',
},
admin_user,
)
assert not result.get('failed', False), result.get('msg', result)
assert not result.get('changed', True), result
@pytest.mark.django_db
def test_job_template_rejects_unknown_webhook_service(run_module, admin_user, project, inventory):
result = run_module(
'job_template',
{
'name': 'foo',
'playbook': 'helloworld.yml',
'project': project.name,
'inventory': inventory.name,
'webhook_service': 'not_a_real_service',
'state': 'present',
},
admin_user,
)
assert result.get('failed', False), result
assert 'webhook_service' in result.get('msg', '')
@pytest.mark.django_db
def test_workflow_job_template_rejects_unknown_webhook_service(run_module, admin_user, organization):
result = run_module(
'workflow_job_template',
{
'name': 'foo-workflow',
'organization': organization.name,
'webhook_service': 'not_a_real_service',
'state': 'present',
},
admin_user,
)
assert result.get('failed', False), result
assert 'webhook_service' in result.get('msg', '')

View File

@@ -1,5 +1,5 @@
[build-system]
requires = ["setuptools>=45", "setuptools_scm[toml]>=6.2"]
requires = ["setuptools>=45", "setuptools_scm[toml]>=6.2,<10"]
build-backend = "setuptools.build_meta"
# Do not uncomment the line below. We need to be able to override the version via a file, and this

View File

@@ -116,7 +116,7 @@ cython==3.1.3
# via -r /awx_devel/requirements/requirements.in
daphne==4.2.1
# via -r /awx_devel/requirements/requirements.in
dispatcherd[pg-notify]==2026.02.26
dispatcherd[pg-notify]==2026.3.25
# via -r /awx_devel/requirements/requirements.in
distro==1.9.0
# via -r /awx_devel/requirements/requirements.in

View File

@@ -24,6 +24,7 @@ atomicwrites
flake8
yamllint
pip>=25.3 # PEP 660 Editable installs for pyproject.toml based builds (wheel based)
awx-tui
# python debuggers
debugpy

View File

@@ -35,6 +35,30 @@ if output=$(ANSIBLE_REVERSE_RESOURCE_SYNC=false awx-manage createsuperuser --noi
fi
echo "Admin password: ${DJANGO_SUPERUSER_PASSWORD}"
# Configure awx-tui to connect to the local AWX instance
AWX_TUI_CONFIG_DIR="${HOME}/.config/awx-tui"
AWX_TUI_CONFIG_FILE="${AWX_TUI_CONFIG_DIR}/config.yaml"
mkdir -p "${AWX_TUI_CONFIG_DIR}"
python3 -c "
import yaml, os
config = {
'instances': {
'local': {
'url': 'https://localhost:8043',
'auth': {
'method': 'password',
'username': 'admin',
'password': os.environ['DJANGO_SUPERUSER_PASSWORD'],
},
'verify_ssl': False,
}
}
}
with open('${AWX_TUI_CONFIG_FILE}', 'w') as f:
yaml.dump(config, f, default_flow_style=False)
"
chmod 600 "${AWX_TUI_CONFIG_FILE}"
ANSIBLE_REVERSE_RESOURCE_SYNC=false awx-manage create_preload_data
awx-manage register_default_execution_environments