Merge pull request #1121 from rooftopcellist/organization_based_permission

Organization based permission
This commit is contained in:
Christian Adams
2018-04-04 10:39:40 -04:00
committed by GitHub
11 changed files with 295 additions and 140 deletions

View File

@@ -580,69 +580,77 @@ class UserAccess(BaseAccess):
class OAuth2ApplicationAccess(BaseAccess):
'''
I can read, change or delete OAuth applications when:
I can read, change or delete OAuth 2 applications when:
- I am a superuser.
- I am the admin of the organization of the user of the application.
- I am the user of the application.
I can create OAuth applications when:
- I am a user in the organization of the application.
I can create OAuth 2 applications when:
- I am a superuser.
- I am the admin of the organization of the user of the application.
- I am the admin of the organization of the application.
'''
model = OAuth2Application
select_related = ('user',)
def filtered_queryset(self):
accessible_users = User.objects.filter(
pk__in=self.user.admin_of_organizations.values('member_role__members')
) | User.objects.filter(pk=self.user.pk)
return self.model.objects.filter(user__in=accessible_users)
return self.model.objects.filter(organization__in=self.user.organizations)
def can_change(self, obj, data):
return self.can_read(obj)
return self.user.is_superuser or self.check_related('organization', Organization, data, obj=obj,
role_field='admin_role', mandatory=True)
def can_delete(self, obj):
return self.can_read(obj)
return self.user.is_superuser or obj.organization in self.user.admin_of_organizations
def can_add(self, data):
if self.user.is_superuser:
return True
user = get_object_from_data('user', User, data)
if not user:
return False
return set(self.user.admin_of_organizations.all()) & set(user.organizations.all())
return True
if not data:
return Organization.accessible_objects(self.user, 'admin_role').exists()
return self.check_related('organization', Organization, data, role_field='admin_role', mandatory=True)
class OAuth2TokenAccess(BaseAccess):
'''
I can read, change or delete an OAuth2 token when:
I can read, change or delete an app token when:
- I am a superuser.
- I am the admin of the organization of the user of the token.
- I am the admin of the organization of the application of the token.
- I am the user of the token.
I can create an OAuth token when:
I can create an OAuth2 app token when:
- I have the read permission of the related application.
I can read, change or delete a personal token when:
- I am the user of the token
- I am the superuser
I can create an OAuth2 Personal Access Token when:
- I am a user. But I can only create a PAT for myself.
'''
model = OAuth2AccessToken
select_related = ('user', 'application')
def filtered_queryset(self):
accessible_users = User.objects.filter(
pk__in=self.user.admin_of_organizations.values('member_role__members')
) | User.objects.filter(pk=self.user.pk)
return self.model.objects.filter(user__in=accessible_users)
def can_change(self, obj, data):
return self.can_read(obj)
def filtered_queryset(self):
org_access_qs = Organization.objects.filter(
Q(admin_role__members=self.user) | Q(auditor_role__members=self.user))
return self.model.objects.filter(application__organization__in=org_access_qs) | self.model.objects.filter(user__id=self.user.pk)
def can_delete(self, obj):
return self.can_read(obj)
if (self.user.is_superuser) | (obj.user == self.user):
return True
elif not obj.application:
return False
return self.user in obj.application.organization.admin_role
def can_change(self, obj, data):
return self.can_delete(obj)
def can_add(self, data):
app = get_object_from_data('application', OAuth2Application, data)
if not app:
return True
return OAuth2ApplicationAccess(self.user).can_read(app)
if 'application' in data:
app = get_object_from_data('application', OAuth2Application, data)
if app is None:
return True
return OAuth2ApplicationAccess(self.user).can_read(app)
return True
class OrganizationAccess(BaseAccess):

View File

@@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.11 on 2018-03-16 20:25
from __future__ import unicode_literals
import awx.main.fields
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('main', '0027_v330_add_tower_verify'),
]
operations = [
migrations.AddField(
model_name='oauth2application',
name='organization',
field=models.ForeignKey(help_text='Organization containing this application.', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='applications', to='main.Organization'),
),
]

View File

