Fix credential assertions and rename migration helpers

This commit is contained in:
Wayne Witzel III 2016-03-17 15:25:09 -04:00
parent 74e2c440a5
commit 6d249f38a7
4 changed files with 76 additions and 47 deletions

View File

@ -566,8 +566,6 @@ class CredentialAccess(BaseAccess):
def can_change(self, obj, data):
if self.user.is_superuser:
return True
if not self.can_add(data):
return False
return obj.accessible_by(self.user, {'read':True, 'update': True, 'delete':True})
def can_delete(self, obj):

View File

@ -1,5 +1,3 @@
import logging
from django.contrib.contenttypes.models import ContentType
from django.db.models import Q
@ -55,31 +53,44 @@ def migrate_team(apps, schema_editor):
migrations[t.name].append(user)
return migrations
def attrfunc(attr_path):
'''attrfunc returns a function that will
attempt to use the attr_path to access the attribute
of an instance that is passed in to the returned function.
Example:
get_org = attrfunc('inventory.organization')
org = get_org(JobTemplateInstance)
'''
def attr(inst):
return reduce(getattr, [inst] + attr_path.split('.'))
return attr
def _update_credential_parents(org, cred):
org.admin_role.children.add(cred.owner_role)
org.member_role.children.add(cred.usage_role)
cred.user, cred.team = None, None
cred.save()
def _discover_credentials(insts, cred, org_path='organization'):
def _discover_credentials(instances, cred, orgfunc):
'''_discover_credentials will find shared credentials across
organizations. If a shared credential is found, it will duplicate
the credential, ensure the proper role permissions are added to the new
credential, and update any references from the old to the newly created
credential.
'''
def get_org(inst):
fields = org_path.split('.')
for field in fields:
inst = getattr(inst, field)
return inst
instances is a list of all objects that were matched when filtered
with cred.
orgfunc is a function that when called with an instance from instances
will produce an Organization object.
'''
orgs = defaultdict(list)
for inst in insts:
orgs[get_org(inst)].append(inst)
for inst in instances:
orgs[orgfunc(inst)].append(inst)
if len(orgs) == 1:
_update_credential_parents(insts[0].inventory.organization, cred)
_update_credential_parents(instances[0].inventory.organization, cred)
else:
for pos, org in enumerate(orgs):
if pos == 0:
@ -100,30 +111,25 @@ def migrate_credential(apps, schema_editor):
Project = apps.get_model('main', 'Project')
InventorySource = apps.get_model('main', 'InventorySource')
migrated = []
for cred in Credential.objects.all():
jts = JobTemplate.objects.filter(Q(credential=cred) | Q(cloud_credential=cred)).all()
if jts is not None:
if len(jts) == 1:
_update_credential_parents(jts[0].inventory.organization, cred)
else:
_discover_credentials(jts, cred, org_path='inventory.organization')
continue
migrated.append(cred)
invs = InventorySource.objects.filter(credential=cred).all()
if invs is not None:
if len(invs) == 1:
_update_credential_parents(invs[0].inventory.organization, cred)
results = (JobTemplate.objects.filter(Q(credential=cred) | Q(cloud_credential=cred)).all() or
InventorySource.objects.filter(credential=cred).all())
if results:
if len(results) == 1:
_update_credential_parents(results[0].inventory.organization, cred)
else:
_discover_credentials(invs, cred, org_path='inventory.organization')
_discover_credentials(results, cred, attrfunc('inventory.organization'))
continue
projs = Project.objects.filter(credential=cred).all()
if projs is not None:
if projs:
if len(projs) == 1:
_update_credential_parents(projs[0].organization, cred)
else:
_discover_credentials(projs, cred, org_path='organization')
_discover_credentials(projs, cred, attrfunc('organization'))
continue
if cred.team is not None:
@ -140,6 +146,7 @@ def migrate_credential(apps, schema_editor):
cred.save()
# no match found, log
return migrated
def migrate_inventory(apps, schema_editor):

View File

@ -163,7 +163,6 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
role_name='Credential Owner',
role_description='Owner of the credential',
parent_role=[
'team.admin_role',
'singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
],
permissions = {'all': True}
@ -179,7 +178,6 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin):
usage_role = ImplicitRoleField(
role_name='Credential User',
role_description='May use this credential, but not read sensitive portions or modify it',
parent_role= 'team.member_role',
permissions = {'use': True}
)

View File

@ -3,6 +3,7 @@ import pytest
from awx.main.access import CredentialAccess
from awx.main.models.credential import Credential
from awx.main.models.jobs import JobTemplate
from awx.main.models.inventory import InventorySource
from awx.main.migrations import _rbac as rbac
from django.apps import apps
from django.contrib.auth.models import User
@ -50,9 +51,6 @@ def test_credential_migration_team_admin(credential, team, user, permissions):
credential.team = team
credential.save()
# No permissions pre-migration
team.admin_role.children.remove(credential.owner_role)
team.member_role.children.remove(credential.usage_role)
assert not credential.accessible_by(u, permissions['usage'])
# Usage permissions post migration
@ -77,17 +75,15 @@ def test_credential_access_admin(user, team, credential):
access = CredentialAccess(u)
assert access.can_add({'user': u.pk})
assert access.can_add({'team': team.pk})
assert not access.can_change(credential, {'user': u.pk})
# unowned credential can be deleted
assert access.can_delete(credential)
# unowned credential is superuser only
assert not access.can_delete(credential)
# credential is now part of a team
# that is part of an organization
# that I am an admin for
credential.team = team
credential.owner_role.parents.add(team.admin_role)
credential.save()
credential.owner_role.rebuild_role_ancestor_list()
@ -164,17 +160,47 @@ def test_single_cred_multi_job_template_multi_org(user, organizations, credentia
assert not access.can_change(jts[0].credential, {'organization': org.pk})
@pytest.mark.django_db
def test_cred_single_org():
pass
def test_cred_inventory_source(user, inventory, credential):
u = user('member', False)
inventory.organization.member_role.members.add(u)
InventorySource.objects.create(
name="test-inv-src",
credential=credential,
inventory=inventory,
)
assert not credential.accessible_by(u, {'use':True})
rbac.migrate_credential(apps, None)
assert credential.accessible_by(u, {'use':True})
@pytest.mark.django_db
def test_cred_created_by_multi_org():
pass
def test_cred_project(user, credential, project):
u = user('member', False)
project.organization.member_role.members.add(u)
project.credential = credential
project.save()
assert not credential.accessible_by(u, {'use':True})
rbac.migrate_credential(apps, None)
assert credential.accessible_by(u, {'use':True})
@pytest.mark.django_db
def test_cred_no_org():
pass
def test_cred_no_org(user, credential):
su = user('su', True)
access = CredentialAccess(su)
assert access.can_change(credential, {'user': su.pk})
@pytest.mark.django_db
def test_cred_team():
pass
def test_cred_team(user, team, credential):
u = user('a', False)
team.member_role.members.add(u)
credential.team = team
credential.save()
assert not credential.accessible_by(u, {'use':True})
rbac.migrate_credential(apps, None)
assert credential.accessible_by(u, {'use':True})