Compare commits

..

3 Commits

Author SHA1 Message Date
Seth Foster
b8c9ae73cd Fix OIDC workload identity for inventory sync (#16390)
The cloud credential used by inventory updates was not going through
the OIDC workload identity token flow because it lives outside the
normal _credentials list. This overrides populate_workload_identity_tokens
in RunInventoryUpdate to include the cloud credential as an
additional_credentials argument to the base implementation, and
patches get_cloud_credential on the instance so the injector picks up
the credential with OIDC context intact.

Co-authored-by: Alan Rominger <arominge@redhat.com>
Co-authored-by: Dave Mulford <dmulford@redhat.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 16:26:18 -04:00
Stevenson Michel
d71f18fa44 [Devel] Config Endpoint Optimization (#16389)
* Improved performance of the config endpoint by reducing database queries in GET /api/controller/v2/config/
2026-04-09 16:24:03 -04:00
Tong He
e82a4246f3 Bind the install bundle to the ansible.receptor collection 2.0.8 version (#16396) 2026-04-09 17:09:26 +02:00
8 changed files with 194 additions and 140 deletions

View File

@@ -1,4 +1,4 @@
--- ---
collections: collections:
- name: ansible.receptor - name: ansible.receptor
version: 2.0.6 version: 2.0.8

View File

@@ -344,13 +344,22 @@ class ApiV2ConfigView(APIView):
become_methods=PRIVILEGE_ESCALATION_METHODS, become_methods=PRIVILEGE_ESCALATION_METHODS,
) )
if ( # Check superuser/auditor first
request.user.is_superuser if request.user.is_superuser or request.user.is_system_auditor:
or request.user.is_system_auditor has_org_access = True
or Organization.accessible_objects(request.user, 'admin_role').exists() else:
or Organization.accessible_objects(request.user, 'auditor_role').exists() # Single query checking all three organization role types at once
or Organization.accessible_objects(request.user, 'project_admin_role').exists() has_org_access = (
): (
Organization.access_qs(request.user, 'change')
| Organization.access_qs(request.user, 'audit')
| Organization.access_qs(request.user, 'add_project')
)
.distinct()
.exists()
)
if has_org_access:
data.update( data.update(
dict( dict(
project_base_dir=settings.PROJECTS_ROOT, project_base_dir=settings.PROJECTS_ROOT,
@@ -358,7 +367,9 @@ class ApiV2ConfigView(APIView):
custom_virtualenvs=get_custom_venv_choices(), custom_virtualenvs=get_custom_venv_choices(),
) )
) )
elif JobTemplate.accessible_objects(request.user, 'admin_role').exists(): else:
# Only check JobTemplate access if org check failed
if JobTemplate.accessible_objects(request.user, 'admin_role').exists():
data['custom_virtualenvs'] = get_custom_venv_choices() data['custom_virtualenvs'] = get_custom_venv_choices()
return Response(data) return Response(data)

View File

@@ -228,16 +228,19 @@ class BaseTask(object):
# Convert to list to prevent re-evaluation of QuerySet # Convert to list to prevent re-evaluation of QuerySet
return list(credentials_list) return list(credentials_list)
def populate_workload_identity_tokens(self): def populate_workload_identity_tokens(self, additional_credentials=None):
""" """
Populate credentials with workload identity tokens. Populate credentials with workload identity tokens.
Sets the context on Credential objects that have input sources Sets the context on Credential objects that have input sources
using compatible external credential types. using compatible external credential types.
""" """
credentials = list(self._credentials)
if additional_credentials:
credentials.extend(additional_credentials)
credential_input_sources = ( credential_input_sources = (
(credential.context, src) (credential.context, src)
for credential in self._credentials for credential in credentials
for src in credential.input_sources.all() for src in credential.input_sources.all()
if any( if any(
field.get('id') == 'workload_identity_token' and field.get('internal') field.get('id') == 'workload_identity_token' and field.get('internal')
@@ -1863,6 +1866,24 @@ class RunInventoryUpdate(SourceControlMixin, BaseTask):
# All credentials not used by inventory source injector # All credentials not used by inventory source injector
return inventory_update.get_extra_credentials() return inventory_update.get_extra_credentials()
def populate_workload_identity_tokens(self, additional_credentials=None):
"""Also generate OIDC tokens for the cloud credential.
The cloud credential is not in _credentials (it is handled by the
inventory source injector), but it may still need a workload identity
token generated for it.
"""
cloud_cred = self.instance.get_cloud_credential()
creds = list(additional_credentials or [])
if cloud_cred:
creds.append(cloud_cred)
super().populate_workload_identity_tokens(additional_credentials=creds or None)
# Override get_cloud_credential on this instance so the injector
# uses the credential with OIDC context instead of doing a fresh
# DB fetch that would lose it.
if cloud_cred and cloud_cred.context:
self.instance.get_cloud_credential = lambda: cloud_cred
def build_project_dir(self, inventory_update, private_data_dir): def build_project_dir(self, inventory_update, private_data_dir):
source_project = None source_project = None
if inventory_update.inventory_source: if inventory_update.inventory_source:

View File

@@ -0,0 +1,84 @@
import pytest
from awx.api.versioning import reverse
from rest_framework import status
from awx.main.models.jobs import JobTemplate
@pytest.mark.django_db
class TestConfigEndpointFields:
def test_base_fields_all_users(self, get, rando):
url = reverse('api:api_v2_config_view')
response = get(url, rando, expect=200)
assert 'time_zone' in response.data
assert 'license_info' in response.data
assert 'version' in response.data
assert 'eula' in response.data
assert 'analytics_status' in response.data
assert 'analytics_collectors' in response.data
assert 'become_methods' in response.data
@pytest.mark.parametrize(
"role_type",
[
"superuser",
"system_auditor",
"org_admin",
"org_auditor",
"org_project_admin",
],
)
def test_privileged_users_conditional_fields(self, get, user, organization, admin, role_type):
url = reverse('api:api_v2_config_view')
if role_type == "superuser":
test_user = admin
elif role_type == "system_auditor":
test_user = user('system-auditor', is_superuser=False)
test_user.is_system_auditor = True
test_user.save()
elif role_type == "org_admin":
test_user = user('org-admin', is_superuser=False)
organization.admin_role.members.add(test_user)
elif role_type == "org_auditor":
test_user = user('org-auditor', is_superuser=False)
organization.auditor_role.members.add(test_user)
elif role_type == "org_project_admin":
test_user = user('org-project-admin', is_superuser=False)
organization.project_admin_role.members.add(test_user)
response = get(url, test_user, expect=200)
assert 'project_base_dir' in response.data
assert 'project_local_paths' in response.data
assert 'custom_virtualenvs' in response.data
def test_job_template_admin_gets_venvs_only(self, get, user, organization, project, inventory):
"""Test that JobTemplate admin without org access gets only custom_virtualenvs"""
jt_admin = user('jt-admin', is_superuser=False)
jt = JobTemplate.objects.create(name='test-jt', organization=organization, project=project, inventory=inventory)
jt.admin_role.members.add(jt_admin)
url = reverse('api:api_v2_config_view')
response = get(url, jt_admin, expect=200)
assert 'custom_virtualenvs' in response.data
assert 'project_base_dir' not in response.data
assert 'project_local_paths' not in response.data
def test_normal_user_no_conditional_fields(self, get, rando):
url = reverse('api:api_v2_config_view')
response = get(url, rando, expect=200)
assert 'project_base_dir' not in response.data
assert 'project_local_paths' not in response.data
assert 'custom_virtualenvs' not in response.data
def test_unauthenticated_denied(self, get):
"""Test that unauthenticated requests are denied"""
url = reverse('api:api_v2_config_view')
response = get(url, None, expect=401)
assert response.status_code == status.HTTP_401_UNAUTHORIZED

View File

@@ -590,3 +590,67 @@ def test_populate_workload_identity_tokens_passes_get_instance_timeout_to_client
scope=AutomationControllerJobScope.name, scope=AutomationControllerJobScope.name,
workload_ttl_seconds=expected_ttl, workload_ttl_seconds=expected_ttl,
) )
class TestRunInventoryUpdatePopulateWorkloadIdentityTokens:
"""Tests for RunInventoryUpdate.populate_workload_identity_tokens."""
def test_cloud_credential_passed_as_additional_credential(self):
"""The cloud credential is forwarded to super().populate_workload_identity_tokens via additional_credentials."""
cloud_cred = mock.MagicMock(name='cloud_cred')
cloud_cred.context = {}
task = jobs.RunInventoryUpdate()
task.instance = mock.MagicMock()
task.instance.get_cloud_credential.return_value = cloud_cred
task._credentials = []
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
task.populate_workload_identity_tokens()
mock_super.assert_called_once_with(additional_credentials=[cloud_cred])
def test_no_cloud_credential_calls_super_with_none(self):
"""When there is no cloud credential, super() is called with additional_credentials=None."""
task = jobs.RunInventoryUpdate()
task.instance = mock.MagicMock()
task.instance.get_cloud_credential.return_value = None
task._credentials = []
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
task.populate_workload_identity_tokens()
mock_super.assert_called_once_with(additional_credentials=None)
def test_additional_credentials_combined_with_cloud_credential(self):
"""Caller-supplied additional_credentials are combined with the cloud credential."""
cloud_cred = mock.MagicMock(name='cloud_cred')
cloud_cred.context = {}
extra_cred = mock.MagicMock(name='extra_cred')
task = jobs.RunInventoryUpdate()
task.instance = mock.MagicMock()
task.instance.get_cloud_credential.return_value = cloud_cred
task._credentials = []
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
task.populate_workload_identity_tokens(additional_credentials=[extra_cred])
mock_super.assert_called_once_with(additional_credentials=[extra_cred, cloud_cred])
def test_cloud_credential_override_after_context_set(self):
"""After OIDC processing, get_cloud_credential is overridden on the instance when context is populated."""
cloud_cred = mock.MagicMock(name='cloud_cred')
# Simulate that super().populate_workload_identity_tokens populates context
cloud_cred.context = {'workload_identity_token': 'eyJ.test.jwt'}
task = jobs.RunInventoryUpdate()
task.instance = mock.MagicMock()
task.instance.get_cloud_credential.return_value = cloud_cred
task._credentials = []
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens'):
task.populate_workload_identity_tokens()
# The instance's get_cloud_credential should now return the same object with context
assert task.instance.get_cloud_credential() is cloud_cred

View File

@@ -276,7 +276,6 @@ options:
- '' - ''
- 'github' - 'github'
- 'gitlab' - 'gitlab'
- 'bitbucket_dc'
webhook_credential: webhook_credential:
description: description:
- Personal Access Token for posting back the status to the service API - Personal Access Token for posting back the status to the service API
@@ -437,7 +436,7 @@ def main():
scm_branch=dict(), scm_branch=dict(),
ask_scm_branch_on_launch=dict(type='bool'), ask_scm_branch_on_launch=dict(type='bool'),
job_slice_count=dict(type='int'), job_slice_count=dict(type='int'),
webhook_service=dict(choices=['github', 'gitlab', 'bitbucket_dc', '']), webhook_service=dict(choices=['github', 'gitlab', '']),
webhook_credential=dict(), webhook_credential=dict(),
labels=dict(type="list", elements='str'), labels=dict(type="list", elements='str'),
notification_templates_started=dict(type="list", elements='str'), notification_templates_started=dict(type="list", elements='str'),

View File

@@ -117,7 +117,6 @@ options:
choices: choices:
- github - github
- gitlab - gitlab
- bitbucket_dc
webhook_credential: webhook_credential:
description: description:
- Personal Access Token for posting back the status to the service API - Personal Access Token for posting back the status to the service API
@@ -829,7 +828,7 @@ def main():
ask_inventory_on_launch=dict(type='bool'), ask_inventory_on_launch=dict(type='bool'),
ask_scm_branch_on_launch=dict(type='bool'), ask_scm_branch_on_launch=dict(type='bool'),
ask_limit_on_launch=dict(type='bool'), ask_limit_on_launch=dict(type='bool'),
webhook_service=dict(choices=['github', 'gitlab', 'bitbucket_dc']), webhook_service=dict(choices=['github', 'gitlab']),
webhook_credential=dict(), webhook_credential=dict(),
labels=dict(type="list", elements='str'), labels=dict(type="list", elements='str'),
notification_templates_started=dict(type="list", elements='str'), notification_templates_started=dict(type="list", elements='str'),

View File

@@ -1,124 +0,0 @@
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import pytest
from awx.main.models import JobTemplate, WorkflowJobTemplate
# The backend supports these webhook services on job/workflow templates
# (see awx/main/models/mixins.py). The collection modules must accept all of
# them in their argument_spec ``choices`` list. This test guards against the
# module's choices drifting from the backend -- see AAP-45980, where
# ``bitbucket_dc`` had been supported by the API since migration 0188 but was
# still being rejected by the job_template/workflow_job_template modules.
WEBHOOK_SERVICES = ['github', 'gitlab', 'bitbucket_dc']
@pytest.mark.django_db
@pytest.mark.parametrize('webhook_service', WEBHOOK_SERVICES)
def test_job_template_accepts_webhook_service(run_module, admin_user, project, inventory, webhook_service):
result = run_module(
'job_template',
{
'name': 'foo',
'playbook': 'helloworld.yml',
'project': project.name,
'inventory': inventory.name,
'webhook_service': webhook_service,
'state': 'present',
},
admin_user,
)
assert not result.get('failed', False), result.get('msg', result)
assert result.get('changed', False), result
jt = JobTemplate.objects.get(name='foo')
assert jt.webhook_service == webhook_service
# Re-running with the same args must be a no-op (idempotence).
result = run_module(
'job_template',
{
'name': 'foo',
'playbook': 'helloworld.yml',
'project': project.name,
'inventory': inventory.name,
'webhook_service': webhook_service,
'state': 'present',
},
admin_user,
)
assert not result.get('failed', False), result.get('msg', result)
assert not result.get('changed', True), result
@pytest.mark.django_db
@pytest.mark.parametrize('webhook_service', WEBHOOK_SERVICES)
def test_workflow_job_template_accepts_webhook_service(run_module, admin_user, organization, webhook_service):
result = run_module(
'workflow_job_template',
{
'name': 'foo-workflow',
'organization': organization.name,
'webhook_service': webhook_service,
'state': 'present',
},
admin_user,
)
assert not result.get('failed', False), result.get('msg', result)
assert result.get('changed', False), result
wfjt = WorkflowJobTemplate.objects.get(name='foo-workflow')
assert wfjt.webhook_service == webhook_service
# Re-running with the same args must be a no-op (idempotence).
result = run_module(
'workflow_job_template',
{
'name': 'foo-workflow',
'organization': organization.name,
'webhook_service': webhook_service,
'state': 'present',
},
admin_user,
)
assert not result.get('failed', False), result.get('msg', result)
assert not result.get('changed', True), result
@pytest.mark.django_db
def test_job_template_rejects_unknown_webhook_service(run_module, admin_user, project, inventory):
result = run_module(
'job_template',
{
'name': 'foo',
'playbook': 'helloworld.yml',
'project': project.name,
'inventory': inventory.name,
'webhook_service': 'not_a_real_service',
'state': 'present',
},
admin_user,
)
assert result.get('failed', False), result
assert 'webhook_service' in result.get('msg', '')
@pytest.mark.django_db
def test_workflow_job_template_rejects_unknown_webhook_service(run_module, admin_user, organization):
result = run_module(
'workflow_job_template',
{
'name': 'foo-workflow',
'organization': organization.name,
'webhook_service': 'not_a_real_service',
'state': 'present',
},
admin_user,
)
assert result.get('failed', False), result
assert 'webhook_service' in result.get('msg', '')