Compare commits

..

4 Commits

Author SHA1 Message Date
Dirk Julich
1a205af41f AAP-57614 fix: remove early dispatch, rely on events_processed_hook
Dispatching save_indirect_host_entries from artifacts_handler was
fundamentally flawed: it ran before job events were written to the DB
by the callback receiver, so the task found no events to process, set
event_queries_processed=True, and blocked all future processing.

Remove the dispatch and the now-unused import.  The existing
events_processed_hook (called from both the task runner after the
final save and the callback receiver after the wrapup event) handles
dispatching at the right time — after events are in the DB.

The direct DB write of event_queries_processed=False and
installed_collections (added in the previous commit) remains: it
ensures events_processed_hook sees the correct values regardless of
which call site runs first.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 21:22:26 +01:00
Dirk Julich
96bd35bfb4 AAP-57614 fix: also write installed_collections directly to DB
save_indirect_host_entries calls fetch_job_event_query which reads
job.installed_collections from the DB. When dispatched from
artifacts_handler, installed_collections was still only in
delay_update (not yet flushed to DB), so the task found no matching
EventQuery records and created no IndirectManagedNodeAudit entries.

Write both event_queries_processed and installed_collections directly
to the DB before dispatching, so save_indirect_host_entries has all
the data it needs immediately.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 21:22:26 +01:00
Dirk Julich
21e73cb065 AAP-57614 fix: write event_queries_processed directly to DB
The previous commit dispatched save_indirect_host_entries from
artifacts_handler, but used delay_update to set event_queries_processed
to False. delay_update only queues the write for the final job status
save, so save_indirect_host_entries would read the default (True) from
the DB and bail out before processing.

Replace delay_update(event_queries_processed=False) with a direct
Job.objects.filter().update() call so the value is visible in the DB
before save_indirect_host_entries runs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 21:22:26 +01:00
Dirk Julich
53be3d16bd AAP-57614 fix: dispatch save_indirect_host_entries from artifacts_handler
The artifacts_handler and handle_success_and_failure_notifications can
run in either order after job completion. Since event_queries_processed
defaults to True on the Job model, when the notification handler runs
first it sees True (the default) and skips dispatching
save_indirect_host_entries. When artifacts_handler runs later and sets
event_queries_processed to False, no task is dispatched to process the
EventQuery records, leaving event_queries_processed stuck at False and
no IndirectManagedNodeAudit records created.

Fix by also dispatching save_indirect_host_entries from
artifacts_handler after EventQuery records are created. The task's
select_for_update lock prevents duplicate processing if both code
paths dispatch.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 21:22:26 +01:00
26 changed files with 100 additions and 1306 deletions

View File

@@ -581,7 +581,7 @@ detect-schema-change: genschema
validate-openapi-schema: genschema validate-openapi-schema: genschema
@echo "Validating OpenAPI schema from schema.json..." @echo "Validating OpenAPI schema from schema.json..."
@python3 -c "from openapi_spec_validator import validate; import json; spec = json.load(open('schema.json')); validate(spec); print('✓ Schema is valid')" @python3 -c "from openapi_spec_validator import validate; import json; spec = json.load(open('schema.json')); validate(spec); print('✓ OpenAPI Schema is valid!')"
docker-compose-clean: awx/projects docker-compose-clean: awx/projects
$(DOCKER_COMPOSE) -f tools/docker-compose/_sources/docker-compose.yml rm -sf $(DOCKER_COMPOSE) -f tools/docker-compose/_sources/docker-compose.yml rm -sf

View File

@@ -122,6 +122,7 @@ from awx.main.scheduler.task_manager_models import TaskManagerModels
from awx.main.redact import UriCleaner, REPLACE_STR from awx.main.redact import UriCleaner, REPLACE_STR
from awx.main.signals import update_inventory_computed_fields from awx.main.signals import update_inventory_computed_fields
from awx.main.validators import vars_validate_or_raise from awx.main.validators import vars_validate_or_raise
from awx.api.versioning import reverse from awx.api.versioning import reverse
@@ -2931,19 +2932,6 @@ class CredentialTypeSerializer(BaseSerializer):
field['label'] = _(field['label']) field['label'] = _(field['label'])
if 'help_text' in field: if 'help_text' in field:
field['help_text'] = _(field['help_text']) field['help_text'] = _(field['help_text'])
# Deep copy inputs to avoid modifying the original model data
inputs = value.get('inputs')
if not isinstance(inputs, dict):
inputs = {}
value['inputs'] = copy.deepcopy(inputs)
fields = value['inputs'].get('fields', [])
if not isinstance(fields, list):
fields = []
# Normalize fields and filter out internal fields
value['inputs']['fields'] = [f for f in fields if not f.get('internal')]
return value return value
def filter_field_metadata(self, fields, method): def filter_field_metadata(self, fields, method):

View File

@@ -1,4 +1,4 @@
--- ---
collections: collections:
- name: ansible.receptor - name: ansible.receptor
version: 2.0.8 version: 2.0.6

View File

