mirror of
https://github.com/ansible/awx.git
synced 2026-01-19 13:41:28 -03:30
Credential migration and initial tests
This commit is contained in:
parent
63d8efc268
commit
b59e960b46
@ -58,20 +58,6 @@ access_registry = {
|
||||
}
|
||||
|
||||
|
||||
def user_or_team(data):
|
||||
try:
|
||||
if 'user' in data:
|
||||
pk = get_pk_from_dict(data, 'user')
|
||||
return get_object_or_400(User, pk=pk), None
|
||||
elif 'team' in data:
|
||||
pk = get_pk_from_dict(data, 'team')
|
||||
return None, get_object_or_400(Team, pk=pk)
|
||||
else:
|
||||
return None, None
|
||||
except ParseError:
|
||||
return None, None
|
||||
|
||||
|
||||
def register_access(model_class, access_class):
|
||||
access_classes = access_registry.setdefault(model_class, [])
|
||||
access_classes.append(access_class)
|
||||
@ -566,14 +552,16 @@ class CredentialAccess(BaseAccess):
|
||||
if self.user.is_superuser:
|
||||
return True
|
||||
|
||||
user, team = user_or_team(data)
|
||||
if user is None and team is None:
|
||||
return False
|
||||
|
||||
if user is not None:
|
||||
if 'user' in data:
|
||||
pk = get_pk_from_dict(data, 'user')
|
||||
user = get_object_or_400(User, pk=pk)
|
||||
return user.accessible_by(self.user, {'write': True})
|
||||
if team is not None:
|
||||
return team.accessible_by(self.user, {'write':True})
|
||||
elif 'organization' in data:
|
||||
pk = get_pk_from_dict(data, 'organization')
|
||||
org = get_object_or_400(Organization, pk=pk)
|
||||
return org.accessible_by(self.user, {'write': True})
|
||||
|
||||
return False
|
||||
|
||||
def can_change(self, obj, data):
|
||||
if self.user.is_superuser:
|
||||
@ -585,8 +573,8 @@ class CredentialAccess(BaseAccess):
|
||||
def can_delete(self, obj):
|
||||
# Unassociated credentials may be marked deleted by anyone, though we
|
||||
# shouldn't ever end up with those.
|
||||
if obj.user is None and obj.team is None:
|
||||
return True
|
||||
#if obj.user is None and obj.team is None:
|
||||
# return True
|
||||
return self.can_change(obj, None)
|
||||
|
||||
class TeamAccess(BaseAccess):
|
||||
|
||||
@ -14,8 +14,8 @@ class Migration(migrations.Migration):
|
||||
operations = [
|
||||
migrations.RunPython(rbac.migrate_users),
|
||||
migrations.RunPython(rbac.migrate_organization),
|
||||
migrations.RunPython(rbac.migrate_credential),
|
||||
migrations.RunPython(rbac.migrate_team),
|
||||
migrations.RunPython(rbac.migrate_inventory),
|
||||
migrations.RunPython(rbac.migrate_projects),
|
||||
migrations.RunPython(rbac.migrate_credential),
|
||||
]
|
||||
|
||||
@ -1,4 +1,7 @@
|
||||
import logging
|
||||
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.db.models import Q
|
||||
|
||||
from collections import defaultdict
|
||||
import _old_access as old_access
|
||||
@ -52,18 +55,117 @@ def migrate_team(apps, schema_editor):
|
||||
migrations[t.name].append(user)
|
||||
return migrations
|
||||
|
||||
def _update_jt_creds(jts, cred):
|
||||
for i, jt in enumerate(jts):
|
||||
if i == 0:
|
||||
jt.inventory.organization.admin_role.children.add(cred.owner_role)
|
||||
continue
|
||||
|
||||
cred.pk = None
|
||||
cred.user = None
|
||||
cred.save()
|
||||
|
||||
jt.credential = cred
|
||||
jt.inventory.organization.admin_role.children.add(cred.owner_role)
|
||||
jt.save()
|
||||
|
||||
def _update_proj_creds(projects, cred):
|
||||
for i, proj in enumerate(projects):
|
||||
if i == 0:
|
||||
proj.organization.admin_role.children.add(cred.owner_role)
|
||||
continue
|
||||
|
||||
cred.pk = None
|
||||
cred.user = None
|
||||
cred.save()
|
||||
|
||||
proj.credential = cred
|
||||
proj.organization.admin_role.children.add(cred.owner_role)
|
||||
proj.save()
|
||||
|
||||
def _handle_single_cred(org, cred):
|
||||
org.admin_role.children.add(cred.owner_role)
|
||||
org.member_role.children.add(cred.usage_role)
|
||||
cred.user, cred.team = None, None
|
||||
cred.save()
|
||||
|
||||
def _handle_multi_cred(insts, cred, org_path='organization'):
|
||||
def get_org(inst):
|
||||
fields = org_path.split('.')
|
||||
for field in fields:
|
||||
inst = getattr(inst, field)
|
||||
return inst
|
||||
|
||||
orgs = defaultdict(list)
|
||||
for inst in insts:
|
||||
orgs[get_org(inst)].append(inst)
|
||||
|
||||
if len(orgs) == 1:
|
||||
_handle_single_cred(insts[0].inventory.organization, cred)
|
||||
else:
|
||||
for pos, org in enumerate(orgs):
|
||||
if pos == 0:
|
||||
_handle_single_cred(org, cred)
|
||||
else:
|
||||
cred.pk, cred.user, cred.team = None, None, None
|
||||
cred.save()
|
||||
cred.owner_role, cred.usage_role = None, None
|
||||
cred.save()
|
||||
|
||||
for i in orgs[org]:
|
||||
i.credential = cred
|
||||
i.save()
|
||||
|
||||
_handle_single_cred(org, cred)
|
||||
|
||||
|
||||
def migrate_credential(apps, schema_editor):
|
||||
migrations = defaultdict(list)
|
||||
credential = apps.get_model('main', "Credential")
|
||||
for cred in credential.objects.all():
|
||||
if cred.user:
|
||||
cred.owner_role.members.add(cred.user)
|
||||
migrations[cred.name].append(cred.user)
|
||||
elif cred.team:
|
||||
cred.owner_role.parents.add(cred.team.admin_role)
|
||||
cred.usage_role.parents.add(cred.team.member_role)
|
||||
migrations[cred.name].append(cred.team)
|
||||
return migrations
|
||||
Credential = apps.get_model('main', "Credential")
|
||||
JobTemplate = apps.get_model('main', 'JobTemplate')
|
||||
Project = apps.get_model('main', 'Project')
|
||||
InventorySource = apps.get_model('main', 'InventorySource')
|
||||
|
||||
|
||||
for cred in Credential.objects.all():
|
||||
jts = JobTemplate.objects.filter(Q(credential=cred) | Q(cloud_credential=cred)).all()
|
||||
if jts is not None:
|
||||
if len(jts) == 1:
|
||||
_handle_single_cred(jts[0].inventory.organization, cred)
|
||||
else:
|
||||
_handle_multi_cred(jts, cred, org_path='inventory.organization')
|
||||
continue
|
||||
|
||||
invs = InventorySource.objects.filter(credential=cred).all()
|
||||
if invs is not None:
|
||||
if len(invs) == 1:
|
||||
_single_cred(invs[0].inventory.organization, cred)
|
||||
else:
|
||||
_multi_cred(invs, cred, org_path='inventory.organization')
|
||||
continue
|
||||
|
||||
projs = Project.objects.filter(credential=cred).all()
|
||||
if projs is not None:
|
||||
if len(projs) == 1:
|
||||
_single_cred(projs[0].organization, cred)
|
||||
else:
|
||||
_multi_cred(projs, cred, org_path='organization')
|
||||
continue
|
||||
|
||||
if cred.team is not None:
|
||||
cred.team.admin_role.children.add(cred.owner_role)
|
||||
cred.team.member_role.children.add(cred.usage_role)
|
||||
cred.user = None
|
||||
cred.team = None
|
||||
cred.save()
|
||||
|
||||
elif cred.user is not None:
|
||||
cred.user.admin_role.children.add(cred.owner_role)
|
||||
cred.user = None
|
||||
cred.team = None
|
||||
cred.save()
|
||||
|
||||
# no match found, log
|
||||
|
||||
|
||||
def migrate_inventory(apps, schema_editor):
|
||||
migrations = defaultdict(dict)
|
||||
|
||||
@ -2,6 +2,7 @@ import pytest
|
||||
|
||||
from awx.main.access import CredentialAccess
|
||||
from awx.main.models.credential import Credential
|
||||
from awx.main.models.jobs import JobTemplate
|
||||
from awx.main.migrations import _rbac as rbac
|
||||
from django.apps import apps
|
||||
from django.contrib.auth.models import User
|
||||
@ -96,3 +97,84 @@ def test_credential_access_admin(user, team, credential):
|
||||
|
||||
# should have can_change access as org-admin
|
||||
assert access.can_change(credential, {'user': u.pk})
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_cred_job_template(user, deploy_jobtemplate):
|
||||
a = user('admin', False)
|
||||
org = deploy_jobtemplate.project.organization
|
||||
org.admin_role.members.add(a)
|
||||
|
||||
cred = deploy_jobtemplate.credential
|
||||
cred.user = user('john', False)
|
||||
cred.save()
|
||||
|
||||
access = CredentialAccess(a)
|
||||
rbac.migrate_credential(apps, None)
|
||||
assert access.can_change(cred, {'organization': org.pk})
|
||||
|
||||
org.admin_role.members.remove(a)
|
||||
assert not access.can_change(cred, {'organization': org.pk})
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_cred_multi_job_template_single_org(user, deploy_jobtemplate):
|
||||
a = user('admin', False)
|
||||
org = deploy_jobtemplate.project.organization
|
||||
org.admin_role.members.add(a)
|
||||
|
||||
cred = deploy_jobtemplate.credential
|
||||
cred.user = user('john', False)
|
||||
cred.save()
|
||||
|
||||
access = CredentialAccess(a)
|
||||
rbac.migrate_credential(apps, None)
|
||||
assert access.can_change(cred, {'organization': org.pk})
|
||||
|
||||
org.admin_role.members.remove(a)
|
||||
assert not access.can_change(cred, {'organization': org.pk})
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_single_cred_multi_job_template_multi_org(user, organizations, credential):
|
||||
orgs = organizations(2)
|
||||
jts = []
|
||||
for org in orgs:
|
||||
inv = org.inventories.create(name="inv-%d" % org.pk)
|
||||
jt = JobTemplate.objects.create(
|
||||
inventory=inv,
|
||||
credential=credential,
|
||||
name="test-jt-org-%d" % org.pk,
|
||||
job_type='check',
|
||||
)
|
||||
jts.append(jt)
|
||||
|
||||
a = user('admin', False)
|
||||
orgs[0].admin_role.members.add(a)
|
||||
orgs[1].admin_role.members.add(a)
|
||||
|
||||
access = CredentialAccess(a)
|
||||
rbac.migrate_credential(apps, None)
|
||||
|
||||
for jt in jts:
|
||||
jt.refresh_from_db()
|
||||
|
||||
assert jts[0].credential != jts[1].credential
|
||||
assert access.can_change(jts[0].credential, {'organization': org.pk})
|
||||
assert access.can_change(jts[1].credential, {'organization': org.pk})
|
||||
|
||||
orgs[0].admin_role.members.remove(a)
|
||||
assert not access.can_change(jts[0].credential, {'organization': org.pk})
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_cred_single_org():
|
||||
pass
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_cred_created_by_multi_org():
|
||||
pass
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_cred_no_org():
|
||||
pass
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_cred_team():
|
||||
pass
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user