mirror of
https://github.com/ansible/awx.git
synced 2026-02-14 17:50:02 -03:30
138 lines
4.5 KiB
Python
138 lines
4.5 KiB
Python
# Copyright (c) 2015 Ansible, Inc.
|
|
# All Rights Reserved.
|
|
|
|
# Python
|
|
import re
|
|
|
|
# Python Social Auth
|
|
from social.exceptions import AuthException
|
|
|
|
# Tower
|
|
from awx.api.license import feature_enabled
|
|
|
|
|
|
class AuthNotFound(AuthException):
|
|
|
|
def __init__(self, backend, email_or_uid, *args, **kwargs):
|
|
self.email_or_uid = email_or_uid
|
|
super(AuthNotFound, self).__init__(backend, *args, **kwargs)
|
|
|
|
def __str__(self):
|
|
return 'An account cannot be found for {0}'.format(self.email_or_uid)
|
|
|
|
|
|
class AuthInactive(AuthException):
|
|
|
|
def __str__(self):
|
|
return 'Your account is inactive'
|
|
|
|
|
|
def check_user_found_or_created(backend, details, user=None, *args, **kwargs):
|
|
if not user:
|
|
email_or_uid = details.get('email') or kwargs.get('email') or kwargs.get('uid') or '???'
|
|
raise AuthNotFound(backend, email_or_uid)
|
|
|
|
|
|
def set_is_active_for_new_user(strategy, details, user=None, *args, **kwargs):
|
|
if kwargs.get('is_new', False):
|
|
details['is_active'] = True
|
|
return {'details': details}
|
|
|
|
|
|
def prevent_inactive_login(backend, details, user=None, *args, **kwargs):
|
|
if user and not user.is_active:
|
|
raise AuthInactive(backend)
|
|
|
|
|
|
def _update_m2m_from_expression(user, rel, expr, remove=False):
|
|
'''
|
|
Helper function to update m2m relationship based on user matching one or
|
|
more expressions.
|
|
'''
|
|
should_add = False
|
|
if expr is None:
|
|
return
|
|
elif not expr:
|
|
pass
|
|
elif expr is True:
|
|
should_add = True
|
|
else:
|
|
if isinstance(expr, (basestring, type(re.compile('')))):
|
|
expr = [expr]
|
|
for ex in expr:
|
|
if isinstance(ex, basestring):
|
|
if user.username == ex or user.email == ex:
|
|
should_add = True
|
|
elif isinstance(ex, type(re.compile(''))):
|
|
if ex.match(user.username) or ex.match(user.email):
|
|
should_add = True
|
|
if should_add:
|
|
rel.add(user)
|
|
elif remove:
|
|
rel.remove(user)
|
|
|
|
|
|
def update_user_orgs(backend, details, user=None, *args, **kwargs):
|
|
'''
|
|
Update organization memberships for the given user based on mapping rules
|
|
defined in settings.
|
|
'''
|
|
if not user:
|
|
return
|
|
from awx.main.models import Organization
|
|
multiple_orgs = feature_enabled('multiple_organizations')
|
|
org_map = backend.setting('ORGANIZATION_MAP') or {}
|
|
for org_name, org_opts in org_map.items():
|
|
|
|
# Get or create the org to update. If the license only allows for one
|
|
# org, always use the first active org, unless no org exists.
|
|
if multiple_orgs:
|
|
org = Organization.objects.get_or_create(name=org_name)[0]
|
|
else:
|
|
try:
|
|
org = Organization.objects.order_by('pk')[0]
|
|
except IndexError:
|
|
continue
|
|
|
|
# Update org admins from expression(s).
|
|
remove = bool(org_opts.get('remove', False))
|
|
admins_expr = org_opts.get('admins', None)
|
|
remove_admins = bool(org_opts.get('remove_admins', remove))
|
|
_update_m2m_from_expression(user, org.admin_role.members, admins_expr, remove_admins)
|
|
|
|
# Update org users from expression(s).
|
|
users_expr = org_opts.get('users', None)
|
|
remove_users = bool(org_opts.get('remove_users', remove))
|
|
_update_m2m_from_expression(user, org.member_role.members, users_expr, remove_users)
|
|
|
|
|
|
def update_user_teams(backend, details, user=None, *args, **kwargs):
|
|
'''
|
|
Update team memberships for the given user based on mapping rules defined
|
|
in settings.
|
|
'''
|
|
if not user:
|
|
return
|
|
from awx.main.models import Organization, Team
|
|
multiple_orgs = feature_enabled('multiple_organizations')
|
|
team_map = backend.setting('TEAM_MAP') or {}
|
|
for team_name, team_opts in team_map.items():
|
|
|
|
# Get or create the org to update. If the license only allows for one
|
|
# org, always use the first active org, unless no org exists.
|
|
if multiple_orgs:
|
|
if 'organization' not in team_opts:
|
|
continue
|
|
org = Organization.objects.get_or_create(name=team_opts['organization'])[0]
|
|
else:
|
|
try:
|
|
org = Organization.objects.order_by('pk')[0]
|
|
except IndexError:
|
|
continue
|
|
|
|
# Update team members from expression(s).
|
|
team = Team.objects.get_or_create(name=team_name, organization=org)[0]
|
|
users_expr = team_opts.get('users', None)
|
|
remove = bool(team_opts.get('remove', False))
|
|
_update_m2m_from_expression(user, team.member_role.members, users_expr, remove)
|