diff --git a/awx/main/models/credential.py b/awx/main/models/credential.py index 6e16851145..a7645fdfe8 100644 --- a/awx/main/models/credential.py +++ b/awx/main/models/credential.py @@ -156,19 +156,16 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin): help_text=_('Vault password (or "ASK" to prompt the user).'), ) owner_role = ImplicitRoleField( - role_name='Credential Owner', - parent_role=[ - 'user.user_role', - 'team.admin_role' - ], + role_name='Credential Owner', + parent_role='team.admin_role', resource_field='resource', permissions = { 'all': True } ) usage_role = ImplicitRoleField( - role_name='Credential User', + role_name='Credential User', resource_field='resource', parent_role= 'team.member_role', - permissions = { 'usage': True } + permissions = { 'use': True } ) @property @@ -366,6 +363,15 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin): update_fields.append('cloud') super(Credential, self).save(*args, **kwargs) + def migrate_to_rbac(self): + if self.user: + self.owner_role.members.add(self.user) + return [self.user] + elif self.team: + self.owner_role.parents.add(self.team.admin_role) + self.usage_role.parents.add(self.team.member_role) + return [self.team] + def validate_ssh_private_key(data): """Validate that the given SSH private key or certificate is, in fact, valid. diff --git a/awx/main/tests/functional/conftest.py b/awx/main/tests/functional/conftest.py index 22dd64c894..d3454a8a4e 100644 --- a/awx/main/tests/functional/conftest.py +++ b/awx/main/tests/functional/conftest.py @@ -1,6 +1,10 @@ import pytest -from awx.main.models.organization import Organization +from awx.main.models.credential import Credential +from awx.main.models.organization import ( + Organization, + Team, +) from django.contrib.auth.models import User @pytest.fixture @@ -14,10 +18,18 @@ def user(): return user return u +@pytest.fixture +def team(organization): + return Team.objects.create(organization=organization, name='test-team') + @pytest.fixture def organization(): return Organization.objects.create(name="test-org", description="test-org-desc") +@pytest.fixture +def credential(): + return Credential.objects.create(kind='aws', name='test-cred') + @pytest.fixture def permissions(): return { @@ -26,5 +38,8 @@ def permissions(): 'auditor':{'read':True, 'create':False, 'write':False, 'update':False, 'delete':False, 'scm_update':False, 'execute':False, 'use':False,}, + + 'usage':{'read':False, 'create':False, 'write':False, + 'update':False, 'delete':False, 'scm_update':False, 'execute':False, 'use':True,}, } diff --git a/awx/main/tests/functional/test_rbac_credential.py b/awx/main/tests/functional/test_rbac_credential.py new file mode 100644 index 0000000000..8868501e6a --- /dev/null +++ b/awx/main/tests/functional/test_rbac_credential.py @@ -0,0 +1,44 @@ +import pytest + +@pytest.mark.django_db +def test_credential_migration_user(credential, user, permissions): + u = user('user', False) + credential.user = u + migrated = credential.migrate_to_rbac() + assert len(migrated) == 1 + assert credential.accessible_by(u, permissions['admin']) + +@pytest.mark.django_db +def test_credential_usage_role(credential, user, permissions): + u = user('user', False) + credential.usage_role.members.add(u) + assert credential.accessible_by(u, permissions['usage']) + +@pytest.mark.django_db +def test_credential_migration_team_member(credential, team, user, permissions): + u = user('user', False) + team.admin_role.members.add(u) + credential.team = team + + # No permissions pre-migration + assert credential.accessible_by(u, permissions['admin']) == False + + migrated = credential.migrate_to_rbac() + # Admin permissions post migration + assert len(migrated) == 1 + assert credential.accessible_by(u, permissions['admin']) + +@pytest.mark.django_db +def test_credential_migration_team_admin(credential, team, user, permissions): + u = user('user', False) + team.member_role.members.add(u) + credential.team = team + + # No permissions pre-migration + assert credential.accessible_by(u, permissions['usage']) == False + + # Usage permissions post migration + migrated = credential.migrate_to_rbac() + assert len(migrated) == 1 + assert credential.accessible_by(u, permissions['usage']) +