mirror of
https://github.com/ansible/awx.git
synced 2026-02-01 01:28:09 -03:30
Split out RBAC and can_user_* methods from models into access.py. Moved list/item permissions checks from the base views into RBAC. Added serializers/views/tests for jobs REST API.
This commit is contained in:
@@ -81,7 +81,9 @@ class EditHelper(object):
|
||||
@classmethod
|
||||
def illegal_changes(cls, request, obj, model_class):
|
||||
''' have any illegal changes been made (for a PUT request)? '''
|
||||
can_admin = model_class.can_user_administrate(request.user, obj, request.DATA)
|
||||
from lib.main.access import *
|
||||
#can_admin = model_class.can_user_administrate(request.user, obj, request.DATA)
|
||||
can_admin = check_user_access(request.user, User, 'change', obj, request.DATA)
|
||||
if (not can_admin) or (can_admin == 'partial'):
|
||||
check_fields = model_class.admin_only_edit_fields
|
||||
changed = cls.fields_changed(check_fields, obj, request.DATA)
|
||||
@@ -107,51 +109,6 @@ class UserHelper(object):
|
||||
# fields that the user themselves cannot edit, but are not actually read only
|
||||
admin_only_edit_fields = ('last_name', 'first_name', 'username', 'is_active', 'is_superuser')
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
''' a user can be administrated if they are themselves, or by org admins or superusers '''
|
||||
if user == obj:
|
||||
return 'partial'
|
||||
if user.is_superuser:
|
||||
return True
|
||||
matching_orgs = obj.organizations.filter(admins__in = [user]).count()
|
||||
return matching_orgs
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
''' a user can be read if they are on the same team or can be administrated '''
|
||||
matching_teams = user.teams.filter(users__in = [ user ]).count()
|
||||
return matching_teams or cls.can_user_administrate(user, obj, None)
|
||||
|
||||
@classmethod
|
||||
def can_user_delete(cls, user, obj):
|
||||
if user.is_superuser:
|
||||
return True
|
||||
matching_orgs = obj.organizations.filter(admins__in = [user]).count()
|
||||
return matching_orgs
|
||||
|
||||
@classmethod
|
||||
def can_user_add(cls, user, data):
|
||||
# TODO: reuse. make helper functions like "is user an org admin"
|
||||
# apply throughout permissions code
|
||||
if user.is_superuser:
|
||||
return True
|
||||
return user.admin_of_organizations.count() > 0
|
||||
|
||||
@classmethod
|
||||
def can_user_attach(cls, user, obj, sub_obj, relationship_type, data):
|
||||
if type(sub_obj) != User:
|
||||
if not sub_obj.can_user_read(user, sub_obj):
|
||||
return False
|
||||
rc = cls.can_user_administrate(user, obj, None)
|
||||
if not rc:
|
||||
return False
|
||||
return sub_obj.__class__.can_user_read(user, sub_obj)
|
||||
|
||||
@classmethod
|
||||
def can_user_unattach(cls, user, obj, sub_obj, relationship_type):
|
||||
return cls.can_user_administrate(user, obj, None)
|
||||
|
||||
class PrimordialModel(models.Model):
|
||||
'''
|
||||
common model for all object types that have these standard fields
|
||||
@@ -165,6 +122,8 @@ class PrimordialModel(models.Model):
|
||||
description = models.TextField(blank=True, default='')
|
||||
created_by = models.ForeignKey('auth.User', on_delete=SET_NULL, null=True, related_name='%s(class)s_created', editable=False) # not blank=False on purpose for admin!
|
||||
creation_date = models.DateField(auto_now_add=True)
|
||||
#created = models.DateTimeField(auto_now_add=True)
|
||||
#modified = models.DateTimeField(auto_now=True)
|
||||
tags = models.ManyToManyField('Tag', related_name='%(class)s_by_tag', blank=True)
|
||||
audit_trail = models.ManyToManyField('AuditTrail', related_name='%(class)s_by_audit_trail', blank=True)
|
||||
active = models.BooleanField(default=True)
|
||||
@@ -172,45 +131,6 @@ class PrimordialModel(models.Model):
|
||||
def __unicode__(self):
|
||||
return unicode("%s-%s"% (self.name, self.id))
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
# FIXME: do we want a seperate method to override put? This is kind of general purpose
|
||||
raise Exception("can_user_administrate needs to be implemented in model subclass")
|
||||
|
||||
@classmethod
|
||||
def can_user_delete(cls, user, obj):
|
||||
raise Exception("can_user_delete needs to be implemented in model subclass")
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
raise Exception("can_user_read needs to be implemented in model subclass")
|
||||
|
||||
@classmethod
|
||||
def can_user_add(cls, user, data):
|
||||
return user.is_superuser
|
||||
|
||||
@classmethod
|
||||
def can_user_attach(cls, user, obj, sub_obj, relationship_type, data):
|
||||
''' whether you can add sub_obj to obj using the relationship type in a subobject view '''
|
||||
if type(sub_obj) != User:
|
||||
if not sub_obj.can_user_read(user, sub_obj):
|
||||
return False
|
||||
rc = cls.can_user_administrate(user, obj, None)
|
||||
if not rc:
|
||||
return False
|
||||
|
||||
# in order to attach something you also be able to read what you are attaching
|
||||
if type(sub_obj) == User:
|
||||
# we already check that the user is an admin or org admin up in base_views.py
|
||||
# because the user doesn't have the attributes on it directly to tie it to the org
|
||||
return True
|
||||
else:
|
||||
return sub_obj.__class__.can_user_read(user, sub_obj)
|
||||
|
||||
@classmethod
|
||||
def can_user_unattach(cls, user, obj, sub_obj, relationship):
|
||||
return cls.can_user_administrate(user, obj, None)
|
||||
|
||||
class CommonModel(PrimordialModel):
|
||||
''' a base model where the name is unique '''
|
||||
|
||||
@@ -243,17 +163,6 @@ class Tag(models.Model):
|
||||
def get_absolute_url(self):
|
||||
return reverse('main:tags_detail', args=(self.pk,))
|
||||
|
||||
@classmethod
|
||||
def can_user_add(cls, user, data):
|
||||
# anybody can make up tags
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
# anybody can read tags, we won't show much detail other than the names
|
||||
return True
|
||||
|
||||
|
||||
class AuditTrail(models.Model):
|
||||
'''
|
||||
changing any object records the change
|
||||
@@ -286,28 +195,6 @@ class Organization(CommonModel):
|
||||
def get_absolute_url(self):
|
||||
return reverse('main:organizations_detail', args=(self.pk,))
|
||||
|
||||
@classmethod
|
||||
def can_user_delete(cls, user, obj):
|
||||
return user in obj.admins.all()
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
# FIXME: super user checks should be higher up so we don't have to repeat them
|
||||
if user.is_superuser:
|
||||
return True
|
||||
if obj.created_by == user:
|
||||
return True
|
||||
rc = user in obj.admins.all()
|
||||
return rc
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
return cls.can_user_administrate(user,obj,None) or user in obj.users.all()
|
||||
|
||||
@classmethod
|
||||
def can_user_delete(cls, user, obj):
|
||||
return cls.can_user_administrate(user, obj, None)
|
||||
|
||||
def __unicode__(self):
|
||||
return self.name
|
||||
|
||||
@@ -326,82 +213,6 @@ class Inventory(CommonModel):
|
||||
def get_absolute_url(self):
|
||||
return reverse('main:inventory_detail', args=(self.pk,))
|
||||
|
||||
@classmethod
|
||||
def _has_permission_types(cls, user, obj, allowed):
|
||||
if user.is_superuser:
|
||||
return True
|
||||
by_org_admin = obj.organization.admins.filter(pk = user.pk).count()
|
||||
by_team_permission = obj.permissions.filter(
|
||||
team__in = user.teams.all(),
|
||||
permission_type__in = allowed
|
||||
).count()
|
||||
by_user_permission = obj.permissions.filter(
|
||||
user = user,
|
||||
permission_type__in = allowed
|
||||
).count()
|
||||
|
||||
result = (by_org_admin + by_team_permission + by_user_permission)
|
||||
return result > 0
|
||||
|
||||
@classmethod
|
||||
def _has_any_inventory_permission_types(cls, user, allowed):
|
||||
'''
|
||||
rather than checking for a permission on a specific inventory, return whether we have
|
||||
permissions on any inventory. This is primarily used to decide if the user can create
|
||||
host or group objects
|
||||
'''
|
||||
|
||||
if user.is_superuser:
|
||||
return True
|
||||
by_org_admin = user.organizations.filter(
|
||||
admins__in = [ user ]
|
||||
).count()
|
||||
by_team_permission = Permission.objects.filter(
|
||||
team__in = user.teams.all(),
|
||||
permission_type__in = allowed
|
||||
).count()
|
||||
by_user_permission = user.permissions.filter(
|
||||
permission_type__in = allowed
|
||||
).count()
|
||||
|
||||
result = (by_org_admin + by_team_permission + by_user_permission)
|
||||
return result > 0
|
||||
|
||||
@classmethod
|
||||
def can_user_add(cls, user, data):
|
||||
if not 'organization' in data:
|
||||
return True
|
||||
if user.is_superuser:
|
||||
return True
|
||||
if not user.is_superuser:
|
||||
org = Organization.objects.get(pk=data['organization'])
|
||||
if user in org.admins.all():
|
||||
return True
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
return cls._has_permission_types(user, obj, PERMISSION_TYPES_ALLOWING_INVENTORY_ADMIN)
|
||||
|
||||
@classmethod
|
||||
def can_user_attach(cls, user, obj, sub_obj, relationship_type, data):
|
||||
''' whether you can add sub_obj to obj using the relationship type in a subobject view '''
|
||||
if not sub_obj.can_user_read(user, sub_obj):
|
||||
return False
|
||||
return cls._has_permission_types(user, obj, PERMISSION_TYPES_ALLOWING_INVENTORY_WRITE)
|
||||
|
||||
@classmethod
|
||||
def can_user_unattach(cls, user, obj, sub_obj, relationship):
|
||||
return cls._has_permission_types(user, obj, PERMISSION_TYPES_ALLOWING_INVENTORY_WRITE)
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
return cls._has_permission_types(user, obj, PERMISSION_TYPES_ALLOWING_INVENTORY_READ)
|
||||
|
||||
@classmethod
|
||||
def can_user_delete(cls, user, obj):
|
||||
return cls._has_permission_types(user, obj, PERMISSION_TYPES_ALLOWING_INVENTORY_ADMIN)
|
||||
|
||||
class Host(CommonModelNameNotUnique):
|
||||
'''
|
||||
A managed node
|
||||
@@ -417,18 +228,6 @@ class Host(CommonModelNameNotUnique):
|
||||
def __unicode__(self):
|
||||
return self.name
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
return Inventory.can_user_read(user, obj.inventory)
|
||||
|
||||
@classmethod
|
||||
def can_user_add(cls, user, data):
|
||||
if not 'inventory' in data:
|
||||
return False
|
||||
inventory = Inventory.objects.get(pk=data['inventory'])
|
||||
rc = Inventory._has_permission_types(user, inventory, PERMISSION_TYPES_ALLOWING_INVENTORY_WRITE)
|
||||
return rc
|
||||
|
||||
def get_absolute_url(self):
|
||||
return reverse('main:hosts_detail', args=(self.pk,))
|
||||
|
||||
@@ -453,23 +252,6 @@ class Group(CommonModelNameNotUnique):
|
||||
def __unicode__(self):
|
||||
return self.name
|
||||
|
||||
@classmethod
|
||||
def can_user_add(cls, user, data):
|
||||
if not 'inventory' in data:
|
||||
return False
|
||||
inventory = Inventory.objects.get(pk=data['inventory'])
|
||||
return Inventory._has_permission_types(user, inventory, PERMISSION_TYPES_ALLOWING_INVENTORY_WRITE)
|
||||
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
# here this controls whether the user can attach subgroups
|
||||
return Inventory._has_permission_types(user, obj.inventory, PERMISSION_TYPES_ALLOWING_INVENTORY_WRITE)
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
return Inventory.can_user_read(user, obj.inventory)
|
||||
|
||||
def get_absolute_url(self):
|
||||
return reverse('main:groups_detail', args=(self.pk,))
|
||||
|
||||
@@ -495,15 +277,6 @@ class VariableData(CommonModelNameNotUnique):
|
||||
def get_absolute_url(self):
|
||||
return reverse('main:variable_detail', args=(self.pk,))
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
''' a user can be read if they are on the same team or can be administrated '''
|
||||
if obj.host is not None:
|
||||
return Inventory.can_user_read(user, obj.host.inventory)
|
||||
if obj.group is not None:
|
||||
return Inventory.can_user_read(user, obj.group.inventory)
|
||||
return False
|
||||
|
||||
class Credential(CommonModelNameNotUnique):
|
||||
'''
|
||||
A credential contains information about how to talk to a remote set of hosts
|
||||
@@ -571,44 +344,6 @@ class Credential(CommonModelNameNotUnique):
|
||||
def needs_sudo_password(self):
|
||||
return self.sudo_password == 'ASK'
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
if user.is_superuser:
|
||||
return True
|
||||
if user == obj.user:
|
||||
return True
|
||||
|
||||
if obj.user:
|
||||
if (obj.user.organizations.filter(admins__in = [user]).count()):
|
||||
return True
|
||||
if obj.team:
|
||||
if user in obj.team.organization.admins.all():
|
||||
return True
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def can_user_delete(cls, user, obj):
|
||||
if obj.user is None and obj.team is None:
|
||||
# unassociated credentials may be marked deleted by anyone
|
||||
return True
|
||||
return cls.can_user_administrate(user,obj,None)
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
''' a user can be read if they are on the same team or can be administrated '''
|
||||
return cls.can_user_administrate(user, obj, None)
|
||||
|
||||
@classmethod
|
||||
def can_user_add(cls, user, data):
|
||||
if user.is_superuser:
|
||||
return True
|
||||
if 'user' in data:
|
||||
user_obj = User.objects.get(pk=data['user'])
|
||||
return UserHelper.can_user_administrate(user, user_obj, data)
|
||||
if 'team' in data:
|
||||
team_obj = Team.objects.get(pk=data['team'])
|
||||
return Team.can_user_administrate(user, team_obj, data)
|
||||
|
||||
def get_absolute_url(self):
|
||||
return reverse('main:credentials_detail', args=(self.pk,))
|
||||
|
||||
@@ -627,37 +362,6 @@ class Team(CommonModel):
|
||||
def get_absolute_url(self):
|
||||
return reverse('main:teams_detail', args=(self.pk,))
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
# FIXME -- audit when this is called explicitly, if any
|
||||
if user.is_superuser:
|
||||
return True
|
||||
if user in obj.organization.admins.all():
|
||||
return True
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
if cls.can_user_administrate(user, obj, None):
|
||||
return True
|
||||
if obj.users.filter(pk__in = [ user.pk ]).count():
|
||||
return True
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def can_user_add(cls, user, data):
|
||||
if user.is_superuser:
|
||||
return True
|
||||
if Organization.objects.filter(admins__in = [user]).count():
|
||||
# team assignment to organizations is handled elsewhere, this just creates
|
||||
# a blank team
|
||||
return True
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def can_user_delete(cls, user, obj):
|
||||
return cls.can_user_administrate(user, obj, None)
|
||||
|
||||
class Project(CommonModel):
|
||||
'''
|
||||
A project represents a playbook git repo that can access a set of inventories
|
||||
@@ -682,30 +386,6 @@ class Project(CommonModel):
|
||||
def get_absolute_url(self):
|
||||
return reverse('main:projects_detail', args=(self.pk,))
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
if user.is_superuser:
|
||||
return True
|
||||
if obj.created_by == user:
|
||||
return True
|
||||
organizations = Organization.objects.filter(admins__in = [ user ], projects__in = [ obj ])
|
||||
for org in organizations:
|
||||
if org in project.organizations():
|
||||
return True
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
if cls.can_user_administrate(user, obj, None):
|
||||
return True
|
||||
# and also if I happen to be on a team inside the project
|
||||
# FIXME: add this too
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def can_user_delete(cls, user, obj):
|
||||
return cls.can_user_administrate(user, obj, None)
|
||||
|
||||
@property
|
||||
def available_playbooks(self):
|
||||
playbooks = []
|
||||
@@ -780,33 +460,6 @@ class Permission(CommonModelNameNotUnique):
|
||||
def get_absolute_url(self):
|
||||
return reverse('main:permissions_detail', args=(self.pk,))
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
if user.is_superuser:
|
||||
return True
|
||||
# a permission can be administrated by a super
|
||||
# or if a user permission, that an admin of a user's organization
|
||||
# or if a team permission, an admin of that team's organization
|
||||
if obj.user and obj.user.organizations.filter(admins__in = [user]).count() > 0:
|
||||
return True
|
||||
if obj.team and obj.team.organization.admins.filter(user=user).count() > 0:
|
||||
return True
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
# a permission can be seen by the assigned user or team
|
||||
# or anyone who can administrate that permission
|
||||
if obj.user and obj.user == user:
|
||||
return True
|
||||
if obj.team and obj.team.users.filter(pk = user.pk).count() > 0:
|
||||
return True
|
||||
return cls.can_user_administrate(user, obj, None)
|
||||
|
||||
@classmethod
|
||||
def can_user_delete(cls, user, obj):
|
||||
return cls.can_user_administrate(user, obj, None)
|
||||
|
||||
# TODO: other job types (later)
|
||||
|
||||
class JobTemplate(CommonModel):
|
||||
@@ -892,85 +545,7 @@ class JobTemplate(CommonModel):
|
||||
return job
|
||||
|
||||
def get_absolute_url(self):
|
||||
return reverse('main:job_templates_detail', args=(self.pk,))
|
||||
|
||||
@classmethod
|
||||
def can_user_read(cls, user, obj):
|
||||
# you can only see the job templates that you have permission to launch.
|
||||
data = dict(
|
||||
inventory = obj.inventory.pk,
|
||||
project = obj.project.pk,
|
||||
job_type = obj.job_type
|
||||
)
|
||||
return cls.can_user_add(user, data)
|
||||
|
||||
@classmethod
|
||||
def can_user_administrate(cls, user, obj, data):
|
||||
'''
|
||||
'''
|
||||
|
||||
@classmethod
|
||||
def can_user_add(cls, user, data):
|
||||
'''
|
||||
a user can create a job template if they are a superuser, an org admin of any org
|
||||
that the project is a member, or if they have user or team based permissions tying
|
||||
the project to the inventory source for the given action.
|
||||
|
||||
users who are able to create deploy jobs can also make check (dry run) jobs
|
||||
'''
|
||||
|
||||
if user.is_superuser:
|
||||
return True
|
||||
if not data or '_method' in data: # FIXME: So the browseable API will work?
|
||||
return True
|
||||
project = Project.objects.get(pk=data['project'])
|
||||
inventory = Inventory.objects.get(pk=data['inventory'])
|
||||
|
||||
admin_of_orgs = project.organizations.filter(admins__in = [ user ])
|
||||
if admin_of_orgs.count() > 0:
|
||||
return True
|
||||
job_type = data['job_type']
|
||||
|
||||
has_launch_permission = False
|
||||
user_permissions = Permission.objects.filter(inventory=inventory, project=project, user=user)
|
||||
for perm in user_permissions:
|
||||
if job_type == PERM_INVENTORY_CHECK:
|
||||
# if you have run permissions, you can also create check jobs
|
||||
has_launch_permission = True
|
||||
elif job_type == PERM_INVENTORY_DEPLOY and perm.permission_type == PERM_INVENTORY_DEPLOY:
|
||||
# you need explicit run permissions to make run jobs
|
||||
has_launch_permission = True
|
||||
team_permissions = Permission.objects.filter(inventory=inventory, project=project, team__users__in = [user])
|
||||
for perm in team_permissions:
|
||||
if job_type == PERM_INVENTORY_CHECK:
|
||||
# if you have run permissions, you can also create check jobs
|
||||
has_launch_permission = True
|
||||
elif job_type == PERM_INVENTORY_DEPLOY and perm.permission_type == PERM_INVENTORY_DEPLOY:
|
||||
# you need explicit run permissions to make run jobs
|
||||
has_launch_permission = True
|
||||
|
||||
if not has_launch_permission:
|
||||
return False
|
||||
|
||||
# make sure user owns the credentials they are using
|
||||
if data.has_key('credential'):
|
||||
has_credential = False
|
||||
credential = Credential.objects.get(pk=data['credential'])
|
||||
if credential.team and credential.team.users.filter(id = user.pk).count():
|
||||
has_credential = True
|
||||
if credential.user and credential.user == user:
|
||||
has_credential = True
|
||||
if not has_credential:
|
||||
return False
|
||||
|
||||
# shouldn't really matter with permissions given, but make sure the user
|
||||
# is also currently on the team in case they were added a per-user permission and then removed
|
||||
# from the project.
|
||||
if project.teams.filter(users__in = [ user ]).count():
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
return reverse('main:job_template_detail', args=(self.pk,))
|
||||
|
||||
class Job(CommonModel):
|
||||
'''
|
||||
@@ -1086,7 +661,7 @@ class Job(CommonModel):
|
||||
)
|
||||
|
||||
def get_absolute_url(self):
|
||||
return reverse('main:jobs_detail', args=(self.pk,))
|
||||
return reverse('main:job_detail', args=(self.pk,))
|
||||
|
||||
@property
|
||||
def celery_task(self):
|
||||
@@ -1248,6 +823,9 @@ class JobEvent(models.Model):
|
||||
on_delete=models.SET_NULL,
|
||||
)
|
||||
|
||||
def get_absolute_url(self):
|
||||
return reverse('main:job_event_detail', args=(self.pk,))
|
||||
|
||||
def __unicode__(self):
|
||||
return u'%s @ %s' % (self.get_event_display(), self.created.isoformat())
|
||||
|
||||
|
||||
Reference in New Issue
Block a user