[AAP-72722] Use url instead of jwt_aud for workload identity audience (#16432)

* [AAP-72722] Use url instead of jwt_aud for workload identity audience

The OIDC credential plugin's jwt_aud field is being removed. Use the
plugin's url field as the audience when requesting workload identity
tokens, since the target service URL is the appropriate audience value.

Assisted-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dan Leehr
2026-04-28 10:53:09 -04:00
committed by GitHub
parent d1b3ae53ae
commit b66c0105ae
4 changed files with 20 additions and 23 deletions

View File

@@ -1610,12 +1610,12 @@ class OIDCCredentialTestMixin:
"""
@staticmethod
def _get_workload_identity_token(job_template: models.JobTemplate, jwt_aud: str) -> str:
def _get_workload_identity_token(job_template: models.JobTemplate, audience: str) -> str:
"""Generate a workload identity token for a job template.
Args:
job_template: The JobTemplate instance to generate claims for
jwt_aud: The JWT audience claim value
audience: The JWT audience claim value
Returns:
str: The generated JWT token
@@ -1631,7 +1631,7 @@ class OIDCCredentialTestMixin:
}
return retrieve_workload_identity_jwt_with_claims(
claims=claims,
audience=jwt_aud,
audience=audience,
scope=AutomationControllerJobScope.name,
)
@@ -1714,7 +1714,7 @@ class OIDCCredentialTestMixin:
raise PermissionDenied(_('You do not have access to job template with id: %(id)s.') % {'id': job_template.id})
# Generate workload identity token
jwt_token = self._get_workload_identity_token(job_template, backend_kwargs.pop('jwt_aud', None))
jwt_token = self._get_workload_identity_token(job_template, backend_kwargs.get('url'))
backend_kwargs['workload_identity_token'] = jwt_token
return {'details': {'sent_jwt_payload': self._decode_jwt_payload_for_display(jwt_token)}}

View File

@@ -254,7 +254,7 @@ class BaseTask(object):
try:
jwt = retrieve_workload_identity_jwt(
self.instance,
audience=input_src.source_credential.get_input('jwt_aud'),
audience=input_src.source_credential.get_input('url'),
scope=AutomationControllerJobScope.name,
workload_ttl_seconds=workload_ttl,
)

View File

