mirror of
https://github.com/ansible/awx.git
synced 2026-04-11 13:09:21 -02:30
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2f80a4e30d | ||
|
|
9c2de57235 | ||
|
|
bb2f8cdd01 |
@@ -1,4 +1,4 @@
|
|||||||
---
|
---
|
||||||
collections:
|
collections:
|
||||||
- name: ansible.receptor
|
- name: ansible.receptor
|
||||||
version: 2.0.8
|
version: 2.0.6
|
||||||
|
|||||||
@@ -344,22 +344,13 @@ class ApiV2ConfigView(APIView):
|
|||||||
become_methods=PRIVILEGE_ESCALATION_METHODS,
|
become_methods=PRIVILEGE_ESCALATION_METHODS,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Check superuser/auditor first
|
if (
|
||||||
if request.user.is_superuser or request.user.is_system_auditor:
|
request.user.is_superuser
|
||||||
has_org_access = True
|
or request.user.is_system_auditor
|
||||||
else:
|
or Organization.accessible_objects(request.user, 'admin_role').exists()
|
||||||
# Single query checking all three organization role types at once
|
or Organization.accessible_objects(request.user, 'auditor_role').exists()
|
||||||
has_org_access = (
|
or Organization.accessible_objects(request.user, 'project_admin_role').exists()
|
||||||
(
|
):
|
||||||
Organization.access_qs(request.user, 'change')
|
|
||||||
| Organization.access_qs(request.user, 'audit')
|
|
||||||
| Organization.access_qs(request.user, 'add_project')
|
|
||||||
)
|
|
||||||
.distinct()
|
|
||||||
.exists()
|
|
||||||
)
|
|
||||||
|
|
||||||
if has_org_access:
|
|
||||||
data.update(
|
data.update(
|
||||||
dict(
|
dict(
|
||||||
project_base_dir=settings.PROJECTS_ROOT,
|
project_base_dir=settings.PROJECTS_ROOT,
|
||||||
@@ -367,9 +358,7 @@ class ApiV2ConfigView(APIView):
|
|||||||
custom_virtualenvs=get_custom_venv_choices(),
|
custom_virtualenvs=get_custom_venv_choices(),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
else:
|
elif JobTemplate.accessible_objects(request.user, 'admin_role').exists():
|
||||||
# Only check JobTemplate access if org check failed
|
|
||||||
if JobTemplate.accessible_objects(request.user, 'admin_role').exists():
|
|
||||||
data['custom_virtualenvs'] = get_custom_venv_choices()
|
data['custom_virtualenvs'] = get_custom_venv_choices()
|
||||||
|
|
||||||
return Response(data)
|
return Response(data)
|
||||||
|
|||||||
@@ -228,19 +228,16 @@ class BaseTask(object):
|
|||||||
# Convert to list to prevent re-evaluation of QuerySet
|
# Convert to list to prevent re-evaluation of QuerySet
|
||||||
return list(credentials_list)
|
return list(credentials_list)
|
||||||
|
|
||||||
def populate_workload_identity_tokens(self, additional_credentials=None):
|
def populate_workload_identity_tokens(self):
|
||||||
"""
|
"""
|
||||||
Populate credentials with workload identity tokens.
|
Populate credentials with workload identity tokens.
|
||||||
|
|
||||||
Sets the context on Credential objects that have input sources
|
Sets the context on Credential objects that have input sources
|
||||||
using compatible external credential types.
|
using compatible external credential types.
|
||||||
"""
|
"""
|
||||||
credentials = list(self._credentials)
|
|
||||||
if additional_credentials:
|
|
||||||
credentials.extend(additional_credentials)
|
|
||||||
credential_input_sources = (
|
credential_input_sources = (
|
||||||
(credential.context, src)
|
(credential.context, src)
|
||||||
for credential in credentials
|
for credential in self._credentials
|
||||||
for src in credential.input_sources.all()
|
for src in credential.input_sources.all()
|
||||||
if any(
|
if any(
|
||||||
field.get('id') == 'workload_identity_token' and field.get('internal')
|
field.get('id') == 'workload_identity_token' and field.get('internal')
|
||||||
@@ -1866,24 +1863,6 @@ class RunInventoryUpdate(SourceControlMixin, BaseTask):
|
|||||||
# All credentials not used by inventory source injector
|
# All credentials not used by inventory source injector
|
||||||
return inventory_update.get_extra_credentials()
|
return inventory_update.get_extra_credentials()
|
||||||
|
|
||||||
def populate_workload_identity_tokens(self, additional_credentials=None):
|
|
||||||
"""Also generate OIDC tokens for the cloud credential.
|
|
||||||
|
|
||||||
The cloud credential is not in _credentials (it is handled by the
|
|
||||||
inventory source injector), but it may still need a workload identity
|
|
||||||
token generated for it.
|
|
||||||
"""
|
|
||||||
cloud_cred = self.instance.get_cloud_credential()
|
|
||||||
creds = list(additional_credentials or [])
|
|
||||||
if cloud_cred:
|
|
||||||
creds.append(cloud_cred)
|
|
||||||
super().populate_workload_identity_tokens(additional_credentials=creds or None)
|
|
||||||
# Override get_cloud_credential on this instance so the injector
|
|
||||||
# uses the credential with OIDC context instead of doing a fresh
|
|
||||||
# DB fetch that would lose it.
|
|
||||||
if cloud_cred and cloud_cred.context:
|
|
||||||
self.instance.get_cloud_credential = lambda: cloud_cred
|
|
||||||
|
|
||||||
def build_project_dir(self, inventory_update, private_data_dir):
|
def build_project_dir(self, inventory_update, private_data_dir):
|
||||||
source_project = None
|
source_project = None
|
||||||
if inventory_update.inventory_source:
|
if inventory_update.inventory_source:
|
||||||
|
|||||||
@@ -1,84 +0,0 @@
|
|||||||
import pytest
|
|
||||||
from awx.api.versioning import reverse
|
|
||||||
from rest_framework import status
|
|
||||||
|
|
||||||
from awx.main.models.jobs import JobTemplate
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
class TestConfigEndpointFields:
|
|
||||||
def test_base_fields_all_users(self, get, rando):
|
|
||||||
url = reverse('api:api_v2_config_view')
|
|
||||||
response = get(url, rando, expect=200)
|
|
||||||
|
|
||||||
assert 'time_zone' in response.data
|
|
||||||
assert 'license_info' in response.data
|
|
||||||
assert 'version' in response.data
|
|
||||||
assert 'eula' in response.data
|
|
||||||
assert 'analytics_status' in response.data
|
|
||||||
assert 'analytics_collectors' in response.data
|
|
||||||
assert 'become_methods' in response.data
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
"role_type",
|
|
||||||
[
|
|
||||||
"superuser",
|
|
||||||
"system_auditor",
|
|
||||||
"org_admin",
|
|
||||||
"org_auditor",
|
|
||||||
"org_project_admin",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
def test_privileged_users_conditional_fields(self, get, user, organization, admin, role_type):
|
|
||||||
url = reverse('api:api_v2_config_view')
|
|
||||||
|
|
||||||
if role_type == "superuser":
|
|
||||||
test_user = admin
|
|
||||||
elif role_type == "system_auditor":
|
|
||||||
test_user = user('system-auditor', is_superuser=False)
|
|
||||||
test_user.is_system_auditor = True
|
|
||||||
test_user.save()
|
|
||||||
elif role_type == "org_admin":
|
|
||||||
test_user = user('org-admin', is_superuser=False)
|
|
||||||
organization.admin_role.members.add(test_user)
|
|
||||||
elif role_type == "org_auditor":
|
|
||||||
test_user = user('org-auditor', is_superuser=False)
|
|
||||||
organization.auditor_role.members.add(test_user)
|
|
||||||
elif role_type == "org_project_admin":
|
|
||||||
test_user = user('org-project-admin', is_superuser=False)
|
|
||||||
organization.project_admin_role.members.add(test_user)
|
|
||||||
|
|
||||||
response = get(url, test_user, expect=200)
|
|
||||||
|
|
||||||
assert 'project_base_dir' in response.data
|
|
||||||
assert 'project_local_paths' in response.data
|
|
||||||
assert 'custom_virtualenvs' in response.data
|
|
||||||
|
|
||||||
def test_job_template_admin_gets_venvs_only(self, get, user, organization, project, inventory):
|
|
||||||
"""Test that JobTemplate admin without org access gets only custom_virtualenvs"""
|
|
||||||
jt_admin = user('jt-admin', is_superuser=False)
|
|
||||||
|
|
||||||
jt = JobTemplate.objects.create(name='test-jt', organization=organization, project=project, inventory=inventory)
|
|
||||||
jt.admin_role.members.add(jt_admin)
|
|
||||||
|
|
||||||
url = reverse('api:api_v2_config_view')
|
|
||||||
response = get(url, jt_admin, expect=200)
|
|
||||||
|
|
||||||
assert 'custom_virtualenvs' in response.data
|
|
||||||
assert 'project_base_dir' not in response.data
|
|
||||||
assert 'project_local_paths' not in response.data
|
|
||||||
|
|
||||||
def test_normal_user_no_conditional_fields(self, get, rando):
|
|
||||||
url = reverse('api:api_v2_config_view')
|
|
||||||
response = get(url, rando, expect=200)
|
|
||||||
|
|
||||||
assert 'project_base_dir' not in response.data
|
|
||||||
assert 'project_local_paths' not in response.data
|
|
||||||
assert 'custom_virtualenvs' not in response.data
|
|
||||||
|
|
||||||
def test_unauthenticated_denied(self, get):
|
|
||||||
"""Test that unauthenticated requests are denied"""
|
|
||||||
url = reverse('api:api_v2_config_view')
|
|
||||||
response = get(url, None, expect=401)
|
|
||||||
|
|
||||||
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
|
||||||
@@ -590,67 +590,3 @@ def test_populate_workload_identity_tokens_passes_get_instance_timeout_to_client
|
|||||||
scope=AutomationControllerJobScope.name,
|
scope=AutomationControllerJobScope.name,
|
||||||
workload_ttl_seconds=expected_ttl,
|
workload_ttl_seconds=expected_ttl,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class TestRunInventoryUpdatePopulateWorkloadIdentityTokens:
|
|
||||||
"""Tests for RunInventoryUpdate.populate_workload_identity_tokens."""
|
|
||||||
|
|
||||||
def test_cloud_credential_passed_as_additional_credential(self):
|
|
||||||
"""The cloud credential is forwarded to super().populate_workload_identity_tokens via additional_credentials."""
|
|
||||||
cloud_cred = mock.MagicMock(name='cloud_cred')
|
|
||||||
cloud_cred.context = {}
|
|
||||||
|
|
||||||
task = jobs.RunInventoryUpdate()
|
|
||||||
task.instance = mock.MagicMock()
|
|
||||||
task.instance.get_cloud_credential.return_value = cloud_cred
|
|
||||||
task._credentials = []
|
|
||||||
|
|
||||||
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
|
|
||||||
task.populate_workload_identity_tokens()
|
|
||||||
|
|
||||||
mock_super.assert_called_once_with(additional_credentials=[cloud_cred])
|
|
||||||
|
|
||||||
def test_no_cloud_credential_calls_super_with_none(self):
|
|
||||||
"""When there is no cloud credential, super() is called with additional_credentials=None."""
|
|
||||||
task = jobs.RunInventoryUpdate()
|
|
||||||
task.instance = mock.MagicMock()
|
|
||||||
task.instance.get_cloud_credential.return_value = None
|
|
||||||
task._credentials = []
|
|
||||||
|
|
||||||
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
|
|
||||||
task.populate_workload_identity_tokens()
|
|
||||||
|
|
||||||
mock_super.assert_called_once_with(additional_credentials=None)
|
|
||||||
|
|
||||||
def test_additional_credentials_combined_with_cloud_credential(self):
|
|
||||||
"""Caller-supplied additional_credentials are combined with the cloud credential."""
|
|
||||||
cloud_cred = mock.MagicMock(name='cloud_cred')
|
|
||||||
cloud_cred.context = {}
|
|
||||||
extra_cred = mock.MagicMock(name='extra_cred')
|
|
||||||
|
|
||||||
task = jobs.RunInventoryUpdate()
|
|
||||||
task.instance = mock.MagicMock()
|
|
||||||
task.instance.get_cloud_credential.return_value = cloud_cred
|
|
||||||
task._credentials = []
|
|
||||||
|
|
||||||
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens') as mock_super:
|
|
||||||
task.populate_workload_identity_tokens(additional_credentials=[extra_cred])
|
|
||||||
|
|
||||||
mock_super.assert_called_once_with(additional_credentials=[extra_cred, cloud_cred])
|
|
||||||
|
|
||||||
def test_cloud_credential_override_after_context_set(self):
|
|
||||||
"""After OIDC processing, get_cloud_credential is overridden on the instance when context is populated."""
|
|
||||||
cloud_cred = mock.MagicMock(name='cloud_cred')
|
|
||||||
# Simulate that super().populate_workload_identity_tokens populates context
|
|
||||||
cloud_cred.context = {'workload_identity_token': 'eyJ.test.jwt'}
|
|
||||||
|
|
||||||
task = jobs.RunInventoryUpdate()
|
|
||||||
task.instance = mock.MagicMock()
|
|
||||||
task.instance.get_cloud_credential.return_value = cloud_cred
|
|
||||||
task._credentials = []
|
|
||||||
|
|
||||||
with mock.patch.object(jobs.BaseTask, 'populate_workload_identity_tokens'):
|
|
||||||
task.populate_workload_identity_tokens()
|
|
||||||
|
|
||||||
# The instance's get_cloud_credential should now return the same object with context
|
|
||||||
assert task.instance.get_cloud_credential() is cloud_cred
|
|
||||||
|
|||||||
@@ -276,6 +276,7 @@ options:
|
|||||||
- ''
|
- ''
|
||||||
- 'github'
|
- 'github'
|
||||||
- 'gitlab'
|
- 'gitlab'
|
||||||
|
- 'bitbucket_dc'
|
||||||
webhook_credential:
|
webhook_credential:
|
||||||
description:
|
description:
|
||||||
- Personal Access Token for posting back the status to the service API
|
- Personal Access Token for posting back the status to the service API
|
||||||
@@ -436,7 +437,7 @@ def main():
|
|||||||
scm_branch=dict(),
|
scm_branch=dict(),
|
||||||
ask_scm_branch_on_launch=dict(type='bool'),
|
ask_scm_branch_on_launch=dict(type='bool'),
|
||||||
job_slice_count=dict(type='int'),
|
job_slice_count=dict(type='int'),
|
||||||
webhook_service=dict(choices=['github', 'gitlab', '']),
|
webhook_service=dict(choices=['github', 'gitlab', 'bitbucket_dc', '']),
|
||||||
webhook_credential=dict(),
|
webhook_credential=dict(),
|
||||||
labels=dict(type="list", elements='str'),
|
labels=dict(type="list", elements='str'),
|
||||||
notification_templates_started=dict(type="list", elements='str'),
|
notification_templates_started=dict(type="list", elements='str'),
|
||||||
|
|||||||
@@ -117,6 +117,7 @@ options:
|
|||||||
choices:
|
choices:
|
||||||
- github
|
- github
|
||||||
- gitlab
|
- gitlab
|
||||||
|
- bitbucket_dc
|
||||||
webhook_credential:
|
webhook_credential:
|
||||||
description:
|
description:
|
||||||
- Personal Access Token for posting back the status to the service API
|
- Personal Access Token for posting back the status to the service API
|
||||||
@@ -828,7 +829,7 @@ def main():
|
|||||||
ask_inventory_on_launch=dict(type='bool'),
|
ask_inventory_on_launch=dict(type='bool'),
|
||||||
ask_scm_branch_on_launch=dict(type='bool'),
|
ask_scm_branch_on_launch=dict(type='bool'),
|
||||||
ask_limit_on_launch=dict(type='bool'),
|
ask_limit_on_launch=dict(type='bool'),
|
||||||
webhook_service=dict(choices=['github', 'gitlab']),
|
webhook_service=dict(choices=['github', 'gitlab', 'bitbucket_dc']),
|
||||||
webhook_credential=dict(),
|
webhook_credential=dict(),
|
||||||
labels=dict(type="list", elements='str'),
|
labels=dict(type="list", elements='str'),
|
||||||
notification_templates_started=dict(type="list", elements='str'),
|
notification_templates_started=dict(type="list", elements='str'),
|
||||||
|
|||||||
124
awx_collection/test/awx/test_webhooks.py
Normal file
124
awx_collection/test/awx/test_webhooks.py
Normal file
@@ -0,0 +1,124 @@
|
|||||||
|
from __future__ import absolute_import, division, print_function
|
||||||
|
|
||||||
|
__metaclass__ = type
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from awx.main.models import JobTemplate, WorkflowJobTemplate
|
||||||
|
|
||||||
|
|
||||||
|
# The backend supports these webhook services on job/workflow templates
|
||||||
|
# (see awx/main/models/mixins.py). The collection modules must accept all of
|
||||||
|
# them in their argument_spec ``choices`` list. This test guards against the
|
||||||
|
# module's choices drifting from the backend -- see AAP-45980, where
|
||||||
|
# ``bitbucket_dc`` had been supported by the API since migration 0188 but was
|
||||||
|
# still being rejected by the job_template/workflow_job_template modules.
|
||||||
|
WEBHOOK_SERVICES = ['github', 'gitlab', 'bitbucket_dc']
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
@pytest.mark.parametrize('webhook_service', WEBHOOK_SERVICES)
|
||||||
|
def test_job_template_accepts_webhook_service(run_module, admin_user, project, inventory, webhook_service):
|
||||||
|
result = run_module(
|
||||||
|
'job_template',
|
||||||
|
{
|
||||||
|
'name': 'foo',
|
||||||
|
'playbook': 'helloworld.yml',
|
||||||
|
'project': project.name,
|
||||||
|
'inventory': inventory.name,
|
||||||
|
'webhook_service': webhook_service,
|
||||||
|
'state': 'present',
|
||||||
|
},
|
||||||
|
admin_user,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert not result.get('failed', False), result.get('msg', result)
|
||||||
|
assert result.get('changed', False), result
|
||||||
|
|
||||||
|
jt = JobTemplate.objects.get(name='foo')
|
||||||
|
assert jt.webhook_service == webhook_service
|
||||||
|
|
||||||
|
# Re-running with the same args must be a no-op (idempotence).
|
||||||
|
result = run_module(
|
||||||
|
'job_template',
|
||||||
|
{
|
||||||
|
'name': 'foo',
|
||||||
|
'playbook': 'helloworld.yml',
|
||||||
|
'project': project.name,
|
||||||
|
'inventory': inventory.name,
|
||||||
|
'webhook_service': webhook_service,
|
||||||
|
'state': 'present',
|
||||||
|
},
|
||||||
|
admin_user,
|
||||||
|
)
|
||||||
|
assert not result.get('failed', False), result.get('msg', result)
|
||||||
|
assert not result.get('changed', True), result
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
@pytest.mark.parametrize('webhook_service', WEBHOOK_SERVICES)
|
||||||
|
def test_workflow_job_template_accepts_webhook_service(run_module, admin_user, organization, webhook_service):
|
||||||
|
result = run_module(
|
||||||
|
'workflow_job_template',
|
||||||
|
{
|
||||||
|
'name': 'foo-workflow',
|
||||||
|
'organization': organization.name,
|
||||||
|
'webhook_service': webhook_service,
|
||||||
|
'state': 'present',
|
||||||
|
},
|
||||||
|
admin_user,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert not result.get('failed', False), result.get('msg', result)
|
||||||
|
assert result.get('changed', False), result
|
||||||
|
|
||||||
|
wfjt = WorkflowJobTemplate.objects.get(name='foo-workflow')
|
||||||
|
assert wfjt.webhook_service == webhook_service
|
||||||
|
|
||||||
|
# Re-running with the same args must be a no-op (idempotence).
|
||||||
|
result = run_module(
|
||||||
|
'workflow_job_template',
|
||||||
|
{
|
||||||
|
'name': 'foo-workflow',
|
||||||
|
'organization': organization.name,
|
||||||
|
'webhook_service': webhook_service,
|
||||||
|
'state': 'present',
|
||||||
|
},
|
||||||
|
admin_user,
|
||||||
|
)
|
||||||
|
assert not result.get('failed', False), result.get('msg', result)
|
||||||
|
assert not result.get('changed', True), result
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_job_template_rejects_unknown_webhook_service(run_module, admin_user, project, inventory):
|
||||||
|
result = run_module(
|
||||||
|
'job_template',
|
||||||
|
{
|
||||||
|
'name': 'foo',
|
||||||
|
'playbook': 'helloworld.yml',
|
||||||
|
'project': project.name,
|
||||||
|
'inventory': inventory.name,
|
||||||
|
'webhook_service': 'not_a_real_service',
|
||||||
|
'state': 'present',
|
||||||
|
},
|
||||||
|
admin_user,
|
||||||
|
)
|
||||||
|
assert result.get('failed', False), result
|
||||||
|
assert 'webhook_service' in result.get('msg', '')
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_workflow_job_template_rejects_unknown_webhook_service(run_module, admin_user, organization):
|
||||||
|
result = run_module(
|
||||||
|
'workflow_job_template',
|
||||||
|
{
|
||||||
|
'name': 'foo-workflow',
|
||||||
|
'organization': organization.name,
|
||||||
|
'webhook_service': 'not_a_real_service',
|
||||||
|
'state': 'present',
|
||||||
|
},
|
||||||
|
admin_user,
|
||||||
|
)
|
||||||
|
assert result.get('failed', False), result
|
||||||
|
assert 'webhook_service' in result.get('msg', '')
|
||||||
Reference in New Issue
Block a user