Merge pull request #8833 from AlanCoding/is_superuser

Sync the is_superuser flag when the members relationship changes

Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
softwarefactory-project-zuul[bot]
2020-12-11 20:42:40 +00:00
committed by GitHub
2 changed files with 54 additions and 2 deletions

View File

@@ -121,6 +121,27 @@ def sync_superuser_status_to_rbac(instance, **kwargs):
Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).members.remove(instance) Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).members.remove(instance)
def sync_rbac_to_superuser_status(instance, sender, **kwargs):
'When the is_superuser flag is false but a user has the System Admin role, update the database to reflect that'
if kwargs['action'] in ['post_add', 'post_remove', 'post_clear']:
new_status_value = bool(kwargs['action'] == 'post_add')
if hasattr(instance, 'singleton_name'): # duck typing, role.members.add() vs user.roles.add()
role = instance
if role.singleton_name == ROLE_SINGLETON_SYSTEM_ADMINISTRATOR:
if kwargs['pk_set']:
kwargs['model'].objects.filter(pk__in=kwargs['pk_set']).update(is_superuser=new_status_value)
elif kwargs['action'] == 'post_clear':
kwargs['model'].objects.all().update(is_superuser=False)
else:
user = instance
if kwargs['action'] == 'post_clear':
user.is_superuser = False
user.save(update_fields=['is_superuser'])
elif kwargs['model'].objects.filter(pk__in=kwargs['pk_set'], singleton_name=ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).exists():
user.is_superuser = new_status_value
user.save(update_fields=['is_superuser'])
def rbac_activity_stream(instance, sender, **kwargs): def rbac_activity_stream(instance, sender, **kwargs):
# Only if we are associating/disassociating # Only if we are associating/disassociating
if kwargs['action'] in ['pre_add', 'pre_remove']: if kwargs['action'] in ['pre_add', 'pre_remove']:
@@ -197,6 +218,7 @@ m2m_changed.connect(rebuild_role_ancestor_list, Role.parents.through)
m2m_changed.connect(rbac_activity_stream, Role.members.through) m2m_changed.connect(rbac_activity_stream, Role.members.through)
m2m_changed.connect(rbac_activity_stream, Role.parents.through) m2m_changed.connect(rbac_activity_stream, Role.parents.through)
post_save.connect(sync_superuser_status_to_rbac, sender=User) post_save.connect(sync_superuser_status_to_rbac, sender=User)
m2m_changed.connect(sync_rbac_to_superuser_status, Role.members.through)
pre_delete.connect(cleanup_detached_labels_on_deleted_parent, sender=UnifiedJob) pre_delete.connect(cleanup_detached_labels_on_deleted_parent, sender=UnifiedJob)
pre_delete.connect(cleanup_detached_labels_on_deleted_parent, sender=UnifiedJobTemplate) pre_delete.connect(cleanup_detached_labels_on_deleted_parent, sender=UnifiedJobTemplate)

View File

@@ -4,7 +4,7 @@ from unittest import mock
from django.test import TransactionTestCase from django.test import TransactionTestCase
from awx.main.access import UserAccess, RoleAccess, TeamAccess from awx.main.access import UserAccess, RoleAccess, TeamAccess
from awx.main.models import User, Organization, Inventory from awx.main.models import User, Organization, Inventory, Role
class TestSysAuditorTransactional(TransactionTestCase): class TestSysAuditorTransactional(TransactionTestCase):
@@ -171,3 +171,33 @@ def test_org_admin_cannot_delete_member_attached_to_other_group(org_admin, org_m
other_org.member_role.members.add(org_member) other_org.member_role.members.add(org_member)
assert not access.can_delete(org_member) assert not access.can_delete(org_member)
@pytest.mark.parametrize('reverse', (True, False))
@pytest.mark.django_db
def test_consistency_of_is_superuser_flag(reverse):
users = [User.objects.create(username='rando_{}'.format(i)) for i in range(2)]
for u in users:
assert u.is_superuser is False
system_admin = Role.singleton('system_administrator')
if reverse:
for u in users:
u.roles.add(system_admin)
else:
system_admin.members.add(*[u.id for u in users]) # like .add(42, 54)
for u in users:
u.refresh_from_db()
assert u.is_superuser is True
users[0].roles.clear()
for u in users:
u.refresh_from_db()
assert users[0].is_superuser is False
assert users[1].is_superuser is True
system_admin.members.clear()
for u in users:
u.refresh_from_db()
assert u.is_superuser is False