Merge pull request #1288 from anoek/rbac

Misc RBAC patches
This commit is contained in:
Wayne Witzel III 2016-03-22 13:51:24 -04:00
commit 17daade6e6
11 changed files with 166 additions and 152 deletions

View File

@ -214,7 +214,7 @@ class ApiV1ConfigView(APIView):
user_ldap_fields.extend(getattr(settings, 'AUTH_LDAP_USER_FLAGS_BY_GROUP', {}).keys())
data['user_ldap_fields'] = user_ldap_fields
if request.user.is_superuser or request.user.admin_of_organizations.count():
if request.user.is_superuser or Organization.accessible_objects(request.user, {'write': True}).count():
data.update(dict(
project_base_dir = settings.PROJECTS_ROOT,
project_local_paths = Project.get_local_path_choices(),
@ -999,13 +999,16 @@ class UserMeList(ListAPIView):
def get_queryset(self):
return self.model.objects.filter(pk=self.request.user.pk)
class UserTeamsList(SubListAPIView):
class UserTeamsList(ListAPIView):
model = Team
model = User
serializer_class = TeamSerializer
parent_model = User
relationship = 'teams'
def get_queryset(self):
u = User.objects.get(pk=self.kwargs['pk'])
if not u.accessible_by(self.request.user, {'read': True}):
raise PermissionDenied()
return Team.accessible_objects(self.request.user, {'read': True}).filter(member_role__members=u)
class UserRolesList(SubListCreateAttachDetachAPIView):
@ -1043,8 +1046,9 @@ class UserProjectsList(SubListAPIView):
def get_queryset(self):
parent = self.get_parent_object()
self.check_parent_access(parent)
qs = self.request.user.get_queryset(self.model)
return qs.filter(teams__in=parent.teams.distinct())
my_qs = Project.accessible_objects(self.request.user, {'read': True})
user_qs = Project.accessible_objects(parent, {'read': True})
return my_qs & user_qs
class UserCredentialsList(SubListCreateAttachDetachAPIView):

View File

@ -198,12 +198,10 @@ class BaseAccess(object):
class UserAccess(BaseAccess):
'''
I can see user records when:
- I'm a superuser.
- I'm that user.
- I'm an org admin (org admins should be able to see all users, in order
to add those users to the org).
- I'm in an org with that user.
- I'm on a team with that user.
- I'm a useruser
- I'm in a role with them (such as in an organization or team)
- They are in a role which includes a role of mine
- I am in a role that includes a role of theirs
I can change some fields for a user (mainly password) when I am that user.
I can change all fields for a user (admin access) or delete when:
- I'm a superuser.
@ -213,8 +211,17 @@ class UserAccess(BaseAccess):
model = User
def get_queryset(self):
qs = User.accessible_objects(self.user, {'read':True})
return qs
if self.user.is_superuser:
return User.objects
viewable_users_set = set()
viewable_users_set.update(self.user.roles.values_list('ancestors__members__id', flat=True))
viewable_users_set.update(self.user.roles.values_list('descendents__members__id', flat=True))
return User.objects.filter(id__in=viewable_users_set)
#qs = User.objects.filter(self.user, {'read':True})
#qs = User.objects.
#return qs
def can_add(self, data):
if data is not None and 'is_superuser' in data:
@ -237,7 +244,7 @@ class UserAccess(BaseAccess):
# Admin implies changing all user fields.
if self.user.is_superuser:
return True
return obj.accessible_by(self.user, {'create': True, 'write':True, 'update':True, 'read':True})
return Organization.objects.filter(member_role__members=obj, admin_role__members=self.user).exists()
def can_delete(self, obj):
if obj == self.user:
@ -623,6 +630,8 @@ class ProjectAccess(BaseAccess):
model = Project
def get_queryset(self):
if self.user.is_superuser:
return self.model.objects
qs = self.model.accessible_objects(self.user, {'read':True})
qs = qs.select_related('modified_by', 'credential', 'current_job', 'last_job')
return qs
@ -654,6 +663,8 @@ class ProjectUpdateAccess(BaseAccess):
model = ProjectUpdate
def get_queryset(self):
if self.user.is_superuser:
return self.model.objects
qs = ProjectUpdate.objects.distinct()
qs = qs.select_related('created_by', 'modified_by', 'project')
project_ids = set(self.user.get_queryset(Project).values_list('id', flat=True))

View File

@ -2,7 +2,6 @@
# All Rights Reserved.
# Django
from django.db import connection
from django.db.models.signals import (
post_init,
pre_save,
@ -19,10 +18,6 @@ from django.db.models.fields.related import (
ReverseManyRelatedObjectsDescriptor,
)
from django.core.exceptions import FieldError
from django.db.transaction import TransactionManagementError
# AWX
from awx.main.models.rbac import RolePermission, Role, batch_role_ancestor_rebuilding
@ -155,9 +150,9 @@ class ImplicitRoleField(models.ForeignKey):
for pk in pk_set:
obj = model.objects.get(pk=pk)
if action == 'post_add':
getattr(instance, self.name).children.add(getattr(obj, field_attr))
getattr(instance, field_attr).children.add(getattr(obj, self.name))
if action == 'pre_remove':
getattr(instance, self.name).children.remove(getattr(obj, field_attr))
getattr(instance, field_attr).children.remove(getattr(obj, self.name))
else:
for pk in pk_set:
@ -182,8 +177,9 @@ class ImplicitRoleField(models.ForeignKey):
if role:
return role
role = Role.objects.create(
name=self.role_name,
description=self.role_description)
name=self.role_name,
description=self.role_description
)
setattr(instance, self.name, role)
def _patch_role_content_object_and_grant_permissions(self, instance):

View File

@ -202,12 +202,12 @@ class UserAccess(BaseAccess):
qs = self.model.objects.distinct()
if self.user.is_superuser:
return qs
if tower_settings.ORG_ADMINS_CAN_SEE_ALL_USERS and self.user.admin_of_organizations.all().exists():
if tower_settings.ORG_ADMINS_CAN_SEE_ALL_USERS and self.user.deprecated_admin_of_organizations.all().exists():
return qs
return qs.filter(
Q(pk=self.user.pk) |
Q(organizations__in=self.user.admin_of_organizations) |
Q(organizations__in=self.user.organizations) |
Q(organizations__in=self.user.deprecated_admin_of_organizations) |
Q(organizations__in=self.user.deprecated_organizations) |
Q(teams__in=self.user.teams)
).distinct()
@ -216,7 +216,7 @@ class UserAccess(BaseAccess):
if to_python_boolean(data['is_superuser'], allow_none=True) and not self.user.is_superuser:
return False
return bool(self.user.is_superuser or
self.user.admin_of_organizations.exists())
self.user.deprecated_admin_of_organizations.exists())
def can_change(self, obj, data):
if data is not None and 'is_superuser' in data:
@ -231,7 +231,7 @@ class UserAccess(BaseAccess):
# Admin implies changing all user fields.
if self.user.is_superuser:
return True
return bool(obj.organizations.filter(deprecated_admins__in=[self.user]).exists())
return bool(obj.deprecated_organizations.filter(deprecated_admins__in=[self.user]).exists())
def can_delete(self, obj):
if obj == self.user:
@ -242,7 +242,7 @@ class UserAccess(BaseAccess):
# cannot delete the last active superuser
return False
return bool(self.user.is_superuser or
obj.organizations.filter(deprecated_admins__in=[self.user]).exists())
obj.deprecated_organizations.filter(deprecated_admins__in=[self.user]).exists())
class OrganizationAccess(BaseAccess):
'''
@ -326,7 +326,7 @@ class InventoryAccess(BaseAccess):
# If no data is specified, just checking for generic add permission?
if not data:
return bool(self.user.is_superuser or
self.user.admin_of_organizations.exists())
self.user.deprecated_admin_of_organizations.exists())
# Otherwise, verify that the user has access to change the parent
# organization of this inventory.
if self.user.is_superuser:
@ -568,11 +568,11 @@ class CredentialAccess(BaseAccess):
return qs
# Get the list of organizations for which the user is an admin
orgs_as_admin_ids = set(self.user.admin_of_organizations.values_list('id', flat=True))
orgs_as_admin_ids = set(self.user.deprecated_admin_of_organizations.values_list('id', flat=True))
return qs.filter(
Q(user=self.user) |
Q(user__organizations__id__in=orgs_as_admin_ids) |
Q(user__admin_of_organizations__id__in=orgs_as_admin_ids) |
Q(user__deprecated_organizations__id__in=orgs_as_admin_ids) |
Q(user__deprecated_admin_of_organizations__id__in=orgs_as_admin_ids) |
Q(team__organization__id__in=orgs_as_admin_ids) |
Q(team__deprecated_users__in=[self.user])
)
@ -600,9 +600,9 @@ class CredentialAccess(BaseAccess):
if obj.user:
if self.user == obj.user:
return True
if obj.user.organizations.filter(deprecated_admins__in=[self.user]).exists():
if obj.user.deprecated_organizations.filter(deprecated_admins__in=[self.user]).exists():
return True
if obj.user.admin_of_organizations.filter(deprecated_admins__in=[self.user]).exists():
if obj.user.deprecated_admin_of_organizations.filter(deprecated_admins__in=[self.user]).exists():
return True
if obj.team:
if self.user in obj.team.organization.deprecated_admins.all():
@ -710,7 +710,7 @@ class ProjectAccess(BaseAccess):
def can_add(self, data):
if self.user.is_superuser:
return True
if self.user.admin_of_organizations.exists():
if self.user.deprecated_admin_of_organizations.exists():
return True
return False
@ -772,10 +772,10 @@ class PermissionAccess(BaseAccess):
'project')
if self.user.is_superuser:
return qs
orgs_as_admin_ids = set(self.user.admin_of_organizations.values_list('id', flat=True))
orgs_as_admin_ids = set(self.user.deprecated_admin_of_organizations.values_list('id', flat=True))
return qs.filter(
Q(user__organizations__in=orgs_as_admin_ids) |
Q(user__admin_of_organizations__in=orgs_as_admin_ids) |
Q(user__deprecated_organizations__in=orgs_as_admin_ids) |
Q(user__deprecated_admin_of_organizations__in=orgs_as_admin_ids) |
Q(team__organization__in=orgs_as_admin_ids) |
Q(user=self.user) |
Q(team__deprecated_users__in=[self.user])
@ -1492,16 +1492,16 @@ class ActivityStreamAccess(BaseAccess):
if self.user.is_superuser:
return qs
user_admin_orgs = self.user.admin_of_organizations.all()
user_orgs = self.user.organizations.all()
user_admin_orgs = self.user.deprecated_admin_of_organizations.all()
user_orgs = self.user.deprecated_organizations.all()
#Organization filter
qs = qs.filter(Q(organization__deprecated_admins__in=[self.user]) | Q(organization__deprecated_users__in=[self.user]))
#User filter
qs = qs.filter(Q(user__pk=self.user.pk) |
Q(user__organizations__in=user_admin_orgs) |
Q(user__organizations__in=user_orgs))
Q(user__deprecated_organizations__in=user_admin_orgs) |
Q(user__deprecated_organizations__in=user_orgs))
#Inventory filter
inventory_qs = self.user.get_queryset(Inventory)
@ -1523,8 +1523,8 @@ class ActivityStreamAccess(BaseAccess):
#Credential Update Filter
qs.filter(Q(credential__user=self.user) |
Q(credential__user__organizations__in=user_admin_orgs) |
Q(credential__user__admin_of_organizations__in=user_admin_orgs) |
Q(credential__user__deprecated_organizations__in=user_admin_orgs) |
Q(credential__user__deprecated_admin_of_organizations__in=user_admin_orgs) |
Q(credential__team__organization__in=user_admin_orgs) |
Q(credential__team__deprecated_users__in=[self.user]))
@ -1606,7 +1606,7 @@ class CustomInventoryScriptAccess(BaseAccess):
def can_read(self, obj):
if self.user.is_superuser:
return True
return bool(obj.organization in self.user.organizations.all() or obj.organization in self.user.admin_of_organizations.all())
return bool(obj.organization in self.user.deprecated_organizations.all() or obj.organization in self.user.deprecated_admin_of_organizations.all())
def can_add(self, data):
if self.user.is_superuser:

