Merge branch 'api_release_3.0.1' into release_3.0.2

* api_release_3.0.1:
  add code to HostAccess can_add so the browsable API will work
  update unit tetsts
  use get_object_or_400 to fetch Role
  refactor to unit tests
  restrict User.admin_role membership changes through RoleUsersList
  fix test
  do not allow membership changes to User.admin_role
  RoleTeam and TeamRole sublist NotFound exception handling and test update
  add system job templates to the unified JT list
  Reorganize activity stream around org admin/auditors
  Allow instant cancel for new jobs
  orphan project protection in job delete access
  pass context into Job and JobList serializer classes
  pass context to unified job template subclasses
  Allow auditors to see same /api/v1/config information as admins
This commit is contained in:
Matthew Jones 2016-08-01 13:23:37 -04:00
commit 3e1c0111f5
6 changed files with 152 additions and 58 deletions

View File

@ -526,8 +526,10 @@ class UnifiedJobTemplateSerializer(BaseSerializer):
serializer_class = InventorySourceSerializer
elif isinstance(obj, JobTemplate):
serializer_class = JobTemplateSerializer
elif isinstance(obj, SystemJobTemplate):
serializer_class = SystemJobTemplateSerializer
if serializer_class:
serializer = serializer_class(instance=obj)
serializer = serializer_class(instance=obj, context=self.context)
return serializer.to_representation(obj)
else:
return super(UnifiedJobTemplateSerializer, self).to_representation(obj)
@ -590,7 +592,7 @@ class UnifiedJobSerializer(BaseSerializer):
elif isinstance(obj, SystemJob):
serializer_class = SystemJobSerializer
if serializer_class:
serializer = serializer_class(instance=obj)
serializer = serializer_class(instance=obj, context=self.context)
ret = serializer.to_representation(obj)
else:
ret = super(UnifiedJobSerializer, self).to_representation(obj)
@ -637,7 +639,7 @@ class UnifiedJobListSerializer(UnifiedJobSerializer):
elif isinstance(obj, SystemJob):
serializer_class = SystemJobListSerializer
if serializer_class:
serializer = serializer_class(instance=obj)
serializer = serializer_class(instance=obj, context=self.context)
ret = serializer.to_representation(obj)
else:
ret = super(UnifiedJobListSerializer, self).to_representation(obj)

View File

