feat: workload identity credentials integration (#16286)

* feat: workload identity credentials integration

* feat: cache credentials and add context property to Credential

Assisted-by: Claude

* feat: include safeguard in case feature flag is disabled

* feat: tests to validate workload identity credentials integration

* fix: affected tests by the credential cache mechanism

* feat: remove word cache from variables and comments, use standard library decorators

* fix: reorder tests in correct files

* Use better error catching mechanisms

* Adjust logic to support multiple credential input sources and use internal field

* Remove hardcoded credential type names

* Add tests for the internal field

Assited-by: Claude
This commit is contained in:
Pablo H.
2026-03-04 16:22:27 +01:00
committed by GitHub
parent 8d191046b5
commit 57f9eb093a
11 changed files with 772 additions and 12 deletions

View File

@@ -1,5 +1,7 @@
import pytest
from ansible_base.lib.testing.util import feature_flag_enabled, feature_flag_disabled
from awx.main.models import CredentialInputSource
from awx.api.versioning import reverse
@@ -316,3 +318,60 @@ def test_create_credential_input_source_with_already_used_input_returns_400(post
]
all_responses = [post(list_url, params, admin) for params in all_params]
assert all_responses.pop().status_code == 400
@pytest.mark.django_db
def test_credential_input_source_passes_workload_identity_token_when_flag_enabled(vault_credential, external_credential, mocker):
"""Test that workload_identity_token is passed to backend when flag is enabled."""
with feature_flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'):
# Add workload_identity_token as an internal field on the external credential type
# so get_input_value resolves it from the per-input-source context
external_credential.credential_type.inputs['fields'].append(
{'id': 'workload_identity_token', 'label': 'Workload Identity Token', 'type': 'string', 'internal': True}
)
# Create an input source
input_source = CredentialInputSource.objects.create(
target_credential=vault_credential,
source_credential=external_credential,
input_field_name='vault_password',
metadata={'key': 'test_key'},
)
# Mock the credential plugin backend
mock_backend = mocker.patch.object(external_credential.credential_type.plugin, 'backend', autospec=True, return_value='test_value')
# Call with context keyed by input source PK
test_context = {input_source.pk: {'workload_identity_token': 'jwt_token_here'}}
result = input_source.get_input_value(context=test_context)
# Verify backend was called with workload_identity_token
assert result == 'test_value'
call_kwargs = mock_backend.call_args[1]
assert call_kwargs['workload_identity_token'] == 'jwt_token_here'
assert call_kwargs['key'] == 'test_key'
@pytest.mark.django_db
def test_credential_input_source_skips_workload_identity_token_when_flag_disabled(vault_credential, external_credential, mocker):
"""Test that workload_identity_token is NOT passed when flag is disabled."""
with feature_flag_disabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'):
# Create an input source
input_source = CredentialInputSource.objects.create(
target_credential=vault_credential,
source_credential=external_credential,
input_field_name='vault_password',
metadata={'key': 'test_key'},
)
# Mock the credential plugin backend
mock_backend = mocker.patch.object(external_credential.credential_type.plugin, 'backend', autospec=True, return_value='test_value')
# Call with context containing workload_identity_token but NO internal field defined,
# simulating a flag-disabled scenario where tokens are not generated upstream
test_context = {input_source.pk: {'workload_identity_token': 'jwt_token_here'}}
result = input_source.get_input_value(context=test_context)
# Verify backend was called WITHOUT workload_identity_token since the credential type
# does not define it as an internal field (flag-disabled path doesn't register it)
assert result == 'test_value'
call_kwargs = mock_backend.call_args[1]
assert 'workload_identity_token' not in call_kwargs
assert call_kwargs['key'] == 'test_key'

View File

@@ -93,6 +93,8 @@ def test_default_cred_types():
'gpg_public_key',
'hashivault_kv',
'hashivault_ssh',
'hashivault-kv-oidc',
'hashivault-ssh-oidc',
'hcp_terraform',
'insights',
'kubernetes_bearer_token',

View File

@@ -8,6 +8,7 @@ from awx.main.models import (
Instance,
Host,
JobHostSummary,
Inventory,
InventoryUpdate,
InventorySource,
Project,
@@ -17,14 +18,60 @@ from awx.main.models import (
InstanceGroup,
Label,
ExecutionEnvironment,
Credential,
CredentialType,
CredentialInputSource,
Organization,
JobTemplate,
)
from awx.main.tasks import jobs
from awx.main.tasks.system import cluster_node_heartbeat
from awx.main.utils.db import bulk_update_sorted_by_id
from ansible_base.lib.testing.util import feature_flag_enabled, feature_flag_disabled
from django.db import OperationalError
from django.test.utils import override_settings
@pytest.fixture
def job_template_with_credentials():
"""
Factory fixture that creates a job template with specified credentials.
Usage:
job = job_template_with_credentials(ssh_cred, vault_cred)
"""
def _create_job_template(
*credentials, org_name='test-org', project_name='test-project', inventory_name='test-inventory', jt_name='test-jt', playbook='test.yml'
):
"""
Create a job template with the given credentials.
Args:
*credentials: Variable number of Credential objects to attach to the job template
org_name: Name for the organization
project_name: Name for the project
inventory_name: Name for the inventory
jt_name: Name for the job template
playbook: Playbook filename
Returns:
Job instance created from the job template
"""
org = Organization.objects.create(name=org_name)
proj = Project.objects.create(name=project_name, organization=org)
inv = Inventory.objects.create(name=inventory_name, organization=org)
jt = JobTemplate.objects.create(name=jt_name, project=proj, inventory=inv, playbook=playbook)
if credentials:
jt.credentials.add(*credentials)
return jt.create_unified_job()
return _create_job_template
@pytest.mark.django_db
def test_orphan_unified_job_creation(instance, inventory):
job = Job.objects.create(job_template=None, inventory=inventory, name='hi world')
@@ -262,3 +309,393 @@ class TestLaunchConfig:
assert config.execution_environment
# We just write the PK instead of trying to assign an item, that happens on the save
assert config.execution_environment_id == ee.id
@pytest.mark.django_db
def test_base_task_credentials_property(job_template_with_credentials):
"""Test that _credentials property caches credentials and doesn't re-query."""
task = jobs.RunJob()
# Create real credentials
ssh_type = CredentialType.defaults['ssh']()
ssh_type.save()
vault_type = CredentialType.defaults['vault']()
vault_type.save()
ssh_cred = Credential.objects.create(credential_type=ssh_type, name='ssh-cred')
vault_cred = Credential.objects.create(credential_type=vault_type, name='vault-cred')
# Create a job with credentials using fixture
job = job_template_with_credentials(ssh_cred, vault_cred)
task.instance = job
# First access should build credentials
result1 = task._credentials
assert len(result1) == 2
assert isinstance(result1, list)
# Second access should return cached value (we can verify by checking it's the same list object)
result2 = task._credentials
assert result2 is result1 # Same object reference
@pytest.mark.django_db
def test_run_job_machine_credential(job_template_with_credentials):
"""Test _machine_credential returns ssh credential from cache."""
task = jobs.RunJob()
# Create credentials
ssh_type = CredentialType.defaults['ssh']()
ssh_type.save()
vault_type = CredentialType.defaults['vault']()
vault_type.save()
ssh_cred = Credential.objects.create(credential_type=ssh_type, name='ssh-cred')
vault_cred = Credential.objects.create(credential_type=vault_type, name='vault-cred')
# Create a job using fixture
job = job_template_with_credentials(ssh_cred, vault_cred)
task.instance = job
# Set cached credentials
task._credentials = [ssh_cred, vault_cred]
# Get machine credential
result = task._machine_credential
assert result == ssh_cred
assert result.credential_type.kind == 'ssh'
@pytest.mark.django_db
def test_run_job_machine_credential_none(job_template_with_credentials):
"""Test _machine_credential returns None when no ssh credential exists."""
task = jobs.RunJob()
# Create only vault credential
vault_type = CredentialType.defaults['vault']()
vault_type.save()
vault_cred = Credential.objects.create(credential_type=vault_type, name='vault-cred')
job = job_template_with_credentials(vault_cred)
task.instance = job
# Set cached credentials
task._credentials = [vault_cred]
# Get machine credential
result = task._machine_credential
assert result is None
@pytest.mark.django_db
def test_run_job_vault_credentials(job_template_with_credentials):
"""Test _vault_credentials returns all vault credentials from cache."""
task = jobs.RunJob()
# Create credentials
vault_type = CredentialType.defaults['vault']()
vault_type.save()
ssh_type = CredentialType.defaults['ssh']()
ssh_type.save()
vault_cred1 = Credential.objects.create(credential_type=vault_type, name='vault-1')
vault_cred2 = Credential.objects.create(credential_type=vault_type, name='vault-2')
ssh_cred = Credential.objects.create(credential_type=ssh_type, name='ssh-cred')
job = job_template_with_credentials(vault_cred1, ssh_cred, vault_cred2)
task.instance = job
# Set cached credentials
task._credentials = [vault_cred1, ssh_cred, vault_cred2]
# Get vault credentials
result = task._vault_credentials
assert len(result) == 2
assert vault_cred1 in result
assert vault_cred2 in result
assert ssh_cred not in result
@pytest.mark.django_db
def test_run_job_network_credentials(job_template_with_credentials):
"""Test _network_credentials returns all network credentials from cache."""
task = jobs.RunJob()
# Create credentials
net_type = CredentialType.defaults['net']()
net_type.save()
ssh_type = CredentialType.defaults['ssh']()
ssh_type.save()
net_cred = Credential.objects.create(credential_type=net_type, name='net-cred')
ssh_cred = Credential.objects.create(credential_type=ssh_type, name='ssh-cred')
job = job_template_with_credentials(net_cred, ssh_cred)
task.instance = job
# Set cached credentials
task._credentials = [net_cred, ssh_cred]
# Get network credentials
result = task._network_credentials
assert len(result) == 1
assert result[0] == net_cred
@pytest.mark.django_db
def test_run_job_cloud_credentials(job_template_with_credentials):
"""Test _cloud_credentials returns all cloud credentials from cache."""
task = jobs.RunJob()
# Create credentials
aws_type = CredentialType.defaults['aws']()
aws_type.save()
ssh_type = CredentialType.defaults['ssh']()
ssh_type.save()
aws_cred = Credential.objects.create(credential_type=aws_type, name='aws-cred')
ssh_cred = Credential.objects.create(credential_type=ssh_type, name='ssh-cred')
job = job_template_with_credentials(aws_cred, ssh_cred)
task.instance = job
# Set cached credentials
task._credentials = [aws_cred, ssh_cred]
# Get cloud credentials
result = task._cloud_credentials
assert len(result) == 1
assert result[0] == aws_cred
@pytest.mark.django_db
@override_settings(RESOURCE_SERVER={'URL': 'https://gateway.example.com', 'SECRET_KEY': 'test-secret-key', 'VALIDATE_HTTPS': False})
def test_populate_workload_identity_tokens_with_flag_enabled(job_template_with_credentials, mocker):
"""Test populate_workload_identity_tokens sets context when flag is enabled."""
with feature_flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'):
task = jobs.RunJob()
# Create credential types
ssh_type = CredentialType.defaults['ssh']()
ssh_type.save()
# Create a workload identity credential type
hashivault_type = CredentialType(
name='HashiCorp Vault Secret Lookup (OIDC)',
kind='cloud',
managed=False,
inputs={
'fields': [
{'id': 'jwt_aud', 'type': 'string', 'label': 'JWT Audience'},
{'id': 'workload_identity_token', 'type': 'string', 'label': 'Workload Identity Token', 'secret': True, 'internal': True},
]
},
)
hashivault_type.save()
# Create credentials
ssh_cred = Credential.objects.create(credential_type=ssh_type, name='ssh-cred')
source_cred = Credential.objects.create(credential_type=hashivault_type, name='vault-source', inputs={'jwt_aud': 'https://vault.example.com'})
target_cred = Credential.objects.create(credential_type=ssh_type, name='target-cred', inputs={'username': 'testuser'})
# Create input source linking source credential to target credential
input_source = CredentialInputSource.objects.create(
target_credential=target_cred, source_credential=source_cred, input_field_name='password', metadata={'path': 'secret/data/password'}
)
# Create a job using fixture
job = job_template_with_credentials(target_cred, ssh_cred)
task.instance = job
# Override cached_property so the loop uses these exact Python objects
task._credentials = [target_cred, ssh_cred]
# Mock only the HTTP response from the Gateway workload identity endpoint
mock_response = mocker.Mock(status_code=200)
mock_response.json.return_value = {'jwt': 'eyJ.test.jwt'}
mock_request = mocker.patch('requests.request', return_value=mock_response, autospec=True)
task.populate_workload_identity_tokens()
# Verify the HTTP call was made to the correct endpoint
mock_request.assert_called_once()
call_kwargs = mock_request.call_args.kwargs
assert call_kwargs['method'] == 'POST'
assert '/api/gateway/v1/workload_identity_tokens' in call_kwargs['url']
# Verify context was set on the credential, keyed by input source PK
assert input_source.pk in target_cred.context
assert target_cred.context[input_source.pk]['workload_identity_token'] == 'eyJ.test.jwt'
@pytest.mark.django_db
def test_populate_workload_identity_tokens_with_flag_disabled(job_template_with_credentials):
"""Test populate_workload_identity_tokens sets error status when flag is disabled."""
with feature_flag_disabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'):
task = jobs.RunJob()
# Create credential types
ssh_type = CredentialType.defaults['ssh']()
ssh_type.save()
# Create a workload identity credential type
hashivault_type = CredentialType(
name='HashiCorp Vault Secret Lookup (OIDC)',
kind='cloud',
managed=False,
inputs={
'fields': [
{'id': 'jwt_aud', 'type': 'string', 'label': 'JWT Audience'},
{'id': 'workload_identity_token', 'type': 'string', 'label': 'Workload Identity Token', 'secret': True, 'internal': True},
]
},
)
hashivault_type.save()
# Create credentials
source_cred = Credential.objects.create(credential_type=hashivault_type, name='vault-source')
target_cred = Credential.objects.create(credential_type=ssh_type, name='target-cred', inputs={'username': 'testuser'})
# Create input source linking source credential to target credential
# Note: Creates the relationship that will trigger the feature flag check
CredentialInputSource.objects.create(
target_credential=target_cred, source_credential=source_cred, input_field_name='password', metadata={'path': 'secret/data/password'}
)
# Create a job using fixture
job = job_template_with_credentials(target_cred)
task.instance = job
# Set cached credentials
task._credentials = [target_cred]
task.populate_workload_identity_tokens()
# Verify job status was set to error
job.refresh_from_db()
assert job.status == 'error'
assert 'FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED' in job.job_explanation
assert 'vault-source' in job.job_explanation
@pytest.mark.django_db
@override_settings(RESOURCE_SERVER={'URL': 'https://gateway.example.com', 'SECRET_KEY': 'test-secret-key', 'VALIDATE_HTTPS': False})
def test_populate_workload_identity_tokens_multiple_input_sources_per_credential(job_template_with_credentials, mocker):
"""Test that a single credential with two input sources from different workload identity
credential types gets a separate JWT token for each input source, keyed by input source PK."""
with feature_flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'):
task = jobs.RunJob()
# Create credential types
ssh_type = CredentialType.defaults['ssh']()
ssh_type.save()
# Create two different workload identity credential types
hashivault_kv_type = CredentialType(
name='HashiCorp Vault Secret Lookup (OIDC)',
kind='cloud',
managed=False,
inputs={
'fields': [
{'id': 'jwt_aud', 'type': 'string', 'label': 'JWT Audience'},
{'id': 'workload_identity_token', 'type': 'string', 'label': 'Workload Identity Token', 'secret': True, 'internal': True},
]
},
)
hashivault_kv_type.save()
hashivault_ssh_type = CredentialType(
name='HashiCorp Vault Signed SSH (OIDC)',
kind='cloud',
managed=False,
inputs={
'fields': [
{'id': 'jwt_aud', 'type': 'string', 'label': 'JWT Audience'},
{'id': 'workload_identity_token', 'type': 'string', 'label': 'Workload Identity Token', 'secret': True, 'internal': True},
]
},
)
hashivault_ssh_type.save()
# Create source credentials with different audiences
source_cred_kv = Credential.objects.create(
credential_type=hashivault_kv_type, name='vault-kv-source', inputs={'jwt_aud': 'https://vault-kv.example.com'}
)
source_cred_ssh = Credential.objects.create(
credential_type=hashivault_ssh_type, name='vault-ssh-source', inputs={'jwt_aud': 'https://vault-ssh.example.com'}
)
# Create target credential that uses both sources for different fields
target_cred = Credential.objects.create(credential_type=ssh_type, name='target-cred', inputs={'username': 'testuser'})
# Create two input sources on the same target credential, each for a different field
input_source_password = CredentialInputSource.objects.create(
target_credential=target_cred, source_credential=source_cred_kv, input_field_name='password', metadata={'path': 'secret/data/password'}
)
input_source_ssh_key = CredentialInputSource.objects.create(
target_credential=target_cred, source_credential=source_cred_ssh, input_field_name='ssh_key_data', metadata={'path': 'secret/data/ssh_key'}
)
# Create a job using fixture
job = job_template_with_credentials(target_cred)
task.instance = job
# Override cached_property so the loop uses this exact Python object
task._credentials = [target_cred]
# Mock HTTP responses - return different JWTs for each call
response_kv = mocker.Mock(status_code=200)
response_kv.json.return_value = {'jwt': 'eyJ.kv.jwt'}
response_ssh = mocker.Mock(status_code=200)
response_ssh.json.return_value = {'jwt': 'eyJ.ssh.jwt'}
mock_request = mocker.patch('requests.request', side_effect=[response_kv, response_ssh], autospec=True)
task.populate_workload_identity_tokens()
# Verify two separate HTTP calls were made (one per input source)
assert mock_request.call_count == 2
# Verify each call used the correct audience from its source credential
audiences_requested = {call.kwargs.get('json', {}).get('audience', '') for call in mock_request.call_args_list}
assert 'https://vault-kv.example.com' in audiences_requested
assert 'https://vault-ssh.example.com' in audiences_requested
# Verify context on the target credential has both tokens, keyed by input source PK
assert input_source_password.pk in target_cred.context
assert input_source_ssh_key.pk in target_cred.context
assert target_cred.context[input_source_password.pk]['workload_identity_token'] == 'eyJ.kv.jwt'
assert target_cred.context[input_source_ssh_key.pk]['workload_identity_token'] == 'eyJ.ssh.jwt'
@pytest.mark.django_db
def test_populate_workload_identity_tokens_without_workload_identity_credentials(job_template_with_credentials, mocker):
"""Test populate_workload_identity_tokens does nothing when no workload identity credentials."""
with feature_flag_enabled('FEATURE_OIDC_WORKLOAD_IDENTITY_ENABLED'):
task = jobs.RunJob()
# Create only standard credentials (no workload identity)
ssh_type = CredentialType.defaults['ssh']()
ssh_type.save()
vault_type = CredentialType.defaults['vault']()
vault_type.save()
ssh_cred = Credential.objects.create(credential_type=ssh_type, name='ssh-cred')
vault_cred = Credential.objects.create(credential_type=vault_type, name='vault-cred')
# Create a job using fixture
job = job_template_with_credentials(ssh_cred, vault_cred)
task.instance = job
# Set cached credentials
task._credentials = [ssh_cred, vault_cred]
mocker.patch('awx.main.tasks.jobs.populate_claims_for_workload', return_value={'job_id': 123}, autospec=True)
task.populate_workload_identity_tokens()
# Verify no context was set
assert not hasattr(ssh_cred, '_context') or ssh_cred.context == {}
assert not hasattr(vault_cred, '_context') or vault_cred.context == {}

View File

@@ -47,3 +47,34 @@ def test__get_credential_type_class_invalid_params():
assert type(e.value) is ValueError
assert str(e.value) == 'Expected only apps or app_config to be defined, not both'
def test_credential_context_property():
"""Test that credential context property initializes empty dict and persists across accesses."""
ct = CredentialType(name='Test Cred', kind='vault')
cred = Credential(id=1, name='Test Credential', credential_type=ct, inputs={})
# First access should return empty dict
context = cred.context
assert context == {}
# Modify the context
context['test_key'] = 'test_value'
# Second access should return the same dict with modifications
assert cred.context == {'test_key': 'test_value'}
assert cred.context is context # Same object reference
def test_credential_context_property_independent_instances():
"""Test that context property is independent between credential instances."""
ct = CredentialType(name='Test Cred', kind='vault')
cred1 = Credential(id=1, name='Cred 1', credential_type=ct, inputs={})
cred2 = Credential(id=2, name='Cred 2', credential_type=ct, inputs={})
cred1.context['key1'] = 'value1'
cred2.context['key2'] = 'value2'
assert cred1.context == {'key1': 'value1'}
assert cred2.context == {'key2': 'value2'}
assert cred1.context is not cred2.context

View File

@@ -41,6 +41,45 @@ def private_data_dir():
shutil.rmtree(private_data, True)
@pytest.fixture
def job_template_with_credentials():
"""
Factory fixture that creates a job template with specified credentials.
Usage:
job = job_template_with_credentials(ssh_cred, vault_cred)
"""
def _create_job_template(
*credentials, org_name='test-org', project_name='test-project', inventory_name='test-inventory', jt_name='test-jt', playbook='test.yml'
):
"""
Create a job template with the given credentials.
Args:
*credentials: Variable number of Credential objects to attach to the job template
org_name: Name for the organization
project_name: Name for the project
inventory_name: Name for the inventory
jt_name: Name for the job template
playbook: Playbook filename
Returns:
Job instance created from the job template
"""
org = Organization.objects.create(name=org_name)
proj = Project.objects.create(name=project_name, organization=org)
inv = Inventory.objects.create(name=inventory_name, organization=org)
jt = JobTemplate.objects.create(name=jt_name, project=proj, inventory=inv, playbook=playbook)
if credentials:
jt.credentials.add(*credentials)
return jt.create_unified_job()
return _create_job_template
@mock.patch('awx.main.tasks.facts.settings')
@mock.patch('awx.main.tasks.jobs.create_partition', return_value=True)
def test_pre_post_run_hook_facts(mock_create_partition, mock_facts_settings, private_data_dir, execution_environment):

View File

@@ -76,6 +76,9 @@ def test_custom_error_messages(schema, given, message):
({'fields': [{'id': 'token', 'label': 'Token', 'secret': 'bad'}]}, False),
({'fields': [{'id': 'token', 'label': 'Token', 'ask_at_runtime': True}]}, True),
({'fields': [{'id': 'token', 'label': 'Token', 'ask_at_runtime': 'bad'}]}, False), # noqa
({'fields': [{'id': 'token', 'label': 'Token', 'internal': True}]}, True),
({'fields': [{'id': 'token', 'label': 'Token', 'internal': False}]}, True),
({'fields': [{'id': 'token', 'label': 'Token', 'internal': 'bad'}]}, False),
({'fields': [{'id': 'become_method', 'label': 'Become', 'choices': 'not-a-list'}]}, False), # noqa
({'fields': [{'id': 'become_method', 'label': 'Become', 'choices': []}]}, False),
({'fields': [{'id': 'become_method', 'label': 'Become', 'choices': ['su', 'sudo']}]}, True), # noqa
@@ -204,6 +207,68 @@ def test_credential_creation_validation_failure(inputs):
assert e.type in (ValidationError, DRFValidationError)
def test_credential_input_field_excludes_internal_fields():
"""Internal fields should be excluded from the schema generated by CredentialInputField,
preventing users from providing values for internally resolved fields."""
type_ = CredentialType(
kind='cloud',
name='SomeCloud',
managed=True,
inputs={
'fields': [
{'id': 'username', 'label': 'Username', 'type': 'string'},
{'id': 'resolved_token', 'label': 'Token', 'type': 'string', 'internal': True},
]
},
)
cred = Credential(credential_type=type_, name="Test Credential", inputs={'username': 'joe'})
field = cred._meta.get_field('inputs')
schema = field.schema(cred)
assert 'username' in schema['properties']
assert 'resolved_token' not in schema['properties']
def test_credential_input_field_rejects_values_for_internal_fields():
"""Users should not be able to provide values for fields marked as internal."""
type_ = CredentialType(
kind='cloud',
name='SomeCloud',
managed=True,
inputs={
'fields': [
{'id': 'username', 'label': 'Username', 'type': 'string'},
{'id': 'resolved_token', 'label': 'Token', 'type': 'string', 'internal': True},
]
},
)
cred = Credential(credential_type=type_, name="Test Credential", inputs={'username': 'joe', 'resolved_token': 'secret'})
field = cred._meta.get_field('inputs')
with pytest.raises(Exception) as e:
field.validate(cred.inputs, cred)
assert e.type in (ValidationError, DRFValidationError)
def test_credential_input_field_accepts_non_internal_fields_only():
"""Credentials with only non-internal field values should validate successfully."""
type_ = CredentialType(
kind='cloud',
name='SomeCloud',
managed=True,
inputs={
'fields': [
{'id': 'username', 'label': 'Username', 'type': 'string'},
{'id': 'resolved_token', 'label': 'Token', 'type': 'string', 'internal': True},
]
},
)
cred = Credential(credential_type=type_, name="Test Credential", inputs={'username': 'joe'})
field = cred._meta.get_field('inputs')
# Should not raise
field.validate(cred.inputs, cred)
def test_implicit_role_field_parents():
"""This assures that every ImplicitRoleField only references parents
which are relationships that actually exist

View File

@@ -556,7 +556,8 @@ class TestGenericRun:
task._write_extra_vars_file = mock.Mock()
with mock.patch('awx.main.tasks.jobs.settings.AWX_TASK_ENV', {'FOO': 'BAR'}):
env = task.build_env(job, private_data_dir)
with mock.patch.object(task, 'build_credentials_list', return_value=[], autospec=True):
env = task.build_env(job, private_data_dir)
assert env['FOO'] == 'BAR'
@@ -649,7 +650,9 @@ class TestJobCredentials(TestJobExecution):
)
with mock.patch.object(UnifiedJob, 'credentials', credentials_mock):
yield job
# Mock build_credentials_list to work with the cached credentials mechanism
with mock.patch.object(jobs.RunJob, 'build_credentials_list', return_value=job._credentials, autospec=True):
yield job
@pytest.fixture
def update_model_wrapper(self, job):