@@ -14,7 +14,6 @@ import sys
import time import time
from base64 import b64encode from base64 import b64encode
from collections import OrderedDict from collections import OrderedDict
from jwt import decode as _jwt_decode
from urllib3.exceptions import ConnectTimeoutError from urllib3.exceptions import ConnectTimeoutError
@@ -59,13 +58,8 @@ from drf_spectacular.utils import extend_schema_view, extend_schema
from ansible_base.lib.utils.requests import get_remote_hosts from ansible_base.lib.utils.requests import get_remote_hosts
from ansible_base.rbac.models import RoleEvaluation from ansible_base.rbac.models import RoleEvaluation
from ansible_base.lib.utils.schema import extend_schema_if_available from ansible_base.lib.utils.schema import extend_schema_if_available
from ansible_base.lib.workload_identity.controller import AutomationControllerJobScope
# flags
from flags.state import flag_enabled
# AWX # AWX
from awx.main.tasks.jobs import retrieve_workload_identity_jwt_with_claims
from awx.main.tasks.system import send_notifications, update_inventory_computed_fields from awx.main.tasks.system import send_notifications, update_inventory_computed_fields
from awx.main.access import get_user_queryset from awx.main.access import get_user_queryset
from awx.api.generics import ( from awx.api.generics import (
@@ -1601,177 +1595,7 @@ class CredentialCopy(CopyAPIView):
resource_purpose = 'copy of a credential' resource_purpose = 'copy of a credential'
class OIDCCredentialTestMixin: class CredentialExternalTest(SubDetailAPIView):
"""
Mixin to add OIDC workload identity token support to credential test endpoints.
This mixin provides methods to handle OIDC-enabled external credentials that use
workload identity tokens for authentication.
"""
@staticmethod
def _get_workload_identity_token(job_template: models.JobTemplate, jwt_aud: str) -> str:
"""Generate a workload identity token for a job template.
Args:
job_template: The JobTemplate instance to generate claims for
jwt_aud: The JWT audience claim value
Returns:
str: The generated JWT token
"""
claims = {
AutomationControllerJobScope.CLAIM_ORGANIZATION_NAME: job_template.organization.name,
AutomationControllerJobScope.CLAIM_ORGANIZATION_ID: job_template.organization.id,
AutomationControllerJobScope.CLAIM_PROJECT_NAME: job_template.project.name,
AutomationControllerJobScope.CLAIM_PROJECT_ID: job_template.project.id,
AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_NAME: job_template.name,
AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_ID: job_template.id,
AutomationControllerJobScope.CLAIM_PLAYBOOK_NAME: job_template.playbook,
}
return retrieve_workload_identity_jwt_with_claims(
claims=claims,
audience=jwt_aud,
scope=AutomationControllerJobScope.name,
)
@staticmethod
def _decode_jwt_payload_for_display(jwt_token):
"""Decode JWT payload for display purposes only (signature not verified).
This is safe because the JWT was just created by AWX and is only decoded
to show the user what claims are being sent to the external system.
The external system will perform proper signature verification.
Args:
jwt_token: The JWT token to decode
Returns:
dict: The decoded JWT payload
"""
return _jwt_decode(jwt_token, algorithms=["RS256"], options={"verify_signature": False}) # NOSONAR python:S5659
def _has_workload_identity_token(self, credential_type_inputs):
"""Check if credential type has an internal workload_identity_token field.
Args:
credential_type_inputs: The inputs dict from a credential type
Returns:
bool: True if the credential type has a workload_identity_token field marked as internal
"""
fields = credential_type_inputs.get('fields', []) if isinstance(credential_type_inputs, dict) else []
return any(field.get('internal') and field.get('id') == 'workload_identity_token' for field in fields)
def _validate_and_get_job_template(self, job_template_id):
"""Validate job template ID and return the JobTemplate instance.
Args:
job_template_id: The job template ID from metadata
Returns:
JobTemplate instance
Raises:
ParseError: If job_template_id is invalid or not found
"""
if job_template_id is None:
raise ParseError(_('Job template ID is required.'))
try:
return models.JobTemplate.objects.get(id=int(job_template_id))
except ValueError:
raise ParseError(_('Job template ID must be an integer.'))
except models.JobTemplate.DoesNotExist:
raise ParseError(_('Job template with ID %(id)s does not exist.') % {'id': job_template_id})
def _handle_oidc_credential_test(self, backend_kwargs):
"""
Handle OIDC workload identity token generation for external credential test endpoints.
This method should only be called when FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED is enabled
and the credential type has a workload_identity_token field.
Args:
backend_kwargs: The kwargs dict to pass to the backend (will be modified in place)
Returns:
dict: Response body containing details with the sent JWT payload
Raises:
PermissionDenied: If user lacks access to the job template (re-raised for 403 response)
All other exceptions are caught and converted to 400 responses with error details.
Modifies backend_kwargs in place to add workload_identity_token.
"""
# Validate job template
job_template_id = backend_kwargs.pop('job_template_id', None)
job_template = self._validate_and_get_job_template(job_template_id)
# Check user access
if not self.request.user.can_access(models.JobTemplate, 'start', job_template):
raise PermissionDenied(_('You do not have access to job template with id: %(id)s.') % {'id': job_template.id})
# Generate workload identity token
jwt_token = self._get_workload_identity_token(job_template, backend_kwargs.pop('jwt_aud', None))
backend_kwargs['workload_identity_token'] = jwt_token
return {'details': {'sent_jwt_payload': self._decode_jwt_payload_for_display(jwt_token)}}
def _call_backend_with_error_handling(self, plugin, backend_kwargs, response_body):
"""Call credential backend and handle errors, adding secret_value to response if OIDC details present."""
try:
with set_environ(**settings.AWX_TASK_ENV):
secret_value = plugin.backend(**backend_kwargs)
if 'details' in response_body:
response_body['details']['secret_value'] = secret_value
return Response(response_body, status=status.HTTP_202_ACCEPTED)
except requests.exceptions.HTTPError as exc:
message = self._extract_http_error_message(exc)
self._add_error_to_response(response_body, message)
return Response(response_body, status=status.HTTP_400_BAD_REQUEST)
except Exception as exc:
message = self._extract_generic_error_message(exc)
self._add_error_to_response(response_body, message)
return Response(response_body, status=status.HTTP_400_BAD_REQUEST)
@staticmethod
def _extract_http_error_message(exc):
"""Extract error message from HTTPError, checking response JSON and text."""
message = str(exc)
if not hasattr(exc, 'response') or exc.response is None:
return message
try:
error_data = exc.response.json()
if 'errors' in error_data and error_data['errors']:
return ', '.join(error_data['errors'])
if 'error' in error_data:
return error_data['error']
except (ValueError, KeyError):
if exc.response.text:
return exc.response.text
return message
@staticmethod
def _extract_generic_error_message(exc):
"""Extract error message from exception, handling ConnectTimeoutError specially."""
message = str(exc) if str(exc) else exc.__class__.__name__
for arg in getattr(exc, 'args', []):
if isinstance(getattr(arg, 'reason', None), ConnectTimeoutError):
return str(arg.reason)
return message
@staticmethod
def _add_error_to_response(response_body, message):
"""Add error message to both 'detail' and 'details.error_message' fields."""
response_body['detail'] = message
if 'details' in response_body:
response_body['details']['error_message'] = message
class CredentialExternalTest(OIDCCredentialTestMixin, SubDetailAPIView):
""" """
Test updates to the input values and metadata of an external credential Test updates to the input values and metadata of an external credential
before saving them. before saving them.
@@ -1798,22 +1622,23 @@ class CredentialExternalTest(OIDCCredentialTestMixin, SubDetailAPIView):
if value != '$encrypted$': if value != '$encrypted$':
backend_kwargs[field_name] = value backend_kwargs[field_name] = value
backend_kwargs.update(request.data.get('metadata', {})) backend_kwargs.update(request.data.get('metadata', {}))
try:
# Handle OIDC workload identity token generation if enabled with set_environ(**settings.AWX_TASK_ENV):
response_body = {} obj.credential_type.plugin.backend(**backend_kwargs)
if flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED') and self._has_workload_identity_token(obj.credential_type.inputs): return Response({}, status=status.HTTP_202_ACCEPTED)
try: except requests.exceptions.HTTPError:
oidc_response_body = self._handle_oidc_credential_test(backend_kwargs) message = """Test operation is not supported for credential type {}.
response_body.update(oidc_response_body) This endpoint only supports credentials that connect to
except PermissionDenied: external secret management systems such as CyberArk, HashiCorp
raise Vault, or cloud-based secret managers.""".format(obj.credential_type.kind)
except Exception as exc: return Response({'detail': message}, status=status.HTTP_400_BAD_REQUEST)
error_message = str(exc.detail) if hasattr(exc, 'detail') else str(exc) except Exception as exc:
response_body['detail'] = error_message message = exc.__class__.__name__
response_body['details'] = {'error_message': error_message} exc_args = getattr(exc, 'args', [])
return Response(response_body, status=status.HTTP_400_BAD_REQUEST) for a in exc_args:
if isinstance(getattr(a, 'reason', None), ConnectTimeoutError):
return self._call_backend_with_error_handling(obj.credential_type.plugin, backend_kwargs, response_body) message = str(a.reason)
return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST)
class CredentialInputSourceDetail(RetrieveUpdateDestroyAPIView): class CredentialInputSourceDetail(RetrieveUpdateDestroyAPIView):
@@ -1843,7 +1668,7 @@ class CredentialInputSourceSubList(SubListCreateAPIView):
parent_key = 'target_credential' parent_key = 'target_credential'
class CredentialTypeExternalTest(OIDCCredentialTestMixin, SubDetailAPIView): class CredentialTypeExternalTest(SubDetailAPIView):
""" """
Test a complete set of input values for an external credential before Test a complete set of input values for an external credential before
saving it. saving it.
@@ -1860,22 +1685,19 @@ class CredentialTypeExternalTest(OIDCCredentialTestMixin, SubDetailAPIView):
obj = self.get_object() obj = self.get_object()
backend_kwargs = request.data.get('inputs', {}) backend_kwargs = request.data.get('inputs', {})
backend_kwargs.update(request.data.get('metadata', {})) backend_kwargs.update(request.data.get('metadata', {}))
try:
# Handle OIDC workload identity token generation if enabled obj.plugin.backend(**backend_kwargs)
response_body = {} return Response({}, status=status.HTTP_202_ACCEPTED)
if flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED') and self._has_workload_identity_token(obj.inputs): except requests.exceptions.HTTPError as exc:
try: message = 'HTTP {}'.format(exc.response.status_code)
oidc_response_body = self._handle_oidc_credential_test(backend_kwargs) return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST)
response_body.update(oidc_response_body) except Exception as exc:
except PermissionDenied: message = exc.__class__.__name__
raise args_exc = getattr(exc, 'args', [])
except Exception as exc: for a in args_exc:
error_message = str(exc.detail) if hasattr(exc, 'detail') else str(exc) if isinstance(getattr(a, 'reason', None), ConnectTimeoutError):
response_body['detail'] = error_message message = str(a.reason)
response_body['details'] = {'error_message': error_message} return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST)
return Response(response_body, status=status.HTTP_400_BAD_REQUEST)
return self._call_backend_with_error_handling(obj.plugin, backend_kwargs, response_body)
class HostRelatedSearchMixin(object): class HostRelatedSearchMixin(object):

View File

@@ -344,22 +344,13 @@ class ApiV2ConfigView(APIView):
become_methods=PRIVILEGE_ESCALATION_METHODS, become_methods=PRIVILEGE_ESCALATION_METHODS,
) )
# Check superuser/auditor first if (
if request.user.is_superuser or request.user.is_system_auditor: request.user.is_superuser
has_org_access = True or request.user.is_system_auditor
else: or Organization.accessible_objects(request.user, 'admin_role').exists()
# Single query checking all three organization role types at once or Organization.accessible_objects(request.user, 'auditor_role').exists()
has_org_access = ( or Organization.accessible_objects(request.user, 'project_admin_role').exists()
( ):
Organization.access_qs(request.user, 'change')
| Organization.access_qs(request.user, 'audit')
| Organization.access_qs(request.user, 'add_project')
)
.distinct()
.exists()
)
if has_org_access:
data.update( data.update(
dict( dict(
project_base_dir=settings.PROJECTS_ROOT, project_base_dir=settings.PROJECTS_ROOT,
@@ -367,10 +358,8 @@ class ApiV2ConfigView(APIView):
custom_virtualenvs=get_custom_venv_choices(), custom_virtualenvs=get_custom_venv_choices(),
) )
) )
else: elif JobTemplate.accessible_objects(request.user, 'admin_role').exists():
# Only check JobTemplate access if org check failed data['custom_virtualenvs'] = get_custom_venv_choices()
if JobTemplate.accessible_objects(request.user, 'admin_role').exists():
data['custom_virtualenvs'] = get_custom_venv_choices()
return Response(data) return Response(data)

View File

