mirror of
https://github.com/ansible/awx.git
synced 2026-02-20 04:30:05 -03:30
I had need to perform this query right on a Resource, so I moved it from the mixin to the Resource
187 lines
6.8 KiB
Python
187 lines
6.8 KiB
Python
# Copyright (c) 2016 Ansible, Inc.
|
|
# All Rights Reserved.
|
|
|
|
# Python
|
|
import logging
|
|
|
|
# Django
|
|
from django.db import models
|
|
from django.db.models.aggregates import Max
|
|
from django.core.urlresolvers import reverse
|
|
from django.utils.translation import ugettext_lazy as _
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.contrib.contenttypes.fields import GenericForeignKey
|
|
|
|
# AWX
|
|
from awx.main.models.base import * # noqa
|
|
|
|
__all__ = ['Role', 'RolePermission', 'Resource', 'ROLE_SINGLETON_SYSTEM_ADMINISTRATOR', 'ROLE_SINGLETON_SYSTEM_AUDITOR']
|
|
|
|
logger = logging.getLogger('awx.main.models.rbac')
|
|
|
|
ROLE_SINGLETON_SYSTEM_ADMINISTRATOR='System Administrator'
|
|
ROLE_SINGLETON_SYSTEM_AUDITOR='System Auditor'
|
|
|
|
|
|
class Role(CommonModelNameNotUnique):
|
|
'''
|
|
Role model
|
|
'''
|
|
|
|
class Meta:
|
|
app_label = 'main'
|
|
verbose_name_plural = _('roles')
|
|
db_table = 'main_rbac_roles'
|
|
|
|
singleton_name = models.TextField(null=True, default=None, db_index=True, unique=True)
|
|
parents = models.ManyToManyField('Role', related_name='children')
|
|
ancestors = models.ManyToManyField('Role', related_name='descendents') # auto-generated by `rebuild_role_ancestor_list`
|
|
members = models.ManyToManyField('auth.User', related_name='roles')
|
|
content_type = models.ForeignKey(ContentType, null=True, default=None)
|
|
object_id = models.PositiveIntegerField(null=True, default=None)
|
|
content_object = GenericForeignKey('content_type', 'object_id')
|
|
|
|
def save(self, *args, **kwargs):
|
|
super(Role, self).save(*args, **kwargs)
|
|
self.rebuild_role_ancestor_list()
|
|
|
|
def rebuild_role_ancestor_list(self):
|
|
'''
|
|
Updates our `ancestors` map to accurately reflect all of the ancestors for a role
|
|
|
|
You should never need to call this. Signal handlers should be calling
|
|
this method when the role hierachy changes automatically.
|
|
|
|
Note that this method relies on any parents' ancestor list being correct.
|
|
'''
|
|
|
|
actual_ancestors = set(Role.objects.filter(id=self.id).values_list('parents__ancestors__id', flat=True))
|
|
actual_ancestors.add(self.id)
|
|
if None in actual_ancestors:
|
|
actual_ancestors.remove(None)
|
|
stored_ancestors = set(self.ancestors.all().values_list('id', flat=True))
|
|
|
|
# If it differs, update, and then update all of our children
|
|
if actual_ancestors != stored_ancestors:
|
|
for id in actual_ancestors - stored_ancestors:
|
|
self.ancestors.add(Role.objects.get(id=id))
|
|
for id in stored_ancestors - actual_ancestors:
|
|
self.ancestors.remove(Role.objects.get(id=id))
|
|
|
|
for child in self.children.all():
|
|
child.rebuild_role_ancestor_list()
|
|
|
|
def grant(self, resource, permissions):
|
|
# take either the raw Resource or something that includes the ResourceMixin
|
|
resource = resource if type(resource) is Resource else resource.resource
|
|
|
|
if 'all' in permissions and permissions['all']:
|
|
del permissions['all']
|
|
permissions['create'] = True
|
|
permissions['read'] = True
|
|
permissions['write'] = True
|
|
permissions['update'] = True
|
|
permissions['delete'] = True
|
|
permissions['scm_update'] = True
|
|
permissions['use'] = True
|
|
permissions['execute'] = True
|
|
|
|
permission = RolePermission(role=self, resource=resource)
|
|
for k in permissions:
|
|
setattr(permission, k, int(permissions[k]))
|
|
permission.save()
|
|
|
|
@staticmethod
|
|
def singleton(name):
|
|
try:
|
|
return Role.objects.get(singleton_name=name)
|
|
except Role.DoesNotExist:
|
|
ret = Role(singleton_name=name, name=name)
|
|
ret.save()
|
|
return ret
|
|
|
|
def is_ancestor_of(self, role):
|
|
return role.ancestors.filter(id=self.id).exists()
|
|
|
|
|
|
class Resource(CommonModelNameNotUnique):
|
|
'''
|
|
Role model
|
|
'''
|
|
|
|
class Meta:
|
|
app_label = 'main'
|
|
verbose_name_plural = _('resources')
|
|
db_table = 'main_rbac_resources'
|
|
|
|
content_type = models.ForeignKey(ContentType, null=True, default=None)
|
|
object_id = models.PositiveIntegerField(null=True, default=None)
|
|
content_object = GenericForeignKey('content_type', 'object_id')
|
|
|
|
def get_permissions(self, user):
|
|
'''
|
|
Returns a dict (or None) of the permissions a user has for a given
|
|
resource.
|
|
|
|
Note: Each field in the dict is the `or` of all respective permissions
|
|
that have been granted to the roles that are applicable for the given
|
|
user.
|
|
|
|
In example, if a user has been granted read access through a permission
|
|
on one role and write access through a permission on a separate role,
|
|
the returned dict will denote that the user has both read and write
|
|
access.
|
|
'''
|
|
|
|
qs = user.__class__.objects.filter(id=user.id, roles__descendents__permissions__resource=self)
|
|
|
|
qs = qs.annotate(max_create = Max('roles__descendents__permissions__create'))
|
|
qs = qs.annotate(max_read = Max('roles__descendents__permissions__read'))
|
|
qs = qs.annotate(max_write = Max('roles__descendents__permissions__write'))
|
|
qs = qs.annotate(max_update = Max('roles__descendents__permissions__update'))
|
|
qs = qs.annotate(max_delete = Max('roles__descendents__permissions__delete'))
|
|
qs = qs.annotate(max_scm_update = Max('roles__descendents__permissions__scm_update'))
|
|
qs = qs.annotate(max_execute = Max('roles__descendents__permissions__execute'))
|
|
qs = qs.annotate(max_use = Max('roles__descendents__permissions__use'))
|
|
|
|
qs = qs.values('max_create', 'max_read', 'max_write', 'max_update',
|
|
'max_delete', 'max_scm_update', 'max_execute', 'max_use')
|
|
|
|
res = qs.all()
|
|
if len(res):
|
|
# strip away the 'max_' prefix
|
|
return {k[4:]:v for k,v in res[0].items()}
|
|
return None
|
|
|
|
|
|
class RolePermission(CreatedModifiedModel):
|
|
'''
|
|
Defines the permissions a role has
|
|
'''
|
|
|
|
class Meta:
|
|
app_label = 'main'
|
|
verbose_name_plural = _('permissions')
|
|
db_table = 'main_rbac_permissions'
|
|
|
|
role = models.ForeignKey(
|
|
Role,
|
|
null=False,
|
|
on_delete=models.CASCADE,
|
|
related_name='permissions',
|
|
)
|
|
resource = models.ForeignKey(
|
|
Resource,
|
|
null=False,
|
|
on_delete=models.CASCADE,
|
|
related_name='permissions',
|
|
)
|
|
create = models.IntegerField(default = 0)
|
|
read = models.IntegerField(default = 0)
|
|
write = models.IntegerField(default = 0)
|
|
update = models.IntegerField(default = 0)
|
|
delete = models.IntegerField(default = 0)
|
|
execute = models.IntegerField(default = 0)
|
|
scm_update = models.IntegerField(default = 0)
|
|
use = models.IntegerField(default = 0)
|