diff --git a/awx/main/signals.py b/awx/main/signals.py index a8eefe2839..1d74fcd5e1 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -153,6 +153,80 @@ def org_admin_edit_members(instance, action, model, reverse, pk_set, **kwargs): if action == 'pre_remove': instance.content_object.admin_role.children.remove(user.admin_role) +def grant_host_access_to_group_roles(instance, action, model, reverse, pk_set, **kwargs): + 'Add/remove RolePermission entries for Group roles that contain this host' + + if action == 'post_add': + def grant(host, group): + RolePermission.objects.create(resource=host, role=group.admin_role, auto_generated=True, + create=1, read=1, write=1, delete=1, update=1, + execute=1, scm_update=1, use=1, + ) + RolePermission.objects.create(resource=host, role=group.auditor_role, auto_generated=True, + read=1, + ) + RolePermission.objects.create(resource=host, role=group.updater_role, auto_generated=True, + read=1, write=1, create=1, use=1 + ) + RolePermission.objects.create(resource=host, role=group.executor_role, auto_generated=True, + read=1, execute=1 + ) + + if reverse: + host = instance + for group_id in pk_set: + grant(host, Group.objects.get(id=group_id)) + else: + group = instance + for host_id in pk_set: + grant(Host.objects.get(id=host_id), group) + + if action == 'pre_remove': + host_content_type = ContentType.objects.get_for_model(Host) + def remove_grant(host, group): + RolePermission.objects.filter( + content_type = host_content_type, + object_id = host.id, + auto_generated = True, + role__in = [group.admin_role, group.updater_role, group.auditor_role, group.executor_role] + ).delete() + + if reverse: + host = instance + for group_id in pk_set: + remove_grant(host, Group.objects.get(id=group_id)) + else: + group = instance + for host_id in pk_set: + remove_grant(Host.objects.get(id=host_id), group) + + +def grant_host_access_to_inventory(instance, **kwargs): + 'Add/remove RolePermission entries for the Inventory that contains this host' + host_content_type = ContentType.objects.get_for_model(Host) + inventory_content_type = ContentType.objects.get_for_model(Inventory) + + # Clear out any existing perms.. in case we switched inventory or something + qs = RolePermission.objects.filter( + content_type=host_content_type, + object_id=instance.id, + auto_generated=True, + role__content_type=inventory_content_type + ) + if qs.count() == 1 and qs[0].role.object_id == instance.inventory.id: + # No change + return + qs.delete() + + RolePermission.objects.create( + resource=instance, + role=instance.inventory.admin_role, + auto_generated=True, + create=1, read=1, write=1, delete=1, update=1, + execute=1, scm_update=1, use=1, + ) + + post_save.connect(emit_update_inventory_on_created_or_deleted, sender=Host) post_delete.connect(emit_update_inventory_on_created_or_deleted, sender=Host) post_save.connect(emit_update_inventory_on_created_or_deleted, sender=Group) @@ -169,6 +243,8 @@ post_save.connect(emit_job_event_detail, sender=JobEvent) post_save.connect(emit_ad_hoc_command_event_detail, sender=AdHocCommandEvent) m2m_changed.connect(rebuild_role_ancestor_list, Role.parents.through) m2m_changed.connect(org_admin_edit_members, Role.members.through) +m2m_changed.connect(grant_host_access_to_group_roles, Group.hosts.through) +post_save.connect(grant_host_access_to_inventory, Host) post_save.connect(sync_superuser_status_to_rbac, sender=User) post_save.connect(create_user_role, sender=User) diff --git a/awx/main/tests/functional/test_rbac_inventory.py b/awx/main/tests/functional/test_rbac_inventory.py index 6a22273755..a38faf2643 100644 --- a/awx/main/tests/functional/test_rbac_inventory.py +++ b/awx/main/tests/functional/test_rbac_inventory.py @@ -1,7 +1,7 @@ import pytest from awx.main.migrations import _rbac as rbac -from awx.main.models import Permission +from awx.main.models import Permission, Host from awx.main.access import InventoryAccess from django.apps import apps @@ -232,3 +232,42 @@ def test_access_auditor(organization, inventory, user): assert not access.can_run_ad_hoc_commands(inventory) + +@pytest.mark.django_db +def test_host_access(organization, inventory, user, group): + other_inventory = organization.inventories.create(name='other-inventory') + inventory_admin = user('inventory_admin', False) + my_group = group('my-group') + not_my_group = group('not-my-group') + group_admin = user('group_admin', False) + + + h1 = Host.objects.create(inventory=inventory, name='host1') + h2 = Host.objects.create(inventory=inventory, name='host2') + h1.groups.add(my_group) + h2.groups.add(not_my_group) + + assert h1.accessible_by(inventory_admin, {'read': True}) is False + assert h1.accessible_by(group_admin, {'read': True}) is False + + inventory.admin_role.members.add(inventory_admin) + my_group.admin_role.members.add(group_admin) + + assert h1.accessible_by(inventory_admin, {'read': True}) + assert h2.accessible_by(inventory_admin, {'read': True}) + assert h1.accessible_by(group_admin, {'read': True}) + assert h2.accessible_by(group_admin, {'read': True}) is False + + my_group.hosts.remove(h1) + + assert h1.accessible_by(inventory_admin, {'read': True}) + assert h1.accessible_by(group_admin, {'read': True}) is False + + h1.inventory = other_inventory + h1.save() + + assert h1.accessible_by(inventory_admin, {'read': True}) is False + assert h1.accessible_by(group_admin, {'read': True}) is False + + +