diff --git a/awx/main/access.py b/awx/main/access.py index e5a3f76eed..565a9be47f 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -1194,7 +1194,44 @@ class NotifierAccess(BaseAccess): model = Notifier def get_queryset(self): - return self.model.objects.distinct().all() + qs = self.model.objects.all() + if self.user.is_superuser: + return qs + return self.model.objects.filter(organization__in=Organization.accessible_objects(self.user, ALL_PERMISSIONS).all()) + + def can_read(self, obj): + if self.user.is_superuser: + return True + if obj.organization is not None: + return obj.organization.accessible_by(self.user, ALL_PERMISSIONS) + return False + + def can_add(self, data): + if self.user.is_superuser: + return True + if not data: + return Organization.accessible_objects(self.user, ALL_PERMISSIONS).exists() + org_pk = get_pk_from_dict(data, 'organization') + org = get_object_or_400(Organization, pk=org_pk) + return org.accessible_by(self.user, ALL_PERMISSIONS) + + def can_change(self, obj, data): + if self.user.is_superuser: + return True + org_pk = get_pk_from_dict(data, 'organization') + if obj and org_pk and obj.organization.pk != org_pk: + org = get_object_or_400(Organization, pk=org_pk) + if not org.accessible_by(self.user, ALL_PERMISSIONS): + return False + if obj.organization is not None: + return obj.organization.accessible_by(self.user, ALL_PERMISSIONS) + return False + + def can_admin(self, obj, data): + return self.can_change(obj, data) + + def can_delete(self, obj): + return self.can_change(obj, None) class NotificationAccess(BaseAccess): ''' @@ -1203,7 +1240,16 @@ class NotificationAccess(BaseAccess): model = Notification def get_queryset(self): - return self.model.objects.distinct().all() + qs = self.model.objects.all() + if self.user.is_superuser: + return qs + return self.model.objects.filter(notifier__organization__in=Organization.accessible_objects(self.user, ALL_PERMISSIONS)) + + def can_read(self, obj): + return self.user.can_access(Notifier, 'read', obj.notifier) + + def can_delete(self, obj): + return self.user.can_access(Notifier, 'delete', obj.notifier) class LabelAccess(BaseAccess): ''' diff --git a/awx/main/tests/functional/conftest.py b/awx/main/tests/functional/conftest.py index 975e7841c9..593f3e420b 100644 --- a/awx/main/tests/functional/conftest.py +++ b/awx/main/tests/functional/conftest.py @@ -36,6 +36,7 @@ from awx.main.models.organization import ( ) from awx.main.models.rbac import Role +from awx.main.models.notifications import Notifier ''' Disable all django model signals. @@ -174,6 +175,14 @@ def inventory(organization): def label(organization): return organization.labels.create(name="test-label", description="test-label-desc") +@pytest.fixture +def notifier(organization): + return Notifier.objects.create(name='test-notifier', + organization=organization, + notification_type="webhook", + notification_configuration=dict(url="http://localhost", + headers={"Test": "Header"})) + @pytest.fixture def role(): return Role.objects.create(name='role') @@ -255,6 +264,20 @@ def permissions(): 'update':False, 'delete':False, 'scm_update':False, 'execute':False, 'use':True,}, } +@pytest.fixture +def notifier_factory(organization): + def n(name="test-notifier"): + try: + notifier = Notifier.objects.get(name=name) + except Notifier.DoesNotExist: + notifier = Notifier(name=name, + organization=organization, + notification_type="webhook", + notification_configuration=dict(url="http://localhost", + headers={"Test": "Header"})) + notifier.save() + return notifier + return n @pytest.fixture def post(): diff --git a/awx/main/tests/functional/test_notifications.py b/awx/main/tests/functional/test_notifications.py index dd85ffcf4b..9b966d6c07 100644 --- a/awx/main/tests/functional/test_notifications.py +++ b/awx/main/tests/functional/test_notifications.py @@ -7,13 +7,6 @@ from awx.main.models.jobs import JobTemplate from django.core.urlresolvers import reverse -@pytest.fixture -def notifier(): - return Notifier.objects.create(name="test-notification", - notification_type="webhook", - notification_configuration=dict(url="http://localhost", - headers={"Test": "Header"})) - @pytest.mark.django_db def test_get_notifier_list(get, user, notifier): url = reverse('api:notifier_list') diff --git a/awx/main/tests/functional/test_rbac_notifications.py b/awx/main/tests/functional/test_rbac_notifications.py new file mode 100644 index 0000000000..6d767ada4f --- /dev/null +++ b/awx/main/tests/functional/test_rbac_notifications.py @@ -0,0 +1,68 @@ +import pytest + +from awx.main.access import NotifierAccess + +@pytest.mark.django_db +def test_notifier_get_queryset_orgmember(notifier, user): + access = NotifierAccess(user('user', False)) + notifier.organization.member_role.members.add(user('user', False)) + assert access.get_queryset().count() == 0 + +@pytest.mark.django_db +def test_notifier_get_queryset_nonorgmember(notifier, user): + access = NotifierAccess(user('user', False)) + assert access.get_queryset().count() == 0 + +@pytest.mark.django_db +def test_notifier_get_queryset_su(notifier, user): + access = NotifierAccess(user('user', True)) + assert access.get_queryset().count() == 1 + +@pytest.mark.django_db +def test_notifier_get_queryset_orgadmin(notifier, user): + access = NotifierAccess(user('admin', False)) + notifier.organization.admin_role.members.add(user('admin', False)) + assert access.get_queryset().count() == 1 + +@pytest.mark.django_db +def test_notifier_access_superuser(notifier, user, notifier_factory): + access = NotifierAccess(user('admin', True)) + assert access.can_read(notifier) + assert access.can_change(notifier, None) + assert access.can_delete(notifier) + nf = notifier_factory("test-orphaned") + nf.organization = None + nf.save() + assert access.can_read(nf) + assert access.can_change(nf, None) + assert access.can_delete(nf) + +@pytest.mark.django_db +def test_notifier_access_admin(notifier, user, organization_factory, notifier_factory): + adm = user('admin', False) + other_org = organization_factory('other') + present_org = organization_factory('present') + notifier.organization.admin_role.members.add(adm) + present_org.admin_role.members.add(adm) + + access = NotifierAccess(user('admin', False)) + assert not access.can_change(notifier, {'organization': other_org.id}) + assert access.can_read(notifier) + assert access.can_change(notifier, None) + assert access.can_change(notifier, {'organization': present_org.id}) + assert access.can_delete(notifier) + nf = notifier_factory("test-orphaned") + nf.organization = None + nf.save() + assert not access.can_read(nf) + assert not access.can_change(nf, None) + assert not access.can_delete(nf) + +@pytest.mark.django_db +def test_notifier_access_org_user(notifier, user): + u = user('normal', False) + notifier.organization.member_role.members.add(u) + access = NotifierAccess(user('normal', False)) + assert not access.can_read(notifier) + assert not access.can_change(notifier, None) + assert not access.can_delete(notifier)