remove admin_role for users

This commit is contained in:
AlanCoding
2018-04-09 11:49:36 -04:00
parent a2cc357f21
commit 4995ee7a60
10 changed files with 44 additions and 203 deletions

View File

@@ -1683,14 +1683,8 @@ class UserRolesList(SubListAttachDetachAPIView):
if not sub_id: if not sub_id:
return super(UserRolesList, self).post(request) return super(UserRolesList, self).post(request)
if sub_id == self.request.user.admin_role.pk:
raise PermissionDenied(_('You may not perform any action with your own admin_role.'))
user = get_object_or_400(User, pk=self.kwargs['pk']) user = get_object_or_400(User, pk=self.kwargs['pk'])
role = get_object_or_400(Role, pk=sub_id) role = get_object_or_400(Role, pk=sub_id)
user_content_type = ContentType.objects.get_for_model(User)
if role.content_type == user_content_type:
raise PermissionDenied(_('You may not change the membership of a users admin_role'))
credential_content_type = ContentType.objects.get_for_model(Credential) credential_content_type = ContentType.objects.get_for_model(Credential)
if role.content_type == credential_content_type: if role.content_type == credential_content_type:
@@ -4909,12 +4903,6 @@ class RoleUsersList(SubListAttachDetachAPIView):
user = get_object_or_400(User, pk=sub_id) user = get_object_or_400(User, pk=sub_id)
role = self.get_parent_object() role = self.get_parent_object()
if role == self.request.user.admin_role:
raise PermissionDenied(_('You may not perform any action with your own admin_role.'))
user_content_type = ContentType.objects.get_for_model(User)
if role.content_type == user_content_type:
raise PermissionDenied(_('You may not change the membership of a users admin_role'))
credential_content_type = ContentType.objects.get_for_model(Credential) credential_content_type = ContentType.objects.get_for_model(Credential)
if role.content_type == credential_content_type: if role.content_type == credential_content_type:

View File

@@ -33,8 +33,7 @@ from awx.main.models.mixins import ResourceMixin
from awx.conf.license import LicenseForbids, feature_enabled from awx.conf.license import LicenseForbids, feature_enabled
__all__ = ['get_user_queryset', 'check_user_access', 'check_user_access_with_errors', __all__ = ['get_user_queryset', 'check_user_access', 'check_user_access_with_errors',
'user_accessible_objects', 'consumer_access', 'user_accessible_objects', 'consumer_access',]
'user_admin_role',]
logger = logging.getLogger('awx.main.access') logger = logging.getLogger('awx.main.access')
@@ -78,18 +77,6 @@ def register_access(model_class, access_class):
access_registry[model_class] = access_class access_registry[model_class] = access_class
@property
def user_admin_role(self):
role = Role.objects.get(
content_type=ContentType.objects.get_for_model(User),
object_id=self.id,
role_field='admin_role'
)
# Trick the user.admin_role so that the signal filtering for RBAC activity stream works as intended.
role.parents = [org.admin_role.pk for org in self.organizations]
return role
def user_accessible_objects(user, role_name): def user_accessible_objects(user, role_name):
return ResourceMixin._accessible_objects(User, user, role_name) return ResourceMixin._accessible_objects(User, user, role_name)

View File

@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.11 on 2018-04-02 19:18
from __future__ import unicode_literals
from django.db import migrations
from awx.main.migrations import ActivityStreamDisabledMigration
from awx.main.migrations._rbac import delete_all_user_roles, rebuild_role_hierarchy
from awx.main.migrations import _migration_utils as migration_utils
class Migration(ActivityStreamDisabledMigration):
dependencies = [
('main', '0031_v330_oauth_help_text'),
]
operations = [
migrations.RunPython(migration_utils.set_current_apps_for_migrations),
migrations.RunPython(delete_all_user_roles),
migrations.RunPython(rebuild_role_hierarchy),
]

View File

