diff --git a/awx/main/access.py b/awx/main/access.py index a96d40de40..a875166149 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -393,11 +393,38 @@ class InstanceAccess(BaseAccess): model = Instance + def get_queryset(self): + return Instance.objects.filter(rampart_groups__in=self.user.get_queryset(InstanceGroup)) + + def can_add(self, data): + return False + + def can_change(self, obj, data): + return False + + def can_delete(self, obj): + return False + class InstanceGroupAccess(BaseAccess): model = InstanceGroup + def get_queryset(self): + if self.user.is_superuser or self.user.is_system_auditor: + return InstanceGroup.objects.all() + else: + return InstanceGroup.objects.filter(organization__in=Organization.accessible_objects(self.user, 'admin_role')) + + def can_add(self, data): + return False + + def can_change(self, obj, data): + return False + + def can_delete(self, obj): + return False + class UserAccess(BaseAccess): ''' @@ -523,6 +550,18 @@ class OrganizationAccess(BaseAccess): "active_jobs": active_jobs}) return True + def can_attach(self, obj, sub_obj, relationship, *args, **kwargs): + if relationship == "instance_groups": + if self.user.can_access(type(sub_obj), "read", sub_obj) and self.user in obj.admin_role: + return True + return False + return super(OrganizationAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs) + + def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs): + if relationship == "instance_groups": + return self.can_attach(obj, sub_obj, relationship, *args, **kwargs) + return super(OrganizationAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs) + class InventoryAccess(BaseAccess): ''' @@ -593,6 +632,18 @@ class InventoryAccess(BaseAccess): def can_run_ad_hoc_commands(self, obj): return self.user in obj.adhoc_role + def can_attach(self, obj, sub_obj, relationship, *args, **kwargs): + if relationship == "instance_groups": + if self.user.can_access(type(sub_obj), "read", sub_obj) and self.user in obj.organization.admin_role: + return True + return False + return super(InventoryAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs) + + def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs): + if relationship == "instance_groups": + return self.can_attach(obj, sub_obj, relationship, *args, **kwargs) + return super(InventoryAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs) + class HostAccess(BaseAccess): ''' @@ -1260,9 +1311,19 @@ class JobTemplateAccess(BaseAccess): def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): if isinstance(sub_obj, NotificationTemplate): return self.check_related('organization', Organization, {}, obj=sub_obj, mandatory=True) + if relationship == "instance_groups": + if self.user.can_access(type(sub_obj), "read", sub_obj) and self.user in obj.project.organization.admin_role: + return True + return False return super(JobTemplateAccess, self).can_attach( obj, sub_obj, relationship, data, skip_sub_obj_read_check=skip_sub_obj_read_check) + def can_unattach(self, obj, sub_obj, relationship, *args, **kwargs): + if relationship == "instance_groups": + return self.can_attach(obj, sub_obj, relationship, *args, **kwargs) + return super(InventoryAccess, self).can_attach(obj, sub_obj, relationship, *args, **kwargs) + + class JobAccess(BaseAccess): ''' diff --git a/awx/main/tests/functional/test_rbac_instance_groups.py b/awx/main/tests/functional/test_rbac_instance_groups.py new file mode 100644 index 0000000000..07a6c32c9f --- /dev/null +++ b/awx/main/tests/functional/test_rbac_instance_groups.py @@ -0,0 +1,79 @@ +import pytest + +from awx.main.access import ( + InstanceGroupAccess, + OrganizationAccess, + InventoryAccess, + JobTemplateAccess, +) + + +@pytest.mark.django_db +def test_ig_normal_user_visibility(organization, default_instance_group, user): + u = user('user', False) + assert len(InstanceGroupAccess(u).get_queryset()) == 0 + organization.instance_groups.add(default_instance_group) + organization.member_role.members.add(u) + assert len(InstanceGroupAccess(u).get_queryset()) == 0 + + +@pytest.mark.django_db +def test_ig_admin_user_visibility(organization, default_instance_group, admin, system_auditor, org_admin): + assert len(InstanceGroupAccess(admin).get_queryset()) == 1 + assert len(InstanceGroupAccess(system_auditor).get_queryset()) == 1 + assert len(InstanceGroupAccess(org_admin).get_queryset()) == 0 + organization.instance_groups.add(default_instance_group) + assert len(InstanceGroupAccess(org_admin).get_queryset()) == 1 + + +@pytest.mark.django_db +def test_ig_normal_user_associability(organization, default_instance_group, user): + u = user('user', False) + access = OrganizationAccess(u) + assert not access.can_attach(organization, default_instance_group, 'instance_groups', None) + organization.instance_groups.add(default_instance_group) + organization.member_role.members.add(u) + assert not access.can_attach(organization, default_instance_group, 'instance_groups', None) + + +@pytest.mark.django_db +def test_ig_associability(organization, default_instance_group, admin, system_auditor, org_admin, org_member, job_template_factory): + admin_access = OrganizationAccess(admin) + auditor_access = OrganizationAccess(system_auditor) + oadmin_access = OrganizationAccess(org_admin) + omember_access = OrganizationAccess(org_member) + assert admin_access.can_attach(organization, default_instance_group, 'instance_groups', None) + assert not oadmin_access.can_attach(organization, default_instance_group, 'instance_groups', None) + assert not auditor_access.can_attach(organization, default_instance_group, 'instance_groups', None) + assert not omember_access.can_attach(organization, default_instance_group, 'instance_groups', None) + + organization.instance_groups.add(default_instance_group) + + assert admin_access.can_unattach(organization, default_instance_group, 'instance_groups', None) + assert oadmin_access.can_unattach(organization, default_instance_group, 'instance_groups', None) + assert not auditor_access.can_unattach(organization, default_instance_group, 'instance_groups', None) + assert not omember_access.can_unattach(organization, default_instance_group, 'instance_groups', None) + + objects = job_template_factory('jt', organization=organization, project='p', + inventory='i', credential='c') + admin_access = InventoryAccess(admin) + auditor_access = InventoryAccess(system_auditor) + oadmin_access = InventoryAccess(org_admin) + omember_access = InventoryAccess(org_member) + + assert admin_access.can_attach(objects.inventory, default_instance_group, 'instance_groups', None) + assert oadmin_access.can_attach(objects.inventory, default_instance_group, 'instance_groups', None) + assert not auditor_access.can_attach(objects.inventory, default_instance_group, 'instance_groups', None) + assert not omember_access.can_attach(objects.inventory, default_instance_group, 'instance_groups', None) + + admin_access = JobTemplateAccess(admin) + auditor_access = JobTemplateAccess(system_auditor) + oadmin_access = JobTemplateAccess(org_admin) + omember_access = JobTemplateAccess(org_member) + + assert admin_access.can_attach(objects.job_template, default_instance_group, 'instance_groups', None) + assert oadmin_access.can_attach(objects.job_template, default_instance_group, 'instance_groups', None) + assert not auditor_access.can_attach(objects.job_template, default_instance_group, 'instance_groups', None) + assert not omember_access.can_attach(objects.job_template, default_instance_group, 'instance_groups', None) + +