diff --git a/awx/main/access.py b/awx/main/access.py index 4a843520ae..9d67dd5295 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -566,8 +566,6 @@ class CredentialAccess(BaseAccess): def can_change(self, obj, data): if self.user.is_superuser: return True - if not self.can_add(data): - return False return obj.accessible_by(self.user, {'read':True, 'update': True, 'delete':True}) def can_delete(self, obj): diff --git a/awx/main/migrations/_rbac.py b/awx/main/migrations/_rbac.py index d58de7300a..2dc4c287b1 100644 --- a/awx/main/migrations/_rbac.py +++ b/awx/main/migrations/_rbac.py @@ -1,5 +1,3 @@ -import logging - from django.contrib.contenttypes.models import ContentType from django.db.models import Q @@ -55,31 +53,44 @@ def migrate_team(apps, schema_editor): migrations[t.name].append(user) return migrations +def attrfunc(attr_path): + '''attrfunc returns a function that will + attempt to use the attr_path to access the attribute + of an instance that is passed in to the returned function. + + Example: + get_org = attrfunc('inventory.organization') + org = get_org(JobTemplateInstance) + ''' + def attr(inst): + return reduce(getattr, [inst] + attr_path.split('.')) + return attr + def _update_credential_parents(org, cred): org.admin_role.children.add(cred.owner_role) org.member_role.children.add(cred.usage_role) cred.user, cred.team = None, None cred.save() -def _discover_credentials(insts, cred, org_path='organization'): +def _discover_credentials(instances, cred, orgfunc): '''_discover_credentials will find shared credentials across organizations. If a shared credential is found, it will duplicate the credential, ensure the proper role permissions are added to the new credential, and update any references from the old to the newly created credential. - ''' - def get_org(inst): - fields = org_path.split('.') - for field in fields: - inst = getattr(inst, field) - return inst + instances is a list of all objects that were matched when filtered + with cred. + + orgfunc is a function that when called with an instance from instances + will produce an Organization object. + ''' orgs = defaultdict(list) - for inst in insts: - orgs[get_org(inst)].append(inst) + for inst in instances: + orgs[orgfunc(inst)].append(inst) if len(orgs) == 1: - _update_credential_parents(insts[0].inventory.organization, cred) + _update_credential_parents(instances[0].inventory.organization, cred) else: for pos, org in enumerate(orgs): if pos == 0: @@ -100,30 +111,25 @@ def migrate_credential(apps, schema_editor): Project = apps.get_model('main', 'Project') InventorySource = apps.get_model('main', 'InventorySource') - + migrated = [] for cred in Credential.objects.all(): - jts = JobTemplate.objects.filter(Q(credential=cred) | Q(cloud_credential=cred)).all() - if jts is not None: - if len(jts) == 1: - _update_credential_parents(jts[0].inventory.organization, cred) - else: - _discover_credentials(jts, cred, org_path='inventory.organization') - continue + migrated.append(cred) - invs = InventorySource.objects.filter(credential=cred).all() - if invs is not None: - if len(invs) == 1: - _update_credential_parents(invs[0].inventory.organization, cred) + results = (JobTemplate.objects.filter(Q(credential=cred) | Q(cloud_credential=cred)).all() or + InventorySource.objects.filter(credential=cred).all()) + if results: + if len(results) == 1: + _update_credential_parents(results[0].inventory.organization, cred) else: - _discover_credentials(invs, cred, org_path='inventory.organization') + _discover_credentials(results, cred, attrfunc('inventory.organization')) continue projs = Project.objects.filter(credential=cred).all() - if projs is not None: + if projs: if len(projs) == 1: _update_credential_parents(projs[0].organization, cred) else: - _discover_credentials(projs, cred, org_path='organization') + _discover_credentials(projs, cred, attrfunc('organization')) continue if cred.team is not None: @@ -140,6 +146,7 @@ def migrate_credential(apps, schema_editor): cred.save() # no match found, log + return migrated def migrate_inventory(apps, schema_editor): diff --git a/awx/main/models/credential.py b/awx/main/models/credential.py index 9ae6b47298..1d59049326 100644 --- a/awx/main/models/credential.py +++ b/awx/main/models/credential.py @@ -163,7 +163,6 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin): role_name='Credential Owner', role_description='Owner of the credential', parent_role=[ - 'team.admin_role', 'singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR, ], permissions = {'all': True} @@ -179,7 +178,6 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin): usage_role = ImplicitRoleField( role_name='Credential User', role_description='May use this credential, but not read sensitive portions or modify it', - parent_role= 'team.member_role', permissions = {'use': True} ) diff --git a/awx/main/tests/functional/test_rbac_credential.py b/awx/main/tests/functional/test_rbac_credential.py index efec822e57..f4ea00e68c 100644 --- a/awx/main/tests/functional/test_rbac_credential.py +++ b/awx/main/tests/functional/test_rbac_credential.py @@ -3,6 +3,7 @@ import pytest from awx.main.access import CredentialAccess from awx.main.models.credential import Credential from awx.main.models.jobs import JobTemplate +from awx.main.models.inventory import InventorySource from awx.main.migrations import _rbac as rbac from django.apps import apps from django.contrib.auth.models import User @@ -50,9 +51,6 @@ def test_credential_migration_team_admin(credential, team, user, permissions): credential.team = team credential.save() - # No permissions pre-migration - team.admin_role.children.remove(credential.owner_role) - team.member_role.children.remove(credential.usage_role) assert not credential.accessible_by(u, permissions['usage']) # Usage permissions post migration @@ -77,17 +75,15 @@ def test_credential_access_admin(user, team, credential): access = CredentialAccess(u) assert access.can_add({'user': u.pk}) - assert access.can_add({'team': team.pk}) - assert not access.can_change(credential, {'user': u.pk}) - # unowned credential can be deleted - assert access.can_delete(credential) + # unowned credential is superuser only + assert not access.can_delete(credential) # credential is now part of a team # that is part of an organization # that I am an admin for - credential.team = team + credential.owner_role.parents.add(team.admin_role) credential.save() credential.owner_role.rebuild_role_ancestor_list() @@ -164,17 +160,47 @@ def test_single_cred_multi_job_template_multi_org(user, organizations, credentia assert not access.can_change(jts[0].credential, {'organization': org.pk}) @pytest.mark.django_db -def test_cred_single_org(): - pass +def test_cred_inventory_source(user, inventory, credential): + u = user('member', False) + inventory.organization.member_role.members.add(u) + + InventorySource.objects.create( + name="test-inv-src", + credential=credential, + inventory=inventory, + ) + + assert not credential.accessible_by(u, {'use':True}) + + rbac.migrate_credential(apps, None) + assert credential.accessible_by(u, {'use':True}) @pytest.mark.django_db -def test_cred_created_by_multi_org(): - pass +def test_cred_project(user, credential, project): + u = user('member', False) + project.organization.member_role.members.add(u) + project.credential = credential + project.save() + + assert not credential.accessible_by(u, {'use':True}) + + rbac.migrate_credential(apps, None) + assert credential.accessible_by(u, {'use':True}) @pytest.mark.django_db -def test_cred_no_org(): - pass +def test_cred_no_org(user, credential): + su = user('su', True) + access = CredentialAccess(su) + assert access.can_change(credential, {'user': su.pk}) @pytest.mark.django_db -def test_cred_team(): - pass +def test_cred_team(user, team, credential): + u = user('a', False) + team.member_role.members.add(u) + credential.team = team + credential.save() + + assert not credential.accessible_by(u, {'use':True}) + + rbac.migrate_credential(apps, None) + assert credential.accessible_by(u, {'use':True})