@@ -31,6 +31,13 @@ class OAuth2Application(AbstractApplication):
editable=False,
validators=[RegexValidator(DATA_URI_RE)],
)
organization = models.ForeignKey(
'Organization',
related_name='applications',
help_text=_('Organization containing this application.'),
on_delete=models.CASCADE,
null=True,
)
class OAuth2AccessToken(AbstractAccessToken):

View File

@@ -634,12 +634,3 @@ def create_access_token_user_if_missing(sender, **kwargs):
post_save.connect(create_access_token_user_if_missing, sender=OAuth2AccessToken)
# @receiver(post_save, sender=User)
# def create_default_oauth_app(sender, **kwargs):
# if kwargs.get('created', False):
# user = kwargs['instance']
# OAuth2Application.objects.create(
# name='Default application for {}'.format(user.username),
# user=user, client_type='confidential', redirect_uris='',
# authorization_grant_type='password'
# )

View File

@@ -19,44 +19,42 @@ def test_personal_access_token_creation(oauth_application, post, alice):
oauth_application.client_id, oauth_application.client_secret
]))
)
resp_json = resp._container[0]
assert 'access_token' in resp_json
assert 'scope' in resp_json
assert 'refresh_token' in resp_json
@pytest.mark.django_db
def test_oauth_application_create(admin, post):
def test_oauth_application_create(admin, organization, post):
response = post(
reverse('api:o_auth2_application_list'), {
'name': 'test app',
'user': admin.pk,
'organization': organization.pk,
'client_type': 'confidential',
'authorization_grant_type': 'password',
}, admin, expect=201
)
assert 'modified' in response.data
assert 'updated' not in response.data
assert 'user' in response.data['related']
created_app = Application.objects.get(client_id=response.data['client_id'])
assert created_app.name == 'test app'
assert created_app.user == admin
assert created_app.skip_authorization is False
assert created_app.redirect_uris == ''
assert created_app.client_type == 'confidential'
assert created_app.authorization_grant_type == 'password'
assert created_app.organization == organization
@pytest.mark.django_db
def test_oauth_application_update(oauth_application, patch, admin, alice):
def test_oauth_application_update(oauth_application, organization, patch, admin, alice):
patch(
reverse('api:o_auth2_application_detail', kwargs={'pk': oauth_application.pk}), {
'name': 'Test app with immutable grant type and user',
'organization': organization.pk,
'redirect_uris': 'http://localhost/api/',
'authorization_grant_type': 'implicit',
'skip_authorization': True,
'user': alice.pk,
}, admin, expect=200
)
updated_app = Application.objects.get(client_id=oauth_application.client_id)
@@ -64,7 +62,7 @@ def test_oauth_application_update(oauth_application, patch, admin, alice):
assert updated_app.redirect_uris == 'http://localhost/api/'
assert updated_app.skip_authorization is True
assert updated_app.authorization_grant_type == 'password'
assert updated_app.user == admin
assert updated_app.organization == organization
@pytest.mark.django_db

View File