@@ -500,3 +500,12 @@ def infer_credential_org_from_team(apps, schema_editor):
_update_credential_parents(cred.deprecated_team.organization, cred) _update_credential_parents(cred.deprecated_team.organization, cred)
except IntegrityError: except IntegrityError:
logger.info("Organization<{}> credential for old Team<{}> credential already created".format(cred.deprecated_team.organization.pk, cred.pk)) logger.info("Organization<{}> credential for old Team<{}> credential already created".format(cred.deprecated_team.organization.pk, cred.pk))
def delete_all_user_roles(apps, schema_editor):
ContentType = apps.get_model('contenttypes', "ContentType")
Role = apps.get_model('main', "Role")
User = apps.get_model('auth', "User")
user_content_type = ContentType.objects.get_for_model(User)
for role in Role.objects.filter(content_type=user_content_type).iterator():
role.delete()

View File

@@ -56,7 +56,6 @@ User.add_to_class('get_queryset', get_user_queryset)
User.add_to_class('can_access', check_user_access) User.add_to_class('can_access', check_user_access)
User.add_to_class('can_access_with_errors', check_user_access_with_errors) User.add_to_class('can_access_with_errors', check_user_access_with_errors)
User.add_to_class('accessible_objects', user_accessible_objects) User.add_to_class('accessible_objects', user_accessible_objects)
User.add_to_class('admin_role', user_admin_role)
@property @property

View File

@@ -171,46 +171,6 @@ def sync_superuser_status_to_rbac(instance, **kwargs):
Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).members.remove(instance) Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).members.remove(instance)
def create_user_role(instance, **kwargs):
if not kwargs.get('created', True):
return
try:
Role.objects.get(
content_type=ContentType.objects.get_for_model(instance),
object_id=instance.id,
role_field='admin_role'
)
except Role.DoesNotExist:
role = Role.objects.create(
role_field='admin_role',
content_object = instance,
)
role.members.add(instance)
def delete_user_role(instance, **kwargs):
if instance and instance.admin_role:
instance.admin_role.delete()
else:
logger.info(six.text_type("Could not delete the admin role for user {}").format(instance))
def org_admin_edit_members(instance, action, model, reverse, pk_set, **kwargs):
content_type = ContentType.objects.get_for_model(Organization)
if reverse:
return
else:
if instance.content_type == content_type and \
instance.content_object.member_role.id == instance.id:
items = model.objects.filter(pk__in=pk_set).all()
for user in items:
if action == 'post_add':
instance.content_object.admin_role.children.add(user.admin_role)
if action == 'pre_remove':
instance.content_object.admin_role.children.remove(user.admin_role)
def rbac_activity_stream(instance, sender, **kwargs): def rbac_activity_stream(instance, sender, **kwargs):
user_type = ContentType.objects.get_for_model(User) user_type = ContentType.objects.get_for_model(User)
# Only if we are associating/disassociating # Only if we are associating/disassociating
@@ -289,12 +249,9 @@ post_save.connect(emit_project_update_event_detail, sender=ProjectUpdateEvent)
post_save.connect(emit_inventory_update_event_detail, sender=InventoryUpdateEvent) post_save.connect(emit_inventory_update_event_detail, sender=InventoryUpdateEvent)
post_save.connect(emit_system_job_event_detail, sender=SystemJobEvent) post_save.connect(emit_system_job_event_detail, sender=SystemJobEvent)
m2m_changed.connect(rebuild_role_ancestor_list, Role.parents.through) m2m_changed.connect(rebuild_role_ancestor_list, Role.parents.through)
m2m_changed.connect(org_admin_edit_members, Role.members.through)
m2m_changed.connect(rbac_activity_stream, Role.members.through) m2m_changed.connect(rbac_activity_stream, Role.members.through)
m2m_changed.connect(rbac_activity_stream, Role.parents.through) m2m_changed.connect(rbac_activity_stream, Role.parents.through)
post_save.connect(sync_superuser_status_to_rbac, sender=User) post_save.connect(sync_superuser_status_to_rbac, sender=User)
post_save.connect(create_user_role, sender=User)
pre_delete.connect(delete_user_role, sender=User)
pre_delete.connect(cleanup_detached_labels_on_deleted_parent, sender=UnifiedJob) pre_delete.connect(cleanup_detached_labels_on_deleted_parent, sender=UnifiedJob)
pre_delete.connect(cleanup_detached_labels_on_deleted_parent, sender=UnifiedJobTemplate) pre_delete.connect(cleanup_detached_labels_on_deleted_parent, sender=UnifiedJobTemplate)

