diff --git a/awx/main/signals.py b/awx/main/signals.py index adfbc65d01..0a29fa9d6c 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -121,6 +121,27 @@ def sync_superuser_status_to_rbac(instance, **kwargs): Role.singleton(ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).members.remove(instance) +def sync_rbac_to_superuser_status(instance, sender, **kwargs): + 'When the is_superuser flag is false but a user has the System Admin role, update the database to reflect that' + if kwargs['action'] in ['post_add', 'post_remove', 'post_clear']: + new_status_value = bool(kwargs['action'] == 'post_add') + if hasattr(instance, 'singleton_name'): # duck typing, role.members.add() vs user.roles.add() + role = instance + if role.singleton_name == ROLE_SINGLETON_SYSTEM_ADMINISTRATOR: + if kwargs['pk_set']: + kwargs['model'].objects.filter(pk__in=kwargs['pk_set']).update(is_superuser=new_status_value) + elif kwargs['action'] == 'post_clear': + kwargs['model'].objects.all().update(is_superuser=False) + else: + user = instance + if kwargs['action'] == 'post_clear': + user.is_superuser = False + user.save(update_fields=['is_superuser']) + elif kwargs['model'].objects.filter(pk__in=kwargs['pk_set'], singleton_name=ROLE_SINGLETON_SYSTEM_ADMINISTRATOR).exists(): + user.is_superuser = new_status_value + user.save(update_fields=['is_superuser']) + + def rbac_activity_stream(instance, sender, **kwargs): # Only if we are associating/disassociating if kwargs['action'] in ['pre_add', 'pre_remove']: @@ -197,6 +218,7 @@ m2m_changed.connect(rebuild_role_ancestor_list, Role.parents.through) m2m_changed.connect(rbac_activity_stream, Role.members.through) m2m_changed.connect(rbac_activity_stream, Role.parents.through) post_save.connect(sync_superuser_status_to_rbac, sender=User) +m2m_changed.connect(sync_rbac_to_superuser_status, Role.members.through) pre_delete.connect(cleanup_detached_labels_on_deleted_parent, sender=UnifiedJob) pre_delete.connect(cleanup_detached_labels_on_deleted_parent, sender=UnifiedJobTemplate) diff --git a/awx/main/tests/functional/test_rbac_user.py b/awx/main/tests/functional/test_rbac_user.py index ca3d268b18..b62a0db25f 100644 --- a/awx/main/tests/functional/test_rbac_user.py +++ b/awx/main/tests/functional/test_rbac_user.py @@ -4,7 +4,7 @@ from unittest import mock from django.test import TransactionTestCase from awx.main.access import UserAccess, RoleAccess, TeamAccess -from awx.main.models import User, Organization, Inventory +from awx.main.models import User, Organization, Inventory, Role class TestSysAuditorTransactional(TransactionTestCase): @@ -170,4 +170,34 @@ def test_org_admin_cannot_delete_member_attached_to_other_group(org_admin, org_m access = UserAccess(org_admin) other_org.member_role.members.add(org_member) assert not access.can_delete(org_member) - \ No newline at end of file + + +@pytest.mark.parametrize('reverse', (True, False)) +@pytest.mark.django_db +def test_consistency_of_is_superuser_flag(reverse): + users = [User.objects.create(username='rando_{}'.format(i)) for i in range(2)] + for u in users: + assert u.is_superuser is False + + system_admin = Role.singleton('system_administrator') + if reverse: + for u in users: + u.roles.add(system_admin) + else: + system_admin.members.add(*[u.id for u in users]) # like .add(42, 54) + + for u in users: + u.refresh_from_db() + assert u.is_superuser is True + + users[0].roles.clear() + for u in users: + u.refresh_from_db() + assert users[0].is_superuser is False + assert users[1].is_superuser is True + + system_admin.members.clear() + + for u in users: + u.refresh_from_db() + assert u.is_superuser is False