@@ -28,7 +28,6 @@ def oidc_credentialtype():
{'id': 'url', 'label': 'Vault URL', 'type': 'string', 'help_text': 'The Vault server URL.'},
{'id': 'auth_path', 'label': 'Auth Path', 'type': 'string', 'help_text': 'JWT auth mount path.'},
{'id': 'role_id', 'label': 'Role ID', 'type': 'string', 'help_text': 'Vault role.'},
{'id': 'jwt_aud', 'label': 'JWT Audience', 'type': 'string', 'help_text': 'Expected audience.'},
{'id': 'workload_identity_token', 'label': 'Workload Identity Token', 'type': 'string', 'secret': True, 'internal': True},
],
'metadata': [
@@ -56,7 +55,7 @@ def oidc_credential(oidc_credentialtype):
return Credential.objects.create(
credential_type=oidc_credentialtype,
name='oidc-vault-cred',
inputs={'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'},
inputs={'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role'},
)
@@ -230,17 +229,17 @@ def test_credential_test_jwt_generation_failure(mock_flag, post, admin, oidc_cre
@pytest.mark.django_db
@mock.patch('awx.api.views.flag_enabled', return_value=True)
def test_credential_test_job_template_id_not_passed_to_backend(mock_flag, post, admin, oidc_credential, job_template, mock_oidc_backend):
"""Test that job_template_id and jwt_aud are removed from backend_kwargs."""
"""Test that job_template_id is removed from backend_kwargs."""
url = reverse('api:credential_external_test', kwargs={'pk': oidc_credential.pk})
data = {'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)}}
response = post(url, data, admin)
assert response.status_code == 202
# Check that backend was called without job_template_id or jwt_aud
# Check that backend was called without job_template_id but with url and workload_identity_token
call_kwargs = mock_oidc_backend['backend'].backend.call_args[1]
assert 'job_template_id' not in call_kwargs
assert 'jwt_aud' not in call_kwargs
assert 'url' in call_kwargs
assert 'workload_identity_token' in call_kwargs
@@ -276,7 +275,7 @@ def test_credential_type_test_missing_job_template_id(mock_flag, post, admin, oi
"""Test that missing job_template_id returns 400 for credential type test endpoint."""
url = reverse('api:credential_type_external_test', kwargs={'pk': oidc_credentialtype.pk})
data = {
'inputs': {'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'},
'inputs': {'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role'},
'metadata': {'secret_path': 'test/secret'},
}
@@ -293,7 +292,7 @@ def test_credential_type_test_success_returns_jwt_payload(mock_flag, post, admin
"""Test that successful credential type test returns JWT payload."""
url = reverse('api:credential_type_external_test', kwargs={'pk': oidc_credentialtype.pk})
data = {
'inputs': {'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role', 'jwt_aud': 'vault'},
'inputs': {'url': 'http://vault.example.com:8200', 'auth_path': 'jwt', 'role_id': 'test-role'},
'metadata': {'secret_path': 'test/secret', 'job_template_id': str(job_template.id)},
}

View File

@@ -486,7 +486,7 @@ def test_populate_workload_identity_tokens_with_flag_enabled(job_template_with_c
managed=False,
inputs={
'fields': [
{'id': 'jwt_aud', 'type': 'string', 'label': 'JWT Audience'},
{'id': 'url', 'type': 'string', 'label': 'Server URL'},
{'id': 'workload_identity_token', 'type': 'string', 'label': 'Workload Identity Token', 'secret': True, 'internal': True},
]
},
@@ -495,7 +495,7 @@ def test_populate_workload_identity_tokens_with_flag_enabled(job_template_with_c
# Create credentials
ssh_cred = Credential.objects.create(credential_type=ssh_type, name='ssh-cred')
source_cred = Credential.objects.create(credential_type=hashivault_type, name='vault-source', inputs={'jwt_aud': 'https://vault.example.com'})
source_cred = Credential.objects.create(credential_type=hashivault_type, name='vault-source', inputs={'url': 'https://vault.example.com'})
target_cred = Credential.objects.create(credential_type=ssh_type, name='target-cred', inputs={'username': 'testuser'})
# Create input source linking source credential to target credential
@@ -545,7 +545,7 @@ def test_populate_workload_identity_tokens_passes_workload_ttl_from_job_timeout(
managed=False,
inputs={
'fields': [
{'id': 'jwt_aud', 'type': 'string', 'label': 'JWT Audience'},
{'id': 'url', 'type': 'string', 'label': 'Server URL'},
{'id': 'workload_identity_token', 'type': 'string', 'label': 'Workload Identity Token', 'secret': True, 'internal': True},
]
},
@@ -553,7 +553,7 @@ def test_populate_workload_identity_tokens_passes_workload_ttl_from_job_timeout(
hashivault_type.save()
ssh_cred = Credential.objects.create(credential_type=ssh_type, name='ssh-cred')
source_cred = Credential.objects.create(credential_type=hashivault_type, name='vault-source', inputs={'jwt_aud': 'https://vault.example.com'})
source_cred = Credential.objects.create(credential_type=hashivault_type, name='vault-source', inputs={'url': 'https://vault.example.com'})
target_cred = Credential.objects.create(credential_type=ssh_type, name='target-cred', inputs={'username': 'testuser'})
CredentialInputSource.objects.create(
@@ -595,7 +595,7 @@ def test_populate_workload_identity_tokens_with_flag_disabled(job_template_with_
managed=False,
inputs={
'fields': [
{'id': 'jwt_aud', 'type': 'string', 'label': 'JWT Audience'},
{'id': 'url', 'type': 'string', 'label': 'Server URL'},
{'id': 'workload_identity_token', 'type': 'string', 'label': 'Workload Identity Token', 'secret': True, 'internal': True},
]
},
@@ -647,7 +647,7 @@ def test_populate_workload_identity_tokens_multiple_input_sources_per_credential
managed=False,
inputs={
'fields': [
{'id': 'jwt_aud', 'type': 'string', 'label': 'JWT Audience'},
{'id': 'url', 'type': 'string', 'label': 'Server URL'},
{'id': 'workload_identity_token', 'type': 'string', 'label': 'Workload Identity Token', 'secret': True, 'internal': True},
]
},
@@ -660,7 +660,7 @@ def test_populate_workload_identity_tokens_multiple_input_sources_per_credential
managed=False,
inputs={
'fields': [
{'id': 'jwt_aud', 'type': 'string', 'label': 'JWT Audience'},
{'id': 'url', 'type': 'string', 'label': 'Server URL'},
{'id': 'workload_identity_token', 'type': 'string', 'label': 'Workload Identity Token', 'secret': True, 'internal': True},
]
},
@@ -668,11 +668,9 @@ def test_populate_workload_identity_tokens_multiple_input_sources_per_credential
hashivault_ssh_type.save()
# Create source credentials with different audiences
source_cred_kv = Credential.objects.create(
credential_type=hashivault_kv_type, name='vault-kv-source', inputs={'jwt_aud': 'https://vault-kv.example.com'}
)
source_cred_kv = Credential.objects.create(credential_type=hashivault_kv_type, name='vault-kv-source', inputs={'url': 'https://vault-kv.example.com'})
source_cred_ssh = Credential.objects.create(
credential_type=hashivault_ssh_type, name='vault-ssh-source', inputs={'jwt_aud': 'https://vault-ssh.example.com'}
credential_type=hashivault_ssh_type, name='vault-ssh-source', inputs={'url': 'https://vault-ssh.example.com'}
)
# Create target credential that uses both sources for different fields