mirror of
https://github.com/ansible/awx.git
synced 2026-04-11 13:09:21 -02:30
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b8c9ae73cd | ||
|
|
d71f18fa44 | ||
|
|
e82a4246f3 |
@@ -1,4 +1,4 @@
|
||||
---
|
||||
collections:
|
||||
- name: ansible.receptor
|
||||
version: 2.0.6
|
||||
version: 2.0.8
|
||||
|
||||
@@ -344,13 +344,22 @@ class ApiV2ConfigView(APIView):
|
||||
become_methods=PRIVILEGE_ESCALATION_METHODS,
|
||||
)
|
||||
|
||||
if (
|
||||
request.user.is_superuser
|
||||
or request.user.is_system_auditor
|
||||
or Organization.accessible_objects(request.user, 'admin_role').exists()
|
||||
or Organization.accessible_objects(request.user, 'auditor_role').exists()
|
||||
or Organization.accessible_objects(request.user, 'project_admin_role').exists()
|
||||
):
|
||||
# Check superuser/auditor first
|
||||
if request.user.is_superuser or request.user.is_system_auditor:
|
||||
has_org_access = True
|
||||
else:
|
||||
# Single query checking all three organization role types at once
|
||||
has_org_access = (
|
||||
(
|
||||
Organization.access_qs(request.user, 'change')
|
||||
| Organization.access_qs(request.user, 'audit')
|
||||
| Organization.access_qs(request.user, 'add_project')
|
||||
)
|
||||
.distinct()
|
||||
.exists()
|
||||
)
|
||||
|
||||
if has_org_access:
|
||||
data.update(
|
||||
dict(
|
||||
project_base_dir=settings.PROJECTS_ROOT,
|
||||
@@ -358,8 +367,10 @@ class ApiV2ConfigView(APIView):
|
||||
custom_virtualenvs=get_custom_venv_choices(),
|
||||
)
|
||||
)
|
||||
elif JobTemplate.accessible_objects(request.user, 'admin_role').exists():
|
||||
data['custom_virtualenvs'] = get_custom_venv_choices()
|
||||
else:
|
||||
# Only check JobTemplate access if org check failed
|
||||
if JobTemplate.accessible_objects(request.user, 'admin_role').exists():
|
||||
data['custom_virtualenvs'] = get_custom_venv_choices()
|
||||
|
||||
return Response(data)
|
||||
|
||||
|
||||
@@ -228,16 +228,19 @@ class BaseTask(object):
|
||||
# Convert to list to prevent re-evaluation of QuerySet
|
||||
return list(credentials_list)
|
||||
|
||||
def populate_workload_identity_tokens(self):
|
||||
def populate_workload_identity_tokens(self, additional_credentials=None):
|
||||
"""
|
||||
Populate credentials with workload identity tokens.
|
||||
|
||||
Sets the context on Credential objects that have input sources
|
||||
using compatible external credential types.
|
||||
"""
|
||||
credentials = list(self._credentials)
|
||||
if additional_credentials:
|
||||
credentials.extend(additional_credentials)
|
||||
credential_input_sources = (
|
||||
(credential.context, src)
|
||||
for credential in self._credentials
|
||||
for credential in credentials
|
||||
for src in credential.input_sources.all()
|
||||
if any(
|
||||
field.get('id') == 'workload_identity_token' and field.get('internal')
|
||||
@@ -1863,6 +1866,24 @@ class RunInventoryUpdate(SourceControlMixin, BaseTask):
|
||||
# All credentials not used by inventory source injector
|
||||
return inventory_update.get_extra_credentials()
|
||||
|
||||
def populate_workload_identity_tokens(self, additional_credentials=None):
|
||||
"""Also generate OIDC tokens for the cloud credential.
|
||||
|
||||
The cloud credential is not in _credentials (it is handled by the
|
||||
inventory source injector), but it may still need a workload identity
|
||||
token generated for it.
|
||||
"""
|
||||
cloud_cred = self.instance.get_cloud_credential()
|
||||
creds = list(additional_credentials or [])
|
||||
if cloud_cred:
|
||||
creds.append(cloud_cred)
|
||||
super().populate_workload_identity_tokens(additional_credentials=creds or None)
|
||||
# Override get_cloud_credential on this instance so the injector
|
||||
# uses the credential with OIDC context instead of doing a fresh
|
||||
# DB fetch that would lose it.
|
||||
if cloud_cred and cloud_cred.context:
|
||||
self.instance.get_cloud_credential = lambda: cloud_cred
|
||||
|
||||
def build_project_dir(self, inventory_update, private_data_dir):
|
||||
source_project = None
|
||||
if inventory_update.inventory_source:
|
||||
|
||||
84
awx/main/tests/functional/api/test_config_endpoint.py
Normal file
84
awx/main/tests/functional/api/test_config_endpoint.py
Normal file
@@ -0,0 +1,84 @@
|
||||
import pytest
|
||||
from awx.api.versioning import reverse
|
||||
from rest_framework import status
|
||||
|
||||
from awx.main.models.jobs import JobTemplate
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestConfigEndpointFields:
|
||||
def test_base_fields_all_users(self, get, rando):
|
||||
url = reverse('api:api_v2_config_view')
|
||||
response = get(url, rando, expect=200)
|
||||
|
||||
assert 'time_zone' in response.data
|
||||
assert 'license_info' in response.data
|
||||
assert 'version' in response.data
|
||||
assert 'eula' in response.data
|
||||
assert 'analytics_status' in response.data
|
||||
assert 'analytics_collectors' in response.data
|
||||
assert 'become_methods' in response.data
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"role_type",
|
||||
[
|
||||
"superuser",
|
||||
"system_auditor",
|
||||
"org_admin",
|
||||
"org_auditor",
|
||||
"org_project_admin",
|
||||
],
|
||||
)
|
||||
def test_privileged_users_conditional_fields(self, get, user, organization, admin, role_type):
|
||||
url = reverse('api:api_v2_config_view')
|
||||
|
||||
if role_type == "superuser":
|
||||
test_user = admin
|
||||
elif role_type == "system_auditor":
|
||||
test_user = user('system-auditor', is_superuser=False)
|
||||
test_user.is_system_auditor = True
|
||||
test_user.save()
|
||||
elif role_type == "org_admin":
|
||||
test_user = user('org-admin', is_superuser=False)
|
||||
organization.admin_role.members.add(test_user)
|
||||
elif role_type == "org_auditor":
|
||||
test_user = user('org-auditor', is_superuser=False)
|
||||
organization.auditor_role.members.add(test_user)
|
||||
elif role_type == "org_project_admin":
|
||||
test_user = user('org-project-admin', is_superuser=False)
|
||||
organization.project_admin_role.members.add(test_user)
|
||||
|
||||
response = get(url, test_user, expect=200)
|
||||
|
||||
assert 'project_base_dir' in response.data
|
||||
assert 'project_local_paths' in response.data
|
||||
assert 'custom_virtualenvs' in response.data
|
||||
|
||||
def test_job_template_admin_gets_venvs_only(self, get, user, organization, project, inventory):
|
||||
"""Test that JobTemplate admin without org access gets only custom_virtualenvs"""
|
||||
jt_admin = user('jt-admin', is_superuser=False)
|
||||
|
||||
jt = JobTemplate.objects.create(name='test-jt', organization=organization, project=project, inventory=inventory)
|
||||
jt.admin_role.members.add(jt_admin)
|
||||
|
||||
url = reverse('api:api_v2_config_view')
|
||||
response = get(url, jt_admin, expect=200)
|
||||
|
||||
assert 'custom_virtualenvs' in response.data
|
||||
assert 'project_base_dir' not in response.data
|
||||
assert 'project_local_paths' not in response.data
|
||||
|
||||
def test_normal_user_no_conditional_fields(self, get, rando):
|
||||
url = reverse('api:api_v2_config_view')
|
||||
response = get(url, rando, expect=200)
|
||||
|
||||
assert 'project_base_dir' not in response.data
|
||||
assert 'project_local_paths' not in response.data
|
||||
assert 'custom_virtualenvs' not in response.data
|
||||
|
||||
def test_unauthenticated_denied(self, get):
|
||||
"""Test that unauthenticated requests are denied"""
|
||||
url = reverse('api:api_v2_config_view')
|
||||
response = get(url, None, expect=401)
|
||||
|
||||
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
||||
@@ -590,3 +590,67 @@ def test_populate_workload_identity_tokens_passes_get_instance_timeout_to_client
|
||||
scope=AutomationControllerJobScope.name,
|
||||
workload_ttl_seconds=expected_ttl,
|
||||
)
|
||||
|
||||
|
||||
class TestRunInventoryUpdatePopulateWorkloadIdentityTokens:
|
||||
"""Tests for RunInventoryUpdate.populate_workload_identity_tokens."""
|
||||
|
||||
def test_cloud_credential_passed_as_additional_credential(self):
|
||||
"""The cloud credential is forwarded to super().populate_workload_identity_tokens via additional_credentials."""
|
||||
cloud_cred = mock.MagicMock(name='cloud_cred')
|
||||
cloud_cred.context = {}
|
||||
|
||||
task = jobs.RunInventoryUpdate()
|
||||
task.instance = mock.MagicMock()
|
||||
task.instance.get_cloud_credential.return_value = cloud_cred
|
||||
task._credentials = []
|
||||
|
||||
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
|
||||
task.populate_workload_identity_tokens()
|
||||
|
||||
mock_super.assert_called_once_with(additional_credentials=[cloud_cred])
|
||||
|
||||
def test_no_cloud_credential_calls_super_with_none(self):
|
||||
"""When there is no cloud credential, super() is called with additional_credentials=None."""
|
||||
task = jobs.RunInventoryUpdate()
|
||||
task.instance = mock.MagicMock()
|
||||
task.instance.get_cloud_credential.return_value = None
|
||||
task._credentials = []
|
||||
|
||||
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
|
||||
task.populate_workload_identity_tokens()
|
||||
|
||||
mock_super.assert_called_once_with(additional_credentials=None)
|
||||
|
||||
def test_additional_credentials_combined_with_cloud_credential(self):
|
||||
"""Caller-supplied additional_credentials are combined with the cloud credential."""
|
||||
cloud_cred = mock.MagicMock(name='cloud_cred')
|
||||
cloud_cred.context = {}
|
||||
extra_cred = mock.MagicMock(name='extra_cred')
|
||||
|
||||
task = jobs.RunInventoryUpdate()
|
||||
task.instance = mock.MagicMock()
|
||||
task.instance.get_cloud_credential.return_value = cloud_cred
|
||||
task._credentials = []
|
||||
|
||||
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
|
||||
task.populate_workload_identity_tokens(additional_credentials=[extra_cred])
|
||||
|
||||
mock_super.assert_called_once_with(additional_credentials=[extra_cred, cloud_cred])
|
||||
|
||||
def test_cloud_credential_override_after_context_set(self):
|
||||
"""After OIDC processing, get_cloud_credential is overridden on the instance when context is populated."""
|
||||
cloud_cred = mock.MagicMock(name='cloud_cred')
|
||||
# Simulate that super().populate_workload_identity_tokens populates context
|
||||
cloud_cred.context = {'workload_identity_token': 'eyJ.test.jwt'}
|
||||
|
||||
task = jobs.RunInventoryUpdate()
|
||||
task.instance = mock.MagicMock()
|
||||
task.instance.get_cloud_credential.return_value = cloud_cred
|
||||
task._credentials = []
|
||||
|
||||
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens'):
|
||||
task.populate_workload_identity_tokens()
|
||||
|
||||
# The instance's get_cloud_credential should now return the same object with context
|
||||
assert task.instance.get_cloud_credential() is cloud_cred
|
||||
|
||||
@@ -276,7 +276,6 @@ options:
|
||||
- ''
|
||||
- 'github'
|
||||
- 'gitlab'
|
||||
- 'bitbucket_dc'
|
||||
webhook_credential:
|
||||
description:
|
||||
- Personal Access Token for posting back the status to the service API
|
||||
@@ -437,7 +436,7 @@ def main():
|
||||
scm_branch=dict(),
|
||||
ask_scm_branch_on_launch=dict(type='bool'),
|
||||
job_slice_count=dict(type='int'),
|
||||
webhook_service=dict(choices=['github', 'gitlab', 'bitbucket_dc', '']),
|
||||
webhook_service=dict(choices=['github', 'gitlab', '']),
|
||||
webhook_credential=dict(),
|
||||
labels=dict(type="list", elements='str'),
|
||||
notification_templates_started=dict(type="list", elements='str'),
|
||||
|
||||
@@ -117,7 +117,6 @@ options:
|
||||
choices:
|
||||
- github
|
||||
- gitlab
|
||||
- bitbucket_dc
|
||||
webhook_credential:
|
||||
description:
|
||||
- Personal Access Token for posting back the status to the service API
|
||||
@@ -829,7 +828,7 @@ def main():
|
||||
ask_inventory_on_launch=dict(type='bool'),
|
||||
ask_scm_branch_on_launch=dict(type='bool'),
|
||||
ask_limit_on_launch=dict(type='bool'),
|
||||
webhook_service=dict(choices=['github', 'gitlab', 'bitbucket_dc']),
|
||||
webhook_service=dict(choices=['github', 'gitlab']),
|
||||
webhook_credential=dict(),
|
||||
labels=dict(type="list", elements='str'),
|
||||
notification_templates_started=dict(type="list", elements='str'),
|
||||
|
||||
@@ -1,124 +0,0 @@
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
__metaclass__ = type
|
||||
|
||||
import pytest
|
||||
|
||||
from awx.main.models import JobTemplate, WorkflowJobTemplate
|
||||
|
||||
|
||||
# The backend supports these webhook services on job/workflow templates
|
||||
# (see awx/main/models/mixins.py). The collection modules must accept all of
|
||||
# them in their argument_spec ``choices`` list. This test guards against the
|
||||
# module's choices drifting from the backend -- see AAP-45980, where
|
||||
# ``bitbucket_dc`` had been supported by the API since migration 0188 but was
|
||||
# still being rejected by the job_template/workflow_job_template modules.
|
||||
WEBHOOK_SERVICES = ['github', 'gitlab', 'bitbucket_dc']
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('webhook_service', WEBHOOK_SERVICES)
|
||||
def test_job_template_accepts_webhook_service(run_module, admin_user, project, inventory, webhook_service):
|
||||
result = run_module(
|
||||
'job_template',
|
||||
{
|
||||
'name': 'foo',
|
||||
'playbook': 'helloworld.yml',
|
||||
'project': project.name,
|
||||
'inventory': inventory.name,
|
||||
'webhook_service': webhook_service,
|
||||
'state': 'present',
|
||||
},
|
||||
admin_user,
|
||||
)
|
||||
|
||||
assert not result.get('failed', False), result.get('msg', result)
|
||||
assert result.get('changed', False), result
|
||||
|
||||
jt = JobTemplate.objects.get(name='foo')
|
||||
assert jt.webhook_service == webhook_service
|
||||
|
||||
# Re-running with the same args must be a no-op (idempotence).
|
||||
result = run_module(
|
||||
'job_template',
|
||||
{
|
||||
'name': 'foo',
|
||||
'playbook': 'helloworld.yml',
|
||||
'project': project.name,
|
||||
'inventory': inventory.name,
|
||||
'webhook_service': webhook_service,
|
||||
'state': 'present',
|
||||
},
|
||||
admin_user,
|
||||
)
|
||||
assert not result.get('failed', False), result.get('msg', result)
|
||||
assert not result.get('changed', True), result
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize('webhook_service', WEBHOOK_SERVICES)
|
||||
def test_workflow_job_template_accepts_webhook_service(run_module, admin_user, organization, webhook_service):
|
||||
result = run_module(
|
||||
'workflow_job_template',
|
||||
{
|
||||
'name': 'foo-workflow',
|
||||
'organization': organization.name,
|
||||
'webhook_service': webhook_service,
|
||||
'state': 'present',
|
||||
},
|
||||
admin_user,
|
||||
)
|
||||
|
||||
assert not result.get('failed', False), result.get('msg', result)
|
||||
assert result.get('changed', False), result
|
||||
|
||||
wfjt = WorkflowJobTemplate.objects.get(name='foo-workflow')
|
||||
assert wfjt.webhook_service == webhook_service
|
||||
|
||||
# Re-running with the same args must be a no-op (idempotence).
|
||||
result = run_module(
|
||||
'workflow_job_template',
|
||||
{
|
||||
'name': 'foo-workflow',
|
||||
'organization': organization.name,
|
||||
'webhook_service': webhook_service,
|
||||
'state': 'present',
|
||||
},
|
||||
admin_user,
|
||||
)
|
||||
assert not result.get('failed', False), result.get('msg', result)
|
||||
assert not result.get('changed', True), result
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_job_template_rejects_unknown_webhook_service(run_module, admin_user, project, inventory):
|
||||
result = run_module(
|
||||
'job_template',
|
||||
{
|
||||
'name': 'foo',
|
||||
'playbook': 'helloworld.yml',
|
||||
'project': project.name,
|
||||
'inventory': inventory.name,
|
||||
'webhook_service': 'not_a_real_service',
|
||||
'state': 'present',
|
||||
},
|
||||
admin_user,
|
||||
)
|
||||
assert result.get('failed', False), result
|
||||
assert 'webhook_service' in result.get('msg', '')
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_workflow_job_template_rejects_unknown_webhook_service(run_module, admin_user, organization):
|
||||
result = run_module(
|
||||
'workflow_job_template',
|
||||
{
|
||||
'name': 'foo-workflow',
|
||||
'organization': organization.name,
|
||||
'webhook_service': 'not_a_real_service',
|
||||
'state': 'present',
|
||||
},
|
||||
admin_user,
|
||||
)
|
||||
assert result.get('failed', False), result
|
||||
assert 'webhook_service' in result.get('msg', '')
|
||||
Reference in New Issue
Block a user