Merge pull request #1892 from AlanCoding/1364_AS_permission

Access filters in ActivityStream
This commit is contained in:
Alan Rominger 2016-05-18 16:06:38 -04:00
commit 8c1e2e4efc
6 changed files with 147 additions and 84 deletions

View File

@ -1304,6 +1304,26 @@ class ActivityStreamAccess(BaseAccess):
model = ActivityStream
def get_queryset(self):
'''
The full set is returned if the user is:
- System Administrator
- System Auditor
These users will be able to see orphaned activity stream items
(the related resource has been deleted), as well as the other
obscure cases listed here
Complex permissions omitted from the activity stream of a normal user:
- host access via group
- permissions (from prior versions)
- notifications via team admin access
Activity stream events that have been omitted from list for
normal users since 2.4:
- unified job templates
- unified jobs
- schedules
- custom inventory scripts
'''
qs = self.model.objects.all()
qs = qs.select_related('actor')
qs = qs.prefetch_related('organization', 'user', 'inventory', 'host', 'group', 'inventory_source',
@ -1311,60 +1331,44 @@ class ActivityStreamAccess(BaseAccess):
'permission', 'job_template', 'job')
if self.user.is_superuser:
return qs.all()
if self.user in Role.singleton('system_auditor'):
return qs.all()
inventory_set = Inventory.accessible_objects(self.user, 'read_role')
credential_set = Credential.accessible_objects(self.user, 'read_role')
organization_set = Organization.accessible_objects(self.user, 'read_role')
group_set = Group.accessible_objects(self.user, 'read_role')
project_set = Project.accessible_objects(self.user, 'read_role')
jt_set = JobTemplate.accessible_objects(self.user, 'read_role')
team_set = Team.accessible_objects(self.user, 'read_role')
# All of these filters are noops and tests fail when we do qs =
# qs.filter for them, so we need to figure out what the intent was,
# fix this up, and add some tests to enforce the expected behavior
# - anoek - 2016-03-31
'''
#Inventory filter
inventory_qs = self.user.get_queryset(Inventory)
qs.filter(inventory__in=inventory_qs)
ad_hoc_results = qs.filter(
ad_hoc_command__inventory__in=inventory_set,
ad_hoc_command__credential__in=credential_set
)
#Host filter
qs.filter(host__inventory__in=inventory_qs)
global_results = qs.filter(
Q(user__in=organization_set.values('member_role__members')) |
Q(user=self.user) |
Q(organization__in=organization_set) |
Q(inventory__in=inventory_set) |
Q(host__inventory__in=inventory_set) |
Q(group__in=group_set) |
Q(inventory_source__inventory__in=inventory_set) |
Q(inventory_update__inventory_source__inventory__in=inventory_set) |
Q(credential__in=credential_set) |
Q(team__in=team_set) |
Q(project__in=project_set) |
Q(project_update__project__in=project_set) |
Q(job_template__in=jt_set) |
Q(job__job_template__in=jt_set) |
Q(notification_template__organization__admin_role__members__in=[self.user]) |
Q(notification__notification_template__organization__admin_role__members__in=[self.user]) |
Q(label__organization__in=organization_set) |
Q(role__in=Role.visible_roles(self.user))
)
#Group filter
qs.filter(group__inventory__in=inventory_qs)
#Inventory Source Filter
qs.filter(Q(inventory_source__inventory__in=inventory_qs) |
Q(inventory_source__group__inventory__in=inventory_qs))
#Inventory Update Filter
qs.filter(Q(inventory_update__inventory_source__inventory__in=inventory_qs) |
Q(inventory_update__inventory_source__group__inventory__in=inventory_qs))
#Credential Update Filter
credential_qs = self.user.get_queryset(Credential)
qs.filter(credential__in=credential_qs)
#Team Filter
team_qs = self.user.get_queryset(Team)
qs.filter(team__in=team_qs)
#Project Filter
project_qs = self.user.get_queryset(Project)
qs.filter(project__in=project_qs)
#Project Update Filter
qs.filter(project_update__project__in=project_qs)
#Job Template Filter
jobtemplate_qs = self.user.get_queryset(JobTemplate)
qs.filter(job_template__in=jobtemplate_qs)
#Job Filter
job_qs = self.user.get_queryset(Job)
qs.filter(job__in=job_qs)
# Ad Hoc Command Filter
ad_hoc_command_qs = self.user.get_queryset(AdHocCommand)
qs.filter(ad_hoc_command__in=ad_hoc_command_qs)
'''
return qs.all()
return (ad_hoc_results | global_results).distinct()
def can_add(self, data):
return False

