From 09d46f9336bb6663f5fd9084ce5e4efc12d30fb3 Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Wed, 9 Mar 2016 15:33:12 -0500 Subject: [PATCH] test fixups and re-add can_access --- awx/main/access.py | 22 +++++++++++++++++++-- awx/main/models/__init__.py | 2 ++ awx/main/models/organization.py | 15 -------------- awx/main/tests/functional/test_rbac_api.py | 18 +++++++---------- awx/main/tests/functional/test_rbac_core.py | 9 ++------- 5 files changed, 31 insertions(+), 35 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index b500d526f3..201e8d59f3 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -20,7 +20,7 @@ from awx.main.models.rbac import ALL_PERMISSIONS from awx.api.license import LicenseForbids from awx.main.task_engine import TaskSerializer -__all__ = ['get_user_queryset'] +__all__ = ['get_user_queryset', 'check_user_access'] PERMISSION_TYPES = [ PERM_INVENTORY_ADMIN, @@ -90,6 +90,24 @@ def get_user_queryset(user, model_class): queryset = queryset.filter(pk__in=qs.values_list('pk', flat=True)) return queryset +def check_user_access(user, model_class, action, *args, **kwargs): + ''' + Return True if user can perform action against model_class with the + provided parameters. + ''' + for access_class in access_registry.get(model_class, []): + access_instance = access_class(user) + access_method = getattr(access_instance, 'can_%s' % action, None) + if not access_method: + logger.debug('%s.%s not found', access_instance.__class__.__name__, + 'can_%s' % action) + continue + result = access_method(*args, **kwargs) + logger.debug('%s.%s %r returned %r', access_instance.__class__.__name__, + access_method.__name__, args, result) + if result: + return result + return False class BaseAccess(object): ''' @@ -137,7 +155,7 @@ class BaseAccess(object): return self.can_change(obj, None) else: return bool(self.can_change(obj, None) and - sub_obj.accessible_by(self.user, {'read':True})) + self.user.can_access(type(sub_obj), 'read', sub_obj)) def can_unattach(self, obj, sub_obj, relationship): return self.can_change(obj, None) diff --git a/awx/main/models/__init__.py b/awx/main/models/__init__.py index 41131f481e..749807ac74 100644 --- a/awx/main/models/__init__.py +++ b/awx/main/models/__init__.py @@ -38,7 +38,9 @@ _PythonSerializer.handle_m2m_field = _new_handle_m2m_field # Add custom methods to User model for permissions checks. from django.contrib.auth.models import User # noqa from awx.main.access import * # noqa + User.add_to_class('get_queryset', get_user_queryset) +User.add_to_class('can_access', check_user_access) # Import signal handlers only after models have been defined. import awx.main.signals # noqa diff --git a/awx/main/models/organization.py b/awx/main/models/organization.py index d1e7ae3c86..3f86d5032a 100644 --- a/awx/main/models/organization.py +++ b/awx/main/models/organization.py @@ -38,16 +38,6 @@ class Organization(CommonModel, NotificationFieldsModel, ResourceMixin): app_label = 'main' ordering = ('name',) - users = models.ManyToManyField( - 'auth.User', - blank=True, - related_name='organizations', - ) - admins = models.ManyToManyField( - 'auth.User', - blank=True, - related_name='admin_of_organizations', - ) projects = models.ManyToManyField( 'Project', blank=True, @@ -96,11 +86,6 @@ class Team(CommonModelNameNotUnique, ResourceMixin): unique_together = [('organization', 'name')] ordering = ('organization__name', 'name') - users = models.ManyToManyField( - 'auth.User', - blank=True, - related_name='teams', - ) organization = models.ForeignKey( 'Organization', blank=False, diff --git a/awx/main/tests/functional/test_rbac_api.py b/awx/main/tests/functional/test_rbac_api.py index ce04260fd7..6e015b48ef 100644 --- a/awx/main/tests/functional/test_rbac_api.py +++ b/awx/main/tests/functional/test_rbac_api.py @@ -91,10 +91,10 @@ def test_get_user_roles_list(get, admin): def test_user_view_other_user_roles(organization, inventory, team, get, alice, bob): 'Users can see roles for other users, but only the roles that that user has access to see as well' organization.member_role.members.add(alice) - organization.admins.add(bob) + organization.admin_role.members.add(bob) custom_role = Role.objects.create(name='custom_role-test_user_view_admin_roles_list') organization.member_role.children.add(custom_role) - team.users.add(bob) + team.member_role.members.add(bob) # alice and bob are in the same org and can see some child role of that org. # Bob is an org admin, alice can see this. @@ -118,7 +118,7 @@ def test_user_view_other_user_roles(organization, inventory, team, get, alice, b assert team.member_role.id not in role_hash # alice can't see this # again but this time alice is part of the team, and should be able to see the team role - team.users.add(alice) + team.member_role.members.add(alice) response = get(url, alice) assert response.status_code == 200 roles = response.data @@ -271,7 +271,7 @@ def test_org_admin_add_user_to_job_template(post, organization, check_jobtemplat 'Tests that a user with permissions to assign/revoke membership to a particular role can do so' org_admin = user('org-admin') joe = user('joe') - organization.admins.add(org_admin) + organization.admin_role.members.add(org_admin) assert check_jobtemplate.accessible_by(org_admin, {'write': True}) is True assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False @@ -286,7 +286,7 @@ def test_org_admin_remove_user_to_job_template(post, organization, check_jobtemp 'Tests that a user with permissions to assign/revoke membership to a particular role can do so' org_admin = user('org-admin') joe = user('joe') - organization.admins.add(org_admin) + organization.admin_role.members.add(org_admin) check_jobtemplate.executor_role.members.add(joe) assert check_jobtemplate.accessible_by(org_admin, {'write': True}) is True @@ -336,7 +336,6 @@ def test_get_role_teams(get, team, admin, role): role.parents.add(team.member_role) url = reverse('api:role_teams_list', args=(role.id,)) response = get(url, admin) - print(response.data) assert response.status_code == 200 assert response.data['count'] == 1 assert response.data['results'][0]['id'] == team.id @@ -347,7 +346,6 @@ def test_add_team_to_role(post, team, admin, role): url = reverse('api:role_teams_list', args=(role.id,)) assert role.members.filter(id=admin.id).count() == 0 res = post(url, {'id': team.id}, admin) - print res.data assert res.status_code == 204 assert role.parents.filter(id=team.member_role.id).count() == 1 @@ -357,7 +355,6 @@ def test_remove_team_from_role(post, team, admin, role): url = reverse('api:role_teams_list', args=(role.id,)) assert role.members.filter(id=admin.id).count() == 1 res = post(url, {'disassociate': True, 'id': team.id}, admin) - print res.data assert res.status_code == 204 assert role.parents.filter(id=team.member_role.id).count() == 0 @@ -398,11 +395,10 @@ def test_role_children(get, team, admin, role): @pytest.mark.django_db def test_resource_access_list(get, team, admin, role): - team.users.add(admin) + team.member_role.members.add(admin) content_type_id = ContentType.objects.get_for_model(team).pk - url = reverse('api:resource_access_list', args=(content_type_id, team.id,)) + url = reverse('api:team_access_list', args=(team.id,)) res = get(url, admin) - print(res.data) assert res.status_code == 200 diff --git a/awx/main/tests/functional/test_rbac_core.py b/awx/main/tests/functional/test_rbac_core.py index 941a7c6042..9b3e29f6e8 100644 --- a/awx/main/tests/functional/test_rbac_core.py +++ b/awx/main/tests/functional/test_rbac_core.py @@ -91,15 +91,10 @@ def test_team_symantics(organization, team, alice): assert organization.accessible_by(alice, {'read': True}) is False team.member_role.children.add(organization.auditor_role) assert organization.accessible_by(alice, {'read': True}) is False - team.users.add(alice) + team.member_role.members.add(alice) assert organization.accessible_by(alice, {'read': True}) is True - team.users.remove(alice) + team.member_role.members.remove(alice) assert organization.accessible_by(alice, {'read': True}) is False - alice.teams.add(team) - assert organization.accessible_by(alice, {'read': True}) is True - alice.teams.remove(team) - assert organization.accessible_by(alice, {'read': True}) is False - @pytest.mark.django_db def test_auto_m2m_adjuments(organization, project, alice):