mirror of
https://github.com/ansible/awx.git
synced 2026-05-13 12:27:37 -02:30
Fix permissions that come from an external auditor role (#15291)
* Add tests for external auditor * Add assertion for unified JTs which fails * Fix UJT listing bug * Add test for ad hoc commands just to be sure
This commit is contained in:
@@ -31,6 +31,7 @@ from rest_framework.exceptions import ParseError
|
|||||||
from polymorphic.models import PolymorphicModel
|
from polymorphic.models import PolymorphicModel
|
||||||
|
|
||||||
from ansible_base.lib.utils.models import prevent_search, get_type_for_model
|
from ansible_base.lib.utils.models import prevent_search, get_type_for_model
|
||||||
|
from ansible_base.rbac import permission_registry
|
||||||
|
|
||||||
# AWX
|
# AWX
|
||||||
from awx.main.models.base import CommonModelNameNotUnique, PasswordFieldsModel, NotificationFieldsModel
|
from awx.main.models.base import CommonModelNameNotUnique, PasswordFieldsModel, NotificationFieldsModel
|
||||||
@@ -197,9 +198,7 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, ExecutionEn
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _submodels_with_roles(cls):
|
def _submodels_with_roles(cls):
|
||||||
ujt_classes = [c for c in cls.__subclasses__() if c._meta.model_name not in ['inventorysource', 'systemjobtemplate']]
|
return [c for c in cls.__subclasses__() if permission_registry.is_registered(c)]
|
||||||
ct_dict = ContentType.objects.get_for_models(*ujt_classes)
|
|
||||||
return [ct.id for ct in ct_dict.values()]
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def accessible_pk_qs(cls, accessor, role_field):
|
def accessible_pk_qs(cls, accessor, role_field):
|
||||||
@@ -215,8 +214,16 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, ExecutionEn
|
|||||||
|
|
||||||
action = to_permissions[role_field]
|
action = to_permissions[role_field]
|
||||||
|
|
||||||
|
# Special condition for super auditor
|
||||||
|
role_subclasses = cls._submodels_with_roles()
|
||||||
|
role_cts = ContentType.objects.get_for_models(*role_subclasses).values()
|
||||||
|
all_codenames = {f'{action}_{cls._meta.model_name}' for cls in role_subclasses}
|
||||||
|
if not (all_codenames - accessor.singleton_permissions()):
|
||||||
|
qs = cls.objects.filter(polymorphic_ctype__in=role_cts)
|
||||||
|
return qs.values_list('id', flat=True)
|
||||||
|
|
||||||
return (
|
return (
|
||||||
RoleEvaluation.objects.filter(role__in=accessor.has_roles.all(), codename__startswith=action, content_type_id__in=cls._submodels_with_roles())
|
RoleEvaluation.objects.filter(role__in=accessor.has_roles.all(), codename__in=all_codenames, content_type_id__in=[ct.id for ct in role_cts])
|
||||||
.values_list('object_id')
|
.values_list('object_id')
|
||||||
.distinct()
|
.distinct()
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -92,6 +92,11 @@ def deploy_jobtemplate(project, inventory, credential):
|
|||||||
return jt
|
return jt
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def execution_environment():
|
||||||
|
return ExecutionEnvironment.objects.create(name="test-ee", description="test-ee", managed=True)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def setup_managed_roles():
|
def setup_managed_roles():
|
||||||
"Run the migration script to pre-create managed role definitions"
|
"Run the migration script to pre-create managed role definitions"
|
||||||
|
|||||||
120
awx/main/tests/functional/dab_rbac/test_external_auditor.py
Normal file
120
awx/main/tests/functional/dab_rbac/test_external_auditor.py
Normal file
@@ -0,0 +1,120 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
|
from django.apps import apps
|
||||||
|
|
||||||
|
from ansible_base.rbac.managed import SystemAuditor
|
||||||
|
from ansible_base.rbac import permission_registry
|
||||||
|
|
||||||
|
from awx.main.access import check_user_access, get_user_queryset
|
||||||
|
from awx.main.models import User, AdHocCommandEvent
|
||||||
|
from awx.api.versioning import reverse
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def ext_auditor_rd():
|
||||||
|
info = SystemAuditor(overrides={'name': 'Alien Auditor', 'shortname': 'ext_auditor'})
|
||||||
|
rd, _ = info.get_or_create(apps)
|
||||||
|
return rd
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def ext_auditor(ext_auditor_rd):
|
||||||
|
u = User.objects.create(username='external-auditor-user')
|
||||||
|
ext_auditor_rd.give_global_permission(u)
|
||||||
|
return u
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def obj_factory(request):
|
||||||
|
def _rf(fixture_name):
|
||||||
|
obj = request.getfixturevalue(fixture_name)
|
||||||
|
|
||||||
|
# special case to make obj organization-scoped
|
||||||
|
if obj._meta.model_name == 'executionenvironment':
|
||||||
|
obj.organization = request.getfixturevalue('organization')
|
||||||
|
obj.save(update_fields=['organization'])
|
||||||
|
|
||||||
|
return obj
|
||||||
|
|
||||||
|
return _rf
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_access_qs_external_auditor(ext_auditor_rd, rando, job_template):
|
||||||
|
ext_auditor_rd.give_global_permission(rando)
|
||||||
|
jt_cls = apps.get_model('main', 'JobTemplate')
|
||||||
|
ujt_cls = apps.get_model('main', 'UnifiedJobTemplate')
|
||||||
|
assert job_template in jt_cls.access_qs(rando)
|
||||||
|
assert job_template.id in jt_cls.access_ids_qs(rando)
|
||||||
|
assert job_template.id in ujt_cls.accessible_pk_qs(rando, 'read_role')
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
@pytest.mark.parametrize('model', sorted(permission_registry.all_registered_models, key=lambda cls: cls._meta.model_name))
|
||||||
|
class TestExternalAuditorRoleAllModels:
|
||||||
|
def test_access_can_read_method(self, obj_factory, model, ext_auditor, rando):
|
||||||
|
fixture_name = model._meta.verbose_name.replace(' ', '_')
|
||||||
|
obj = obj_factory(fixture_name)
|
||||||
|
|
||||||
|
assert check_user_access(rando, model, 'read', obj) is False
|
||||||
|
assert check_user_access(ext_auditor, model, 'read', obj) is True
|
||||||
|
|
||||||
|
def test_access_get_queryset(self, obj_factory, model, ext_auditor, rando):
|
||||||
|
fixture_name = model._meta.verbose_name.replace(' ', '_')
|
||||||
|
obj = obj_factory(fixture_name)
|
||||||
|
|
||||||
|
assert obj not in get_user_queryset(rando, model)
|
||||||
|
assert obj in get_user_queryset(ext_auditor, model)
|
||||||
|
|
||||||
|
def test_global_list(self, obj_factory, model, ext_auditor, rando, get):
|
||||||
|
fixture_name = model._meta.verbose_name.replace(' ', '_')
|
||||||
|
obj_factory(fixture_name)
|
||||||
|
|
||||||
|
url = reverse(f'api:{fixture_name}_list')
|
||||||
|
r = get(url, user=rando, expect=200)
|
||||||
|
initial_ct = r.data['count']
|
||||||
|
|
||||||
|
r = get(url, user=ext_auditor, expect=200)
|
||||||
|
assert r.data['count'] == initial_ct + 1
|
||||||
|
|
||||||
|
if fixture_name in ('job_template', 'workflow_job_template'):
|
||||||
|
url = reverse('api:unified_job_template_list')
|
||||||
|
r = get(url, user=rando, expect=200)
|
||||||
|
initial_ct = r.data['count']
|
||||||
|
|
||||||
|
r = get(url, user=ext_auditor, expect=200)
|
||||||
|
assert r.data['count'] == initial_ct + 1
|
||||||
|
|
||||||
|
def test_detail_view(self, obj_factory, model, ext_auditor, rando, get):
|
||||||
|
fixture_name = model._meta.verbose_name.replace(' ', '_')
|
||||||
|
obj = obj_factory(fixture_name)
|
||||||
|
|
||||||
|
url = reverse(f'api:{fixture_name}_detail', kwargs={'pk': obj.pk})
|
||||||
|
get(url, user=rando, expect=403) # NOTE: should be 401
|
||||||
|
get(url, user=ext_auditor, expect=200)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
class TestExternalAuditorNonRoleModels:
|
||||||
|
def test_ad_hoc_command_view(self, ad_hoc_command_factory, rando, ext_auditor, get):
|
||||||
|
"""The AdHocCommandAccess class references is_system_auditor
|
||||||
|
|
||||||
|
this is to prove it works with other system-level view roles"""
|
||||||
|
ad_hoc_command = ad_hoc_command_factory()
|
||||||
|
url = reverse('api:ad_hoc_command_list')
|
||||||
|
r = get(url, user=rando, expect=200)
|
||||||
|
assert r.data['count'] == 0
|
||||||
|
r = get(url, user=ext_auditor, expect=200)
|
||||||
|
assert r.data['count'] == 1
|
||||||
|
assert r.data['results'][0]['id'] == ad_hoc_command.id
|
||||||
|
|
||||||
|
event = AdHocCommandEvent.objects.create(ad_hoc_command=ad_hoc_command)
|
||||||
|
url = reverse('api:ad_hoc_command_ad_hoc_command_events_list', kwargs={'pk': ad_hoc_command.id})
|
||||||
|
r = get(url, user=rando, expect=403)
|
||||||
|
r = get(url, user=ext_auditor, expect=200)
|
||||||
|
assert r.data['count'] == 1
|
||||||
|
|
||||||
|
url = reverse('api:ad_hoc_command_event_detail', kwargs={'pk': event.id})
|
||||||
|
r = get(url, user=rando, expect=403)
|
||||||
|
r = get(url, user=ext_auditor, expect=200)
|
||||||
|
assert r.data['id'] == event.id
|
||||||
@@ -4,25 +4,19 @@ import pytest
|
|||||||
# CRUM
|
# CRUM
|
||||||
from crum import impersonate
|
from crum import impersonate
|
||||||
|
|
||||||
# Django
|
|
||||||
from django.contrib.contenttypes.models import ContentType
|
|
||||||
|
|
||||||
# AWX
|
# AWX
|
||||||
from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, WorkflowApprovalTemplate, Project, WorkflowJob, Schedule, Credential
|
from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, Project, WorkflowJob, Schedule, Credential
|
||||||
from awx.api.versioning import reverse
|
from awx.api.versioning import reverse
|
||||||
from awx.main.constants import JOB_VARIABLE_PREFIXES
|
from awx.main.constants import JOB_VARIABLE_PREFIXES
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_subclass_types():
|
def test_subclass_types():
|
||||||
assert set(UnifiedJobTemplate._submodels_with_roles()) == set(
|
assert set(UnifiedJobTemplate._submodels_with_roles()) == {
|
||||||
[
|
JobTemplate,
|
||||||
ContentType.objects.get_for_model(JobTemplate).id,
|
Project,
|
||||||
ContentType.objects.get_for_model(Project).id,
|
WorkflowJobTemplate,
|
||||||
ContentType.objects.get_for_model(WorkflowJobTemplate).id,
|
}
|
||||||
ContentType.objects.get_for_model(WorkflowApprovalTemplate).id,
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
|
|||||||
Reference in New Issue
Block a user