View File

@ -41,12 +41,12 @@ class Organization(CommonModel, NotificationFieldsModel, ResourceMixin):
deprecated_users = models.ManyToManyField(
'auth.User',
blank=True,
related_name='organizations',
related_name='deprecated_organizations',
)
deprecated_admins = models.ManyToManyField(
'auth.User',
blank=True,
related_name='admin_of_organizations',
related_name='deprecated_admin_of_organizations',
)
deprecated_projects = models.ManyToManyField(
'Project',
@ -94,7 +94,7 @@ class Team(CommonModelNameNotUnique, ResourceMixin):
deprecated_users = models.ManyToManyField(
'auth.User',
blank=True,
related_name='teams',
related_name='deprecated_teams',
)
organization = models.ForeignKey(
'Organization',

View File

@ -26,6 +26,10 @@ from awx.main.models.mixins import ResourceMixin
from awx.main.utils import update_scm_url
from awx.main.fields import ImplicitRoleField
from awx.main.conf import tower_settings
from awx.main.models.rbac import (
ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
ROLE_SINGLETON_SYSTEM_AUDITOR,
)
__all__ = ['Project', 'ProjectUpdate']
@ -219,13 +223,20 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin):
admin_role = ImplicitRoleField(
role_name='Project Administrator',
role_description='May manage this project',
parent_role='organization.admin_role',
parent_role=[
'organization.admin_role',
'teams.member_role',
'singleton:' + ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
],
permissions = {'all': True}
)
auditor_role = ImplicitRoleField(
role_name='Project Auditor',
role_description='May read all settings associated with this project',
parent_role='organization.auditor_role',
parent_role=[
'organization.auditor_role',
'singleton:' + ROLE_SINGLETON_SYSTEM_AUDITOR,
],
permissions = {'read': True}
)
member_role = ImplicitRoleField(

View File

@ -241,3 +241,20 @@ def test_auto_parenting():
assert org2.admin_role.is_ancestor_of(prj1.admin_role)
assert org2.admin_role.is_ancestor_of(prj2.admin_role)
@pytest.mark.django_db
def test_auto_m2m_parenting(team, project, user):
u = user('some-user')
team.member_role.members.add(u)
assert project.accessible_by(u, {'read': True}) is False
project.teams.add(team)
assert project.accessible_by(u, {'read': True})
project.teams.remove(team)
assert project.accessible_by(u, {'read': True}) is False
team.projects.add(project)
assert project.accessible_by(u, {'read': True})
team.projects.remove(project)
assert project.accessible_by(u, {'read': True}) is False

View File

@ -136,7 +136,7 @@ class OrganizationsTest(BaseTest):
# no admin rights? get empty list
with self.current_user(self.other_django_user):
response = self.get(url, expect=200)
self.check_pagination_and_size(response, self.other_django_user.organizations.count(), previous=None, next=None)
self.check_pagination_and_size(response, len(self.organizations), previous=None, next=None)
# not a member of any orgs? get empty list
with self.current_user(self.nobody_django_user):
@ -283,14 +283,14 @@ class OrganizationsTest(BaseTest):
# find projects attached to the first org
projects0_url = orgs['results'][0]['related']['projects']
projects1_url = orgs['results'][1]['related']['projects']
projects2_url = orgs['results'][2]['related']['projects']
# get all the projects on the first org
projects0 = self.get(projects0_url, expect=200, auth=self.get_super_credentials())
a_project = projects0['results'][-1]
# attempt to add the project to the 7th org and see what happens
self.post(projects1_url, a_project, expect=204, auth=self.get_super_credentials())
#self.post(projects1_url, a_project, expect=204, auth=self.get_super_credentials())
self.post(projects1_url, a_project, expect=400, auth=self.get_super_credentials())
projects1 = self.get(projects0_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(projects1['count'], 3)
@ -300,32 +300,19 @@ class OrganizationsTest(BaseTest):
# test that by posting a pk + disassociate: True we can remove a relationship
projects1 = self.get(projects1_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(projects1['count'], 6)
self.assertEquals(projects1['count'], 5)
a_project['disassociate'] = True
self.post(projects1_url, a_project, expect=204, auth=self.get_super_credentials())
self.post(projects1_url, a_project, expect=400, auth=self.get_super_credentials())
projects1 = self.get(projects1_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(projects1['count'], 5)
a_project = projects1['results'][-1]
a_project['disassociate'] = 1
projects1 = self.get(projects1_url, expect=200, auth=self.get_super_credentials())
self.post(projects1_url, a_project, expect=204, auth=self.get_normal_credentials())
self.post(projects1_url, a_project, expect=400, auth=self.get_normal_credentials())
projects1 = self.get(projects1_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(projects1['count'], 4)
self.assertEquals(projects1['count'], 5)
new_project_a = self.make_projects(self.normal_django_user, 1)[0]
new_project_b = self.make_projects(self.other_django_user, 1)[0]
# admin of org can add projects that he can read
self.post(projects1_url, dict(id=new_project_a.pk), expect=204, auth=self.get_normal_credentials())
# but not those he cannot
self.post(projects1_url, dict(id=new_project_b.pk), expect=403, auth=self.get_normal_credentials())
# and can't post a project he can read to an org he cannot
# self.post(projects2_url, dict(id=new_project_a.pk), expect=403, auth=self.get_normal_credentials())
# and can't do post a project he can read to an organization he cannot
self.post(projects2_url, dict(id=new_project_a.pk), expect=403, auth=self.get_normal_credentials())
def test_post_item_subobjects_users(self):
@ -342,7 +329,7 @@ class OrganizationsTest(BaseTest):
# post a completely new user to verify we can add users to the subcollection directly
new_user = dict(username='NewUser9000', password='NewPassword9000')
which_org = self.normal_django_user.admin_of_organizations.all()[0]
which_org = Organization.accessible_objects(self.normal_django_user, {'read': True, 'write': True})[0]
url = reverse('api:organization_users_list', args=(which_org.pk,))
self.post(url, new_user, expect=201, auth=self.get_normal_credentials())

View File

@ -59,8 +59,8 @@ class ProjectsTest(BaseTransactionTest):
self.organizations[1].projects.add(project)
for project in self.projects[9:10]:
self.organizations[2].projects.add(project)
self.organizations[0].projects.add(self.projects[-1])
self.organizations[9].projects.add(self.projects[-2])
#self.organizations[0].projects.add(self.projects[-1])
#self.organizations[9].projects.add(self.projects[-2])
# get the URL for various organization records
self.a_detail_url = "%s%s" % (self.collection(), self.organizations[0].pk)
@ -89,7 +89,9 @@ class ProjectsTest(BaseTransactionTest):
)
# create some teams in the first org
self.team1.projects.add(self.projects[0])
#self.team1.projects.add(self.projects[0])
self.projects[0].teams.add(self.team1)
#self.team1.projects.add(self.projects[0])
self.team2.projects.add(self.projects[1])
self.team2.projects.add(self.projects[2])
self.team2.projects.add(self.projects[3])
@ -215,7 +217,7 @@ class ProjectsTest(BaseTransactionTest):
self.assertEquals(results['count'], 10)
# org admin
results = self.get(projects, expect=200, auth=self.get_normal_credentials())
self.assertEquals(results['count'], 9)
self.assertEquals(results['count'], 6)
# user on a team
results = self.get(projects, expect=200, auth=self.get_other_credentials())
self.assertEquals(results['count'], 5)
@ -296,31 +298,6 @@ class ProjectsTest(BaseTransactionTest):
got = self.get(proj_playbooks, expect=200, auth=self.get_super_credentials())
self.assertEqual(got, self.projects[2].playbooks)
# can list member organizations for projects
proj_orgs = reverse('api:project_organizations_list', args=(self.projects[0].pk,))
# only usable as superuser
got = self.get(proj_orgs, expect=200, auth=self.get_normal_credentials())
got = self.get(proj_orgs, expect=200, auth=self.get_super_credentials())
self.get(proj_orgs, expect=403, auth=self.get_other_credentials())
self.assertEquals(got['count'], 1)
self.assertEquals(got['results'][0]['url'], reverse('api:organization_detail', args=(self.organizations[0].pk,)))
# post to create new org associated with this project.
self.post(proj_orgs, data={'name': 'New Org'}, expect=201, auth=self.get_super_credentials())
got = self.get(proj_orgs, expect=200, auth=self.get_super_credentials())
self.assertEquals(got['count'], 2)
# Verify that creatorship doesn't imply access if access is removed
a_new_proj = self.make_project(created_by=self.other_django_user, playbook_content=TEST_PLAYBOOK)
self.organizations[0].admin_role.members.add(self.other_django_user)
self.organizations[0].projects.add(a_new_proj)
proj_detail = reverse('api:project_detail', args=(a_new_proj.pk,))
self.patch(proj_detail, data=dict(description="test"), expect=200, auth=self.get_other_credentials())
self.organizations[0].deprecated_admins.remove(self.other_django_user)
self.patch(proj_detail, data=dict(description="test_now"), expect=403, auth=self.get_other_credentials())
self.delete(proj_detail, expect=403, auth=self.get_other_credentials())
a_new_proj.delete()
# =====================================================================
# TEAMS
@ -421,7 +398,7 @@ class ProjectsTest(BaseTransactionTest):
team = Team.objects.filter( organization__pk=self.organizations[1].pk)[0]
team_users = reverse('api:team_users_list', args=(team.pk,))
for x in team.deprecated_users.all():
for x in team.member_role.members.all():
team.member_role.members.remove(x)
team.save()
@ -446,7 +423,7 @@ class ProjectsTest(BaseTransactionTest):
self.post(team_users, data=dict(username='attempted_superuser_create', password='thepassword',
is_superuser=True), expect=201, auth=self.get_super_credentials())
self.assertEqual(Team.objects.get(pk=team.pk).member_role.members.count(), 5)
self.assertEqual(Team.objects.get(pk=team.pk).member_role.members.count(), all_users['count'] + 1)
# can remove users from teams
for x in all_users['results']:
@ -454,7 +431,7 @@ class ProjectsTest(BaseTransactionTest):
self.post(team_users, data=y, expect=403, auth=self.get_nobody_credentials())
self.post(team_users, data=y, expect=204, auth=self.get_normal_credentials())
self.assertEquals(Team.objects.get(pk=team.pk).deprecated_users.count(), 1) # Leaving just the super user we created
self.assertEquals(Team.objects.get(pk=team.pk).member_role.members.count(), 1) # Leaving just the super user we created
# =====================================================================
# USER TEAMS
@ -465,9 +442,12 @@ class ProjectsTest(BaseTransactionTest):
self.get(url, expect=401)
self.get(url, expect=401, auth=self.get_invalid_credentials())
self.get(url, expect=403, auth=self.get_nobody_credentials())
other.organizations.add(Organization.objects.get(pk=self.organizations[1].pk))
self.organizations[1].member_role.members.add(other)
# Normal user can only see some teams that other user is a part of,
# since normal user is not an admin of that organization.
my_teams1 = self.get(url, expect=200, auth=self.get_normal_credentials())
my_teams2 = self.get(url, expect=200, auth=self.get_other_credentials())
my_teams1 = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(my_teams1['count'], 1)
# Other user should be able to see all his own teams.
@ -622,7 +602,7 @@ class ProjectsTest(BaseTransactionTest):
# Test post as organization admin where team is part of org, but user
# creating credential is not a member of the team. UI may pass user
# as an empty string instead of None.
normal_org = self.normal_django_user.admin_of_organizations.all()[0]
normal_org = self.organizations[1] # normal user is an admin of this
org_team = normal_org.teams.create(name='new empty team')
with self.current_user(self.normal_django_user):
data = {
@ -1262,7 +1242,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
else:
self.check_project_update(project, should_fail=should_still_fail)
# Test that we can delete project updates.
for pu in project.project_updates:
for pu in project.project_updates.all():
pu_url = reverse('api:project_update_detail', args=(pu.pk,))
with self.current_user(self.super_django_user):
self.delete(pu_url, expect=204)

View File

@ -87,11 +87,12 @@ class InventoryScriptTest(BaseScriptTest):
host = inventory.hosts.create(name='host-%02d-%02d.example.com' % (n, x),
inventory=inventory,
variables=variables)
if x in (3, 7):
host.delete()
continue
#if x in (3, 7):
# host.delete()
# continue
hosts.append(host)
# add localhost just to make sure it's thrown into all (Ansible github bug)
local = inventory.hosts.create(name='localhost', inventory=inventory, variables={})
hosts.append(local)
@ -106,9 +107,9 @@ class InventoryScriptTest(BaseScriptTest):
group = inventory.groups.create(name='group-%d' % x,
inventory=inventory,
variables=variables)
if x == 2:
group.delete()
continue
#if x == 2:
# #group.delete()
# #continue
groups.append(group)
group.hosts.add(hosts[x])
group.hosts.add(hosts[x + 5])
@ -118,6 +119,13 @@ class InventoryScriptTest(BaseScriptTest):
group.hosts.add(local)
self.groups.extend(groups)
hosts[3].delete()
hosts[7].delete()
groups[2].delete()
def tearDown(self):
super(InventoryScriptTest, self).tearDown()
self.stop_redis()
@ -162,16 +170,16 @@ class InventoryScriptTest(BaseScriptTest):
# variable data or parent/child relationships.
for k,v in data.items():
if k != 'all':
self.assertTrue(isinstance(v, dict))
self.assertTrue(isinstance(v['children'], (list,tuple)))
self.assertTrue(isinstance(v['hosts'], (list,tuple)))
self.assertTrue(isinstance(v['vars'], (dict)))
assert isinstance(v, dict)
assert isinstance(v['children'], (list,tuple))
assert isinstance(v['hosts'], (list,tuple))
assert isinstance(v['vars'], (dict))
group = inventory.groups.get(name=k)
hosts = group.hosts
hostnames = hosts.values_list('name', flat=True)
self.assertEqual(set(v['hosts']), set(hostnames))
else:
self.assertTrue(v['hosts'] == ['localhost'])
assert v['hosts'] == ['host-00-02.example.com', 'localhost']
# Command line argument for inventory ID should take precedence over
# environment variable.
@ -195,7 +203,7 @@ class InventoryScriptTest(BaseScriptTest):
# Groups for this inventory should have hosts, variable data, and one
# parent/child relationship.
for k,v in data.items():
self.assertTrue(isinstance(v, dict))
assert isinstance(v, dict)
if k == 'all':
self.assertEqual(v.get('vars', {}), inventory.variables_dict)
continue
@ -210,7 +218,7 @@ class InventoryScriptTest(BaseScriptTest):
childnames = children.values_list('name', flat=True)
self.assertEqual(set(v.get('children', [])), set(childnames))
else:
self.assertTrue(len(v['children']) == 0)
assert len(v['children']) == 0
def test_list_with_hostvars_inline(self):
inventory = self.inventories[1]
@ -227,7 +235,7 @@ class InventoryScriptTest(BaseScriptTest):
# Groups for this inventory should have hosts, variable data, and one
# parent/child relationship.
for k,v in data.items():
self.assertTrue(isinstance(v, dict))
assert isinstance(v, dict)
if k == 'all':
self.assertEqual(v.get('vars', {}), inventory.variables_dict)
continue
@ -237,18 +245,18 @@ class InventoryScriptTest(BaseScriptTest):
hosts = group.hosts
hostnames = hosts.values_list('name', flat=True)
all_hostnames.update(hostnames)
self.assertEqual(set(v.get('hosts', [])), set(hostnames))
assert set(v.get('hosts', [])) == set(hostnames)
if group.variables:
self.assertEqual(v.get('vars', {}), group.variables_dict)
assert v.get('vars', {}) == group.variables_dict
if k == 'group-3':
children = group.children
childnames = children.values_list('name', flat=True)
self.assertEqual(set(v.get('children', [])), set(childnames))
assert set(v.get('children', [])) == set(childnames)
else:
self.assertTrue(len(v['children']) == 0)
assert len(v['children']) == 0
# Check hostvars in ['_meta']['hostvars'] dict.
for hostname in all_hostnames:
self.assertTrue(hostname in data['_meta']['hostvars'])
assert hostname in data['_meta']['hostvars']
host = inventory.hosts.get(name=hostname)
self.assertEqual(data['_meta']['hostvars'][hostname],
host.variables_dict)
@ -258,12 +266,12 @@ class InventoryScriptTest(BaseScriptTest):
inventory=inventory.pk)
self.assertEqual(rc, 0, stderr)
data = json.loads(stdout)
self.assertTrue('_meta' in data)
assert '_meta' in data
def test_valid_host(self):
# Host without variable data.
inventory = self.inventories[0]
host = inventory.hosts[2]
host = inventory.hosts.all()[2]
os.environ['INVENTORY_ID'] = str(inventory.pk)
rc, stdout, stderr = self.run_inventory_script(host=host.name)
self.assertEqual(rc, 0, stderr)
@ -271,7 +279,7 @@ class InventoryScriptTest(BaseScriptTest):
self.assertEqual(data, {})
# Host with variable data.
inventory = self.inventories[1]
host = inventory.hosts[4]
host = inventory.hosts.all()[4]
os.environ['INVENTORY_ID'] = str(inventory.pk)
rc, stdout, stderr = self.run_inventory_script(host=host.name)
self.assertEqual(rc, 0, stderr)
@ -348,7 +356,7 @@ class InventoryScriptTest(BaseScriptTest):
groupnames = list(groups.values_list('name', flat=True)) + ['all']
self.assertEqual(set(data.keys()), set(groupnames))
for k,v in data.items():
self.assertTrue(isinstance(v, dict))
assert isinstance(v, dict)
if k == 'all':
self.assertEqual(v.get('vars', {}), inventory.variables_dict)
continue
@ -364,7 +372,7 @@ class InventoryScriptTest(BaseScriptTest):
childnames = children.values_list('name', flat=True)
self.assertEqual(set(v.get('children', [])), set(childnames))
else:
self.assertTrue(len(v['children']) == 0)
assert len(v['children']) == 0
# Load inventory list with all hosts.
rc, stdout, stderr = self.run_inventory_script(list=True, all=True)
self.assertEqual(rc, 0, stderr)
@ -373,7 +381,7 @@ class InventoryScriptTest(BaseScriptTest):
groupnames = list(groups.values_list('name', flat=True)) + ['all']
self.assertEqual(set(data.keys()), set(groupnames))
for k,v in data.items():
self.assertTrue(isinstance(v, dict))
assert isinstance(v, dict)
if k == 'all':
self.assertEqual(v.get('vars', {}), inventory.variables_dict)
continue
@ -381,7 +389,7 @@ class InventoryScriptTest(BaseScriptTest):
hosts = group.hosts
hostnames = hosts.values_list('name', flat=True)
self.assertEqual(set(v.get('hosts', [])), set(hostnames))
self.assertTrue(hostnames)
assert hostnames
if group.variables:
self.assertEqual(v.get('vars', {}), group.variables_dict)
if k == 'group-3':
@ -389,4 +397,4 @@ class InventoryScriptTest(BaseScriptTest):
childnames = children.values_list('name', flat=True)
self.assertEqual(set(v.get('children', [])), set(childnames))
else:
self.assertTrue(len(v['children']) == 0)
assert len(v['children']) == 0

View File

@ -790,8 +790,8 @@ class UsersTest(BaseTest):
self.check_get_list(url, self.super_django_user, qs)
# Filter by related organizations admins username.
url = '%s?organizations__deprecated_admins__username__startswith=norm' % base_url
qs = base_qs.filter(organizations__deprecated_admins__username__startswith='norm')
url = '%s?organizationsadmin_role__members__username__startswith=norm' % base_url
qs = base_qs.filter(organizationsadmin_role__members__username__startswith='norm')
self.assertTrue(qs.count())
self.check_get_list(url, self.super_django_user, qs)
@ -839,11 +839,11 @@ class UsersTest(BaseTest):
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
# Filter by invalid field across lookups.
url = '%s?organizations__deprecated_users__teams__laser=green' % base_url
url = '%s?organizations__member_role.members__teams__laser=green' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
# Filter by invalid relation within lookups.
url = '%s?organizations__deprecated_users__llamas__name=freddie' % base_url
url = '%s?organizations__member_role.members__llamas__name=freddie' % base_url
self.check_get_list(url, self.super_django_user, base_qs, expect=400)
# Filter by invalid query string field names.
@ -1020,13 +1020,13 @@ class LdapTest(BaseTest):
for org_name, org_result in settings.AUTH_LDAP_ORGANIZATION_MAP_RESULT.items():
org = Organization.objects.get(name=org_name)
if org_result.get('admins', False):
self.assertTrue(user in org.deprecated_admins.all())
self.assertTrue(user in org.admin_role.members.all())
else:
self.assertFalse(user in org.deprecated_admins.all())
self.assertFalse(user in org.admin_role.members.all())
if org_result.get('users', False):
self.assertTrue(user in org.deprecated_users.all())
self.assertTrue(user in org.member_role.members.all())
else:
self.assertFalse(user in org.deprecated_users.all())
self.assertFalse(user in org.member_role.members.all())
# Try again with different test mapping.
self.use_test_setting('ORGANIZATION_MAP', {},
from_name='ORGANIZATION_MAP_2')
@ -1038,13 +1038,13 @@ class LdapTest(BaseTest):
for org_name, org_result in settings.AUTH_LDAP_ORGANIZATION_MAP_RESULT.items():
org = Organization.objects.get(name=org_name)
if org_result.get('admins', False):
self.assertTrue(user in org.deprecated_admins.all())
self.assertTrue(user in org.admin_role.members.all())
else:
self.assertFalse(user in org.deprecated_admins.all())
self.assertFalse(user in org.admin_role.members.all())
if org_result.get('users', False):
self.assertTrue(user in org.deprecated_users.all())
self.assertTrue(user in org.member_role.members.all())
else:
self.assertFalse(user in org.deprecated_users.all())
self.assertFalse(user in org.member_role.members.all())
def test_ldap_team_mapping(self):
for name in ('USER_SEARCH', 'ALWAYS_UPDATE_USER', 'USER_ATTR_MAP',
@ -1062,9 +1062,9 @@ class LdapTest(BaseTest):
for team_name, team_result in settings.AUTH_LDAP_TEAM_MAP_RESULT.items():
team = Team.objects.get(name=team_name)
if team_result.get('users', False):
self.assertTrue(user in team.deprecated_users.all())
self.assertTrue(user in team.member_role.members.all())
else:
self.assertFalse(user in team.deprecated_users.all())
self.assertFalse(user in team.member_role.members.all())
# Try again with different test mapping.
self.use_test_setting('TEAM_MAP', {}, from_name='TEAM_MAP_2')
self.use_test_setting('TEAM_MAP_RESULT', {},
@ -1075,9 +1075,9 @@ class LdapTest(BaseTest):
for team_name, team_result in settings.AUTH_LDAP_TEAM_MAP_RESULT.items():
team = Team.objects.get(name=team_name)
if team_result.get('users', False):
self.assertTrue(user in team.deprecated_users.all())
self.assertTrue(user in team.member_role.members.all())
else:
self.assertFalse(user in team.deprecated_users.all())
self.assertFalse(user in team.member_role.members.all())
def test_prevent_changing_ldap_user_fields(self):
for name in ('USER_SEARCH', 'ALWAYS_UPDATE_USER', 'USER_ATTR_MAP',