mirror of
https://github.com/ansible/awx.git
synced 2026-05-15 13:27:40 -02:30
Implement rbac and related unit tests for notifications
This commit is contained in:
@@ -1194,7 +1194,44 @@ class NotifierAccess(BaseAccess):
|
|||||||
model = Notifier
|
model = Notifier
|
||||||
|
|
||||||
def get_queryset(self):
|
def get_queryset(self):
|
||||||
return self.model.objects.distinct().all()
|
qs = self.model.objects.all()
|
||||||
|
if self.user.is_superuser:
|
||||||
|
return qs
|
||||||
|
return self.model.objects.filter(organization__in=Organization.accessible_objects(self.user, ALL_PERMISSIONS).all())
|
||||||
|
|
||||||
|
def can_read(self, obj):
|
||||||
|
if self.user.is_superuser:
|
||||||
|
return True
|
||||||
|
if obj.organization is not None:
|
||||||
|
return obj.organization.accessible_by(self.user, ALL_PERMISSIONS)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def can_add(self, data):
|
||||||
|
if self.user.is_superuser:
|
||||||
|
return True
|
||||||
|
if not data:
|
||||||
|
return Organization.accessible_objects(self.user, ALL_PERMISSIONS).exists()
|
||||||
|
org_pk = get_pk_from_dict(data, 'organization')
|
||||||
|
org = get_object_or_400(Organization, pk=org_pk)
|
||||||
|
return org.accessible_by(self.user, ALL_PERMISSIONS)
|
||||||
|
|
||||||
|
def can_change(self, obj, data):
|
||||||
|
if self.user.is_superuser:
|
||||||
|
return True
|
||||||
|
org_pk = get_pk_from_dict(data, 'organization')
|
||||||
|
if obj and org_pk and obj.organization.pk != org_pk:
|
||||||
|
org = get_object_or_400(Organization, pk=org_pk)
|
||||||
|
if not org.accessible_by(self.user, ALL_PERMISSIONS):
|
||||||
|
return False
|
||||||
|
if obj.organization is not None:
|
||||||
|
return obj.organization.accessible_by(self.user, ALL_PERMISSIONS)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def can_admin(self, obj, data):
|
||||||
|
return self.can_change(obj, data)
|
||||||
|
|
||||||
|
def can_delete(self, obj):
|
||||||
|
return self.can_change(obj, None)
|
||||||
|
|
||||||
class NotificationAccess(BaseAccess):
|
class NotificationAccess(BaseAccess):
|
||||||
'''
|
'''
|
||||||
@@ -1203,7 +1240,16 @@ class NotificationAccess(BaseAccess):
|
|||||||
model = Notification
|
model = Notification
|
||||||
|
|
||||||
def get_queryset(self):
|
def get_queryset(self):
|
||||||
return self.model.objects.distinct().all()
|
qs = self.model.objects.all()
|
||||||
|
if self.user.is_superuser:
|
||||||
|
return qs
|
||||||
|
return self.model.objects.filter(notifier__organization__in=Organization.accessible_objects(self.user, ALL_PERMISSIONS))
|
||||||
|
|
||||||
|
def can_read(self, obj):
|
||||||
|
return self.user.can_access(Notifier, 'read', obj.notifier)
|
||||||
|
|
||||||
|
def can_delete(self, obj):
|
||||||
|
return self.user.can_access(Notifier, 'delete', obj.notifier)
|
||||||
|
|
||||||
class LabelAccess(BaseAccess):
|
class LabelAccess(BaseAccess):
|
||||||
'''
|
'''
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ from awx.main.models.organization import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
from awx.main.models.rbac import Role
|
from awx.main.models.rbac import Role
|
||||||
|
from awx.main.models.notifications import Notifier
|
||||||
|
|
||||||
'''
|
'''
|
||||||
Disable all django model signals.
|
Disable all django model signals.
|
||||||
@@ -174,6 +175,14 @@ def inventory(organization):
|
|||||||
def label(organization):
|
def label(organization):
|
||||||
return organization.labels.create(name="test-label", description="test-label-desc")
|
return organization.labels.create(name="test-label", description="test-label-desc")
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def notifier(organization):
|
||||||
|
return Notifier.objects.create(name='test-notifier',
|
||||||
|
organization=organization,
|
||||||
|
notification_type="webhook",
|
||||||
|
notification_configuration=dict(url="http://localhost",
|
||||||
|
headers={"Test": "Header"}))
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def role():
|
def role():
|
||||||
return Role.objects.create(name='role')
|
return Role.objects.create(name='role')
|
||||||
@@ -255,6 +264,20 @@ def permissions():
|
|||||||
'update':False, 'delete':False, 'scm_update':False, 'execute':False, 'use':True,},
|
'update':False, 'delete':False, 'scm_update':False, 'execute':False, 'use':True,},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def notifier_factory(organization):
|
||||||
|
def n(name="test-notifier"):
|
||||||
|
try:
|
||||||
|
notifier = Notifier.objects.get(name=name)
|
||||||
|
except Notifier.DoesNotExist:
|
||||||
|
notifier = Notifier(name=name,
|
||||||
|
organization=organization,
|
||||||
|
notification_type="webhook",
|
||||||
|
notification_configuration=dict(url="http://localhost",
|
||||||
|
headers={"Test": "Header"}))
|
||||||
|
notifier.save()
|
||||||
|
return notifier
|
||||||
|
return n
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def post():
|
def post():
|
||||||
|
|||||||
@@ -7,13 +7,6 @@ from awx.main.models.jobs import JobTemplate
|
|||||||
|
|
||||||
from django.core.urlresolvers import reverse
|
from django.core.urlresolvers import reverse
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def notifier():
|
|
||||||
return Notifier.objects.create(name="test-notification",
|
|
||||||
notification_type="webhook",
|
|
||||||
notification_configuration=dict(url="http://localhost",
|
|
||||||
headers={"Test": "Header"}))
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_get_notifier_list(get, user, notifier):
|
def test_get_notifier_list(get, user, notifier):
|
||||||
url = reverse('api:notifier_list')
|
url = reverse('api:notifier_list')
|
||||||
|
|||||||
68
awx/main/tests/functional/test_rbac_notifications.py
Normal file
68
awx/main/tests/functional/test_rbac_notifications.py
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
|
from awx.main.access import NotifierAccess
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_notifier_get_queryset_orgmember(notifier, user):
|
||||||
|
access = NotifierAccess(user('user', False))
|
||||||
|
notifier.organization.member_role.members.add(user('user', False))
|
||||||
|
assert access.get_queryset().count() == 0
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_notifier_get_queryset_nonorgmember(notifier, user):
|
||||||
|
access = NotifierAccess(user('user', False))
|
||||||
|
assert access.get_queryset().count() == 0
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_notifier_get_queryset_su(notifier, user):
|
||||||
|
access = NotifierAccess(user('user', True))
|
||||||
|
assert access.get_queryset().count() == 1
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_notifier_get_queryset_orgadmin(notifier, user):
|
||||||
|
access = NotifierAccess(user('admin', False))
|
||||||
|
notifier.organization.admin_role.members.add(user('admin', False))
|
||||||
|
assert access.get_queryset().count() == 1
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_notifier_access_superuser(notifier, user, notifier_factory):
|
||||||
|
access = NotifierAccess(user('admin', True))
|
||||||
|
assert access.can_read(notifier)
|
||||||
|
assert access.can_change(notifier, None)
|
||||||
|
assert access.can_delete(notifier)
|
||||||
|
nf = notifier_factory("test-orphaned")
|
||||||
|
nf.organization = None
|
||||||
|
nf.save()
|
||||||
|
assert access.can_read(nf)
|
||||||
|
assert access.can_change(nf, None)
|
||||||
|
assert access.can_delete(nf)
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_notifier_access_admin(notifier, user, organization_factory, notifier_factory):
|
||||||
|
adm = user('admin', False)
|
||||||
|
other_org = organization_factory('other')
|
||||||
|
present_org = organization_factory('present')
|
||||||
|
notifier.organization.admin_role.members.add(adm)
|
||||||
|
present_org.admin_role.members.add(adm)
|
||||||
|
|
||||||
|
access = NotifierAccess(user('admin', False))
|
||||||
|
assert not access.can_change(notifier, {'organization': other_org.id})
|
||||||
|
assert access.can_read(notifier)
|
||||||
|
assert access.can_change(notifier, None)
|
||||||
|
assert access.can_change(notifier, {'organization': present_org.id})
|
||||||
|
assert access.can_delete(notifier)
|
||||||
|
nf = notifier_factory("test-orphaned")
|
||||||
|
nf.organization = None
|
||||||
|
nf.save()
|
||||||
|
assert not access.can_read(nf)
|
||||||
|
assert not access.can_change(nf, None)
|
||||||
|
assert not access.can_delete(nf)
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_notifier_access_org_user(notifier, user):
|
||||||
|
u = user('normal', False)
|
||||||
|
notifier.organization.member_role.members.add(u)
|
||||||
|
access = NotifierAccess(user('normal', False))
|
||||||
|
assert not access.can_read(notifier)
|
||||||
|
assert not access.can_change(notifier, None)
|
||||||
|
assert not access.can_delete(notifier)
|
||||||
Reference in New Issue
Block a user