View File

@ -3,12 +3,18 @@ import pytest
from awx.main.middleware import ActivityStreamMiddleware
from awx.main.models.activity_stream import ActivityStream
from awx.main.access import ActivityStreamAccess
from django.core.urlresolvers import reverse
from django.conf import settings
def mock_feature_enabled(feature, bypass_database=None):
return True
@pytest.fixture
def activity_stream_entry(organization, org_admin):
return ActivityStream.objects.filter(organization__pk=organization.pk, operation='associate').first()
@pytest.mark.skipif(not getattr(settings, 'ACTIVITY_STREAM_ENABLED', True), reason="Activity stream not enabled")
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
@ -63,25 +69,65 @@ def test_middleware_actor_added(monkeypatch, post, get, user):
@pytest.mark.skipif(not getattr(settings, 'ACTIVITY_STREAM_ENABLED', True), reason="Activity stream not enabled")
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_rbac_stream_resource_roles(mocker, organization, user):
member = user('test', False)
organization.admin_role.members.add(member)
def test_rbac_stream_resource_roles(activity_stream_entry, organization, org_admin):
activity_stream = ActivityStream.objects.filter(organization__pk=organization.pk, operation='associate').first()
assert activity_stream.user.first() == member
assert activity_stream.organization.first() == organization
assert activity_stream.role.first() == organization.admin_role
assert activity_stream.object_relationship_type == 'awx.main.models.organization.Organization.admin_role'
assert activity_stream_entry.user.first() == org_admin
assert activity_stream_entry.organization.first() == organization
assert activity_stream_entry.role.first() == organization.admin_role
assert activity_stream_entry.object_relationship_type == 'awx.main.models.organization.Organization.admin_role'
@pytest.mark.skipif(not getattr(settings, 'ACTIVITY_STREAM_ENABLED', True), reason="Activity stream not enabled")
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_rbac_stream_user_roles(mocker, organization, user):
member = user('test', False)
member.roles.add(organization.admin_role)
def test_rbac_stream_user_roles(activity_stream_entry, organization, org_admin):
activity_stream = ActivityStream.objects.filter(organization__pk=organization.pk, operation='associate').first()
assert activity_stream.user.first() == member
assert activity_stream.organization.first() == organization
assert activity_stream.role.first() == organization.admin_role
assert activity_stream.object_relationship_type == 'awx.main.models.organization.Organization.admin_role'
assert activity_stream_entry.user.first() == org_admin
assert activity_stream_entry.organization.first() == organization
assert activity_stream_entry.role.first() == organization.admin_role
assert activity_stream_entry.object_relationship_type == 'awx.main.models.organization.Organization.admin_role'
@pytest.mark.django_db
@pytest.mark.activity_stream_access
@pytest.mark.skipif(not getattr(settings, 'ACTIVITY_STREAM_ENABLED', True), reason="Activity stream not enabled")
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
def test_stream_access_cant_change(activity_stream_entry, organization, org_admin):
access = ActivityStreamAccess(org_admin)
# These should always return false because the activity stream can not be edited
assert not access.can_add(activity_stream_entry)
assert not access.can_change(activity_stream_entry, {'organization': None})
assert not access.can_delete(activity_stream_entry)
@pytest.mark.django_db
@pytest.mark.activity_stream_access
@pytest.mark.skipif(not getattr(settings, 'ACTIVITY_STREAM_ENABLED', True), reason="Activity stream not enabled")
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
def test_stream_queryset_hides_shows_items(
activity_stream_entry, organization, user, org_admin,
project, org_credential, inventory, label, deploy_jobtemplate,
notification_template, group, host, team):
# this user is not in any organizations and should not see any resource activity
no_access_user = user('no-access-user', False)
queryset = ActivityStreamAccess(no_access_user).get_queryset()
assert not queryset.filter(project__pk=project.pk)
assert not queryset.filter(credential__pk=org_credential.pk)
assert not queryset.filter(inventory__pk=inventory.pk)
assert not queryset.filter(label__pk=label.pk)
assert not queryset.filter(job_template__pk=deploy_jobtemplate.pk)
assert not queryset.filter(group__pk=group.pk)
assert not queryset.filter(host__pk=host.pk)
assert not queryset.filter(team__pk=team.pk)
assert not queryset.filter(notification_template__pk=notification_template.pk)
# Organization admin should be able to see most things in the ActivityStream
queryset = ActivityStreamAccess(org_admin).get_queryset()
assert queryset.filter(project__pk=project.pk, operation='create').count() == 1
assert queryset.filter(credential__pk=org_credential.pk, operation='create').count() == 1
assert queryset.filter(inventory__pk=inventory.pk, operation='create').count() == 1
assert queryset.filter(label__pk=label.pk, operation='create').count() == 1
assert queryset.filter(job_template__pk=deploy_jobtemplate.pk, operation='create').count() == 1
assert queryset.filter(group__pk=group.pk, operation='create').count() == 1
assert queryset.filter(host__pk=host.pk, operation='create').count() == 1
assert queryset.filter(team__pk=team.pk, operation='create').count() == 1
assert queryset.filter(notification_template__pk=notification_template.pk, operation='create').count() == 1