@ -201,7 +201,7 @@ class ApiV1ConfigView(APIView):
'''Return various sitewide configuration settings.'''
license_reader = TaskSerializer()
license_data = license_reader.from_database(show_key=request.user.is_superuser)
license_data = license_reader.from_database(show_key=request.user.is_superuser or request.user.is_system_auditor)
if license_data and 'features' in license_data and 'activity_streams' in license_data['features']:
license_data['features']['activity_streams'] &= tower_settings.ACTIVITY_STREAM_ENABLED
@ -225,7 +225,10 @@ class ApiV1ConfigView(APIView):
user_ldap_fields.extend(getattr(settings, 'AUTH_LDAP_USER_FLAGS_BY_GROUP', {}).keys())
data['user_ldap_fields'] = user_ldap_fields
if request.user.is_superuser or Organization.accessible_objects(request.user, 'admin_role').exists():
if request.user.is_superuser \
or request.user.is_system_auditor \
or Organization.accessible_objects(request.user, 'admin_role').exists() \
or Organization.accessible_objects(request.user, 'auditor_role').exists():
data.update(dict(
project_base_dir = settings.PROJECTS_ROOT,
project_local_paths = Project.get_local_path_choices(),
@ -876,7 +879,7 @@ class TeamRolesList(SubListCreateAttachDetachAPIView):
data = dict(msg="Role 'id' field is missing.")
return Response(data, status=status.HTTP_400_BAD_REQUEST)
role = Role.objects.get(pk=sub_id)
role = get_object_or_400(Role, pk=sub_id)
content_type = ContentType.objects.get_for_model(Organization)
if role.content_type == content_type:
data = dict(msg="You cannot assign an Organization role as a child role for a Team.")
@ -1205,7 +1208,12 @@ class UserRolesList(SubListCreateAttachDetachAPIView):
return Response(data, status=status.HTTP_400_BAD_REQUEST)
if sub_id == self.request.user.admin_role.pk:
raise PermissionDenied('You may not remove your own admin_role.')
raise PermissionDenied('You may not perform any action with your own admin_role.')
role = get_object_or_400(Role, pk=sub_id)
user_content_type = ContentType.objects.get_for_model(User)
if role.content_type == user_content_type:
raise PermissionDenied('You may not change the membership of a users admin_role')
return super(UserRolesList, self).post(request, *args, **kwargs)
@ -3646,6 +3654,15 @@ class RoleUsersList(SubListCreateAttachDetachAPIView):
if not sub_id:
data = dict(msg="User 'id' field is missing.")
return Response(data, status=status.HTTP_400_BAD_REQUEST)
role = self.get_parent_object()
if role == self.request.user.admin_role:
raise PermissionDenied('You may not perform any action with your own admin_role.')
user_content_type = ContentType.objects.get_for_model(User)
if role.content_type == user_content_type:
raise PermissionDenied('You may not change the membership of a users admin_role')
return super(RoleUsersList, self).post(request, *args, **kwargs)
@ -3676,7 +3693,7 @@ class RoleTeamsList(SubListAPIView):
data = dict(msg="You cannot assign an Organization role as a child role for a Team.")
return Response(data, status=status.HTTP_400_BAD_REQUEST)
team = Team.objects.get(pk=sub_id)
team = get_object_or_400(Team, pk=sub_id)
action = 'attach'
if request.data.get('disassociate', None):
action = 'unattach'

View File

@ -428,8 +428,8 @@ class HostAccess(BaseAccess):
return obj and self.user in obj.inventory.read_role
def can_add(self, data):
if not data or 'inventory' not in data:
return False
if not data: # So the browseable API will work
return Inventory.accessible_objects(self.user, 'admin_role').exists()
# Checks for admin or change permission on inventory.
inventory_pk = get_pk_from_dict(data, 'inventory')
@ -1089,7 +1089,8 @@ class JobAccess(BaseAccess):
def can_delete(self, obj):
if obj.inventory is not None and self.user in obj.inventory.organization.admin_role:
return True
if obj.project is not None and self.user in obj.project.organization.admin_role:
if (obj.project is not None and obj.project.organization is not None and
self.user in obj.project.organization.admin_role):
return True
return False
@ -1299,9 +1300,11 @@ class UnifiedJobTemplateAccess(BaseAccess):
project_qs = self.user.get_queryset(Project).filter(scm_type__in=[s[0] for s in Project.SCM_TYPE_CHOICES])
inventory_source_qs = self.user.get_queryset(InventorySource).filter(source__in=CLOUD_INVENTORY_SOURCES)
job_template_qs = self.user.get_queryset(JobTemplate)
system_job_template_qs = self.user.get_queryset(SystemJobTemplate)
qs = qs.filter(Q(Project___in=project_qs) |
Q(InventorySource___in=inventory_source_qs) |
Q(JobTemplate___in=job_template_qs))
Q(JobTemplate___in=job_template_qs) |
Q(systemjobtemplate__in=system_job_template_qs))
qs = qs.select_related(
'created_by',
'modified_by',
@ -1569,21 +1572,22 @@ class ActivityStreamAccess(BaseAccess):
inventory_set = Inventory.accessible_objects(self.user, 'read_role')
credential_set = Credential.accessible_objects(self.user, 'read_role')
organization_set = Organization.accessible_objects(self.user, 'read_role')
admin_of_orgs = Organization.accessible_objects(self.user, 'admin_role')
group_set = Group.objects.filter(inventory__in=inventory_set)
auditing_orgs = (
Organization.accessible_objects(self.user, 'admin_role') |
Organization.accessible_objects(self.user, 'auditor_role')
).distinct().values_list('id', flat=True)
project_set = Project.accessible_objects(self.user, 'read_role')
jt_set = JobTemplate.accessible_objects(self.user, 'read_role')
team_set = Team.accessible_objects(self.user, 'read_role')
return qs.filter(
Q(ad_hoc_command__inventory__in=inventory_set) |
Q(user__in=organization_set.values('member_role__members')) |
Q(user__in=auditing_orgs.values('member_role__members')) |
Q(user=self.user) |
Q(organization__in=organization_set) |
Q(organization__in=auditing_orgs) |
Q(inventory__in=inventory_set) |
Q(host__inventory__in=inventory_set) |
Q(group__in=group_set) |
Q(group__inventory__in=inventory_set) |
Q(inventory_source__inventory__in=inventory_set) |
Q(inventory_update__inventory_source__inventory__in=inventory_set) |
Q(credential__in=credential_set) |
@ -1592,10 +1596,10 @@ class ActivityStreamAccess(BaseAccess):
Q(project_update__project__in=project_set) |
Q(job_template__in=jt_set) |
Q(job__job_template__in=jt_set) |
Q(notification_template__organization__in=admin_of_orgs) |
Q(notification__notification_template__organization__in=admin_of_orgs) |
Q(label__organization__in=organization_set) |
Q(role__in=Role.visible_roles(self.user))
Q(notification_template__organization__in=auditing_orgs) |
Q(notification__notification_template__organization__in=auditing_orgs) |
Q(label__organization__in=auditing_orgs) |
Q(role__in=Role.visible_roles(self.user) if auditing_orgs else [])
).distinct()
def can_add(self, data):

View File

@ -92,6 +92,12 @@ def test_null_related_delete_denied(normal_job, rando):
access = JobAccess(rando)
assert not access.can_delete(normal_job)
@pytest.mark.django_db
def test_delete_job_with_orphan_proj(normal_job, rando):
normal_job.project.organization = None
access = JobAccess(rando)
assert not access.can_delete(normal_job)
@pytest.mark.django_db
def test_inventory_org_admin_delete_allowed(normal_job, org_admin):
normal_job.project = None # do this so we test job->inventory->org->admin connection

View File

@ -0,0 +1,100 @@
import mock
from mock import PropertyMock
import pytest
from rest_framework.test import APIRequestFactory
from rest_framework.test import force_authenticate
from django.contrib.contenttypes.models import ContentType
from awx.api.views import (
RoleUsersList,
UserRolesList,
TeamRolesList,
)
from awx.main.models import (
User,
Role,
)
@pytest.mark.parametrize("pk, err", [
(111, "not change the membership"),
(1, "may not perform"),
])
def test_user_roles_list_user_admin_role(pk, err):
with mock.patch('awx.api.views.get_object_or_400') as role_get, \
mock.patch('awx.api.views.ContentType.objects.get_for_model') as ct_get:
role_mock = mock.MagicMock(spec=Role, id=1, pk=1)
content_type_mock = mock.MagicMock(spec=ContentType)
role_mock.content_type = content_type_mock
role_get.return_value = role_mock
ct_get.return_value = content_type_mock
with mock.patch('awx.api.views.User.admin_role', new_callable=PropertyMock, return_value=role_mock):
factory = APIRequestFactory()
view = UserRolesList.as_view()
user = User(username="root", is_superuser=True)
request = factory.post("/user/1/roles", {'id':pk}, format="json")
force_authenticate(request, user)
response = view(request)
response.render()
assert response.status_code == 403
assert err in response.content
@pytest.mark.parametrize("admin_role, err", [
(True, "may not perform"),
(False, "not change the membership"),
])
def test_role_users_list_other_user_admin_role(admin_role, err):
with mock.patch('awx.api.views.RoleUsersList.get_parent_object') as role_get, \
mock.patch('awx.api.views.ContentType.objects.get_for_model') as ct_get:
role_mock = mock.MagicMock(spec=Role, id=1)
content_type_mock = mock.MagicMock(spec=ContentType)
role_mock.content_type = content_type_mock
role_get.return_value = role_mock
ct_get.return_value = content_type_mock
user_admin_role = role_mock if admin_role else None
with mock.patch('awx.api.views.User.admin_role', new_callable=PropertyMock, return_value=user_admin_role):
factory = APIRequestFactory()
view = RoleUsersList.as_view()
user = User(username="root", is_superuser=True, pk=1, id=1)
request = factory.post("/role/1/users", {'id':1}, format="json")
force_authenticate(request, user)
response = view(request)
response.render()
assert response.status_code == 403
assert err in response.content
def test_team_roles_list_post_org_roles():
with mock.patch('awx.api.views.get_object_or_400') as role_get, \
mock.patch('awx.api.views.ContentType.objects.get_for_model') as ct_get:
role_mock = mock.MagicMock(spec=Role)
content_type_mock = mock.MagicMock(spec=ContentType)
role_mock.content_type = content_type_mock
role_get.return_value = role_mock
ct_get.return_value = content_type_mock
factory = APIRequestFactory()
view = TeamRolesList.as_view()
request = factory.post("/team/1/roles", {'id':1}, format="json")
force_authenticate(request, User(username="root", is_superuser=True))
response = view(request)
response.render()
assert response.status_code == 400
assert 'cannot assign' in response.content

View File

@ -1,22 +1,11 @@
import mock
import pytest
from rest_framework.test import APIRequestFactory
from rest_framework.test import force_authenticate
from django.contrib.contenttypes.models import ContentType
from awx.api.views import (
ApiV1RootView,
TeamRolesList,
JobTemplateLabelList,
)
from awx.main.models import (
User,
Role,
)
@pytest.fixture
def mock_response_new(mocker):
m = mocker.patch('awx.api.views.Response.__new__')
@ -68,30 +57,6 @@ class TestJobTemplateLabelList:
with mock.patch('awx.api.generics.DeleteLastUnattachLabelMixin.unattach') as mixin_unattach:
view = JobTemplateLabelList()
mock_request = mock.MagicMock()
super(JobTemplateLabelList, view).unattach(mock_request, None, None)
assert mixin_unattach.called_with(mock_request, None, None)
@pytest.mark.parametrize("url", ["/team/1/roles", "/role/1/teams"])
def test_team_roles_list_post_org_roles(url):
with mock.patch('awx.api.views.Role.objects.get') as role_get, \
mock.patch('awx.api.views.ContentType.objects.get_for_model') as ct_get:
role_mock = mock.MagicMock(spec=Role)
content_type_mock = mock.MagicMock(spec=ContentType)
role_mock.content_type = content_type_mock
role_get.return_value = role_mock
ct_get.return_value = content_type_mock
factory = APIRequestFactory()
view = TeamRolesList.as_view()
request = factory.post(url, {'id':1}, format="json")
force_authenticate(request, User(username="root", is_superuser=True))
response = view(request)
response.render()
assert response.status_code == 400
assert 'cannot assign' in response.content