test fixups and re-add can_access

This commit is contained in:
Wayne Witzel III
2016-03-09 15:33:12 -05:00
parent 0d0d87073e
commit 09d46f9336
5 changed files with 31 additions and 35 deletions

View File

@@ -20,7 +20,7 @@ from awx.main.models.rbac import ALL_PERMISSIONS
from awx.api.license import LicenseForbids from awx.api.license import LicenseForbids
from awx.main.task_engine import TaskSerializer from awx.main.task_engine import TaskSerializer
__all__ = ['get_user_queryset'] __all__ = ['get_user_queryset', 'check_user_access']
PERMISSION_TYPES = [ PERMISSION_TYPES = [
PERM_INVENTORY_ADMIN, PERM_INVENTORY_ADMIN,
@@ -90,6 +90,24 @@ def get_user_queryset(user, model_class):
queryset = queryset.filter(pk__in=qs.values_list('pk', flat=True)) queryset = queryset.filter(pk__in=qs.values_list('pk', flat=True))
return queryset return queryset
def check_user_access(user, model_class, action, *args, **kwargs):
'''
Return True if user can perform action against model_class with the
provided parameters.
'''
for access_class in access_registry.get(model_class, []):
access_instance = access_class(user)
access_method = getattr(access_instance, 'can_%s' % action, None)
if not access_method:
logger.debug('%s.%s not found', access_instance.__class__.__name__,
'can_%s' % action)
continue
result = access_method(*args, **kwargs)
logger.debug('%s.%s %r returned %r', access_instance.__class__.__name__,
access_method.__name__, args, result)
if result:
return result
return False
class BaseAccess(object): class BaseAccess(object):
''' '''
@@ -137,7 +155,7 @@ class BaseAccess(object):
return self.can_change(obj, None) return self.can_change(obj, None)
else: else:
return bool(self.can_change(obj, None) and return bool(self.can_change(obj, None) and
sub_obj.accessible_by(self.user, {'read':True})) self.user.can_access(type(sub_obj), 'read', sub_obj))
def can_unattach(self, obj, sub_obj, relationship): def can_unattach(self, obj, sub_obj, relationship):
return self.can_change(obj, None) return self.can_change(obj, None)

View File

@@ -38,7 +38,9 @@ _PythonSerializer.handle_m2m_field = _new_handle_m2m_field
# Add custom methods to User model for permissions checks. # Add custom methods to User model for permissions checks.
from django.contrib.auth.models import User # noqa from django.contrib.auth.models import User # noqa
from awx.main.access import * # noqa from awx.main.access import * # noqa
User.add_to_class('get_queryset', get_user_queryset) User.add_to_class('get_queryset', get_user_queryset)
User.add_to_class('can_access', check_user_access)
# Import signal handlers only after models have been defined. # Import signal handlers only after models have been defined.
import awx.main.signals # noqa import awx.main.signals # noqa

View File

@@ -38,16 +38,6 @@ class Organization(CommonModel, NotificationFieldsModel, ResourceMixin):
app_label = 'main' app_label = 'main'
ordering = ('name',) ordering = ('name',)
users = models.ManyToManyField(
'auth.User',
blank=True,
related_name='organizations',
)
admins = models.ManyToManyField(
'auth.User',
blank=True,
related_name='admin_of_organizations',
)
projects = models.ManyToManyField( projects = models.ManyToManyField(
'Project', 'Project',
blank=True, blank=True,
@@ -96,11 +86,6 @@ class Team(CommonModelNameNotUnique, ResourceMixin):
unique_together = [('organization', 'name')] unique_together = [('organization', 'name')]
ordering = ('organization__name', 'name') ordering = ('organization__name', 'name')
users = models.ManyToManyField(
'auth.User',
blank=True,
related_name='teams',
)
organization = models.ForeignKey( organization = models.ForeignKey(
'Organization', 'Organization',
blank=False, blank=False,

View File

@@ -91,10 +91,10 @@ def test_get_user_roles_list(get, admin):
def test_user_view_other_user_roles(organization, inventory, team, get, alice, bob): def test_user_view_other_user_roles(organization, inventory, team, get, alice, bob):
'Users can see roles for other users, but only the roles that that user has access to see as well' 'Users can see roles for other users, but only the roles that that user has access to see as well'
organization.member_role.members.add(alice) organization.member_role.members.add(alice)
organization.admins.add(bob) organization.admin_role.members.add(bob)
custom_role = Role.objects.create(name='custom_role-test_user_view_admin_roles_list') custom_role = Role.objects.create(name='custom_role-test_user_view_admin_roles_list')
organization.member_role.children.add(custom_role) organization.member_role.children.add(custom_role)
team.users.add(bob) team.member_role.members.add(bob)
# alice and bob are in the same org and can see some child role of that org. # alice and bob are in the same org and can see some child role of that org.
# Bob is an org admin, alice can see this. # Bob is an org admin, alice can see this.
@@ -118,7 +118,7 @@ def test_user_view_other_user_roles(organization, inventory, team, get, alice, b
assert team.member_role.id not in role_hash # alice can't see this assert team.member_role.id not in role_hash # alice can't see this
# again but this time alice is part of the team, and should be able to see the team role # again but this time alice is part of the team, and should be able to see the team role
team.users.add(alice) team.member_role.members.add(alice)
response = get(url, alice) response = get(url, alice)
assert response.status_code == 200 assert response.status_code == 200
roles = response.data roles = response.data
@@ -271,7 +271,7 @@ def test_org_admin_add_user_to_job_template(post, organization, check_jobtemplat
'Tests that a user with permissions to assign/revoke membership to a particular role can do so' 'Tests that a user with permissions to assign/revoke membership to a particular role can do so'
org_admin = user('org-admin') org_admin = user('org-admin')
joe = user('joe') joe = user('joe')
organization.admins.add(org_admin) organization.admin_role.members.add(org_admin)
assert check_jobtemplate.accessible_by(org_admin, {'write': True}) is True assert check_jobtemplate.accessible_by(org_admin, {'write': True}) is True
assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False assert check_jobtemplate.accessible_by(joe, {'execute': True}) is False
@@ -286,7 +286,7 @@ def test_org_admin_remove_user_to_job_template(post, organization, check_jobtemp
'Tests that a user with permissions to assign/revoke membership to a particular role can do so' 'Tests that a user with permissions to assign/revoke membership to a particular role can do so'
org_admin = user('org-admin') org_admin = user('org-admin')
joe = user('joe') joe = user('joe')
organization.admins.add(org_admin) organization.admin_role.members.add(org_admin)
check_jobtemplate.executor_role.members.add(joe) check_jobtemplate.executor_role.members.add(joe)
assert check_jobtemplate.accessible_by(org_admin, {'write': True}) is True assert check_jobtemplate.accessible_by(org_admin, {'write': True}) is True
@@ -336,7 +336,6 @@ def test_get_role_teams(get, team, admin, role):
role.parents.add(team.member_role) role.parents.add(team.member_role)
url = reverse('api:role_teams_list', args=(role.id,)) url = reverse('api:role_teams_list', args=(role.id,))
response = get(url, admin) response = get(url, admin)
print(response.data)
assert response.status_code == 200 assert response.status_code == 200
assert response.data['count'] == 1 assert response.data['count'] == 1
assert response.data['results'][0]['id'] == team.id assert response.data['results'][0]['id'] == team.id
@@ -347,7 +346,6 @@ def test_add_team_to_role(post, team, admin, role):
url = reverse('api:role_teams_list', args=(role.id,)) url = reverse('api:role_teams_list', args=(role.id,))
assert role.members.filter(id=admin.id).count() == 0 assert role.members.filter(id=admin.id).count() == 0
res = post(url, {'id': team.id}, admin) res = post(url, {'id': team.id}, admin)
print res.data
assert res.status_code == 204 assert res.status_code == 204
assert role.parents.filter(id=team.member_role.id).count() == 1 assert role.parents.filter(id=team.member_role.id).count() == 1
@@ -357,7 +355,6 @@ def test_remove_team_from_role(post, team, admin, role):
url = reverse('api:role_teams_list', args=(role.id,)) url = reverse('api:role_teams_list', args=(role.id,))
assert role.members.filter(id=admin.id).count() == 1 assert role.members.filter(id=admin.id).count() == 1
res = post(url, {'disassociate': True, 'id': team.id}, admin) res = post(url, {'disassociate': True, 'id': team.id}, admin)
print res.data
assert res.status_code == 204 assert res.status_code == 204
assert role.parents.filter(id=team.member_role.id).count() == 0 assert role.parents.filter(id=team.member_role.id).count() == 0
@@ -398,11 +395,10 @@ def test_role_children(get, team, admin, role):
@pytest.mark.django_db @pytest.mark.django_db
def test_resource_access_list(get, team, admin, role): def test_resource_access_list(get, team, admin, role):
team.users.add(admin) team.member_role.members.add(admin)
content_type_id = ContentType.objects.get_for_model(team).pk content_type_id = ContentType.objects.get_for_model(team).pk
url = reverse('api:resource_access_list', args=(content_type_id, team.id,)) url = reverse('api:team_access_list', args=(team.id,))
res = get(url, admin) res = get(url, admin)
print(res.data)
assert res.status_code == 200 assert res.status_code == 200

View File

@@ -91,15 +91,10 @@ def test_team_symantics(organization, team, alice):
assert organization.accessible_by(alice, {'read': True}) is False assert organization.accessible_by(alice, {'read': True}) is False
team.member_role.children.add(organization.auditor_role) team.member_role.children.add(organization.auditor_role)
assert organization.accessible_by(alice, {'read': True}) is False assert organization.accessible_by(alice, {'read': True}) is False
team.users.add(alice) team.member_role.members.add(alice)
assert organization.accessible_by(alice, {'read': True}) is True assert organization.accessible_by(alice, {'read': True}) is True
team.users.remove(alice) team.member_role.members.remove(alice)
assert organization.accessible_by(alice, {'read': True}) is False assert organization.accessible_by(alice, {'read': True}) is False
alice.teams.add(team)
assert organization.accessible_by(alice, {'read': True}) is True
alice.teams.remove(team)
assert organization.accessible_by(alice, {'read': True}) is False
@pytest.mark.django_db @pytest.mark.django_db
def test_auto_m2m_adjuments(organization, project, alice): def test_auto_m2m_adjuments(organization, project, alice):