View File

@ -167,6 +167,11 @@ def credential():
def machine_credential():
return Credential.objects.create(name='machine-cred', kind='ssh', username='test_user', password='pas4word')
@pytest.fixture
def org_credential(organization, credential):
credential.owner_role.parents.add(organization.admin_role)
return credential
@pytest.fixture
def inventory(organization):
return organization.inventories.create(name="test-inv")
@ -233,7 +238,7 @@ def organizations(instance):
return rf
@pytest.fixture
def group(inventory):
def group_factory(inventory):
def g(name):
try:
return Group.objects.get(name=name, inventory=inventory)
@ -242,8 +247,8 @@ def group(inventory):
return g
@pytest.fixture
def hosts(group):
group1 = group('group-1')
def hosts(group_factory):
group1 = group_factory('group-1')
def rf(host_count=1):
hosts = []
@ -256,6 +261,14 @@ def hosts(group):
return hosts
return rf
@pytest.fixture
def group(inventory):
return inventory.groups.create(name='single-group')
@pytest.fixture
def host(group, inventory):
return group.hosts.create(name='single-host', inventory=inventory)
@pytest.fixture
def permissions():
return {
@ -406,8 +419,8 @@ def options():
@pytest.fixture
def fact_scans(group, fact_ansible_json, fact_packages_json, fact_services_json):
group1 = group('group-1')
def fact_scans(group_factory, fact_ansible_json, fact_packages_json, fact_services_json):
group1 = group_factory('group-1')
def rf(fact_scans=1, timestamp_epoch=timezone.now()):
facts_json = {}

View File

@ -3,10 +3,10 @@ import pytest
from django.core.urlresolvers import reverse
@pytest.mark.django_db
def test_inventory_source_notification_on_cloud_only(get, post, group, user, notification_template):
def test_inventory_source_notification_on_cloud_only(get, post, group_factory, user, notification_template):
u = user('admin', True)
g_cloud = group('cloud')
g_not = group('not_cloud')
g_cloud = group_factory('cloud')
g_not = group_factory('not_cloud')
cloud_is = g_cloud.inventory_source
not_is = g_not.inventory_source
cloud_is.source = 'ec2'

View File

@ -81,12 +81,12 @@ def test_team_symantics(organization, team, alice):
assert alice not in organization.auditor_role
@pytest.mark.django_db
def test_auto_m2m_adjustments(organization, inventory, group, alice):
def test_auto_m2m_adjustments(organization, inventory, group_factory, alice):
'Ensures the auto role reparenting is working correctly through m2m maps'
g1 = group(name='g1')
g1 = group_factory(name='g1')
g1.admin_role.members.add(alice)
assert alice in g1.admin_role
g2 = group(name='g2')
g2 = group_factory(name='g2')
assert alice not in g2.admin_role
g2.parents.add(g1)

View File

@ -170,11 +170,11 @@ def test_inventory_executor(inventory, permissions, user, team):
assert team.member_role.is_ancestor_of(inventory.execute_role)
@pytest.mark.django_db
def test_group_parent_admin(group, permissions, user):
def test_group_parent_admin(group_factory, permissions, user):
u = user('admin', False)
parent1 = group('parent-1')
parent2 = group('parent-2')
childA = group('child-1')
parent1 = group_factory('parent-1')
parent2 = group_factory('parent-2')
childA = group_factory('child-1')
parent1.admin_role.members.add(u)
assert u in parent1.admin_role
@ -230,11 +230,11 @@ def test_access_auditor(organization, inventory, user):
@pytest.mark.django_db
def test_host_access(organization, inventory, user, group):
def test_host_access(organization, inventory, user, group_factory):
other_inventory = organization.inventories.create(name='other-inventory')
inventory_admin = user('inventory_admin', False)
my_group = group('my-group')
not_my_group = group('not-my-group')
my_group = group_factory('my-group')
not_my_group = group_factory('not-my-group')
group_admin = user('group_admin', False)
inventory_admin_access = HostAccess(inventory_admin)