Files
awx/awx/sso/social_pipeline.py
John Westcott IV 8fb831d3de SAML enhancements (#13316)
* Moving reconcile_users_org_team_mappings into common library

* Renaming pipeline to social_pipeline

* Breaking out SAML and generic Social Auth

* Optimizing SMAL login process

* Moving extraction of org in teams from backends into sso/common.create_orgs_and_teams

* Altering saml_pipeline from testing

Prefixing all internal functions with _
Modified subfunctions to not return values but instead manipulate multable objects
Modified all functions to not add duplicate orgs to the orgs_to_create list

* Updating the common function to respect a teams organization name

* Added can_create flag to create_org_and_teams

This made testing easier and allows for any adapter with a flag the ability to simply pass it into a function

* Multiple changes to SAML pipeline

Removed orgs_to_create from being passed into user_team functions, common create orgs code will add any team orgs to list of orgs automatically

Passed SAML_AUTO_CREATE_OBJECTS flag into create_org_and_teams

Fix bug where we were looking at values instead of keys

Added loading of all teams if remove flag is set in update_user_teams_by_saml_attr

* Moving common items between SAML and Social into a 'base'

* Updating and adding testing

* Renamed get_or_create_with_default_galaxy_cred to get_or_create_org_...
2023-01-27 11:49:16 -03:00

91 lines
3.0 KiB
Python

# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import re
import logging
from awx.sso.common import get_or_create_org_with_default_galaxy_cred
logger = logging.getLogger('awx.sso.social_pipeline')
def _update_m2m_from_expression(user, related, expr, remove=True):
"""
Helper function to update m2m relationship based on user matching one or
more expressions.
"""
should_add = False
if expr is None:
return
elif not expr:
pass
elif expr is True:
should_add = True
else:
if isinstance(expr, (str, type(re.compile('')))):
expr = [expr]
for ex in expr:
if isinstance(ex, str):
if user.username == ex or user.email == ex:
should_add = True
elif isinstance(ex, type(re.compile(''))):
if ex.match(user.username) or ex.match(user.email):
should_add = True
if should_add:
related.add(user)
elif remove:
related.remove(user)
def update_user_orgs(backend, details, user=None, *args, **kwargs):
"""
Update organization memberships for the given user based on mapping rules
defined in settings.
"""
if not user:
return
org_map = backend.setting('ORGANIZATION_MAP') or {}
for org_name, org_opts in org_map.items():
organization_alias = org_opts.get('organization_alias')
if organization_alias:
organization_name = organization_alias
else:
organization_name = org_name
org = get_or_create_org_with_default_galaxy_cred(name=organization_name)
# Update org admins from expression(s).
remove = bool(org_opts.get('remove', True))
admins_expr = org_opts.get('admins', None)
remove_admins = bool(org_opts.get('remove_admins', remove))
_update_m2m_from_expression(user, org.admin_role.members, admins_expr, remove_admins)
# Update org users from expression(s).
users_expr = org_opts.get('users', None)
remove_users = bool(org_opts.get('remove_users', remove))
_update_m2m_from_expression(user, org.member_role.members, users_expr, remove_users)
def update_user_teams(backend, details, user=None, *args, **kwargs):
"""
Update team memberships for the given user based on mapping rules defined
in settings.
"""
if not user:
return
from awx.main.models import Team
team_map = backend.setting('TEAM_MAP') or {}
for team_name, team_opts in team_map.items():
# Get or create the org to update.
if 'organization' not in team_opts:
continue
org = get_or_create_org_with_default_galaxy_cred(name=team_opts['organization'])
# Update team members from expression(s).
team = Team.objects.get_or_create(name=team_name, organization=org)[0]
users_expr = team_opts.get('users', None)
remove = bool(team_opts.get('remove', True))
_update_m2m_from_expression(user, team.member_role.members, users_expr, remove)