@@ -131,7 +131,7 @@ def test_organization_inventory_list(organization, inventory_factory, get, alice
assert get(reverse('api:organization_inventories_list', kwargs={'pk': organization.id}), user=alice).data['count'] == 2
assert get(reverse('api:organization_inventories_list', kwargs={'pk': organization.id}), user=bob).data['count'] == 1
get(reverse('api:organization_inventories_list', kwargs={'pk': organization.id}), user=rando, expect=403)
@pytest.mark.django_db
@mock.patch('awx.api.views.feature_enabled', lambda feature: True)

View File

@@ -12,105 +12,203 @@ from awx.api.versioning import reverse
@pytest.mark.django_db
class TestOAuthApplication:
class TestOAuth2Application:
@pytest.mark.parametrize("user_for_access, can_access_list", [
(0, [True, True]),
(1, [True, True]),
(2, [True, True]),
(3, [False, False]),
])
def test_can_read(
self, admin, org_admin, org_member, alice, user_for_access, can_access_list, organization
):
user_list = [admin, org_admin, org_member, alice]
access = OAuth2ApplicationAccess(user_list[user_for_access])
app_creation_user_list = [admin, org_admin]
for user, can_access in zip(app_creation_user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(user.username), user=user,
client_type='confidential', authorization_grant_type='password', organization=organization
)
assert access.can_read(app) is can_access
def test_can_edit_delete_app_org_admin(
self, admin, org_admin, org_member, alice, organization
):
user_list = [admin, org_admin, org_member, alice]
can_access_list = [True, True, False, False]
for user, can_access in zip(user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(org_admin.username), user=org_admin,
client_type='confidential', authorization_grant_type='password', organization=organization
)
access = OAuth2ApplicationAccess(user)
assert access.can_change(app, {}) is can_access
assert access.can_delete(app) is can_access
def test_can_edit_delete_app_admin(
self, admin, org_admin, org_member, alice, organization
):
user_list = [admin, org_admin, org_member, alice]
can_access_list = [True, True, False, False]
for user, can_access in zip(user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(admin.username), user=admin,
client_type='confidential', authorization_grant_type='password', organization=organization
)
access = OAuth2ApplicationAccess(user)
assert access.can_change(app, {}) is can_access
assert access.can_delete(app) is can_access
@pytest.mark.parametrize("user_for_access, can_access_list", [
(0, [True, True, True, True]),
(1, [False, True, True, False]),
(2, [False, False, True, False]),
(3, [False, False, False, True]),
])
def test_can_read_change_delete(
self, admin, org_admin, org_member, alice, user_for_access, can_access_list
):
user_list = [admin, org_admin, org_member, alice]
access = OAuth2ApplicationAccess(user_list[user_for_access])
for user, can_access in zip(user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(user.username), user=user,
client_type='confidential', authorization_grant_type='password'
)
assert access.can_read(app) is can_access
assert access.can_change(app, {}) is can_access
assert access.can_delete(app) is can_access
def test_superuser_can_always_create(self, admin, org_admin, org_member, alice):
access = OAuth2ApplicationAccess(admin)
for user in [admin, org_admin, org_member, alice]:
assert access.can_add({
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
'authorization_grant_type': 'password'
})
def test_normal_user_cannot_create(self, admin, org_admin, org_member, alice):
for access_user in [org_member, alice]:
access = OAuth2ApplicationAccess(access_user)
def test_superuser_can_always_create(self, admin, org_admin, org_member, alice):
access = OAuth2ApplicationAccess(admin)
for user in [admin, org_admin, org_member, alice]:
assert not access.can_add({
assert access.can_add({
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
'authorization_grant_type': 'password'
'authorization_grant_type': 'password', 'organization': 1
})
def test_org_admin_can_create_in_org(self, admin, org_admin, org_member, alice):
access = OAuth2ApplicationAccess(org_admin)
for user in [admin, alice]:
assert not access.can_add({
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
'authorization_grant_type': 'password'
})
for user in [org_admin, org_member]:
assert access.can_add({
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
'authorization_grant_type': 'password'
})
def test_normal_user_cannot_create(self, admin, org_admin, org_member, alice):
for access_user in [org_member, alice]:
access = OAuth2ApplicationAccess(access_user)
for user in [admin, org_admin, org_member, alice]:
assert not access.can_add({
'name': 'test app', 'user': user.pk, 'client_type': 'confidential',
'authorization_grant_type': 'password', 'organization': 1
})
@pytest.mark.skip(reason="Needs Update - CA")
@pytest.mark.django_db
class TestOAuthToken:
@pytest.mark.parametrize("user_for_access, can_access_list", [
(0, [True, True, True, True]),
(1, [False, True, True, False]),
(2, [False, False, True, False]),
(3, [False, False, False, True]),
])
def test_can_read_change_delete(
self, post, admin, org_admin, org_member, alice, user_for_access, can_access_list
class TestOAuth2Token:
def test_can_read_change_delete_app_token(
self, post, admin, org_admin, org_member, alice, organization
):
user_list = [admin, org_admin, org_member, alice]
access = OAuth2TokenAccess(user_list[user_for_access])
can_access_list = [True, True, False, False]
app = Application.objects.create(
name='test app for {}'.format(admin.username), user=admin,
client_type='confidential', authorization_grant_type='password',
organization=organization
)
response = post(
reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}),
{'scope': 'read'}, admin, expect=201
)
for user, can_access in zip(user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(user.username), user=user,
client_type='confidential', authorization_grant_type='password'
)
response = post(
reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}),
{'scope': 'read'}, admin, expect=201
)
token = AccessToken.objects.get(token=response.data['token'])
assert access.can_read(token) is can_access # TODO: fix this test
access = OAuth2TokenAccess(user)
assert access.can_read(token) is can_access
assert access.can_change(token, {}) is can_access
assert access.can_delete(token) is can_access
def test_auditor_can_read(
self, post, admin, org_admin, org_member, alice, system_auditor, organization
):
user_list = [admin, org_admin, org_member]
can_access_list = [True, True, True]
cannot_access_list = [False, False, False]
app = Application.objects.create(
name='test app for {}'.format(admin.username), user=admin,
client_type='confidential', authorization_grant_type='password',
organization=organization
)
for user, can_access, cannot_access in zip(user_list, can_access_list, cannot_access_list):
response = post(
reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}),
{'scope': 'read'}, user, expect=201
)
token = AccessToken.objects.get(token=response.data['token'])
access = OAuth2TokenAccess(system_auditor)
assert access.can_read(token) is can_access
assert access.can_change(token, {}) is cannot_access
assert access.can_delete(token) is cannot_access
def test_user_auditor_can_change(
self, post, org_member, org_admin, system_auditor, organization
):
app = Application.objects.create(
name='test app for {}'.format(org_admin.username), user=org_admin,
client_type='confidential', authorization_grant_type='password',
organization=organization
)
response = post(
reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}),
{'scope': 'read'}, org_member, expect=201
)
token = AccessToken.objects.get(token=response.data['token'])
access = OAuth2TokenAccess(system_auditor)
assert access.can_read(token) is True
assert access.can_change(token, {}) is False
assert access.can_delete(token) is False
dual_user = system_auditor
organization.admin_role.members.add(dual_user)
access = OAuth2TokenAccess(dual_user)
assert access.can_read(token) is True
assert access.can_change(token, {}) is True
assert access.can_delete(token) is True
def test_can_read_change_delete_personal_token_org_member(
self, post, admin, org_admin, org_member, alice
):
# Tests who can read a token created by an org-member
user_list = [admin, org_admin, org_member, alice]
can_access_list = [True, False, True, False]
response = post(
reverse('api:o_auth2_personal_token_list', kwargs={'pk': org_member.pk}),
{'scope': 'read'}, org_member, expect=201
)
token = AccessToken.objects.get(token=response.data['token'])
for user, can_access in zip(user_list, can_access_list):
access = OAuth2TokenAccess(user)
assert access.can_read(token) is can_access
assert access.can_change(token, {}) is can_access
assert access.can_delete(token) is can_access
def test_can_read_personal_token_creator(
self, post, admin, org_admin, org_member, alice
):
# Tests the token's creator can read their tokens
user_list = [admin, org_admin, org_member, alice]
can_access_list = [True, True, True, True]
for user, can_access in zip(user_list, can_access_list):
response = post(
reverse('api:o_auth2_personal_token_list', kwargs={'pk': user.pk}),
{'scope': 'read', 'application':None}, user, expect=201
)
token = AccessToken.objects.get(token=response.data['token'])
access = OAuth2TokenAccess(user)
assert access.can_read(token) is can_access
assert access.can_change(token, {}) is can_access
assert access.can_delete(token) is can_access
@pytest.mark.parametrize("user_for_access, can_access_list", [
(0, [True, True, True, True]),
(1, [False, True, True, False]),
(2, [False, False, True, False]),
(3, [False, False, False, True]),
(0, [True, True]),
(1, [True, True]),
(2, [True, True]),
(3, [False, False]),
])
def test_can_create(
self, post, admin, org_admin, org_member, alice, user_for_access, can_access_list
self, post, admin, org_admin, org_member, alice, user_for_access, can_access_list, organization
):
user_list = [admin, org_admin, org_member, alice]
for user, can_access in zip(user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(user.username), user=user,
client_type='confidential', authorization_grant_type='password'
client_type='confidential', authorization_grant_type='password', organization=organization
)
post(
reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}),
{'scope': 'read'}, user_list[user_for_access], expect=201 if can_access else 403
)