View File

@@ -196,12 +196,6 @@ class TestAccessListCapabilities:
direct_access_list = response.data['results'][0]['summary_fields']['direct_access'] direct_access_list = response.data['results'][0]['summary_fields']['direct_access']
assert direct_access_list[0]['role']['user_capabilities']['unattach'] == 'foobar' assert direct_access_list[0]['role']['user_capabilities']['unattach'] == 'foobar'
def test_user_access_list_direct_access_capability(self, rando, get):
"When a user views their own access list, they cannot unattach their admin role"
response = get(reverse('api:user_access_list', kwargs={'pk': rando.id}), rando)
direct_access_list = response.data['results'][0]['summary_fields']['direct_access']
assert not direct_access_list[0]['role']['user_capabilities']['unattach']
@pytest.mark.django_db @pytest.mark.django_db
def test_team_roles_unattach(mocker, team, team_member, inventory, mock_access_method, get): def test_team_roles_unattach(mocker, team, team_member, inventory, mock_access_method, get):

View File

@@ -56,7 +56,6 @@ def test_get_roles_list_user(organization, inventory, team, get, user):
assert Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).id in role_hash assert Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).id in role_hash
assert organization.admin_role.id in role_hash assert organization.admin_role.id in role_hash
assert organization.member_role.id in role_hash assert organization.member_role.id in role_hash
assert this_user.admin_role.id in role_hash
assert custom_role.id in role_hash assert custom_role.id in role_hash
assert inventory.admin_role.id not in role_hash assert inventory.admin_role.id not in role_hash
@@ -99,12 +98,12 @@ def test_cant_create_role(post, admin):
@pytest.mark.django_db @pytest.mark.django_db
def test_cant_delete_role(delete, admin): def test_cant_delete_role(delete, admin, inventory):
"Ensure we can't delete roles through the api" "Ensure we can't delete roles through the api"
# Some day we might want to do this, but until that is speced out, lets # Some day we might want to do this, but until that is speced out, lets
# ensure we don't slip up and allow this implicitly through some helper or # ensure we don't slip up and allow this implicitly through some helper or
# another # another
response = delete(reverse('api:role_detail', kwargs={'pk': admin.admin_role.id}), admin) response = delete(reverse('api:role_detail', kwargs={'pk': inventory.admin_role.id}), admin)
assert response.status_code == 405 assert response.status_code == 405

View File

@@ -1,10 +1,9 @@
import pytest import pytest
from django.test import TransactionTestCase from django.test import TransactionTestCase
from django.contrib.contenttypes.models import ContentType
from awx.main.access import UserAccess from awx.main.access import UserAccess
from awx.main.models import User, Organization, Inventory, Role from awx.main.models import User, Organization, Inventory
@pytest.mark.django_db @pytest.mark.django_db
@@ -62,66 +61,21 @@ def test_user_queryset(user):
@pytest.mark.django_db @pytest.mark.django_db
def test_user_accessible_objects(user, organization): def test_user_accessible_objects(user, organization):
'''
We cannot directly use accessible_objects for User model because
both editing and read permissions are obligated to complex business logic
'''
admin = user('admin', False) admin = user('admin', False)
u = user('john', False) u = user('john', False)
assert User.accessible_objects(admin, 'admin_role').count() == 1 access = UserAccess(admin)
assert access.get_queryset().count() == 1 # can only see himself
organization.member_role.members.add(u) organization.member_role.members.add(u)
organization.admin_role.members.add(admin) organization.member_role.members.add(admin)
assert User.accessible_objects(admin, 'admin_role').count() == 2 assert access.get_queryset().count() == 2
organization.member_role.members.remove(u) organization.member_role.members.remove(u)
assert User.accessible_objects(admin, 'admin_role').count() == 1 assert access.get_queryset().count() == 1
@pytest.mark.django_db
def test_org_user_admin(user, organization):
admin = user('orgadmin')
member = user('orgmember')
organization.member_role.members.add(member)
assert admin not in member.admin_role
organization.admin_role.members.add(admin)
assert admin in member.admin_role
organization.admin_role.members.remove(admin)
assert admin not in member.admin_role
@pytest.mark.django_db
def test_org_user_removed(user, organization):
admin = user('orgadmin')
member = user('orgmember')
organization.admin_role.members.add(admin)
organization.member_role.members.add(member)
assert admin in member.admin_role
organization.member_role.members.remove(member)
assert admin not in member.admin_role
@pytest.mark.django_db
def test_create_user_role(rando):
assert Role.objects.filter(
role_field='admin_role',
content_type=ContentType.objects.get_for_model(User),
object_id=rando.id
).count() == 1
assert rando in rando.admin_role
@pytest.mark.django_db
def test_user_role_deleted(rando):
rando_id = rando.id
rando.delete()
assert not Role.objects.filter(
role_field='admin_role',
content_type=ContentType.objects.get_for_model(User),
object_id=rando_id
)
@pytest.mark.django_db @pytest.mark.django_db