@@ -409,12 +409,10 @@ class Command(BaseCommand):
del_child_group_pks = list(set(db_children_name_pk_map.values())) del_child_group_pks = list(set(db_children_name_pk_map.values()))
for offset in range(0, len(del_child_group_pks), self._batch_size): for offset in range(0, len(del_child_group_pks), self._batch_size):
child_group_pks = del_child_group_pks[offset : (offset + self._batch_size)] child_group_pks = del_child_group_pks[offset : (offset + self._batch_size)]
children_to_remove = list(db_children.filter(pk__in=child_group_pks)) for db_child in db_children.filter(pk__in=child_group_pks):
if children_to_remove: group_group_count += 1
group_group_count += len(children_to_remove) db_group.children.remove(db_child)
db_group.children.remove(*children_to_remove) logger.debug('Group "%s" removed from group "%s"', db_child.name, db_group.name)
for db_child in children_to_remove:
logger.debug('Group "%s" removed from group "%s"', db_child.name, db_group.name)
# FIXME: Inventory source group relationships # FIXME: Inventory source group relationships
# Delete group/host relationships not present in imported data. # Delete group/host relationships not present in imported data.
db_hosts = db_group.hosts db_hosts = db_group.hosts
@@ -443,12 +441,12 @@ class Command(BaseCommand):
del_host_pks = list(del_host_pks) del_host_pks = list(del_host_pks)
for offset in range(0, len(del_host_pks), self._batch_size): for offset in range(0, len(del_host_pks), self._batch_size):
del_pks = del_host_pks[offset : (offset + self._batch_size)] del_pks = del_host_pks[offset : (offset + self._batch_size)]
hosts_to_remove = list(db_hosts.filter(pk__in=del_pks)) for db_host in db_hosts.filter(pk__in=del_pks):
if hosts_to_remove: group_host_count += 1
group_host_count += len(hosts_to_remove) if db_host not in db_group.hosts.all():
db_group.hosts.remove(*hosts_to_remove) continue
for db_host in hosts_to_remove: db_group.hosts.remove(db_host)
logger.debug('Host "%s" removed from group "%s"', db_host.name, db_group.name) logger.debug('Host "%s" removed from group "%s"', db_host.name, db_group.name)
if settings.SQL_DEBUG: if settings.SQL_DEBUG:
logger.warning( logger.warning(
'group-group and group-host deletions took %d queries for %d relationships', 'group-group and group-host deletions took %d queries for %d relationships',

View File

@@ -531,7 +531,6 @@ class CredentialType(CommonModelNameNotUnique):
existing = ct_class.objects.filter(name=default.name, kind=default.kind).first() existing = ct_class.objects.filter(name=default.name, kind=default.kind).first()
if existing is not None: if existing is not None:
existing.namespace = default.namespace existing.namespace = default.namespace
existing.description = getattr(default, 'description', '')
existing.inputs = {} existing.inputs = {}
existing.injectors = {} existing.injectors = {}
existing.save() existing.save()
@@ -571,14 +570,7 @@ class CredentialType(CommonModelNameNotUnique):
@classmethod @classmethod
def load_plugin(cls, ns, plugin): def load_plugin(cls, ns, plugin):
# TODO: User "side-loaded" credential custom_injectors isn't supported # TODO: User "side-loaded" credential custom_injectors isn't supported
ManagedCredentialType.registry[ns] = SimpleNamespace( ManagedCredentialType.registry[ns] = SimpleNamespace(namespace=ns, name=plugin.name, kind='external', inputs=plugin.inputs, backend=plugin.backend)
namespace=ns,
name=plugin.name,
kind='external',
inputs=plugin.inputs,
backend=plugin.backend,
description=getattr(plugin, 'plugin_description', ''),
)
def inject_credential(self, credential, env, safe_env, args, private_data_dir, container_root=None): def inject_credential(self, credential, env, safe_env, args, private_data_dir, container_root=None):
from awx_plugins.interfaces._temporary_private_inject_api import inject_credential from awx_plugins.interfaces._temporary_private_inject_api import inject_credential
@@ -590,13 +582,7 @@ class CredentialTypeHelper:
@classmethod @classmethod
def get_creation_params(cls, cred_type): def get_creation_params(cls, cred_type):
if cred_type.kind == 'external': if cred_type.kind == 'external':
return { return dict(namespace=cred_type.namespace, kind=cred_type.kind, name=cred_type.name, managed=True)
'namespace': cred_type.namespace,
'kind': cred_type.kind,
'name': cred_type.name,
'managed': True,
'description': getattr(cred_type, 'description', ''),
}
return dict( return dict(
namespace=cred_type.namespace, namespace=cred_type.namespace,
kind=cred_type.kind, kind=cred_type.kind,

View File

@@ -345,11 +345,7 @@ class WorkflowJobNode(WorkflowNodeBase):
) )
data.update(accepted_fields) # missing fields are handled in the scheduler data.update(accepted_fields) # missing fields are handled in the scheduler
# build ancestor artifacts, save them to node model for later # build ancestor artifacts, save them to node model for later
# initialize from pre-seeded ancestor_artifacts (set on root nodes of aa_dict = {}
# child workflows via seed_root_ancestor_artifacts to carry artifacts
# from the parent workflow); exclude job_slice which is internal
# metadata handled separately below
aa_dict = {k: v for k, v in self.ancestor_artifacts.items() if k != 'job_slice'} if self.ancestor_artifacts else {}
is_root_node = True is_root_node = True
for parent_node in self.get_parent_nodes(): for parent_node in self.get_parent_nodes():
is_root_node = False is_root_node = False
@@ -370,13 +366,11 @@ class WorkflowJobNode(WorkflowNodeBase):
data['survey_passwords'] = password_dict data['survey_passwords'] = password_dict
# process extra_vars # process extra_vars
extra_vars = data.get('extra_vars', {}) extra_vars = data.get('extra_vars', {})
if ujt_obj and isinstance(ujt_obj, JobTemplate): if ujt_obj and isinstance(ujt_obj, (JobTemplate, WorkflowJobTemplate)):
if aa_dict: if aa_dict:
functional_aa_dict = copy(aa_dict) functional_aa_dict = copy(aa_dict)
functional_aa_dict.pop('_ansible_no_log', None) functional_aa_dict.pop('_ansible_no_log', None)
extra_vars.update(functional_aa_dict) extra_vars.update(functional_aa_dict)
elif ujt_obj and isinstance(ujt_obj, WorkflowJobTemplate):
pass # artifacts are applied via seed_root_ancestor_artifacts in the task manager
# Workflow Job extra_vars higher precedence than ancestor artifacts # Workflow Job extra_vars higher precedence than ancestor artifacts
extra_vars.update(wj_special_vars) extra_vars.update(wj_special_vars)
@@ -740,18 +734,6 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
wj = wj.get_workflow_job() wj = wj.get_workflow_job()
return ancestors return ancestors
def seed_root_ancestor_artifacts(self, artifacts):
"""Apply parent workflow artifacts to root nodes so they propagate
through the normal ancestor_artifacts channel instead of being
baked into this workflow's extra_vars."""
self.workflow_job_nodes.exclude(
workflowjobnodes_success__isnull=False,
).exclude(
workflowjobnodes_failure__isnull=False,
).exclude(
workflowjobnodes_always__isnull=False,
).update(ancestor_artifacts=artifacts)
def get_effective_artifacts(self, **kwargs): def get_effective_artifacts(self, **kwargs):
""" """
For downstream jobs of a workflow nested inside of a workflow, For downstream jobs of a workflow nested inside of a workflow,

View File

@@ -241,8 +241,6 @@ class WorkflowManager(TaskBase):
job = spawn_node.unified_job_template.create_unified_job(**kv) job = spawn_node.unified_job_template.create_unified_job(**kv)
spawn_node.job = job spawn_node.job = job
spawn_node.save() spawn_node.save()
if spawn_node.ancestor_artifacts and isinstance(spawn_node.unified_job_template, WorkflowJobTemplate):
job.seed_root_ancestor_artifacts(spawn_node.ancestor_artifacts)
logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk) logger.debug('Spawned %s in %s for node %s', job.log_format, workflow_job.log_format, spawn_node.pk)
can_start = True can_start = True
if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate): if isinstance(spawn_node.unified_job_template, WorkflowJobTemplate):

View File

@@ -277,7 +277,6 @@ class RunnerCallback:
def artifacts_handler(self, artifact_dir): def artifacts_handler(self, artifact_dir):
success, query_file_contents = try_load_query_file(artifact_dir) success, query_file_contents = try_load_query_file(artifact_dir)
if success: if success:
self.delay_update(event_queries_processed=False)
collections_info = collect_queries(query_file_contents) collections_info = collect_queries(query_file_contents)
for collection, data in collections_info.items(): for collection, data in collections_info.items():
version = data['version'] version = data['version']
@@ -301,6 +300,24 @@ class RunnerCallback:
else: else:
logger.warning(f'The file {COLLECTION_FILENAME} unexpectedly did not contain ansible_version') logger.warning(f'The file {COLLECTION_FILENAME} unexpectedly did not contain ansible_version')
# Write event_queries_processed and installed_collections directly
# to the DB instead of using delay_update. delay_update defers
# writes until the final job status save, but
# events_processed_hook (called from both the task runner after
# the final save and the callback receiver after the wrapup
# event) needs event_queries_processed=False visible in the DB
# to dispatch save_indirect_host_entries. The field defaults to
# True, so without a direct write the hook would see True and
# skip the dispatch. installed_collections is also written
# directly so it is available if the callback receiver
# dispatches before the final save.
from awx.main.models import Job
db_updates = {'event_queries_processed': False}
if 'installed_collections' in query_file_contents:
db_updates['installed_collections'] = query_file_contents['installed_collections']
Job.objects.filter(id=self.instance.id).update(**db_updates)
self.artifacts_processed = True self.artifacts_processed = True

View File

