mirror of
https://github.com/ansible/awx.git
synced 2026-03-23 20:05:03 -02:30
JT RBAC edits for extra_credentials and vault_credential
This commit is contained in:
@@ -28,7 +28,7 @@ from django.utils.timezone import now
|
|||||||
from django.utils.functional import cached_property
|
from django.utils.functional import cached_property
|
||||||
|
|
||||||
# Django REST Framework
|
# Django REST Framework
|
||||||
from rest_framework.exceptions import ValidationError
|
from rest_framework.exceptions import ValidationError, PermissionDenied
|
||||||
from rest_framework import fields
|
from rest_framework import fields
|
||||||
from rest_framework import serializers
|
from rest_framework import serializers
|
||||||
from rest_framework import validators
|
from rest_framework import validators
|
||||||
@@ -2248,6 +2248,7 @@ class JobOptionsSerializer(LabelsListMixin, BaseSerializer):
|
|||||||
|
|
||||||
def validate(self, attrs):
|
def validate(self, attrs):
|
||||||
v1_credentials = {}
|
v1_credentials = {}
|
||||||
|
view = self.context.get('view', None)
|
||||||
if self.version == 1: # TODO: remove in 3.3
|
if self.version == 1: # TODO: remove in 3.3
|
||||||
for attr, kind, error in (
|
for attr, kind, error in (
|
||||||
('cloud_credential', 'cloud', _('You must provide a cloud credential.')),
|
('cloud_credential', 'cloud', _('You must provide a cloud credential.')),
|
||||||
@@ -2260,6 +2261,8 @@ class JobOptionsSerializer(LabelsListMixin, BaseSerializer):
|
|||||||
cred = v1_credentials[attr] = Credential.objects.get(pk=pk)
|
cred = v1_credentials[attr] = Credential.objects.get(pk=pk)
|
||||||
if cred.credential_type.kind != kind:
|
if cred.credential_type.kind != kind:
|
||||||
raise serializers.ValidationError({attr: error})
|
raise serializers.ValidationError({attr: error})
|
||||||
|
if (not view) or (not view.request) or (view.request.user not in cred.use_role):
|
||||||
|
raise PermissionDenied()
|
||||||
|
|
||||||
if 'project' in self.fields and 'playbook' in self.fields:
|
if 'project' in self.fields and 'playbook' in self.fields:
|
||||||
project = attrs.get('project', self.instance and self.instance.project or None)
|
project = attrs.get('project', self.instance and self.instance.project or None)
|
||||||
|
|||||||
@@ -2586,7 +2586,7 @@ class JobTemplateList(ListCreateAPIView):
|
|||||||
always_allow_superuser = False
|
always_allow_superuser = False
|
||||||
capabilities_prefetch = [
|
capabilities_prefetch = [
|
||||||
'admin', 'execute',
|
'admin', 'execute',
|
||||||
{'copy': ['project.use', 'inventory.use', 'credential.use']}
|
{'copy': ['project.use', 'inventory.use', 'credential.use', 'vault_credential.use']}
|
||||||
]
|
]
|
||||||
|
|
||||||
def post(self, request, *args, **kwargs):
|
def post(self, request, *args, **kwargs):
|
||||||
@@ -2839,6 +2839,17 @@ class JobTemplateExtraCredentialsList(SubListCreateAttachDetachAPIView):
|
|||||||
new_in_320 = True
|
new_in_320 = True
|
||||||
new_in_api_v2 = True
|
new_in_api_v2 = True
|
||||||
|
|
||||||
|
def get_queryset(self):
|
||||||
|
# Return the full list of extra_credentials
|
||||||
|
parent = self.get_parent_object()
|
||||||
|
self.check_parent_access(parent)
|
||||||
|
sublist_qs = getattrd(parent, self.relationship)
|
||||||
|
sublist_qs = sublist_qs.prefetch_related(
|
||||||
|
'created_by', 'modified_by',
|
||||||
|
'admin_role', 'use_role', 'read_role',
|
||||||
|
'admin_role__parents', 'admin_role__members')
|
||||||
|
return sublist_qs
|
||||||
|
|
||||||
def is_valid_relation(self, parent, sub, created=False):
|
def is_valid_relation(self, parent, sub, created=False):
|
||||||
current_extra_types = [
|
current_extra_types = [
|
||||||
cred.credential_type.pk for cred in parent.extra_credentials.all()
|
cred.credential_type.pk for cred in parent.extra_credentials.all()
|
||||||
@@ -4116,7 +4127,8 @@ class UnifiedJobTemplateList(ListAPIView):
|
|||||||
new_in_148 = True
|
new_in_148 = True
|
||||||
capabilities_prefetch = [
|
capabilities_prefetch = [
|
||||||
'admin', 'execute',
|
'admin', 'execute',
|
||||||
{'copy': ['jobtemplate.project.use', 'jobtemplate.inventory.use', 'jobtemplate.credential.use',
|
{'copy': ['jobtemplate.project.use', 'jobtemplate.inventory.use',
|
||||||
|
'jobtemplate.credential.use', 'jobtemplate.vault_credential.use',
|
||||||
'workflowjobtemplate.organization.admin']}
|
'workflowjobtemplate.organization.admin']}
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
@@ -1111,14 +1111,7 @@ class ProjectUpdateAccess(BaseAccess):
|
|||||||
class JobTemplateAccess(BaseAccess):
|
class JobTemplateAccess(BaseAccess):
|
||||||
'''
|
'''
|
||||||
I can see job templates when:
|
I can see job templates when:
|
||||||
- I am a superuser.
|
- I have read role for the job template.
|
||||||
- I can read the inventory, project and credential (which means I am an
|
|
||||||
org admin or member of a team with access to all of the above).
|
|
||||||
- I have permission explicitly granted to check/deploy with the inventory
|
|
||||||
and project.
|
|
||||||
|
|
||||||
This does not mean I would be able to launch a job from the template or
|
|
||||||
edit the template.
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
model = JobTemplate
|
model = JobTemplate
|
||||||
@@ -1133,8 +1126,10 @@ class JobTemplateAccess(BaseAccess):
|
|||||||
|
|
||||||
def can_add(self, data):
|
def can_add(self, data):
|
||||||
'''
|
'''
|
||||||
a user can create a job template if they are a superuser, an org admin
|
a user can create a job template if
|
||||||
of any org that the project is a member, or if they have user or team
|
- they are a superuser
|
||||||
|
- an org admin of any org that the project is a member
|
||||||
|
- if they have user or team
|
||||||
based permissions tying the project to the inventory source for the
|
based permissions tying the project to the inventory source for the
|
||||||
given action as well as the 'create' deploy permission.
|
given action as well as the 'create' deploy permission.
|
||||||
Users who are able to create deploy jobs can also run normal and check (dry run) jobs.
|
Users who are able to create deploy jobs can also run normal and check (dry run) jobs.
|
||||||
@@ -1170,8 +1165,9 @@ class JobTemplateAccess(BaseAccess):
|
|||||||
if not self.check_related('credential', Credential, data, role_field='use_role'):
|
if not self.check_related('credential', Credential, data, role_field='use_role'):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# TODO: If a vault credential is provided, the user should have use access to it.
|
# If a vault credential is provided, the user should have use access to it.
|
||||||
# TODO: If any credential in extra_credentials, the user must have access
|
if not self.check_related('vault_credential', Credential, data, role_field='use_role'):
|
||||||
|
return False
|
||||||
|
|
||||||
# If an inventory is provided, the user should have use access.
|
# If an inventory is provided, the user should have use access.
|
||||||
inventory = get_value(Inventory, 'inventory')
|
inventory = get_value(Inventory, 'inventory')
|
||||||
@@ -1181,14 +1177,10 @@ class JobTemplateAccess(BaseAccess):
|
|||||||
|
|
||||||
project = get_value(Project, 'project')
|
project = get_value(Project, 'project')
|
||||||
if 'job_type' in data and data['job_type'] == PERM_INVENTORY_SCAN:
|
if 'job_type' in data and data['job_type'] == PERM_INVENTORY_SCAN:
|
||||||
if inventory:
|
if not inventory:
|
||||||
accessible = self.user in inventory.use_role
|
|
||||||
else:
|
|
||||||
accessible = False
|
|
||||||
if not project and accessible:
|
|
||||||
return True
|
|
||||||
elif not accessible:
|
|
||||||
return False
|
return False
|
||||||
|
elif not project:
|
||||||
|
return True
|
||||||
# If the user has admin access to the project (as an org admin), should
|
# If the user has admin access to the project (as an org admin), should
|
||||||
# be able to proceed without additional checks.
|
# be able to proceed without additional checks.
|
||||||
if project:
|
if project:
|
||||||
@@ -1237,8 +1229,7 @@ class JobTemplateAccess(BaseAccess):
|
|||||||
self.check_license(feature='surveys')
|
self.check_license(feature='surveys')
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# TODO: handle vault_credential and extra_credentials
|
for required_field in ('credential', 'inventory', 'project', 'vault_credential'):
|
||||||
for required_field in ('credential', 'inventory', 'project'):
|
|
||||||
required_obj = getattr(obj, required_field, None)
|
required_obj = getattr(obj, required_field, None)
|
||||||
if required_field not in data_for_change and required_obj is not None:
|
if required_field not in data_for_change and required_obj is not None:
|
||||||
data_for_change[required_field] = required_obj.pk
|
data_for_change[required_field] = required_obj.pk
|
||||||
@@ -1273,6 +1264,7 @@ class JobTemplateAccess(BaseAccess):
|
|||||||
project_id = data.get('project', obj.project.id if obj.project else None)
|
project_id = data.get('project', obj.project.id if obj.project else None)
|
||||||
inventory_id = data.get('inventory', obj.inventory.id if obj.inventory else None)
|
inventory_id = data.get('inventory', obj.inventory.id if obj.inventory else None)
|
||||||
credential_id = data.get('credential', obj.credential.id if obj.credential else None)
|
credential_id = data.get('credential', obj.credential.id if obj.credential else None)
|
||||||
|
vault_credential_id = data.get('credential', obj.vault_credential.id if obj.vault_credential else None)
|
||||||
|
|
||||||
if project_id and self.user not in Project.objects.get(pk=project_id).use_role:
|
if project_id and self.user not in Project.objects.get(pk=project_id).use_role:
|
||||||
return False
|
return False
|
||||||
@@ -1280,7 +1272,8 @@ class JobTemplateAccess(BaseAccess):
|
|||||||
return False
|
return False
|
||||||
if credential_id and self.user not in Credential.objects.get(pk=credential_id).use_role:
|
if credential_id and self.user not in Credential.objects.get(pk=credential_id).use_role:
|
||||||
return False
|
return False
|
||||||
# TODO: handle vault_credential and extra_credentials
|
if vault_credential_id and self.user not in Credential.objects.get(pk=vault_credential_id).use_role:
|
||||||
|
return False
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@@ -1300,17 +1293,23 @@ class JobTemplateAccess(BaseAccess):
|
|||||||
if isinstance(sub_obj, NotificationTemplate):
|
if isinstance(sub_obj, NotificationTemplate):
|
||||||
return self.check_related('organization', Organization, {}, obj=sub_obj, mandatory=True)
|
return self.check_related('organization', Organization, {}, obj=sub_obj, mandatory=True)
|
||||||
if relationship == "instance_groups":
|
if relationship == "instance_groups":
|
||||||
|
if not obj.project.organization:
|
||||||
|
return False
|
||||||
return self.user.can_access(type(sub_obj), "read", sub_obj) and self.user in obj.project.organization.admin_role
|
return self.user.can_access(type(sub_obj), "read", sub_obj) and self.user in obj.project.organization.admin_role
|
||||||
|
if relationship == 'extra_credentials' and isinstance(sub_obj, Credential):
|
||||||
|
return self.user in obj.admin_role and self.user in sub_obj.use_role
|
||||||
return super(JobTemplateAccess, self).can_attach(
|
return super(JobTemplateAccess, self).can_attach(
|
||||||
obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check)
|
obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check)
|
||||||
|
|
||||||
|
@check_superuser
|
||||||
def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs):
|
def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs):
|
||||||
if relationship == "instance_groups":
|
if relationship == "instance_groups":
|
||||||
return self.can_attach(obj, sub_obj, relationship, *args, **kwargs)
|
return self.can_attach(obj, sub_obj, relationship, *args, **kwargs)
|
||||||
|
if relationship == 'extra_credentials' and isinstance(sub_obj, Credential):
|
||||||
|
return self.user in obj.admin_role
|
||||||
return super(JobTemplateAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs)
|
return super(JobTemplateAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class JobAccess(BaseAccess):
|
class JobAccess(BaseAccess):
|
||||||
'''
|
'''
|
||||||
I can see jobs when:
|
I can see jobs when:
|
||||||
|
|||||||
@@ -41,6 +41,8 @@ def test_create(post, project, machine_credential, inventory, alice, grant_proje
|
|||||||
def test_create_with_v1_deprecated_credentials(get, post, project, machine_credential, credential, net_credential, inventory, alice):
|
def test_create_with_v1_deprecated_credentials(get, post, project, machine_credential, credential, net_credential, inventory, alice):
|
||||||
project.use_role.members.add(alice)
|
project.use_role.members.add(alice)
|
||||||
machine_credential.use_role.members.add(alice)
|
machine_credential.use_role.members.add(alice)
|
||||||
|
credential.use_role.members.add(alice)
|
||||||
|
net_credential.use_role.members.add(alice)
|
||||||
inventory.use_role.members.add(alice)
|
inventory.use_role.members.add(alice)
|
||||||
|
|
||||||
pk = post(reverse('api:job_template_list', kwargs={'version': 'v1'}), {
|
pk = post(reverse('api:job_template_list', kwargs={'version': 'v1'}), {
|
||||||
@@ -82,7 +84,27 @@ def test_create_with_empty_v1_deprecated_credentials(get, post, project, machine
|
|||||||
assert response.data.get('network_credential') is None
|
assert response.data.get('network_credential') is None
|
||||||
|
|
||||||
|
|
||||||
# TODO: test this with RBAC and lower-priveleged users
|
# TODO: remove in 3.3
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_create_v1_rbac_check(get, post, project, credential, net_credential, rando):
|
||||||
|
project.use_role.members.add(rando)
|
||||||
|
|
||||||
|
base_kwargs = dict(
|
||||||
|
name = 'Made with cloud/net creds I have no access to',
|
||||||
|
project = project.id,
|
||||||
|
ask_inventory_on_launch = True,
|
||||||
|
ask_credential_on_launch = True,
|
||||||
|
playbook = 'helloworld.yml',
|
||||||
|
)
|
||||||
|
|
||||||
|
base_kwargs['cloud_credential'] = credential.pk
|
||||||
|
post(reverse('api:job_template_list', kwargs={'version': 'v1'}), base_kwargs, rando, expect=403)
|
||||||
|
|
||||||
|
base_kwargs.pop('cloud_credential')
|
||||||
|
base_kwargs['network_credential'] = net_credential.pk
|
||||||
|
post(reverse('api:job_template_list', kwargs={'version': 'v1'}), base_kwargs, rando, expect=403)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_extra_credential_creation(get, post, organization_factory, job_template_factory, credentialtype_aws):
|
def test_extra_credential_creation(get, post, organization_factory, job_template_factory, credentialtype_aws):
|
||||||
objs = organization_factory("org", superusers=['admin'])
|
objs = organization_factory("org", superusers=['admin'])
|
||||||
@@ -140,7 +162,6 @@ def test_extra_credential_unique_type_xfail(get, post, organization_factory, job
|
|||||||
assert response.data.get('count') == 1
|
assert response.data.get('count') == 1
|
||||||
|
|
||||||
|
|
||||||
# TODO: test this with RBAC and lower-priveleged users
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_attach_extra_credential(get, post, organization_factory, job_template_factory, credential):
|
def test_attach_extra_credential(get, post, organization_factory, job_template_factory, credential):
|
||||||
objs = organization_factory("org", superusers=['admin'])
|
objs = organization_factory("org", superusers=['admin'])
|
||||||
@@ -158,7 +179,6 @@ def test_attach_extra_credential(get, post, organization_factory, job_template_f
|
|||||||
assert response.data.get('count') == 1
|
assert response.data.get('count') == 1
|
||||||
|
|
||||||
|
|
||||||
# TODO: test this with RBAC and lower-priveleged users
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_detach_extra_credential(get, post, organization_factory, job_template_factory, credential):
|
def test_detach_extra_credential(get, post, organization_factory, job_template_factory, credential):
|
||||||
objs = organization_factory("org", superusers=['admin'])
|
objs = organization_factory("org", superusers=['admin'])
|
||||||
|
|||||||
@@ -307,24 +307,27 @@ def test_prefetch_group_capabilities(group, rando):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_prefetch_jt_copy_capability(job_template, project, inventory, machine_credential, rando):
|
def test_prefetch_jt_copy_capability(job_template, project, inventory,
|
||||||
|
machine_credential, vault_credential, rando):
|
||||||
job_template.project = project
|
job_template.project = project
|
||||||
job_template.inventory = inventory
|
job_template.inventory = inventory
|
||||||
job_template.credential = machine_credential
|
job_template.credential = machine_credential
|
||||||
|
job_template.vault_credential = vault_credential
|
||||||
job_template.save()
|
job_template.save()
|
||||||
|
|
||||||
qs = JobTemplate.objects.all()
|
qs = JobTemplate.objects.all()
|
||||||
cache_list_capabilities(qs, [{'copy': [
|
cache_list_capabilities(qs, [{'copy': [
|
||||||
'project.use', 'inventory.use', 'credential.use',
|
'project.use', 'inventory.use', 'credential.use', 'vault_credential.use'
|
||||||
]}], JobTemplate, rando)
|
]}], JobTemplate, rando)
|
||||||
assert qs[0].capabilities_cache == {'copy': False}
|
assert qs[0].capabilities_cache == {'copy': False}
|
||||||
|
|
||||||
project.use_role.members.add(rando)
|
project.use_role.members.add(rando)
|
||||||
inventory.use_role.members.add(rando)
|
inventory.use_role.members.add(rando)
|
||||||
machine_credential.use_role.members.add(rando)
|
machine_credential.use_role.members.add(rando)
|
||||||
|
vault_credential.use_role.members.add(rando)
|
||||||
|
|
||||||
cache_list_capabilities(qs, [{'copy': [
|
cache_list_capabilities(qs, [{'copy': [
|
||||||
'project.use', 'inventory.use', 'credential.use',
|
'project.use', 'inventory.use', 'credential.use', 'vault_credential.use'
|
||||||
]}], JobTemplate, rando)
|
]}], JobTemplate, rando)
|
||||||
assert qs[0].capabilities_cache == {'copy': True}
|
assert qs[0].capabilities_cache == {'copy': True}
|
||||||
|
|
||||||
|
|||||||
@@ -209,6 +209,13 @@ def credentialtype_net():
|
|||||||
return net
|
return net
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def credentialtype_vault():
|
||||||
|
vault_type = CredentialType.defaults['vault']()
|
||||||
|
vault_type.save()
|
||||||
|
return vault_type
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def credential(credentialtype_aws):
|
def credential(credentialtype_aws):
|
||||||
return Credential.objects.create(credential_type=credentialtype_aws, name='test-cred',
|
return Credential.objects.create(credential_type=credentialtype_aws, name='test-cred',
|
||||||
@@ -221,6 +228,12 @@ def net_credential(credentialtype_net):
|
|||||||
inputs={'username': 'something', 'password': 'secret'})
|
inputs={'username': 'something', 'password': 'secret'})
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def vault_credential(credentialtype_vault):
|
||||||
|
return Credential.objects.create(credential_type=credentialtype_vault, name='test-cred',
|
||||||
|
inputs={'vault_password': 'secret'})
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def machine_credential(credentialtype_ssh):
|
def machine_credential(credentialtype_ssh):
|
||||||
return Credential.objects.create(credential_type=credentialtype_ssh, name='machine-cred',
|
return Credential.objects.create(credential_type=credentialtype_ssh, name='machine-cred',
|
||||||
|
|||||||
@@ -12,11 +12,21 @@ from awx.main.models.schedules import Schedule
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def jt_objects(job_template_factory):
|
def jt_linked(job_template_factory, credential, net_credential, vault_credential):
|
||||||
|
'''
|
||||||
|
A job template with a reasonably complete set of related objects to
|
||||||
|
test RBAC and other functionality affected by related objects
|
||||||
|
'''
|
||||||
objects = job_template_factory(
|
objects = job_template_factory(
|
||||||
'testJT', organization='org1', project='proj1', inventory='inventory1',
|
'testJT', organization='org1', project='proj1', inventory='inventory1',
|
||||||
credential='cred1', cloud_credential='aws1', network_credential='juniper1')
|
credential='cred1')
|
||||||
return objects
|
jt = objects.job_template
|
||||||
|
jt.vault_credential = vault_credential
|
||||||
|
jt.save()
|
||||||
|
# Add AWS cloud credential and network credential
|
||||||
|
jt.extra_credentials.add(credential)
|
||||||
|
jt.extra_credentials.add(net_credential)
|
||||||
|
return jt
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(BaseAccess, 'check_license', return_value=None)
|
@mock.patch.object(BaseAccess, 'check_license', return_value=None)
|
||||||
@@ -32,58 +42,86 @@ def test_job_template_access_superuser(check_license, user, deploy_jobtemplate):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_job_template_access_read_level(jt_objects, rando):
|
def test_job_template_access_read_level(jt_linked, rando):
|
||||||
|
|
||||||
access = JobTemplateAccess(rando)
|
access = JobTemplateAccess(rando)
|
||||||
jt_objects.project.read_role.members.add(rando)
|
jt_linked.project.read_role.members.add(rando)
|
||||||
jt_objects.inventory.read_role.members.add(rando)
|
jt_linked.inventory.read_role.members.add(rando)
|
||||||
jt_objects.credential.read_role.members.add(rando)
|
jt_linked.credential.read_role.members.add(rando)
|
||||||
jt_objects.cloud_credential.read_role.members.add(rando)
|
|
||||||
jt_objects.network_credential.read_role.members.add(rando)
|
|
||||||
|
|
||||||
proj_pk = jt_objects.project.pk
|
proj_pk = jt_linked.project.pk
|
||||||
assert not access.can_add(dict(inventory=jt_objects.inventory.pk, project=proj_pk))
|
assert not access.can_add(dict(inventory=jt_linked.inventory.pk, project=proj_pk))
|
||||||
assert not access.can_add(dict(credential=jt_objects.credential.pk, project=proj_pk))
|
assert not access.can_add(dict(credential=jt_linked.credential.pk, project=proj_pk))
|
||||||
assert not access.can_add(dict(cloud_credential=jt_objects.cloud_credential.pk, project=proj_pk))
|
assert not access.can_add(dict(vault_credential=jt_linked.vault_credential.pk, project=proj_pk))
|
||||||
assert not access.can_add(dict(network_credential=jt_objects.network_credential.pk, project=proj_pk))
|
|
||||||
|
for cred in jt_linked.extra_credentials.all():
|
||||||
|
assert not access.can_unattach(jt_linked, cred, 'extra_credentials', {})
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_job_template_access_use_level(jt_objects, rando):
|
def test_job_template_access_use_level(jt_linked, rando):
|
||||||
|
|
||||||
access = JobTemplateAccess(rando)
|
access = JobTemplateAccess(rando)
|
||||||
jt_objects.project.use_role.members.add(rando)
|
jt_linked.project.use_role.members.add(rando)
|
||||||
jt_objects.inventory.use_role.members.add(rando)
|
jt_linked.inventory.use_role.members.add(rando)
|
||||||
jt_objects.credential.use_role.members.add(rando)
|
jt_linked.credential.use_role.members.add(rando)
|
||||||
jt_objects.cloud_credential.use_role.members.add(rando)
|
jt_linked.vault_credential.use_role.members.add(rando)
|
||||||
jt_objects.network_credential.use_role.members.add(rando)
|
|
||||||
|
|
||||||
proj_pk = jt_objects.project.pk
|
proj_pk = jt_linked.project.pk
|
||||||
assert access.can_add(dict(inventory=jt_objects.inventory.pk, project=proj_pk))
|
assert access.can_add(dict(inventory=jt_linked.inventory.pk, project=proj_pk))
|
||||||
assert access.can_add(dict(credential=jt_objects.credential.pk, project=proj_pk))
|
assert access.can_add(dict(credential=jt_linked.credential.pk, project=proj_pk))
|
||||||
assert access.can_add(dict(cloud_credential=jt_objects.cloud_credential.pk, project=proj_pk))
|
assert access.can_add(dict(vault_credential=jt_linked.vault_credential.pk, project=proj_pk))
|
||||||
assert access.can_add(dict(network_credential=jt_objects.network_credential.pk, project=proj_pk))
|
|
||||||
|
for cred in jt_linked.extra_credentials.all():
|
||||||
|
assert not access.can_unattach(jt_linked, cred, 'extra_credentials', {})
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_job_template_access_org_admin(jt_objects, rando):
|
def test_job_template_access_org_admin(jt_linked, rando):
|
||||||
access = JobTemplateAccess(rando)
|
access = JobTemplateAccess(rando)
|
||||||
# Appoint this user as admin of the organization
|
# Appoint this user as admin of the organization
|
||||||
jt_objects.inventory.organization.admin_role.members.add(rando)
|
jt_linked.inventory.organization.admin_role.members.add(rando)
|
||||||
# Assign organization permission in the same way the create view does
|
# Assign organization permission in the same way the create view does
|
||||||
organization = jt_objects.inventory.organization
|
organization = jt_linked.inventory.organization
|
||||||
jt_objects.credential.admin_role.parents.add(organization.admin_role)
|
jt_linked.credential.admin_role.parents.add(organization.admin_role)
|
||||||
jt_objects.cloud_credential.admin_role.parents.add(organization.admin_role)
|
|
||||||
jt_objects.network_credential.admin_role.parents.add(organization.admin_role)
|
|
||||||
|
|
||||||
proj_pk = jt_objects.project.pk
|
proj_pk = jt_linked.project.pk
|
||||||
assert access.can_add(dict(inventory=jt_objects.inventory.pk, project=proj_pk))
|
assert access.can_add(dict(inventory=jt_linked.inventory.pk, project=proj_pk))
|
||||||
assert access.can_add(dict(credential=jt_objects.credential.pk, project=proj_pk))
|
assert access.can_add(dict(credential=jt_linked.credential.pk, project=proj_pk))
|
||||||
assert access.can_add(dict(cloud_credential=jt_objects.cloud_credential.pk, project=proj_pk))
|
|
||||||
assert access.can_add(dict(network_credential=jt_objects.network_credential.pk, project=proj_pk))
|
|
||||||
|
|
||||||
assert access.can_read(jt_objects.job_template)
|
for cred in jt_linked.extra_credentials.all():
|
||||||
assert access.can_delete(jt_objects.job_template)
|
assert access.can_unattach(jt_linked, cred, 'extra_credentials', {})
|
||||||
|
|
||||||
|
assert access.can_read(jt_linked)
|
||||||
|
assert access.can_delete(jt_linked)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
class TestJobTemplateCredentials:
|
||||||
|
|
||||||
|
def test_job_template_cannot_add_extra_credentials(self, job_template, credential, rando):
|
||||||
|
job_template.admin_role.members.add(rando)
|
||||||
|
credential.read_role.members.add(rando)
|
||||||
|
# without permission to credential, user can not attach it
|
||||||
|
assert not JobTemplateAccess(rando).can_attach(
|
||||||
|
job_template, credential, 'extra_credentials', {})
|
||||||
|
|
||||||
|
def test_job_template_can_add_extra_credentials(self, job_template, credential, rando):
|
||||||
|
job_template.admin_role.members.add(rando)
|
||||||
|
credential.use_role.members.add(rando)
|
||||||
|
# user has permission to apply credential
|
||||||
|
assert JobTemplateAccess(rando).can_attach(
|
||||||
|
job_template, credential, 'extra_credentials', {})
|
||||||
|
|
||||||
|
def test_job_template_vault_cred_check(self, job_template, vault_credential, rando):
|
||||||
|
job_template.admin_role.members.add(rando)
|
||||||
|
# not allowed to use the vault cred
|
||||||
|
assert not JobTemplateAccess(rando).can_change(
|
||||||
|
job_template, {'vault_credential': vault_credential})
|
||||||
|
|
||||||
|
def test_new_jt_with_vault(self, vault_credential, project, rando):
|
||||||
|
project.admin_role.members.add(rando)
|
||||||
|
assert not JobTemplateAccess(rando).can_add({'vault_credential': vault_credential, 'project': project.pk})
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
|
|||||||
@@ -123,6 +123,7 @@ def job_template_with_ids(job_template_factory):
|
|||||||
credential = Credential(id=1, pk=1, name='testcred', kind='ssh')
|
credential = Credential(id=1, pk=1, name='testcred', kind='ssh')
|
||||||
net_cred = Credential(id=2, pk=2, name='testnetcred', kind='net')
|
net_cred = Credential(id=2, pk=2, name='testnetcred', kind='net')
|
||||||
cloud_cred = Credential(id=3, pk=3, name='testcloudcred', kind='aws')
|
cloud_cred = Credential(id=3, pk=3, name='testcloudcred', kind='aws')
|
||||||
|
vault_cred = Credential(id=4, pk=4, name='testnetcred', kind='vault')
|
||||||
inv = Inventory(id=11, pk=11, name='testinv')
|
inv = Inventory(id=11, pk=11, name='testinv')
|
||||||
proj = Project(id=14, pk=14, name='testproj')
|
proj = Project(id=14, pk=14, name='testproj')
|
||||||
|
|
||||||
@@ -130,6 +131,7 @@ def job_template_with_ids(job_template_factory):
|
|||||||
'testJT', project=proj, inventory=inv, credential=credential,
|
'testJT', project=proj, inventory=inv, credential=credential,
|
||||||
cloud_credential=cloud_cred, network_credential=net_cred,
|
cloud_credential=cloud_cred, network_credential=net_cred,
|
||||||
persisted=False)
|
persisted=False)
|
||||||
|
jt_objects.job_template.vault_credential = vault_cred
|
||||||
return jt_objects.job_template
|
return jt_objects.job_template
|
||||||
|
|
||||||
|
|
||||||
@@ -159,7 +161,6 @@ def test_jt_existing_values_are_nonsensitive(job_template_with_ids, user_unit):
|
|||||||
assert access.changes_are_non_sensitive(job_template_with_ids, data)
|
assert access.changes_are_non_sensitive(job_template_with_ids, data)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.xfail # TODO: update this to respect JT.extra_credentials
|
|
||||||
def test_change_jt_sensitive_data(job_template_with_ids, mocker, user_unit):
|
def test_change_jt_sensitive_data(job_template_with_ids, mocker, user_unit):
|
||||||
"""Assure that can_add is called with all ForeignKeys."""
|
"""Assure that can_add is called with all ForeignKeys."""
|
||||||
|
|
||||||
@@ -178,8 +179,7 @@ def test_change_jt_sensitive_data(job_template_with_ids, mocker, user_unit):
|
|||||||
'inventory': data['inventory'],
|
'inventory': data['inventory'],
|
||||||
'project': job_template_with_ids.project.id,
|
'project': job_template_with_ids.project.id,
|
||||||
'credential': job_template_with_ids.credential.id,
|
'credential': job_template_with_ids.credential.id,
|
||||||
'cloud_credential': job_template_with_ids.cloud_credential.id,
|
'vault_credential': job_template_with_ids.vault_credential.id
|
||||||
'network_credential': job_template_with_ids.network_credential.id
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user