diff --git a/awx/main/signals.py b/awx/main/signals.py index 34d0003ab9..0a29fa9d6c 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -123,10 +123,23 @@ def sync_superuser_status_to_rbac(instance, **kwargs): def sync_rbac_to_superuser_status(instance, sender, **kwargs): 'When the is_superuser flag is false but a user has the System Admin role, update the database to reflect that' - if kwargs['action'] in ['post_add', 'post_remove']: - if instance.singleton_name == ROLE_SINGLETON_SYSTEM_ADMINISTRATOR: - new_status_value = bool(kwargs['action'] == 'post_add') - kwargs['model'].objects.filter(pk__in=kwargs['pk_set']).update(is_superuser=new_status_value) + if kwargs['action'] in ['post_add', 'post_remove', 'post_clear']: + new_status_value = bool(kwargs['action'] == 'post_add') + if hasattr(instance, 'singleton_name'): # duck typing, role.members.add() vs user.roles.add() + role = instance + if role.singleton_name == ROLE_SINGLETON_SYSTEM_ADMINISTRATOR: + if kwargs['pk_set']: + kwargs['model'].objects.filter(pk__in=kwargs['pk_set']).update(is_superuser=new_status_value) + elif kwargs['action'] == 'post_clear': + kwargs['model'].objects.all().update(is_superuser=False) + else: + user = instance + if kwargs['action'] == 'post_clear': + user.is_superuser = False + user.save(update_fields=['is_superuser']) + elif kwargs['model'].objects.filter(pk__in=kwargs['pk_set'], singleton_name=ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).exists(): + user.is_superuser = new_status_value + user.save(update_fields=['is_superuser']) def rbac_activity_stream(instance, sender, **kwargs): diff --git a/awx/main/tests/functional/test_rbac_user.py b/awx/main/tests/functional/test_rbac_user.py index ca3d268b18..b62a0db25f 100644 --- a/awx/main/tests/functional/test_rbac_user.py +++ b/awx/main/tests/functional/test_rbac_user.py @@ -4,7 +4,7 @@ from unittest import mock from django.test import TransactionTestCase from awx.main.access import UserAccess, RoleAccess, TeamAccess -from awx.main.models import User, Organization, Inventory +from awx.main.models import User, Organization, Inventory, Role class TestSysAuditorTransactional(TransactionTestCase): @@ -170,4 +170,34 @@ def test_org_admin_cannot_delete_member_attached_to_other_group(org_admin, org_m access = UserAccess(org_admin) other_org.member_role.members.add(org_member) assert not access.can_delete(org_member) - \ No newline at end of file + + +@pytest.mark.parametrize('reverse', (True, False)) +@pytest.mark.django_db +def test_consistency_of_is_superuser_flag(reverse): + users = [User.objects.create(username='rando_{}'.format(i)) for i in range(2)] + for u in users: + assert u.is_superuser is False + + system_admin = Role.singleton('system_administrator') + if reverse: + for u in users: + u.roles.add(system_admin) + else: + system_admin.members.add(*[u.id for u in users]) # like .add(42, 54) + + for u in users: + u.refresh_from_db() + assert u.is_superuser is True + + users[0].roles.clear() + for u in users: + u.refresh_from_db() + assert users[0].is_superuser is False + assert users[1].is_superuser is True + + system_admin.members.clear() + + for u in users: + u.refresh_from_db() + assert u.is_superuser is False