View File

@@ -1,7 +1,4 @@
import mock import mock
from mock import PropertyMock
import pytest
from rest_framework.test import APIRequestFactory from rest_framework.test import APIRequestFactory
from rest_framework.test import force_authenticate from rest_framework.test import force_authenticate
@@ -9,8 +6,6 @@ from rest_framework.test import force_authenticate
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from awx.api.views import ( from awx.api.views import (
RoleUsersList,
UserRolesList,
TeamRolesList, TeamRolesList,
) )
@@ -20,69 +15,6 @@ from awx.main.models import (
) )
@pytest.mark.parametrize("pk, err", [
(111, "not change the membership"),
(1, "may not perform"),
])
def test_user_roles_list_user_admin_role(pk, err):
with mock.patch('awx.api.views.get_object_or_400') as role_get, \
mock.patch('awx.api.views.ContentType.objects.get_for_model') as ct_get:
role_mock = mock.MagicMock(spec=Role, id=1, pk=1)
content_type_mock = mock.MagicMock(spec=ContentType)
role_mock.content_type = content_type_mock
role_get.return_value = role_mock
ct_get.return_value = content_type_mock
with mock.patch('awx.api.views.User.admin_role', new_callable=PropertyMock, return_value=role_mock):
factory = APIRequestFactory()
view = UserRolesList.as_view()
user = User(username="root", is_superuser=True, pk=1, id=1)
request = factory.post("/user/1/roles", {'id':pk}, format="json")
force_authenticate(request, user)
response = view(request, pk=user.pk)
response.render()
assert response.status_code == 403
assert err in response.content
@pytest.mark.parametrize("admin_role, err", [
(True, "may not perform"),
(False, "not change the membership"),
])
def test_role_users_list_other_user_admin_role(admin_role, err):
with mock.patch('awx.api.views.RoleUsersList.get_parent_object') as role_get, \
mock.patch('awx.api.views.ContentType.objects.get_for_model') as ct_get:
role_mock = mock.MagicMock(spec=Role, id=1)
content_type_mock = mock.MagicMock(spec=ContentType)
role_mock.content_type = content_type_mock
role_get.return_value = role_mock
ct_get.return_value = content_type_mock
user_admin_role = role_mock if admin_role else None
with mock.patch('awx.api.views.User.admin_role', new_callable=PropertyMock, return_value=user_admin_role):
factory = APIRequestFactory()
view = RoleUsersList.as_view()
user = User(username="root", is_superuser=True, pk=1, id=1)
queried_user = User(username="maynard")
request = factory.post("/role/1/users", {'id':1}, format="json")
force_authenticate(request, user)
with mock.patch('awx.api.views.get_object_or_400', return_value=queried_user):
response = view(request)
response.render()
assert response.status_code == 403
assert err in response.content
def test_team_roles_list_post_org_roles(): def test_team_roles_list_post_org_roles():
with mock.patch('awx.api.views.get_object_or_400') as role_get, \ with mock.patch('awx.api.views.get_object_or_400') as role_get, \
mock.patch('awx.api.views.ContentType.objects.get_for_model') as ct_get: mock.patch('awx.api.views.ContentType.objects.get_for_model') as ct_get: