import logging from django.db.models import Q from django.contrib.auth.models import User from lib.main.models import * __all__ = ['get_user_queryset', 'check_user_access'] logger = logging.getLogger('lib.main.access') access_registry = { # : [, ...], # ... } def register_access(model_class, access_class): access_classes = access_registry.setdefault(model_class, []) access_classes.append(access_class) def get_user_queryset(user, model_class): ''' Return a queryset for the given model_class containing only the instances that should be visible to the given user. ''' querysets = [] for access_class in access_registry.get(model_class, []): access_instance = access_class(user) querysets.append(access_instance.get_queryset()) if not querysets: return model_class.objects.none() elif len(querysets) == 1: return querysets[0] else: queryset = model_class.objects.all() for qs in querysets: queryset = queryset.filter(pk__in=qs.values_list('pk', flat=True)) return queryset def check_user_access(user, model_class, action, *args, **kwargs): ''' Return True if user can perform action against model_class with the provided parameters. ''' for access_class in access_registry.get(model_class, []): access_instance = access_class(user) access_method = getattr(access_instance, 'can_%s' % action, None) if not access_method: continue result = access_method(*args, **kwargs) logger.debug('%s.%s %r returned %r', access_instance.__class__.__name__, access_method.__name__, args, result) if result: return result return False class BaseAccess(object): model = None def __init__(self, user): self.user = user def get_queryset(self): if self.user.is_superuser: return self.model.objects.all() else: return self.model.objects.none() def can_read(self, obj): return self.user.is_superuser def can_add(self, data): return self.user.is_superuser def can_change(self, obj, data): return self.user.is_superuser def can_write(self, obj, data): # Alias for change. return self.can_change(obj, data) def can_admin(self, obj, data): # Alias for can_change. Can be overridden if admin vs. user change # permissions need to be different. return self.can_change(obj, data) def can_delete(self, obj): return self.user.is_superuser def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): if skip_sub_obj_read_check: return self.can_change(obj, None) else: return bool(self.can_change(obj, None) and check_user_access(self.user, type(sub_obj), 'read', sub_obj)) def can_unattach(self, obj, sub_obj, relationship): return self.can_change(obj, None) class UserAccess(BaseAccess): model = User def get_queryset(self): # I can see user records when I'm a superuser, I'm that user, I'm # their org admin, or I'm on a team with that user. if self.user.is_superuser: return self.model.objects.all() return self.model.objects.filter(is_active=True).filter( Q(pk=self.user.pk) | Q(organizations__in=self.user.admin_of_organizations.all()) | Q(teams__in=self.request.user.teams.all()) ).distinct() def can_read(self, obj): # A user can be read if they are on the same team or can be changed. matching_teams = self.user.teams.filter(users__in=[self.user]).count() return bool(matching_teams or self.can_change(obj, None)) def can_add(self, data): # TODO: reuse. make helper functions like "is user an org admin" # apply throughout permissions code return bool(self.user.is_superuser or self.user.admin_of_organizations.count()) def can_change(self, obj, data): # A user can be changed if they are themselves, or by org admins or # superusers. if self.user == obj: return 'partial' return bool(self.user.is_superuser or obj.organizations.filter(admins__in=[self.user]).count()) def can_delete(self, obj): return bool(self.user.is_superuser or obj.organizations.filter(admins__in=[self.user]).count()) class TagAccess(BaseAccess): model = Tag def can_read(self, obj): # anybody can read tags, we won't show much detail other than the names return True def can_add(self, data): # anybody can make up tags return True class OrganizationAccess(BaseAccess): model = Organization def can_read(self, obj): return bool(self.can_change(obj, None) or self.user in obj.users.all()) def can_change(self, obj, data): return bool(self.user.is_superuser or obj.created_by == self.user or self.user in obj.admins.all()) def can_delete(self, obj): return self.can_change(obj, None) class InventoryAccess(BaseAccess): model = Inventory def _has_permission_types(self, obj, allowed): if self.user.is_superuser: return True by_org_admin = obj.organization.admins.filter(pk = self.user.pk).count() by_team_permission = obj.permissions.filter( team__in = self.user.teams.all(), permission_type__in = allowed ).count() by_user_permission = obj.permissions.filter( user = self.user, permission_type__in = allowed ).count() result = (by_org_admin + by_team_permission + by_user_permission) return result > 0 def _has_any_inventory_permission_types(self, allowed): ''' rather than checking for a permission on a specific inventory, return whether we have permissions on any inventory. This is primarily used to decide if the user can create host or group objects ''' if self.user.is_superuser: return True by_org_admin = self.user.organizations.filter( admins__in = [ self.user ] ).count() by_team_permission = Permission.objects.filter( team__in = self.user.teams.all(), permission_type__in = allowed ).count() by_user_permission = self.user.permissions.filter( permission_type__in = allowed ).count() result = (by_org_admin + by_team_permission + by_user_permission) return result > 0 def can_read(self, obj): return self._has_permission_types(obj, PERMISSION_TYPES_ALLOWING_INVENTORY_READ) def can_add(self, data): if not 'organization' in data: return True if self.user.is_superuser: return True if not self.user.is_superuser: org = Organization.objects.get(pk=data['organization']) if self.user in org.admins.all(): return True return False def can_change(self, obj, data): return self._has_permission_types(obj, PERMISSION_TYPES_ALLOWING_INVENTORY_WRITE) def can_admin(self, obj, data): return self._has_permission_types(obj, PERMISSION_TYPES_ALLOWING_INVENTORY_ADMIN) def can_delete(self, obj): return self._has_permission_types(obj, PERMISSION_TYPES_ALLOWING_INVENTORY_ADMIN) def can_attach(self, obj, sub_obj, relationship, data, skip_sub_obj_read_check=False): ''' whether you can add sub_obj to obj using the relationship type in a subobject view ''' #if not sub_obj.can_user_read(user, sub_obj): if sub_obj and not skip_sub_obj_read_check: if not check_user_access(self.user, type(sub_obj), 'read', sub_obj): return False return self._has_permission_types(obj, PERMISSION_TYPES_ALLOWING_INVENTORY_WRITE) def can_unattach(self, obj, sub_obj, relationship): return self._has_permission_types(obj, PERMISSION_TYPES_ALLOWING_INVENTORY_WRITE) class HostAccess(BaseAccess): model = Host def can_read(self, obj): return check_user_access(self.user, Inventory, 'read', obj.inventory) def can_add(self, data): if not 'inventory' in data: return False inventory = Inventory.objects.get(pk=data['inventory']) # Checks for admin or change permission on inventory. return check_user_access(self.user, Inventory, 'change', inventory, None) class GroupAccess(BaseAccess): model = Group def can_read(self, obj): return check_user_access(self.user, Inventory, 'read', obj.inventory) def can_add(self, data): if not 'inventory' in data: return False inventory = Inventory.objects.get(pk=data['inventory']) # Checks for admin or change permission on inventory. return check_user_access(self.user, Inventory, 'change', inventory, None) def can_change(self, obj, data): # Checks for admin or change permission on inventory, controls whether # the user can attach subgroups return check_user_access(self.user, Inventory, 'change', obj.inventory, None) class VariableDataAccess(BaseAccess): model = VariableData def can_read(self, obj): if obj.host: inventory = obj.host.inventory elif obj.group: inventory = obj.group.inventory else: return False return check_user_access(self.user, Inventory, 'read', inventory) def can_change(self, obj, data): if obj.host: inventory = obj.host.inventory elif obj.group: inventory = obj.group.inventory else: return False return check_user_access(self.user, Inventory, 'change', inventory) def can_delete(self, obj): return False class CredentialAccess(BaseAccess): model = Credential def can_read(self, obj): return self.can_change(obj, None) def can_add(self, data): if self.user.is_superuser: return True if 'user' in data: user_obj = User.objects.get(pk=data['user']) return check_user_access(self.user, User, 'change', user_obj, None) if 'team' in data: team_obj = Team.objects.get(pk=data['team']) return check_user_access(self.user, Team, 'change', team_obj, None) def can_change(self, obj, data): if self.user.is_superuser: return True if self.user == obj.user: return True if obj.user: if (obj.user.organizations.filter(admins__in = [self.user]).count()): return True if obj.team: if self.user in obj.team.organization.admins.all(): return True return False def can_delete(self, obj): if obj.user is None and obj.team is None: # unassociated credentials may be marked deleted by anyone return True return self.can_change(obj, None) class TeamAccess(BaseAccess): model = Team def can_add(self, data): if self.user.is_superuser: return True if Organization.objects.filter(admins__in = [self.user]).count(): # team assignment to organizations is handled elsewhere, this just creates # a blank team return True return False def can_read(self, obj): if self.can_change(obj, None): return True if obj.users.filter(pk__in = [ self.user.pk ]).count(): return True return False def can_change(self, obj, data): # FIXME -- audit when this is called explicitly, if any if self.user.is_superuser: return True if self.user in obj.organization.admins.all(): return True return False def can_delete(self, obj): return self.can_change(obj, None) class ProjectAccess(BaseAccess): model = Project def can_read(self, obj): if self.can_change(obj, None): return True # and also if I happen to be on a team inside the project # FIXME: add this too return False def can_change(self, obj, data): if self.user.is_superuser: return True if obj.created_by == self.user: return True organizations = Organization.objects.filter(admins__in = [ self.user ], projects__in = [ obj ]) for org in organizations: if org in project.organizations(): return True return False def can_delete(self, obj): return self.can_change(obj, None) class PermissionAccess(BaseAccess): model = Permission def can_read(self, obj): # a permission can be seen by the assigned user or team # or anyone who can administrate that permission if obj.user and obj.user == self.user: return True if obj.team and obj.team.users.filter(pk = self.user.pk).count() > 0: return True return self.can_change(obj, None) def can_change(self, obj, data): if self.user.is_superuser: return True # a permission can be administrated by a super # or if a user permission, that an admin of a user's organization # or if a team permission, an admin of that team's organization if obj.user and obj.user.organizations.filter(admins__in = [self.user]).count() > 0: return True if obj.team and obj.team.organization.admins.filter(user=self.user).count() > 0: return True return False def can_delete(self, obj): return self.can_change(obj, None) class JobTemplateAccess(BaseAccess): model = JobTemplate def get_queryset(self): ''' I can see job templates when I am a superuser, or I am an admin of the project's orgs, or if I'm in a team on the project. This does not mean I would be able to launch a job from the template or edit the template. ''' qs = self.model.objects.all() if self.user.is_superuser: return qs.all() return qs.filter(active=True).filter( Q(project__organizations__admins__in=[self.user]) | Q(project__teams__users__in=[self.user]) ).distinct() def can_read(self, obj): # you can only see the job templates that you have permission to launch. data = dict( inventory = obj.inventory.pk, project = obj.project.pk, job_type = obj.job_type ) return self.can_add(data) def can_add(self, data): ''' a user can create a job template if they are a superuser, an org admin of any org that the project is a member, or if they have user or team based permissions tying the project to the inventory source for the given action. users who are able to create deploy jobs can also make check (dry run) jobs ''' if self.user.is_superuser: return True if not data or '_method' in data: # FIXME: So the browseable API will work? return True project = Project.objects.get(pk=data['project']) inventory = Inventory.objects.get(pk=data['inventory']) admin_of_orgs = project.organizations.filter(admins__in = [ self.user ]) if admin_of_orgs.count() > 0: return True job_type = data['job_type'] has_launch_permission = False user_permissions = Permission.objects.filter(inventory=inventory, project=project, user=self.user) for perm in user_permissions: if job_type == PERM_INVENTORY_CHECK: # if you have run permissions, you can also create check jobs has_launch_permission = True elif job_type == PERM_INVENTORY_DEPLOY and perm.permission_type == PERM_INVENTORY_DEPLOY: # you need explicit run permissions to make run jobs has_launch_permission = True team_permissions = Permission.objects.filter(inventory=inventory, project=project, team__users__in = [self.user]) for perm in team_permissions: if job_type == PERM_INVENTORY_CHECK: # if you have run permissions, you can also create check jobs has_launch_permission = True elif job_type == PERM_INVENTORY_DEPLOY and perm.permission_type == PERM_INVENTORY_DEPLOY: # you need explicit run permissions to make run jobs has_launch_permission = True if not has_launch_permission: return False # make sure user owns the credentials they are using if data.has_key('credential'): has_credential = False credential = Credential.objects.get(pk=data['credential']) if credential.team and credential.team.users.filter(id = self.user.pk).count(): has_credential = True if credential.user and credential.user == self.user: has_credential = True if not has_credential: return False # shouldn't really matter with permissions given, but make sure the user # is also currently on the team in case they were added a per-user permission and then removed # from the project. if project.teams.filter(users__in = [ self.user ]).count(): return False return True def can_change(self, obj, data): ''' ''' return False # FIXME class JobAccess(BaseAccess): model = Job def can_start(self, obj): return False # FIXME def can_cancel(self, obj): return False # FIXME class JobHostSummaryAccess(BaseAccess): model = JobHostSummary class JobEventAccess(BaseAccess): model = JobEvent register_access(User, UserAccess) register_access(Tag, TagAccess) register_access(Organization, OrganizationAccess) register_access(Inventory, InventoryAccess) register_access(Host, HostAccess) register_access(Group, GroupAccess) register_access(VariableData, VariableDataAccess) register_access(Credential, CredentialAccess) register_access(Team, TeamAccess) register_access(Project, ProjectAccess) register_access(Permission, PermissionAccess) register_access(JobTemplate, JobTemplateAccess) register_access(Job, JobAccess) register_access(JobHostSummary, JobHostSummaryAccess) register_access(JobEvent, JobEventAccess)