diff --git a/awx/main/fields.py b/awx/main/fields.py index e5210b423b..cb19a4c2c6 100644 --- a/awx/main/fields.py +++ b/awx/main/fields.py @@ -2,7 +2,7 @@ # All Rights Reserved. # Django -from django.db.models.signals import post_save, post_init +from django.db.models.signals import post_save from django.db import models from django.db.models.fields.related import SingleRelatedObjectDescriptor from django.db.models.fields.related import ReverseSingleRelatedObjectDescriptor @@ -66,7 +66,7 @@ class ResourceFieldDescriptor(ReverseSingleRelatedObjectDescriptor): # Take first non null parent resource parent = None if type(self.parent_resource) is list: - for path in self.parent_resource: + for path in self.parent_resource: parent = resolve_field(instance, path) if parent: break @@ -123,7 +123,7 @@ class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor): if self.parent_role: # Add all non-null parent roles as parents if type(self.parent_role) is list: - for path in self.parent_role: + for path in self.parent_role: if path.startswith("singleton:"): parent = Role.singleton(path[10:]) else: @@ -142,7 +142,7 @@ class ImplicitRoleDescriptor(ReverseSingleRelatedObjectDescriptor): if self.resource_field and self.permissions: permissions = RolePermission( - role=role, + role=role, resource=getattr(instance, self.resource_field) ) @@ -180,13 +180,13 @@ class ImplicitRoleField(models.ForeignKey): def contribute_to_class(self, cls, name): super(ImplicitRoleField, self).contribute_to_class(cls, name) - setattr(cls, - self.name, + setattr(cls, + self.name, ImplicitRoleDescriptor( - self.role_name, - self.resource_field, - self.permissions, - self.parent_role, + self.role_name, + self.resource_field, + self.permissions, + self.parent_role, self ) ) diff --git a/awx/main/models/credential.py b/awx/main/models/credential.py index 6e16851145..462cf35249 100644 --- a/awx/main/models/credential.py +++ b/awx/main/models/credential.py @@ -156,19 +156,16 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin): help_text=_('Vault password (or "ASK" to prompt the user).'), ) owner_role = ImplicitRoleField( - role_name='Credential Owner', - parent_role=[ - 'user.user_role', - 'team.admin_role' - ], + role_name='Credential Owner', + parent_role='team.admin_role', resource_field='resource', - permissions = { 'all': True } + permissions = {'all': True} ) usage_role = ImplicitRoleField( - role_name='Credential User', + role_name='Credential User', resource_field='resource', parent_role= 'team.member_role', - permissions = { 'usage': True } + permissions = {'use': True} ) @property @@ -366,6 +363,15 @@ class Credential(PasswordFieldsModel, CommonModelNameNotUnique, ResourceMixin): update_fields.append('cloud') super(Credential, self).save(*args, **kwargs) + def migrate_to_rbac(self): + if self.user: + self.owner_role.members.add(self.user) + return [self.user] + elif self.team: + self.owner_role.parents.add(self.team.admin_role) + self.usage_role.parents.add(self.team.member_role) + return [self.team] + def validate_ssh_private_key(data): """Validate that the given SSH private key or certificate is, in fact, valid. diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 313204454a..e33cec1a23 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -94,22 +94,22 @@ class Inventory(CommonModel, ResourceMixin): help_text=_('Number of external inventory sources in this inventory with failures.'), ) admin_role = ImplicitRoleField( - role_name='Inventory Administrator', + role_name='Inventory Administrator', parent_role='organization.admin_role', resource_field='resource', - permissions = { 'all': True } + permissions = {'all': True} ) auditor_role = ImplicitRoleField( - role_name='Inventory Auditor', + role_name='Inventory Auditor', parent_role='organization.auditor_role', resource_field='resource', - permissions = { 'read': True } + permissions = {'read': True} ) updater_role = ImplicitRoleField( - role_name='Inventory Updater', + role_name='Inventory Updater', ) executor_role = ImplicitRoleField( - role_name='Inventory Executor', + role_name='Inventory Executor', ) def get_absolute_url(self): @@ -543,23 +543,23 @@ class Group(CommonModelNameNotUnique, ResourceMixin): help_text=_('Inventory source(s) that created or modified this group.'), ) admin_role = ImplicitRoleField( - role_name='Inventory Group Administrator', + role_name='Inventory Group Administrator', parent_role='inventory.admin_role', resource_field='resource', - permissions = { 'all': True } + permissions = {'all': True} ) auditor_role = ImplicitRoleField( - role_name='Inventory Group Auditor', + role_name='Inventory Group Auditor', parent_role='inventory.auditor_role', resource_field='resource', - permissions = { 'read': True } + permissions = {'read': True} ) updater_role = ImplicitRoleField( - role_name='Inventory Group Updater', + role_name='Inventory Group Updater', parent_role='inventory.updater_role' ) executor_role = ImplicitRoleField( - role_name='Inventory Group Executor', + role_name='Inventory Group Executor', parent_role='inventory.executor_role' ) @@ -1186,7 +1186,7 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions, ResourceMixin) return 'never updated' # inherit the child job status else: - return self.last_job.status + return self.last_job.status else: return 'none' diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 9c3866b593..d8f114bc40 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -182,22 +182,22 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin): default={}, ) admin_role = ImplicitRoleField( - role_name='Job Template Administrator', + role_name='Job Template Administrator', parent_role='project.admin_role', resource_field='resource', - permissions = { 'all': True } + permissions = {'all': True} ) auditor_role = ImplicitRoleField( - role_name='Job Template Auditor', + role_name='Job Template Auditor', parent_role='project.auditor_role', resource_field='resource', - permissions = { 'read': True } + permissions = {'read': True} ) executor_role = ImplicitRoleField( - role_name='Job Template Executor', + role_name='Job Template Executor', parent_role='project.auditor_role', resource_field='resource', - permissions = { 'execute': True } + permissions = {'execute': True} ) @classmethod diff --git a/awx/main/models/organization.py b/awx/main/models/organization.py index 37cd56543d..306a4ff42b 100644 --- a/awx/main/models/organization.py +++ b/awx/main/models/organization.py @@ -55,12 +55,12 @@ class Organization(CommonModel, ResourceMixin): admin_role = ImplicitRoleField( role_name='Organization Administrator', resource_field='resource', - permissions = { 'all': True } + permissions = {'all': True} ) auditor_role = ImplicitRoleField( role_name='Organization Auditor', resource_field='resource', - permissions = { 'read': True } + permissions = {'read': True} ) @@ -118,17 +118,19 @@ class Team(CommonModelNameNotUnique, ResourceMixin): role_name='Team Administrator', parent_role='organization.admin_role', resource_field='resource', - permissions = { 'all': True } + permissions = {'all': True} ) auditor_role = ImplicitRoleField( role_name='Team Auditor', parent_role='organization.auditor_role', resource_field='resource', - permissions = { 'read': True } + permissions = {'read': True} ) member_role = ImplicitRoleField( role_name='Team Member', parent_role='admin_role', + resource_field='resource', + permissions = {'read':True}, ) def get_absolute_url(self): @@ -142,6 +144,13 @@ class Team(CommonModelNameNotUnique, ResourceMixin): cred.mark_inactive() super(Team, self).mark_inactive(save=save) + def migrate_to_rbac(self): + migrated = [] + for user in self.users.all(): + self.member_role.members.add(user) + migrated.append(user) + return migrated + class Permission(CommonModelNameNotUnique): ''' A permission allows a user, project, or team to be able to use an inventory source. diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index cb3f143dbd..1da3d51961 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -216,28 +216,28 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin): blank=True, ) admin_role = ImplicitRoleField( - role_name='Project Administrator', + role_name='Project Administrator', parent_role='organization.admin_role', resource_field='resource', - permissions = { 'all': True } + permissions = {'all': True} ) auditor_role = ImplicitRoleField( - role_name='Project Auditor', + role_name='Project Auditor', parent_role='organization.auditor_role', resource_field='resource', - permissions = { 'read': True } + permissions = {'read': True} ) member_role = ImplicitRoleField( - role_name='Project Member', + role_name='Project Member', parent_role='admin', resource_field='resource', - permissions = { 'usage': True } + permissions = {'usage': True} ) scm_update_role = ImplicitRoleField( - role_name='Project Updater', + role_name='Project Updater', parent_role='admin', resource_field='resource', - permissions = { 'scm_update': True } + permissions = {'scm_update': True} ) @classmethod @@ -333,7 +333,7 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin): if (self.last_job_run + datetime.timedelta(seconds=self.scm_update_cache_timeout)) > now(): return True return False - + @property def needs_update_on_launch(self): if self.active and self.scm_type and self.scm_update_on_launch: diff --git a/awx/main/models/rbac.py b/awx/main/models/rbac.py index 37948fe14e..d8cdaecfe2 100644 --- a/awx/main/models/rbac.py +++ b/awx/main/models/rbac.py @@ -18,7 +18,7 @@ logger = logging.getLogger('awx.main.models.rbac') class Role(CommonModelNameNotUnique): ''' - Role model + Role model ''' class Meta: @@ -82,7 +82,7 @@ class Role(CommonModelNameNotUnique): except Role.DoesNotExist: ret = Role(singleton_name=name) ret.save() - return ret; + return ret @@ -102,7 +102,7 @@ class RoleHierarchy(CreatedModifiedModel): class Resource(CommonModelNameNotUnique): ''' - Role model + Role model ''' class Meta: diff --git a/awx/main/tests/functional/conftest.py b/awx/main/tests/functional/conftest.py index 98d31528ce..c565cebac5 100644 --- a/awx/main/tests/functional/conftest.py +++ b/awx/main/tests/functional/conftest.py @@ -1,11 +1,35 @@ import pytest -from awx.main.models.organization import Organization +from awx.main.models.credential import Credential +from awx.main.models.organization import ( + Organization, + Team, +) +from django.contrib.auth.models import User + +@pytest.fixture +def user(): + def u(name, is_superuser=False): + try: + user = User.objects.get(username=name) + except User.DoesNotExist: + user = User(username=name, is_superuser=is_superuser, password=name) + user.save() + return user + return u + +@pytest.fixture +def team(organization): + return Team.objects.create(organization=organization, name='test-team') @pytest.fixture def organization(): return Organization.objects.create(name="test-org", description="test-org-desc") +@pytest.fixture +def credential(): + return Credential.objects.create(kind='aws', name='test-cred') + @pytest.fixture def permissions(): return { @@ -14,5 +38,8 @@ def permissions(): 'auditor':{'read':True, 'create':False, 'write':False, 'update':False, 'delete':False, 'scm_update':False, 'execute':False, 'use':False,}, + + 'usage':{'read':False, 'create':False, 'write':False, + 'update':False, 'delete':False, 'scm_update':False, 'execute':False, 'use':True,}, } diff --git a/awx/main/tests/functional/test_rbac_credential.py b/awx/main/tests/functional/test_rbac_credential.py new file mode 100644 index 0000000000..173467f258 --- /dev/null +++ b/awx/main/tests/functional/test_rbac_credential.py @@ -0,0 +1,44 @@ +import pytest + +@pytest.mark.django_db +def test_credential_migration_user(credential, user, permissions): + u = user('user', False) + credential.user = u + migrated = credential.migrate_to_rbac() + assert len(migrated) == 1 + assert credential.accessible_by(u, permissions['admin']) + +@pytest.mark.django_db +def test_credential_usage_role(credential, user, permissions): + u = user('user', False) + credential.usage_role.members.add(u) + assert credential.accessible_by(u, permissions['usage']) + +@pytest.mark.django_db +def test_credential_migration_team_member(credential, team, user, permissions): + u = user('user', False) + team.admin_role.members.add(u) + credential.team = team + + # No permissions pre-migration + assert not credential.accessible_by(u, permissions['admin']) + + migrated = credential.migrate_to_rbac() + # Admin permissions post migration + assert len(migrated) == 1 + assert credential.accessible_by(u, permissions['admin']) + +@pytest.mark.django_db +def test_credential_migration_team_admin(credential, team, user, permissions): + u = user('user', False) + team.member_role.members.add(u) + credential.team = team + + # No permissions pre-migration + assert not credential.accessible_by(u, permissions['usage']) + + # Usage permissions post migration + migrated = credential.migrate_to_rbac() + assert len(migrated) == 1 + assert credential.accessible_by(u, permissions['usage']) + diff --git a/awx/main/tests/functional/test_rbac_organization.py b/awx/main/tests/functional/test_rbac_organization.py index 1e4c90e936..1eadd5a866 100644 --- a/awx/main/tests/functional/test_rbac_organization.py +++ b/awx/main/tests/functional/test_rbac_organization.py @@ -1,51 +1,43 @@ import pytest from awx.main.access import OrganizationAccess -from django.contrib.auth.models import User - -def make_user(name, admin=False): - try: - user = User.objects.get(username=name) - except User.DoesNotExist: - user = User(username=name, is_superuser=admin, password=name) - user.save() - return user @pytest.mark.django_db -@pytest.mark.parametrize("username,admin", [ - ("admin", True), - ("user", False), -]) -def test_organization_migration(organization, permissions, username, admin): - user = make_user(username, admin) - if admin: - organization.admins.add(user) - else: - organization.users.add(user) +def test_organization_migration_admin(organization, permissions, user): + u = user('admin', True) + organization.admins.add(u) + + assert not organization.accessible_by(u, permissions['admin']) migrated_users = organization.migrate_to_rbac() assert len(migrated_users) == 1 - assert migrated_users[0] == user - - if admin: - assert organization.accessible_by(user, permissions['admin']) == True - else: - assert organization.accessible_by(user, permissions['auditor']) == True + assert organization.accessible_by(u, permissions['admin']) @pytest.mark.django_db -@pytest.mark.parametrize("username,admin", [ - ("admin", True), - ("user-admin", False), - ("user", False) -]) -def test_organization_access(organization, username, admin): - user = make_user(username, admin) - access = OrganizationAccess(user) - if admin: - assert access.can_change(organization, None) == True - elif username == "user-admin": - organization.admins.add(user) - assert access.can_change(organization, None) == True - else: - assert access.can_change(organization, None) == False +def test_organization_migration_user(organization, permissions, user): + u = user('user', False) + organization.users.add(u) + assert not organization.accessible_by(u, permissions['auditor']) + + migrated_users = organization.migrate_to_rbac() + assert len(migrated_users) == 1 + assert organization.accessible_by(u, permissions['auditor']) + +@pytest.mark.django_db +def test_organization_access_superuser(organization, user): + access = OrganizationAccess(user('admin', True)) + assert access.can_change(organization, None) + +@pytest.mark.django_db +def test_organization_access_admin(organization, user): + u = user('admin', False) + organization.admins.add(u) + + access = OrganizationAccess(u) + assert access.can_change(organization, None) + +@pytest.mark.django_db +def test_organization_access_user(organization, user): + access = OrganizationAccess(user('user', False)) + assert not access.can_change(organization, None) diff --git a/awx/main/tests/functional/test_rbac_team.py b/awx/main/tests/functional/test_rbac_team.py new file mode 100644 index 0000000000..42356783f3 --- /dev/null +++ b/awx/main/tests/functional/test_rbac_team.py @@ -0,0 +1,13 @@ +import pytest + +@pytest.mark.django_db +def test_team_migration_user(team, user, permissions): + u = user('user', False) + team.users.add(u) + + assert not team.accessible_by(u, permissions['auditor']) + + migrated = team.migrate_to_rbac() + assert len(migrated) == 1 + assert team.accessible_by(u, permissions['auditor']) +