@@ -94,7 +94,7 @@ from flags.state import flag_enabled
# Workload Identity # Workload Identity
from ansible_base.lib.workload_identity.controller import AutomationControllerJobScope from ansible_base.lib.workload_identity.controller import AutomationControllerJobScope
from awx.main.utils.workload_identity import retrieve_workload_identity_jwt_with_claims from ansible_base.resource_registry.workload_identity_client import get_workload_identity_client
logger = logging.getLogger('awx.main.tasks.jobs') logger = logging.getLogger('awx.main.tasks.jobs')
@@ -168,12 +168,14 @@ def retrieve_workload_identity_jwt(
Raises: Raises:
RuntimeError: if the workload identity client is not configured. RuntimeError: if the workload identity client is not configured.
""" """
return retrieve_workload_identity_jwt_with_claims( client = get_workload_identity_client()
populate_claims_for_workload(unified_job), if client is None:
audience, raise RuntimeError("Workload identity client is not configured")
scope, claims = populate_claims_for_workload(unified_job)
workload_ttl_seconds, kwargs = {"claims": claims, "scope": scope, "audience": audience}
) if workload_ttl_seconds:
kwargs["workload_ttl_seconds"] = workload_ttl_seconds
return client.request_workload_jwt(**kwargs).jwt
def with_path_cleanup(f): def with_path_cleanup(f):
@@ -228,19 +230,16 @@ class BaseTask(object):
# Convert to list to prevent re-evaluation of QuerySet # Convert to list to prevent re-evaluation of QuerySet
return list(credentials_list) return list(credentials_list)
def populate_workload_identity_tokens(self, additional_credentials=None): def populate_workload_identity_tokens(self):
""" """
Populate credentials with workload identity tokens. Populate credentials with workload identity tokens.
Sets the context on Credential objects that have input sources Sets the context on Credential objects that have input sources
using compatible external credential types. using compatible external credential types.
""" """
credentials = list(self._credentials)
if additional_credentials:
credentials.extend(additional_credentials)
credential_input_sources = ( credential_input_sources = (
(credential.context, src) (credential.context, src)
for credential in credentials for credential in self._credentials
for src in credential.input_sources.all() for src in credential.input_sources.all()
if any( if any(
field.get('id') == 'workload_identity_token' and field.get('internal') field.get('id') == 'workload_identity_token' and field.get('internal')
@@ -1683,7 +1682,7 @@ class RunProjectUpdate(BaseTask):
return params return params
def build_credentials_list(self, project_update): def build_credentials_list(self, project_update):
if project_update.credential: if project_update.scm_type == 'insights' and project_update.credential:
return [project_update.credential] return [project_update.credential]
return [] return []
@@ -1866,24 +1865,6 @@ class RunInventoryUpdate(SourceControlMixin, BaseTask):
# All credentials not used by inventory source injector # All credentials not used by inventory source injector
return inventory_update.get_extra_credentials() return inventory_update.get_extra_credentials()
def populate_workload_identity_tokens(self, additional_credentials=None):
"""Also generate OIDC tokens for the cloud credential.
The cloud credential is not in _credentials (it is handled by the
inventory source injector), but it may still need a workload identity
token generated for it.
"""
cloud_cred = self.instance.get_cloud_credential()
creds = list(additional_credentials or [])
if cloud_cred:
creds.append(cloud_cred)
super().populate_workload_identity_tokens(additional_credentials=creds or None)
# Override get_cloud_credential on this instance so the injector
# uses the credential with OIDC context instead of doing a fresh
# DB fetch that would lose it.
if cloud_cred and cloud_cred.context:
self.instance.get_cloud_credential = lambda: cloud_cred
def build_project_dir(self, inventory_update, private_data_dir): def build_project_dir(self, inventory_update, private_data_dir):
source_project = None source_project = None
if inventory_update.inventory_source: if inventory_update.inventory_source:

View File

@@ -1,11 +0,0 @@
---
- hosts: all
gather_facts: false
connection: local
tasks:
- name: Set artifacts via set_stats
ansible.builtin.set_stats:
data: "{{ stats_data }}"
per_host: false
aggregate: false
when: stats_data is defined

View File

@@ -1,84 +0,0 @@
import pytest
from awx.api.versioning import reverse
from rest_framework import status
from awx.main.models.jobs import JobTemplate
@pytest.mark.django_db
class TestConfigEndpointFields:
def test_base_fields_all_users(self, get, rando):
url = reverse('api:api_v2_config_view')
response = get(url, rando, expect=200)
assert 'time_zone' in response.data
assert 'license_info' in response.data
assert 'version' in response.data
assert 'eula' in response.data
assert 'analytics_status' in response.data
assert 'analytics_collectors' in response.data
assert 'become_methods' in response.data
@pytest.mark.parametrize(
"role_type",
[
"superuser",
"system_auditor",
"org_admin",
"org_auditor",
"org_project_admin",
],
)
def test_privileged_users_conditional_fields(self, get, user, organization, admin, role_type):
url = reverse('api:api_v2_config_view')
if role_type == "superuser":
test_user = admin
elif role_type == "system_auditor":
test_user = user('system-auditor', is_superuser=False)
test_user.is_system_auditor = True
test_user.save()
elif role_type == "org_admin":
test_user = user('org-admin', is_superuser=False)
organization.admin_role.members.add(test_user)
elif role_type == "org_auditor":
test_user = user('org-auditor', is_superuser=False)
organization.auditor_role.members.add(test_user)
elif role_type == "org_project_admin":
test_user = user('org-project-admin', is_superuser=False)
organization.project_admin_role.members.add(test_user)
response = get(url, test_user, expect=200)
assert 'project_base_dir' in response.data
assert 'project_local_paths' in response.data
assert 'custom_virtualenvs' in response.data
def test_job_template_admin_gets_venvs_only(self, get, user, organization, project, inventory):
"""Test that JobTemplate admin without org access gets only custom_virtualenvs"""
jt_admin = user('jt-admin', is_superuser=False)
jt = JobTemplate.objects.create(name='test-jt', organization=organization, project=project, inventory=inventory)
jt.admin_role.members.add(jt_admin)
url = reverse('api:api_v2_config_view')
response = get(url, jt_admin, expect=200)
assert 'custom_virtualenvs' in response.data
assert 'project_base_dir' not in response.data
assert 'project_local_paths' not in response.data
def test_normal_user_no_conditional_fields(self, get, rando):
url = reverse('api:api_v2_config_view')
response = get(url, rando, expect=200)
assert 'project_base_dir' not in response.data
assert 'project_local_paths' not in response.data
assert 'custom_virtualenvs' not in response.data
def test_unauthenticated_denied(self, get):
"""Test that unauthenticated requests are denied"""
url = reverse('api:api_v2_config_view')
response = get(url, None, expect=401)
assert response.status_code == status.HTTP_401_UNAUTHORIZED

View File

@@ -2,7 +2,6 @@ import json
import pytest import pytest
from ansible_base.lib.testing.util import feature_flag_enabled
from awx.main.models.credential import CredentialType, Credential from awx.main.models.credential import CredentialType, Credential
from awx.api.versioning import reverse from awx.api.versioning import reverse
@@ -160,8 +159,7 @@ def test_create_as_admin(get, post, admin):
response = get(reverse('api:credential_type_list'), admin) response = get(reverse('api:credential_type_list'), admin)
assert response.data['count'] == 1 assert response.data['count'] == 1
assert response.data['results'][0]['name'] == 'Custom Credential Type' assert response.data['results'][0]['name'] == 'Custom Credential Type'
# Serializer normalizes empty inputs to {'fields': []} assert response.data['results'][0]['inputs'] == {}
assert response.data['results'][0]['inputs'] == {'fields': []}
assert response.data['results'][0]['injectors'] == {} assert response.data['results'][0]['injectors'] == {}
assert response.data['results'][0]['managed'] is False assert response.data['results'][0]['managed'] is False
@@ -476,98 +474,3 @@ def test_credential_type_rbac_external_test(post, alice, admin, credentialtype_e
data = {'inputs': {}, 'metadata': {}} data = {'inputs': {}, 'metadata': {}}
assert post(url, data, admin).status_code == 202 assert post(url, data, admin).status_code == 202
assert post(url, data, alice).status_code == 403 assert post(url, data, alice).status_code == 403
# --- Tests for internal field filtering with None/invalid inputs ---
@pytest.mark.django_db
def test_credential_type_with_none_inputs(get, admin):
"""Test that credential type with empty inputs dict works correctly."""
# Create a credential type with empty dict
ct = CredentialType.objects.create(
kind='cloud',
name='Test Type',
managed=False,
inputs={}, # Empty dict, not None (DB has NOT NULL constraint)
)
url = reverse('api:credential_type_detail', kwargs={'pk': ct.pk})
response = get(url, admin)
assert response.status_code == 200
# Should have normalized inputs to empty dict
assert 'inputs' in response.data
assert isinstance(response.data['inputs'], dict)
assert response.data['inputs']['fields'] == []
@pytest.mark.django_db
def test_credential_type_with_invalid_inputs_type(get, admin):
"""Test that credential type with non-dict inputs doesn't cause errors."""
# Create a credential type with invalid inputs type
ct = CredentialType.objects.create(kind='cloud', name='Test Type', managed=False, inputs={'fields': 'not-a-list'})
url = reverse('api:credential_type_detail', kwargs={'pk': ct.pk})
response = get(url, admin)
assert response.status_code == 200
# Should gracefully handle invalid fields type
assert 'inputs' in response.data
assert response.data['inputs']['fields'] == []
@pytest.mark.django_db
def test_credential_type_filters_internal_fields(get, admin):
"""Test that internal fields are filtered from API responses."""
ct = CredentialType.objects.create(
kind='cloud',
name='Test OIDC Type',
managed=False,
inputs={
'fields': [
{'id': 'url', 'label': 'URL', 'type': 'string'},
{'id': 'token', 'label': 'Token', 'type': 'string', 'secret': True, 'internal': True},
{'id': 'public_field', 'label': 'Public', 'type': 'string'},
]
},
)
url = reverse('api:credential_type_detail', kwargs={'pk': ct.pk})
with feature_flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'):
response = get(url, admin)
assert response.status_code == 200
field_ids = [f['id'] for f in response.data['inputs']['fields']]
# Internal field should be filtered out
assert 'token' not in field_ids
assert 'url' in field_ids
assert 'public_field' in field_ids
@pytest.mark.django_db
def test_credential_type_list_filters_internal_fields(get, admin):
"""Test that internal fields are filtered in list view."""
CredentialType.objects.create(
kind='cloud',
name='Test OIDC Type',
managed=False,
inputs={
'fields': [
{'id': 'url', 'label': 'URL', 'type': 'string'},
{'id': 'workload_identity_token', 'label': 'Token', 'type': 'string', 'secret': True, 'internal': True},
]
},
)
url = reverse('api:credential_type_list')
with feature_flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'):
response = get(url, admin)
assert response.status_code == 200
# Find our credential type in the results
test_ct = next((ct for ct in response.data['results'] if ct['name'] == 'Test OIDC Type'), None)
assert test_ct is not None
field_ids = [f['id'] for f in test_ct['inputs']['fields']]
# Internal field should be filtered out
assert 'workload_identity_token' not in field_ids
assert 'url' in field_ids

View File

@@ -1,259 +0,0 @@
"""
Tests for OIDC workload identity credential test endpoints.
Tests the /api/v2/credentials/<id>/test/ and /api/v2/credential_types/<id>/test/
endpoints when used with OIDC-enabled credential types.
"""
import pytest
from unittest import mock
from django.test import override_settings
from awx.main.models import Credential, CredentialType, JobTemplate
from awx.api.versioning import reverse
@pytest.fixture
def job_template(organization, project):
"""Job template with organization and project for OIDC JWT generation."""
return JobTemplate.objects.create(name='test-jt', organization=organization, project=project, playbook='helloworld.yml')
@pytest.fixture
def oidc_credentialtype():
"""Create a credential type with workload_identity_token internal field."""
oidc_type_inputs = {
'fields': [
{'id': 'url', 'label': 'Vault URL', 'type': 'string', 'help_text': 'The Vault server URL.'},
{'id': 'auth_path', 'label': 'Auth Path', 'type': 'string', 'help_text': 'JWT auth mount path.'},
{'id': 'role_id', 'label': 'Role ID', 'type': 'string', 'help_text': 'Vault role.'},
{'id': 'jwt_aud', 'label': 'JWT Audience', 'type': 'string', 'help_text': 'Expected audience.'},
{'id': 'workload_identity_token', 'label': 'Workload Identity Token', 'type': 'string', 'secret': True, 'internal': True},
],
'metadata': [
{'id': 'secret_path', 'label': 'Secret Path', 'type': 'string'},
{'id': 'job_template_id', 'label': 'Job Template ID', 'type': 'string'},
],
'required': ['url', 'auth_path', 'role_id'],
}
class MockPlugin(object):
def backend(self, **kwargs):
# Simulate successful backend call
return 'secret'
with mock.patch('awx.main.models.credential.CredentialType.plugin', new_callable=mock.PropertyMock) as mock_plugin:
mock_plugin.return_value = MockPlugin()
oidc_type = CredentialType(kind='external', managed=True, namespace='hashivault-kv-oidc', name='HashiCorp Vault KV (OIDC)', inputs=oidc_type_inputs)
oidc_type.save()
yield oidc_type
@pytest.fixture
def oidc_credential(oidc_credentialtype):
"""Create a credential using the OIDC credential type."""
return Credential.objects.create(
credential_type=oidc_credentialtype,
name='oidc-vault-cred',
inputs={'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'},
)
@pytest.fixture
def mock_oidc_backend():
"""Fixture that mocks OIDC JWT generation and credential backend."""
with mock.patch('awx.api.views.retrieve_workload_identity_jwt_with_claims') as mock_jwt, mock.patch('awx.api.views._jwt_decode') as mock_decode, mock.patch(
'awx.main.models.credential.CredentialType.plugin', new_callable=mock.PropertyMock
) as mock_plugin:
# Set default return values
mock_jwt.return_value = 'fake.jwt.token'
mock_decode.return_value = {'iss': 'http://gateway/o', 'aud': 'vault'}
# Create mock backend
mock_backend = mock.MagicMock()
mock_backend.backend.return_value = 'secret'
mock_plugin.return_value = mock_backend
# Yield all mocks for test customization
yield {
'jwt': mock_jwt,
'decode': mock_decode,
'plugin': mock_plugin,
'backend': mock_backend,
}
# --- Tests for CredentialExternalTest endpoint ---
@pytest.mark.django_db
@override_settings(FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED=False)
def test_credential_test_without_oidc_feature_flag(post, admin, oidc_credential):
"""Test that credential test works without OIDC feature flag enabled."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': '1'}}
with mock.patch('awx.main.models.credential.CredentialType.plugin', new_callable=mock.PropertyMock) as mock_plugin:
mock_backend = mock.MagicMock()
mock_backend.backend.return_value = 'secret'
mock_plugin.return_value = mock_backend
response = post(url, data, admin)
assert response.status_code == 202
# Should not contain JWT payload when feature flag is disabled
assert 'details' not in response.data or 'sent_jwt_payload' not in response.data.get('details', {})
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
@pytest.mark.parametrize(
'job_template_id, expected_error',
[
(None, 'Job template ID is required'),
('not-an-integer', 'must be an integer'),
('99999', 'does not exist'),
],
ids=['missing_job_template_id', 'invalid_job_template_id_type', 'nonexistent_job_template_id'],
)
def test_credential_test_job_template_validation(mock_flag, post, admin, oidc_credential, job_template_id, expected_error):
"""Test that invalid job_template_id values return 400 with appropriate error messages."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret'}}
if job_template_id is not None:
data['metadata']['job_template_id'] = job_template_id
response = post(url, data, admin)
assert response.status_code == 400
assert 'details' in response.data
assert 'error_message' in response.data['details']
assert expected_error in response.data['details']['error_message']
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_no_access_to_job_template(mock_flag, post, alice, oidc_credential, job_template):
"""Test that user without access to job template gets 403."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
# Give alice use permission on credential but not on job template
oidc_credential.use_role.members.add(alice)
response = post(url, data, alice)
assert response.status_code == 403
assert 'You do not have access to job template' in str(response.data)
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_success_returns_jwt_payload(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend):
"""Test that successful test returns JWT payload in response."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
# Customize mock for this test
mock_oidc_backend['decode'].return_value = {
'iss': 'http://gateway/o',
'sub': 'system:serviceaccount:default:awx-operator',
'aud': 'vault',
'job_template_id': job_template.id,
}
response = post(url, data, admin)
assert response.status_code == 202
assert 'details' in response.data
assert 'sent_jwt_payload' in response.data['details']
assert response.data['details']['sent_jwt_payload']['job_template_id'] == job_template.id
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_backend_failure_returns_jwt_and_error(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend):
"""Test that backend failure still returns JWT payload along with error message."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
# Make backend fail
mock_oidc_backend['backend'].backend.side_effect = RuntimeError('Connection failed')
response = post(url, data, admin)
assert response.status_code == 400
assert 'details' in response.data
# Both JWT payload and error message should be present
assert 'sent_jwt_payload' in response.data['details']
assert 'error_message' in response.data['details']
assert 'Connection failed' in response.data['details']['error_message']
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_jwt_generation_failure(mock_flag, post, admin, oidc_credential, job_template):
"""Test that JWT generation failure returns error without JWT payload."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
with mock.patch('awx.api.views.OIDCCredentialTestMixin._get_workload_identity_token') as mock_jwt:
mock_jwt.side_effect = RuntimeError('Failed to generate JWT')
response = post(url, data, admin)
assert response.status_code == 400
assert 'details' in response.data
assert 'error_message' in response.data['details']
assert 'Failed to generate JWT' in response.data['details']['error_message']
# No JWT payload when generation fails
assert 'sent_jwt_payload' not in response.data['details']
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_job_template_id_not_passed_to_backend(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend):
"""Test that job_template_id and jwt_aud are removed from backend_kwargs."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
response = post(url, data, admin)
assert response.status_code == 202
# Check that backend was called without job_template_id or jwt_aud
call_kwargs = mock_oidc_backend['backend'].backend.call_args[1]
assert 'job_template_id' not in call_kwargs
assert 'jwt_aud' not in call_kwargs
assert 'workload_identity_token' in call_kwargs
# --- Tests for CredentialTypeExternalTest endpoint ---
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_type_test_missing_job_template_id(mock_flag, post, admin, oidc_credentialtype):
"""Test that missing job_template_id returns 400 for credential type test endpoint."""
url = reverse('api:credential_type_external_test', kwargs={'pk': oidc_credentialtype.pk})
data = {
'inputs': {'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'},
'metadata': {'secret_path': 'test/secret'},
}
response = post(url, data, admin)
assert response.status_code == 400
assert 'details' in response.data
assert 'error_message' in response.data['details']
assert 'Job template ID is required' in response.data['details']['error_message']
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_type_test_success_returns_jwt_payload(mock_flag, post, admin, oidc_credentialtype, job_template, mock_oidc_backend):
"""Test that successful credential type test returns JWT payload."""
url = reverse('api:credential_type_external_test', kwargs={'pk': oidc_credentialtype.pk})
data = {
'inputs': {'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'},
'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)},
}
response = post(url, data, admin)
assert response.status_code == 202
assert 'details' in response.data
assert 'sent_jwt_payload' in response.data['details']

View File

@@ -305,47 +305,6 @@ class TestINIImports:
has_host_group = inventory.groups.get(name='has_a_host') has_host_group = inventory.groups.get(name='has_a_host')
assert has_host_group.hosts.count() == 1 assert has_host_group.hosts.count() == 1
@mock.patch.object(inventory_import, 'AnsibleInventoryLoader', MockLoader)
def test_overwrite_removes_stale_memberships(self, inventory):
"""When overwrite is enabled, host-group and group-group memberships
that are no longer in the imported data should be removed."""
# First import: parent_group has two children, host_group has two hosts
inventory_import.AnsibleInventoryLoader._data = {
"_meta": {"hostvars": {"host1": {}, "host2": {}}},
"all": {"children": ["ungrouped", "parent_group", "child_a", "child_b", "host_group"]},
"parent_group": {"children": ["child_a", "child_b"]},
"host_group": {"hosts": ["host1", "host2"]},
"ungrouped": {"hosts": []},
}
cmd = inventory_import.Command()
cmd.handle(inventory_id=inventory.pk, source=__file__, overwrite=True)
parent = inventory.groups.get(name='parent_group')
assert set(parent.children.values_list('name', flat=True)) == {'child_a', 'child_b'}
host_grp = inventory.groups.get(name='host_group')
assert set(host_grp.hosts.values_list('name', flat=True)) == {'host1', 'host2'}
# Second import: child_b removed from parent_group, host2 moved out of host_group
inventory_import.AnsibleInventoryLoader._data = {
"_meta": {"hostvars": {"host1": {}, "host2": {}}},
"all": {"children": ["ungrouped", "parent_group", "child_a", "child_b", "host_group"]},
"parent_group": {"children": ["child_a"]},
"host_group": {"hosts": ["host1"]},
"ungrouped": {"hosts": ["host2"]},
}
cmd = inventory_import.Command()
cmd.handle(inventory_id=inventory.pk, source=__file__, overwrite=True)
parent.refresh_from_db()
host_grp.refresh_from_db()
# child_b should be removed from parent_group
assert set(parent.children.values_list('name', flat=True)) == {'child_a'}
# host2 should be removed from host_group
assert set(host_grp.hosts.values_list('name', flat=True)) == {'host1'}
# host2 and child_b should still exist in the inventory, just not in those groups
assert inventory.hosts.filter(name='host2').exists()
assert inventory.groups.filter(name='child_b').exists()
@mock.patch.object(inventory_import, 'AnsibleInventoryLoader', MockLoader) @mock.patch.object(inventory_import, 'AnsibleInventoryLoader', MockLoader)
def test_recursive_group_error(self, inventory): def test_recursive_group_error(self, inventory):
inventory_import.AnsibleInventoryLoader._data = { inventory_import.AnsibleInventoryLoader._data = {

View File

@@ -1,206 +0,0 @@
import json
import pytest
from awx.main.tests.live.tests.conftest import wait_for_job
from awx.main.models import JobTemplate, WorkflowJobTemplate, WorkflowJobTemplateNode
JT_NAMES = ('artifact-test-first', 'artifact-test-second', 'artifact-test-reader')
WFT_NAMES = ('artifact-test-outer-wf', 'artifact-test-inner-wf')
@pytest.mark.django_db(transaction=True)
def test_nested_workflow_set_stats_precedence(live_tmp_folder, demo_inv, project_factory, default_org):
"""Reproducer for set_stats artifacts from an outer workflow leaking into
an inner (child) workflow and overriding the inner workflow's own artifacts.
Outer WF: [job_first] --success--> [inner_wf]
Inner WF: [job_second] --success--> [job_reader]
job_first sets via set_stats:
var1: "outer-only" (only source, should propagate through)
var2: "should-be-overridden" (will be overridden by job_second)
job_second sets via set_stats:
var2: "from-inner" (should override outer's value)
var3: "inner-only" (only source, should be available)
job_reader runs debug.yml (no set_stats), we inspect its extra_vars:
var1 should be "outer-only" - outer artifacts propagate when uncontested
var2 should be "from-inner" - inner artifacts override outer (THE BUG)
var3 should be "inner-only" - inner-only artifacts propagate normally
"""
# Clean up resources from prior runs (delete individually for signals)
for name in WFT_NAMES:
for wft in WorkflowJobTemplate.objects.filter(name=name):
wft.delete()
for name in JT_NAMES:
for jt in JobTemplate.objects.filter(name=name):
jt.delete()
proj = project_factory(scm_url=f'file://{live_tmp_folder}/debug')
if proj.current_job:
wait_for_job(proj.current_job)
# job_first: sets var1 (outer-only) and var2 (to be overridden by inner)
jt_first = JobTemplate.objects.create(
name='artifact-test-first',
project=proj,
playbook='set_stats.yml',
inventory=demo_inv,
extra_vars=json.dumps({'stats_data': {'var1': 'outer-only', 'var2': 'should-be-overridden'}}),
)
# job_second: overrides var2, introduces var3
jt_second = JobTemplate.objects.create(
name='artifact-test-second',
project=proj,
playbook='set_stats.yml',
inventory=demo_inv,
extra_vars=json.dumps({'stats_data': {'var2': 'from-inner', 'var3': 'inner-only'}}),
)
# job_reader: just runs, we check what extra_vars it receives
jt_reader = JobTemplate.objects.create(
name='artifact-test-reader',
project=proj,
playbook='debug.yml',
inventory=demo_inv,
)
# Inner WFT: job_second -> job_reader
inner_wft = WorkflowJobTemplate.objects.create(name='artifact-test-inner-wf', organization=default_org)
inner_node_1 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=inner_wft,
unified_job_template=jt_second,
identifier='second',
)
inner_node_2 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=inner_wft,
unified_job_template=jt_reader,
identifier='reader',
)
inner_node_1.success_nodes.add(inner_node_2)
# Outer WFT: job_first -> inner_wf
outer_wft = WorkflowJobTemplate.objects.create(name='artifact-test-outer-wf', organization=default_org)
outer_node_1 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=outer_wft,
unified_job_template=jt_first,
identifier='first',
)
outer_node_2 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=outer_wft,
unified_job_template=inner_wft,
identifier='inner',
)
outer_node_1.success_nodes.add(outer_node_2)
# Launch and wait
outer_wfj = outer_wft.create_unified_job()
outer_wfj.signal_start()
wait_for_job(outer_wfj, running_timeout=120)
# Find the reader job inside the inner workflow
inner_wf_node = outer_wfj.workflow_job_nodes.get(identifier='inner')
inner_wfj = inner_wf_node.job
assert inner_wfj is not None, 'Inner workflow job was never created'
# Check that root node of inner WF (job_second) received outer artifacts
second_node = inner_wfj.workflow_job_nodes.get(identifier='second')
assert second_node.job is not None, 'Second job was never created'
second_extra_vars = json.loads(second_node.job.extra_vars)
assert second_extra_vars.get('var1') == 'outer-only', (
f'Root node var1: expected "outer-only" (outer artifact should be available to root node), '
f'got "{second_extra_vars.get("var1")}". '
f'Outer artifacts are not reaching root nodes of child workflows.'
)
reader_node = inner_wfj.workflow_job_nodes.get(identifier='reader')
assert reader_node.job is not None, 'Reader job was never created'
reader_extra_vars = json.loads(reader_node.job.extra_vars)
# var1: only set by outer job_first, no conflict — should propagate through
assert reader_extra_vars.get('var1') == 'outer-only', f'var1: expected "outer-only" (uncontested outer artifact), ' f'got "{reader_extra_vars.get("var1")}"'
# var2: set by outer as "should-be-overridden", then by inner as "from-inner"
# Inner workflow's own ancestor artifacts should take precedence
assert reader_extra_vars.get('var2') == 'from-inner', (
f'var2: expected "from-inner" (inner workflow artifact should override outer), '
f'got "{reader_extra_vars.get("var2")}". '
f'Outer workflow artifacts are leaking via wj_special_vars. '
f'reader node ancestor_artifacts={reader_node.ancestor_artifacts}'
)
# var3: only set by inner job_second — should propagate normally
assert reader_extra_vars.get('var3') == 'inner-only', f'var3: expected "inner-only" (inner-only artifact), ' f'got "{reader_extra_vars.get("var3")}"'
@pytest.mark.django_db(transaction=True)
def test_workflow_extra_vars_override_artifacts(live_tmp_folder, demo_inv, project_factory, default_org):
"""Workflow extra_vars should take precedence over set_stats artifacts
within a single (non-nested) workflow.
WF (extra_vars: my_var="from-wf-extra-vars"):
[job_setter] --success--> [job_reader]
job_setter sets my_var="from-set-stats" via set_stats
job_reader should see my_var="from-wf-extra-vars" because workflow
extra_vars are higher precedence than ancestor artifacts.
"""
wft_name = 'artifact-test-wf-extra-vars-precedence'
jt_names = ('artifact-test-setter', 'artifact-test-checker')
for wft in WorkflowJobTemplate.objects.filter(name=wft_name):
wft.delete()
for name in jt_names:
for jt in JobTemplate.objects.filter(name=name):
jt.delete()
proj = project_factory(scm_url=f'file://{live_tmp_folder}/debug')
if proj.current_job:
wait_for_job(proj.current_job)
jt_setter = JobTemplate.objects.create(
name='artifact-test-setter',
project=proj,
playbook='set_stats.yml',
inventory=demo_inv,
extra_vars=json.dumps({'stats_data': {'my_var': 'from-set-stats'}}),
)
jt_checker = JobTemplate.objects.create(
name='artifact-test-checker',
project=proj,
playbook='debug.yml',
inventory=demo_inv,
)
wft = WorkflowJobTemplate.objects.create(
name=wft_name,
organization=default_org,
extra_vars=json.dumps({'my_var': 'from-wf-extra-vars'}),
)
node_1 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=wft,
unified_job_template=jt_setter,
identifier='setter',
)
node_2 = WorkflowJobTemplateNode.objects.create(
workflow_job_template=wft,
unified_job_template=jt_checker,
identifier='checker',
)
node_1.success_nodes.add(node_2)
wfj = wft.create_unified_job()
wfj.signal_start()
wait_for_job(wfj, running_timeout=120)
checker_node = wfj.workflow_job_nodes.get(identifier='checker')
assert checker_node.job is not None, 'Checker job was never created'
checker_extra_vars = json.loads(checker_node.job.extra_vars)
assert checker_extra_vars.get('my_var') == 'from-wf-extra-vars', (
f'Expected my_var="from-wf-extra-vars" (workflow extra_vars should override artifacts), '
f'got my_var="{checker_extra_vars.get("my_var")}". '
f'checker node ancestor_artifacts={checker_node.ancestor_artifacts}'
)

View File

@@ -2,11 +2,7 @@
import pytest import pytest
from types import SimpleNamespace
from unittest import mock
from awx.main.models import Credential, CredentialType from awx.main.models import Credential, CredentialType
from awx.main.models.credential import CredentialTypeHelper, ManagedCredentialType
from django.apps import apps from django.apps import apps
@@ -82,53 +78,3 @@ def test_credential_context_property_independent_instances():
assert cred1.context == {'key1': 'value1'} assert cred1.context == {'key1': 'value1'}
assert cred2.context == {'key2': 'value2'} assert cred2.context == {'key2': 'value2'}
assert cred1.context is not cred2.context assert cred1.context is not cred2.context
def test_load_plugin_passes_description():
plugin = SimpleNamespace(name='test_plugin', inputs={'fields': []}, backend=None, plugin_description='A test plugin')
CredentialType.load_plugin('test_ns', plugin)
entry = ManagedCredentialType.registry['test_ns']
assert entry.description == 'A test plugin'
del ManagedCredentialType.registry['test_ns']
def test_load_plugin_missing_description():
plugin = SimpleNamespace(name='test_plugin', inputs={'fields': []}, backend=None)
CredentialType.load_plugin('test_ns', plugin)
entry = ManagedCredentialType.registry['test_ns']
assert entry.description == ''
del ManagedCredentialType.registry['test_ns']
def test_get_creation_params_external_includes_description():
cred_type = SimpleNamespace(namespace='test_ns', kind='external', name='Test', description='My description')
params = CredentialTypeHelper.get_creation_params(cred_type)
assert params['description'] == 'My description'
def test_get_creation_params_external_missing_description():
cred_type = SimpleNamespace(namespace='test_ns', kind='external', name='Test')
params = CredentialTypeHelper.get_creation_params(cred_type)
assert params['description'] == ''
@pytest.mark.django_db
def test_setup_tower_managed_defaults_updates_description():
registry_entry = SimpleNamespace(
namespace='test_ns',
kind='external',
name='Test Plugin',
inputs={'fields': []},
backend=None,
description='Updated description',
)
# Create an existing credential type with no description
ct = CredentialType.objects.create(name='Test Plugin', kind='external', namespace='old_ns')
assert ct.description == ''
with mock.patch.dict(ManagedCredentialType.registry, {'test_ns': registry_entry}, clear=True):
CredentialType._setup_tower_managed_defaults()
ct.refresh_from_db()
assert ct.description == 'Updated description'
assert ct.namespace == 'test_ns'

View File

@@ -473,7 +473,7 @@ def test_populate_claims_for_adhoc_command(workload_attrs, expected_claims):
assert claims == expected_claims assert claims == expected_claims
@mock.patch('awx.main.utils.workload_identity.get_workload_identity_client') @mock.patch('awx.main.tasks.jobs.get_workload_identity_client')
def test_retrieve_workload_identity_jwt_returns_jwt_from_client(mock_get_client): def test_retrieve_workload_identity_jwt_returns_jwt_from_client(mock_get_client):
"""retrieve_workload_identity_jwt returns the JWT string from the client.""" """retrieve_workload_identity_jwt returns the JWT string from the client."""
mock_client = mock.MagicMock() mock_client = mock.MagicMock()
@@ -502,7 +502,7 @@ def test_retrieve_workload_identity_jwt_returns_jwt_from_client(mock_get_client)
assert call_kwargs['claims'][AutomationControllerJobScope.CLAIM_JOB_NAME] == 'Test Job' assert call_kwargs['claims'][AutomationControllerJobScope.CLAIM_JOB_NAME] == 'Test Job'
@mock.patch('awx.main.utils.workload_identity.get_workload_identity_client') @mock.patch('awx.main.tasks.jobs.get_workload_identity_client')
def test_retrieve_workload_identity_jwt_passes_audience_and_scope(mock_get_client): def test_retrieve_workload_identity_jwt_passes_audience_and_scope(mock_get_client):
"""retrieve_workload_identity_jwt passes audience and scope to the client.""" """retrieve_workload_identity_jwt passes audience and scope to the client."""
mock_client = mock.MagicMock() mock_client = mock.MagicMock()
@@ -518,7 +518,7 @@ def test_retrieve_workload_identity_jwt_passes_audience_and_scope(mock_get_clien
mock_client.request_workload_jwt.assert_called_once_with(claims={'job_id': 1}, scope=scope, audience=audience) mock_client.request_workload_jwt.assert_called_once_with(claims={'job_id': 1}, scope=scope, audience=audience)
@mock.patch('awx.main.utils.workload_identity.get_workload_identity_client') @mock.patch('awx.main.tasks.jobs.get_workload_identity_client')
def test_retrieve_workload_identity_jwt_passes_workload_ttl(mock_get_client): def test_retrieve_workload_identity_jwt_passes_workload_ttl(mock_get_client):
"""retrieve_workload_identity_jwt passes workload_ttl_seconds when provided.""" """retrieve_workload_identity_jwt passes workload_ttl_seconds when provided."""
mock_client = mock.Mock() mock_client = mock.Mock()
@@ -542,7 +542,7 @@ def test_retrieve_workload_identity_jwt_passes_workload_ttl(mock_get_client):
) )
@mock.patch('awx.main.utils.workload_identity.get_workload_identity_client') @mock.patch('awx.main.tasks.jobs.get_workload_identity_client')
def test_retrieve_workload_identity_jwt_raises_when_client_not_configured(mock_get_client): def test_retrieve_workload_identity_jwt_raises_when_client_not_configured(mock_get_client):
"""retrieve_workload_identity_jwt raises RuntimeError when client is None.""" """retrieve_workload_identity_jwt raises RuntimeError when client is None."""
mock_get_client.return_value = None mock_get_client.return_value = None
@@ -590,67 +590,3 @@ def test_populate_workload_identity_tokens_passes_get_instance_timeout_to_client
scope=AutomationControllerJobScope.name, scope=AutomationControllerJobScope.name,
workload_ttl_seconds=expected_ttl, workload_ttl_seconds=expected_ttl,
) )
class TestRunInventoryUpdatePopulateWorkloadIdentityTokens:
"""Tests for RunInventoryUpdate.populate_workload_identity_tokens."""
def test_cloud_credential_passed_as_additional_credential(self):
"""The cloud credential is forwarded to super().populate_workload_identity_tokens via additional_credentials."""
cloud_cred = mock.MagicMock(name='cloud_cred')
cloud_cred.context = {}
task = jobs.RunInventoryUpdate()
task.instance = mock.MagicMock()
task.instance.get_cloud_credential.return_value = cloud_cred
task._credentials = []
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
task.populate_workload_identity_tokens()
mock_super.assert_called_once_with(additional_credentials=[cloud_cred])
def test_no_cloud_credential_calls_super_with_none(self):
"""When there is no cloud credential, super() is called with additional_credentials=None."""
task = jobs.RunInventoryUpdate()
task.instance = mock.MagicMock()
task.instance.get_cloud_credential.return_value = None
task._credentials = []
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
task.populate_workload_identity_tokens()
mock_super.assert_called_once_with(additional_credentials=None)
def test_additional_credentials_combined_with_cloud_credential(self):
"""Caller-supplied additional_credentials are combined with the cloud credential."""
cloud_cred = mock.MagicMock(name='cloud_cred')
cloud_cred.context = {}
extra_cred = mock.MagicMock(name='extra_cred')
task = jobs.RunInventoryUpdate()
task.instance = mock.MagicMock()
task.instance.get_cloud_credential.return_value = cloud_cred
task._credentials = []
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
task.populate_workload_identity_tokens(additional_credentials=[extra_cred])
mock_super.assert_called_once_with(additional_credentials=[extra_cred, cloud_cred])
def test_cloud_credential_override_after_context_set(self):
"""After OIDC processing, get_cloud_credential is overridden on the instance when context is populated."""
cloud_cred = mock.MagicMock(name='cloud_cred')
# Simulate that super().populate_workload_identity_tokens populates context
cloud_cred.context = {'workload_identity_token': 'eyJ.test.jwt'}
task = jobs.RunInventoryUpdate()
task.instance = mock.MagicMock()
task.instance.get_cloud_credential.return_value = cloud_cred
task._credentials = []
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens'):
task.populate_workload_identity_tokens()
# The instance's get_cloud_credential should now return the same object with context
assert task.instance.get_cloud_credential() is cloud_cred

View File

@@ -1,22 +0,0 @@
from ansible_base.resource_registry.workload_identity_client import get_workload_identity_client
__all__ = ['retrieve_workload_identity_jwt_with_claims']
def retrieve_workload_identity_jwt_with_claims(
claims: dict,
audience: str,
scope: str,
workload_ttl_seconds: int | None = None,
) -> str:
"""Retrieve JWT token from workload claims.
Raises:
RuntimeError: if the workload identity client is not configured.
"""
client = get_workload_identity_client()
if client is None:
raise RuntimeError("Workload identity client is not configured")
kwargs = {"claims": claims, "scope": scope, "audience": audience}
if workload_ttl_seconds:
kwargs["workload_ttl_seconds"] = workload_ttl_seconds
return client.request_workload_jwt(**kwargs).jwt

View File

@@ -34,6 +34,9 @@ def get_urlpatterns(prefix=None):
re_path(r'^(?:api/)?500.html$', handle_500), re_path(r'^(?:api/)?500.html$', handle_500),
re_path(r'^csp-violation/', handle_csp_violation), re_path(r'^csp-violation/', handle_csp_violation),
re_path(r'^login/', handle_login_redirect), re_path(r'^login/', handle_login_redirect),
# want api/v2/doesnotexist to return a 404, not match the ui urls,
# so use a negative lookahead assertion here
re_path(r'^(?!api/).*', include('awx.ui.urls', namespace='ui')),
] ]
if settings.DYNACONF.is_development_mode: if settings.DYNACONF.is_development_mode:
@@ -44,12 +47,6 @@ def get_urlpatterns(prefix=None):
except ImportError: except ImportError:
pass pass
# want api/v2/doesnotexist to return a 404, not match the ui urls,
# so use a negative lookahead assertion in the pattern below
urlpatterns += [
re_path(r'^(?!api/).*', include('awx.ui.urls', namespace='ui')),
]
return urlpatterns return urlpatterns

View File

@@ -276,7 +276,6 @@ options:
- '' - ''
- 'github' - 'github'
- 'gitlab' - 'gitlab'
- 'bitbucket_dc'
webhook_credential: webhook_credential:
description: description:
- Personal Access Token for posting back the status to the service API - Personal Access Token for posting back the status to the service API
@@ -437,7 +436,7 @@ def main():
scm_branch=dict(), scm_branch=dict(),
ask_scm_branch_on_launch=dict(type='bool'), ask_scm_branch_on_launch=dict(type='bool'),
job_slice_count=dict(type='int'), job_slice_count=dict(type='int'),
webhook_service=dict(choices=['github', 'gitlab', 'bitbucket_dc', '']), webhook_service=dict(choices=['github', 'gitlab', '']),
webhook_credential=dict(), webhook_credential=dict(),
labels=dict(type="list", elements='str'), labels=dict(type="list", elements='str'),
notification_templates_started=dict(type="list", elements='str'), notification_templates_started=dict(type="list", elements='str'),

View File

@@ -117,7 +117,6 @@ options:
choices: choices:
- github - github
- gitlab - gitlab
- bitbucket_dc
webhook_credential: webhook_credential:
description: description:
- Personal Access Token for posting back the status to the service API - Personal Access Token for posting back the status to the service API
@@ -829,7 +828,7 @@ def main():
ask_inventory_on_launch=dict(type='bool'), ask_inventory_on_launch=dict(type='bool'),
ask_scm_branch_on_launch=dict(type='bool'), ask_scm_branch_on_launch=dict(type='bool'),
ask_limit_on_launch=dict(type='bool'), ask_limit_on_launch=dict(type='bool'),
webhook_service=dict(choices=['github', 'gitlab', 'bitbucket_dc']), webhook_service=dict(choices=['github', 'gitlab']),
webhook_credential=dict(), webhook_credential=dict(),
labels=dict(type="list", elements='str'), labels=dict(type="list", elements='str'),
notification_templates_started=dict(type="list", elements='str'), notification_templates_started=dict(type="list", elements='str'),

View File

@@ -1,124 +0,0 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import pytest
from awx.main.models import JobTemplate, WorkflowJobTemplate
# The backend supports these webhook services on job/workflow templates
# (see awx/main/models/mixins.py). The collection modules must accept all of
# them in their argument_spec ``choices`` list. This test guards against the
# module's choices drifting from the backend -- see AAP-45980, where
# ``bitbucket_dc`` had been supported by the API since migration 0188 but was
# still being rejected by the job_template/workflow_job_template modules.
WEBHOOK_SERVICES = ['github', 'gitlab', 'bitbucket_dc']
@pytest.mark.django_db
@pytest.mark.parametrize('webhook_service', WEBHOOK_SERVICES)
def test_job_template_accepts_webhook_service(run_module, admin_user, project, inventory, webhook_service):
result = run_module(
'job_template',
{
'name': 'foo',
'playbook': 'helloworld.yml',
'project': project.name,
'inventory': inventory.name,
'webhook_service': webhook_service,
'state': 'present',
},
admin_user,
)
assert not result.get('failed', False), result.get('msg', result)
assert result.get('changed', False), result
jt = JobTemplate.objects.get(name='foo')
assert jt.webhook_service == webhook_service
# Re-running with the same args must be a no-op (idempotence).
result = run_module(
'job_template',
{
'name': 'foo',
'playbook': 'helloworld.yml',
'project': project.name,
'inventory': inventory.name,
'webhook_service': webhook_service,
'state': 'present',
},
admin_user,
)
assert not result.get('failed', False), result.get('msg', result)
assert not result.get('changed', True), result
@pytest.mark.django_db
@pytest.mark.parametrize('webhook_service', WEBHOOK_SERVICES)
def test_workflow_job_template_accepts_webhook_service(run_module, admin_user, organization, webhook_service):
result = run_module(
'workflow_job_template',
{
'name': 'foo-workflow',
'organization': organization.name,
'webhook_service': webhook_service,
'state': 'present',
},
admin_user,
)
assert not result.get('failed', False), result.get('msg', result)
assert result.get('changed', False), result
wfjt = WorkflowJobTemplate.objects.get(name='foo-workflow')
assert wfjt.webhook_service == webhook_service
# Re-running with the same args must be a no-op (idempotence).
result = run_module(
'workflow_job_template',
{
'name': 'foo-workflow',
'organization': organization.name,
'webhook_service': webhook_service,
'state': 'present',
},
admin_user,
)
assert not result.get('failed', False), result.get('msg', result)
assert not result.get('changed', True), result
@pytest.mark.django_db
def test_job_template_rejects_unknown_webhook_service(run_module, admin_user, project, inventory):
result = run_module(
'job_template',
{
'name': 'foo',
'playbook': 'helloworld.yml',
'project': project.name,
'inventory': inventory.name,
'webhook_service': 'not_a_real_service',
'state': 'present',
},
admin_user,
)
assert result.get('failed', False), result
assert 'webhook_service' in result.get('msg', '')
@pytest.mark.django_db
def test_workflow_job_template_rejects_unknown_webhook_service(run_module, admin_user, organization):
result = run_module(
'workflow_job_template',
{
'name': 'foo-workflow',
'organization': organization.name,
'webhook_service': 'not_a_real_service',
'state': 'present',
},
admin_user,
)
assert result.get('failed', False), result
assert 'webhook_service' in result.get('msg', '')

View File

@@ -1,5 +1,5 @@
[build-system] [build-system]
requires = ["setuptools>=45", "setuptools_scm[toml]>=6.2,<10"] requires = ["setuptools>=45", "setuptools_scm[toml]>=6.2"]
build-backend = "setuptools.build_meta" build-backend = "setuptools.build_meta"
# Do not uncomment the line below. We need to be able to override the version via a file, and this # Do not uncomment the line below. We need to be able to override the version via a file, and this

View File

@@ -116,7 +116,7 @@ cython==3.1.3
# via -r /awx_devel/requirements/requirements.in # via -r /awx_devel/requirements/requirements.in
daphne==4.2.1 daphne==4.2.1
# via -r /awx_devel/requirements/requirements.in # via -r /awx_devel/requirements/requirements.in
dispatcherd[pg-notify]==2026.3.25 dispatcherd[pg-notify]==2026.02.26
# via -r /awx_devel/requirements/requirements.in # via -r /awx_devel/requirements/requirements.in
distro==1.9.0 distro==1.9.0
# via -r /awx_devel/requirements/requirements.in # via -r /awx_devel/requirements/requirements.in