Fix app activity stream permissions

This commit is contained in:
adamscmRH 2018-04-18 16:00:00 -04:00
parent 283132cd08
commit b6fcfd43b1
2 changed files with 43 additions and 1 deletions

View File

@ -2399,7 +2399,7 @@ class ActivityStreamAccess(BaseAccess):
model = ActivityStream
prefetch_related = ('organization', 'user', 'inventory', 'host', 'group',
'inventory_update', 'credential', 'credential_type', 'team',
'ad_hoc_command',
'ad_hoc_command', 'o_auth2_application', 'o_auth2_access_token',
'notification_template', 'notification', 'label', 'role', 'actor',
'schedule', 'custom_inventory_script', 'unified_job_template',
'workflow_job_template_node',)
@ -2442,9 +2442,13 @@ class ActivityStreamAccess(BaseAccess):
jt_set = JobTemplate.accessible_objects(self.user, 'read_role')
team_set = Team.accessible_objects(self.user, 'read_role')
wfjt_set = WorkflowJobTemplate.accessible_objects(self.user, 'read_role')
app_set = OAuth2ApplicationAccess(self.user).filtered_queryset()
token_set = OAuth2TokenAccess(self.user).filtered_queryset()
return qs.filter(
Q(ad_hoc_command__inventory__in=inventory_set) |
Q(o_auth2_application__in=app_set) |
Q(o_auth2_access_token__in=token_set) |
Q(user__in=auditing_orgs.values('member_role__members')) |
Q(user=self.user) |
Q(organization__in=auditing_orgs) |

View File

@ -3,11 +3,13 @@ import pytest
from awx.main.access import (
OAuth2ApplicationAccess,
OAuth2TokenAccess,
ActivityStreamAccess,
)
from awx.main.models.oauth import (
OAuth2Application as Application,
OAuth2AccessToken as AccessToken,
)
from awx.main.models import ActivityStream
from awx.api.versioning import reverse
@ -32,6 +34,42 @@ class TestOAuth2Application:
client_type='confidential', authorization_grant_type='password', organization=organization
)
assert access.can_read(app) is can_access
def test_app_activity_stream(self, org_admin, alice, organization):
app = Application.objects.create(
name='test app for {}'.format(org_admin.username), user=org_admin,
client_type='confidential', authorization_grant_type='password', organization=organization
)
access = OAuth2ApplicationAccess(org_admin)
assert access.can_read(app) is True
access = ActivityStreamAccess(org_admin)
activity_stream = ActivityStream.objects.filter(o_auth2_application=app).latest('pk')
assert access.can_read(activity_stream) is True
access = ActivityStreamAccess(alice)
assert access.can_read(app) is False
assert access.can_read(activity_stream) is False
def test_token_activity_stream(self, org_admin, alice, organization, post):
app = Application.objects.create(
name='test app for {}'.format(org_admin.username), user=org_admin,
client_type='confidential', authorization_grant_type='password', organization=organization
)
response = post(
reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}),
{'scope': 'read'}, org_admin, expect=201
)
token = AccessToken.objects.get(token=response.data['token'])
access = OAuth2ApplicationAccess(org_admin)
assert access.can_read(app) is True
access = ActivityStreamAccess(org_admin)
activity_stream = ActivityStream.objects.filter(o_auth2_access_token=token).latest('pk')
assert access.can_read(activity_stream) is True
access = ActivityStreamAccess(alice)
assert access.can_read(token) is False
assert access.can_read(activity_stream) is False
def test_can_edit_delete_app_org_admin(