mirror of
https://github.com/ansible/awx.git
synced 2026-04-11 13:09:21 -02:30
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b8c9ae73cd | ||
|
|
d71f18fa44 | ||
|
|
e82a4246f3 |
@@ -1,4 +1,4 @@
|
|||||||
---
|
---
|
||||||
collections:
|
collections:
|
||||||
- name: ansible.receptor
|
- name: ansible.receptor
|
||||||
version: 2.0.6
|
version: 2.0.8
|
||||||
|
|||||||
@@ -344,13 +344,22 @@ class ApiV2ConfigView(APIView):
|
|||||||
become_methods=PRIVILEGE_ESCALATION_METHODS,
|
become_methods=PRIVILEGE_ESCALATION_METHODS,
|
||||||
)
|
)
|
||||||
|
|
||||||
if (
|
# Check superuser/auditor first
|
||||||
request.user.is_superuser
|
if request.user.is_superuser or request.user.is_system_auditor:
|
||||||
or request.user.is_system_auditor
|
has_org_access = True
|
||||||
or Organization.accessible_objects(request.user, 'admin_role').exists()
|
else:
|
||||||
or Organization.accessible_objects(request.user, 'auditor_role').exists()
|
# Single query checking all three organization role types at once
|
||||||
or Organization.accessible_objects(request.user, 'project_admin_role').exists()
|
has_org_access = (
|
||||||
):
|
(
|
||||||
|
Organization.access_qs(request.user, 'change')
|
||||||
|
| Organization.access_qs(request.user, 'audit')
|
||||||
|
| Organization.access_qs(request.user, 'add_project')
|
||||||
|
)
|
||||||
|
.distinct()
|
||||||
|
.exists()
|
||||||
|
)
|
||||||
|
|
||||||
|
if has_org_access:
|
||||||
data.update(
|
data.update(
|
||||||
dict(
|
dict(
|
||||||
project_base_dir=settings.PROJECTS_ROOT,
|
project_base_dir=settings.PROJECTS_ROOT,
|
||||||
@@ -358,7 +367,9 @@ class ApiV2ConfigView(APIView):
|
|||||||
custom_virtualenvs=get_custom_venv_choices(),
|
custom_virtualenvs=get_custom_venv_choices(),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
elif JobTemplate.accessible_objects(request.user, 'admin_role').exists():
|
else:
|
||||||
|
# Only check JobTemplate access if org check failed
|
||||||
|
if JobTemplate.accessible_objects(request.user, 'admin_role').exists():
|
||||||
data['custom_virtualenvs'] = get_custom_venv_choices()
|
data['custom_virtualenvs'] = get_custom_venv_choices()
|
||||||
|
|
||||||
return Response(data)
|
return Response(data)
|
||||||
|
|||||||
@@ -228,16 +228,19 @@ class BaseTask(object):
|
|||||||
# Convert to list to prevent re-evaluation of QuerySet
|
# Convert to list to prevent re-evaluation of QuerySet
|
||||||
return list(credentials_list)
|
return list(credentials_list)
|
||||||
|
|
||||||
def populate_workload_identity_tokens(self):
|
def populate_workload_identity_tokens(self, additional_credentials=None):
|
||||||
"""
|
"""
|
||||||
Populate credentials with workload identity tokens.
|
Populate credentials with workload identity tokens.
|
||||||
|
|
||||||
Sets the context on Credential objects that have input sources
|
Sets the context on Credential objects that have input sources
|
||||||
using compatible external credential types.
|
using compatible external credential types.
|
||||||
"""
|
"""
|
||||||
|
credentials = list(self._credentials)
|
||||||
|
if additional_credentials:
|
||||||
|
credentials.extend(additional_credentials)
|
||||||
credential_input_sources = (
|
credential_input_sources = (
|
||||||
(credential.context, src)
|
(credential.context, src)
|
||||||
for credential in self._credentials
|
for credential in credentials
|
||||||
for src in credential.input_sources.all()
|
for src in credential.input_sources.all()
|
||||||
if any(
|
if any(
|
||||||
field.get('id') == 'workload_identity_token' and field.get('internal')
|
field.get('id') == 'workload_identity_token' and field.get('internal')
|
||||||
@@ -1863,6 +1866,24 @@ class RunInventoryUpdate(SourceControlMixin, BaseTask):
|
|||||||
# All credentials not used by inventory source injector
|
# All credentials not used by inventory source injector
|
||||||
return inventory_update.get_extra_credentials()
|
return inventory_update.get_extra_credentials()
|
||||||
|
|
||||||
|
def populate_workload_identity_tokens(self, additional_credentials=None):
|
||||||
|
"""Also generate OIDC tokens for the cloud credential.
|
||||||
|
|
||||||
|
The cloud credential is not in _credentials (it is handled by the
|
||||||
|
inventory source injector), but it may still need a workload identity
|
||||||
|
token generated for it.
|
||||||
|
"""
|
||||||
|
cloud_cred = self.instance.get_cloud_credential()
|
||||||
|
creds = list(additional_credentials or [])
|
||||||
|
if cloud_cred:
|
||||||
|
creds.append(cloud_cred)
|
||||||
|
super().populate_workload_identity_tokens(additional_credentials=creds or None)
|
||||||
|
# Override get_cloud_credential on this instance so the injector
|
||||||
|
# uses the credential with OIDC context instead of doing a fresh
|
||||||
|
# DB fetch that would lose it.
|
||||||
|
if cloud_cred and cloud_cred.context:
|
||||||
|
self.instance.get_cloud_credential = lambda: cloud_cred
|
||||||
|
|
||||||
def build_project_dir(self, inventory_update, private_data_dir):
|
def build_project_dir(self, inventory_update, private_data_dir):
|
||||||
source_project = None
|
source_project = None
|
||||||
if inventory_update.inventory_source:
|
if inventory_update.inventory_source:
|
||||||
|
|||||||
84
awx/main/tests/functional/api/test_config_endpoint.py
Normal file
84
awx/main/tests/functional/api/test_config_endpoint.py
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
import pytest
|
||||||
|
from awx.api.versioning import reverse
|
||||||
|
from rest_framework import status
|
||||||
|
|
||||||
|
from awx.main.models.jobs import JobTemplate
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
class TestConfigEndpointFields:
|
||||||
|
def test_base_fields_all_users(self, get, rando):
|
||||||
|
url = reverse('api:api_v2_config_view')
|
||||||
|
response = get(url, rando, expect=200)
|
||||||
|
|
||||||
|
assert 'time_zone' in response.data
|
||||||
|
assert 'license_info' in response.data
|
||||||
|
assert 'version' in response.data
|
||||||
|
assert 'eula' in response.data
|
||||||
|
assert 'analytics_status' in response.data
|
||||||
|
assert 'analytics_collectors' in response.data
|
||||||
|
assert 'become_methods' in response.data
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"role_type",
|
||||||
|
[
|
||||||
|
"superuser",
|
||||||
|
"system_auditor",
|
||||||
|
"org_admin",
|
||||||
|
"org_auditor",
|
||||||
|
"org_project_admin",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_privileged_users_conditional_fields(self, get, user, organization, admin, role_type):
|
||||||
|
url = reverse('api:api_v2_config_view')
|
||||||
|
|
||||||
|
if role_type == "superuser":
|
||||||
|
test_user = admin
|
||||||
|
elif role_type == "system_auditor":
|
||||||
|
test_user = user('system-auditor', is_superuser=False)
|
||||||
|
test_user.is_system_auditor = True
|
||||||
|
test_user.save()
|
||||||
|
elif role_type == "org_admin":
|
||||||
|
test_user = user('org-admin', is_superuser=False)
|
||||||
|
organization.admin_role.members.add(test_user)
|
||||||
|
elif role_type == "org_auditor":
|
||||||
|
test_user = user('org-auditor', is_superuser=False)
|
||||||
|
organization.auditor_role.members.add(test_user)
|
||||||
|
elif role_type == "org_project_admin":
|
||||||
|
test_user = user('org-project-admin', is_superuser=False)
|
||||||
|
organization.project_admin_role.members.add(test_user)
|
||||||
|
|
||||||
|
response = get(url, test_user, expect=200)
|
||||||
|
|
||||||
|
assert 'project_base_dir' in response.data
|
||||||
|
assert 'project_local_paths' in response.data
|
||||||
|
assert 'custom_virtualenvs' in response.data
|
||||||
|
|
||||||
|
def test_job_template_admin_gets_venvs_only(self, get, user, organization, project, inventory):
|
||||||
|
"""Test that JobTemplate admin without org access gets only custom_virtualenvs"""
|
||||||
|
jt_admin = user('jt-admin', is_superuser=False)
|
||||||
|
|
||||||
|
jt = JobTemplate.objects.create(name='test-jt', organization=organization, project=project, inventory=inventory)
|
||||||
|
jt.admin_role.members.add(jt_admin)
|
||||||
|
|
||||||
|
url = reverse('api:api_v2_config_view')
|
||||||
|
response = get(url, jt_admin, expect=200)
|
||||||
|
|
||||||
|
assert 'custom_virtualenvs' in response.data
|
||||||
|
assert 'project_base_dir' not in response.data
|
||||||
|
assert 'project_local_paths' not in response.data
|
||||||
|
|
||||||
|
def test_normal_user_no_conditional_fields(self, get, rando):
|
||||||
|
url = reverse('api:api_v2_config_view')
|
||||||
|
response = get(url, rando, expect=200)
|
||||||
|
|
||||||
|
assert 'project_base_dir' not in response.data
|
||||||
|
assert 'project_local_paths' not in response.data
|
||||||
|
assert 'custom_virtualenvs' not in response.data
|
||||||
|
|
||||||
|
def test_unauthenticated_denied(self, get):
|
||||||
|
"""Test that unauthenticated requests are denied"""
|
||||||
|
url = reverse('api:api_v2_config_view')
|
||||||
|
response = get(url, None, expect=401)
|
||||||
|
|
||||||
|
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
||||||
@@ -590,3 +590,67 @@ def test_populate_workload_identity_tokens_passes_get_instance_timeout_to_client
|
|||||||
scope=AutomationControllerJobScope.name,
|
scope=AutomationControllerJobScope.name,
|
||||||
workload_ttl_seconds=expected_ttl,
|
workload_ttl_seconds=expected_ttl,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestRunInventoryUpdatePopulateWorkloadIdentityTokens:
|
||||||
|
"""Tests for RunInventoryUpdate.populate_workload_identity_tokens."""
|
||||||
|
|
||||||
|
def test_cloud_credential_passed_as_additional_credential(self):
|
||||||
|
"""The cloud credential is forwarded to super().populate_workload_identity_tokens via additional_credentials."""
|
||||||
|
cloud_cred = mock.MagicMock(name='cloud_cred')
|
||||||
|
cloud_cred.context = {}
|
||||||
|
|
||||||
|
task = jobs.RunInventoryUpdate()
|
||||||
|
task.instance = mock.MagicMock()
|
||||||
|
task.instance.get_cloud_credential.return_value = cloud_cred
|
||||||
|
task._credentials = []
|
||||||
|
|
||||||
|
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
|
||||||
|
task.populate_workload_identity_tokens()
|
||||||
|
|
||||||
|
mock_super.assert_called_once_with(additional_credentials=[cloud_cred])
|
||||||
|
|
||||||
|
def test_no_cloud_credential_calls_super_with_none(self):
|
||||||
|
"""When there is no cloud credential, super() is called with additional_credentials=None."""
|
||||||
|
task = jobs.RunInventoryUpdate()
|
||||||
|
task.instance = mock.MagicMock()
|
||||||
|
task.instance.get_cloud_credential.return_value = None
|
||||||
|
task._credentials = []
|
||||||
|
|
||||||
|
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
|
||||||
|
task.populate_workload_identity_tokens()
|
||||||
|
|
||||||
|
mock_super.assert_called_once_with(additional_credentials=None)
|
||||||
|
|
||||||
|
def test_additional_credentials_combined_with_cloud_credential(self):
|
||||||
|
"""Caller-supplied additional_credentials are combined with the cloud credential."""
|
||||||
|
cloud_cred = mock.MagicMock(name='cloud_cred')
|
||||||
|
cloud_cred.context = {}
|
||||||
|
extra_cred = mock.MagicMock(name='extra_cred')
|
||||||
|
|
||||||
|
task = jobs.RunInventoryUpdate()
|
||||||
|
task.instance = mock.MagicMock()
|
||||||
|
task.instance.get_cloud_credential.return_value = cloud_cred
|
||||||
|
task._credentials = []
|
||||||
|
|
||||||
|
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
|
||||||
|
task.populate_workload_identity_tokens(additional_credentials=[extra_cred])
|
||||||
|
|
||||||
|
mock_super.assert_called_once_with(additional_credentials=[extra_cred, cloud_cred])
|
||||||
|
|
||||||
|
def test_cloud_credential_override_after_context_set(self):
|
||||||
|
"""After OIDC processing, get_cloud_credential is overridden on the instance when context is populated."""
|
||||||
|
cloud_cred = mock.MagicMock(name='cloud_cred')
|
||||||
|
# Simulate that super().populate_workload_identity_tokens populates context
|
||||||
|
cloud_cred.context = {'workload_identity_token': 'eyJ.test.jwt'}
|
||||||
|
|
||||||
|
task = jobs.RunInventoryUpdate()
|
||||||
|
task.instance = mock.MagicMock()
|
||||||
|
task.instance.get_cloud_credential.return_value = cloud_cred
|
||||||
|
task._credentials = []
|
||||||
|
|
||||||
|
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens'):
|
||||||
|
task.populate_workload_identity_tokens()
|
||||||
|
|
||||||
|
# The instance's get_cloud_credential should now return the same object with context
|
||||||
|
assert task.instance.get_cloud_credential() is cloud_cred
|
||||||
|
|||||||
@@ -276,7 +276,6 @@ options:
|
|||||||
- ''
|
- ''
|
||||||
- 'github'
|
- 'github'
|
||||||
- 'gitlab'
|
- 'gitlab'
|
||||||
- 'bitbucket_dc'
|
|
||||||
webhook_credential:
|
webhook_credential:
|
||||||
description:
|
description:
|
||||||
- Personal Access Token for posting back the status to the service API
|
- Personal Access Token for posting back the status to the service API
|
||||||
@@ -437,7 +436,7 @@ def main():
|
|||||||
scm_branch=dict(),
|
scm_branch=dict(),
|
||||||
ask_scm_branch_on_launch=dict(type='bool'),
|
ask_scm_branch_on_launch=dict(type='bool'),
|
||||||
job_slice_count=dict(type='int'),
|
job_slice_count=dict(type='int'),
|
||||||
webhook_service=dict(choices=['github', 'gitlab', 'bitbucket_dc', '']),
|
webhook_service=dict(choices=['github', 'gitlab', '']),
|
||||||
webhook_credential=dict(),
|
webhook_credential=dict(),
|
||||||
labels=dict(type="list", elements='str'),
|
labels=dict(type="list", elements='str'),
|
||||||
notification_templates_started=dict(type="list", elements='str'),
|
notification_templates_started=dict(type="list", elements='str'),
|
||||||
|
|||||||
@@ -117,7 +117,6 @@ options:
|
|||||||
choices:
|
choices:
|
||||||
- github
|
- github
|
||||||
- gitlab
|
- gitlab
|
||||||
- bitbucket_dc
|
|
||||||
webhook_credential:
|
webhook_credential:
|
||||||
description:
|
description:
|
||||||
- Personal Access Token for posting back the status to the service API
|
- Personal Access Token for posting back the status to the service API
|
||||||
@@ -829,7 +828,7 @@ def main():
|
|||||||
ask_inventory_on_launch=dict(type='bool'),
|
ask_inventory_on_launch=dict(type='bool'),
|
||||||
ask_scm_branch_on_launch=dict(type='bool'),
|
ask_scm_branch_on_launch=dict(type='bool'),
|
||||||
ask_limit_on_launch=dict(type='bool'),
|
ask_limit_on_launch=dict(type='bool'),
|
||||||
webhook_service=dict(choices=['github', 'gitlab', 'bitbucket_dc']),
|
webhook_service=dict(choices=['github', 'gitlab']),
|
||||||
webhook_credential=dict(),
|
webhook_credential=dict(),
|
||||||
labels=dict(type="list", elements='str'),
|
labels=dict(type="list", elements='str'),
|
||||||
notification_templates_started=dict(type="list", elements='str'),
|
notification_templates_started=dict(type="list", elements='str'),
|
||||||
|
|||||||
@@ -1,124 +0,0 @@
|
|||||||
from __future__ import absolute_import, division, print_function
|
|
||||||
|
|
||||||
__metaclass__ = type
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from awx.main.models import JobTemplate, WorkflowJobTemplate
|
|
||||||
|
|
||||||
|
|
||||||
# The backend supports these webhook services on job/workflow templates
|
|
||||||
# (see awx/main/models/mixins.py). The collection modules must accept all of
|
|
||||||
# them in their argument_spec ``choices`` list. This test guards against the
|
|
||||||
# module's choices drifting from the backend -- see AAP-45980, where
|
|
||||||
# ``bitbucket_dc`` had been supported by the API since migration 0188 but was
|
|
||||||
# still being rejected by the job_template/workflow_job_template modules.
|
|
||||||
WEBHOOK_SERVICES = ['github', 'gitlab', 'bitbucket_dc']
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
@pytest.mark.parametrize('webhook_service', WEBHOOK_SERVICES)
|
|
||||||
def test_job_template_accepts_webhook_service(run_module, admin_user, project, inventory, webhook_service):
|
|
||||||
result = run_module(
|
|
||||||
'job_template',
|
|
||||||
{
|
|
||||||
'name': 'foo',
|
|
||||||
'playbook': 'helloworld.yml',
|
|
||||||
'project': project.name,
|
|
||||||
'inventory': inventory.name,
|
|
||||||
'webhook_service': webhook_service,
|
|
||||||
'state': 'present',
|
|
||||||
},
|
|
||||||
admin_user,
|
|
||||||
)
|
|
||||||
|
|
||||||
assert not result.get('failed', False), result.get('msg', result)
|
|
||||||
assert result.get('changed', False), result
|
|
||||||
|
|
||||||
jt = JobTemplate.objects.get(name='foo')
|
|
||||||
assert jt.webhook_service == webhook_service
|
|
||||||
|
|
||||||
# Re-running with the same args must be a no-op (idempotence).
|
|
||||||
result = run_module(
|
|
||||||
'job_template',
|
|
||||||
{
|
|
||||||
'name': 'foo',
|
|
||||||
'playbook': 'helloworld.yml',
|
|
||||||
'project': project.name,
|
|
||||||
'inventory': inventory.name,
|
|
||||||
'webhook_service': webhook_service,
|
|
||||||
'state': 'present',
|
|
||||||
},
|
|
||||||
admin_user,
|
|
||||||
)
|
|
||||||
assert not result.get('failed', False), result.get('msg', result)
|
|
||||||
assert not result.get('changed', True), result
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
@pytest.mark.parametrize('webhook_service', WEBHOOK_SERVICES)
|
|
||||||
def test_workflow_job_template_accepts_webhook_service(run_module, admin_user, organization, webhook_service):
|
|
||||||
result = run_module(
|
|
||||||
'workflow_job_template',
|
|
||||||
{
|
|
||||||
'name': 'foo-workflow',
|
|
||||||
'organization': organization.name,
|
|
||||||
'webhook_service': webhook_service,
|
|
||||||
'state': 'present',
|
|
||||||
},
|
|
||||||
admin_user,
|
|
||||||
)
|
|
||||||
|
|
||||||
assert not result.get('failed', False), result.get('msg', result)
|
|
||||||
assert result.get('changed', False), result
|
|
||||||
|
|
||||||
wfjt = WorkflowJobTemplate.objects.get(name='foo-workflow')
|
|
||||||
assert wfjt.webhook_service == webhook_service
|
|
||||||
|
|
||||||
# Re-running with the same args must be a no-op (idempotence).
|
|
||||||
result = run_module(
|
|
||||||
'workflow_job_template',
|
|
||||||
{
|
|
||||||
'name': 'foo-workflow',
|
|
||||||
'organization': organization.name,
|
|
||||||
'webhook_service': webhook_service,
|
|
||||||
'state': 'present',
|
|
||||||
},
|
|
||||||
admin_user,
|
|
||||||
)
|
|
||||||
assert not result.get('failed', False), result.get('msg', result)
|
|
||||||
assert not result.get('changed', True), result
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
def test_job_template_rejects_unknown_webhook_service(run_module, admin_user, project, inventory):
|
|
||||||
result = run_module(
|
|
||||||
'job_template',
|
|
||||||
{
|
|
||||||
'name': 'foo',
|
|
||||||
'playbook': 'helloworld.yml',
|
|
||||||
'project': project.name,
|
|
||||||
'inventory': inventory.name,
|
|
||||||
'webhook_service': 'not_a_real_service',
|
|
||||||
'state': 'present',
|
|
||||||
},
|
|
||||||
admin_user,
|
|
||||||
)
|
|
||||||
assert result.get('failed', False), result
|
|
||||||
assert 'webhook_service' in result.get('msg', '')
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
def test_workflow_job_template_rejects_unknown_webhook_service(run_module, admin_user, organization):
|
|
||||||
result = run_module(
|
|
||||||
'workflow_job_template',
|
|
||||||
{
|
|
||||||
'name': 'foo-workflow',
|
|
||||||
'organization': organization.name,
|
|
||||||
'webhook_service': 'not_a_real_service',
|
|
||||||
'state': 'present',
|
|
||||||
},
|
|
||||||
admin_user,
|
|
||||||
)
|
|
||||||
assert result.get('failed', False), result
|
|
||||||
assert 'webhook_service' in result.get('msg', '')
|
|
||||||
Reference in New Issue
Block a user