From 8fb831d3dea753b3561034fbf4e8d05c9a6da0ca Mon Sep 17 00:00:00 2001 From: John Westcott IV <32551173+john-westcott-iv@users.noreply.github.com> Date: Fri, 27 Jan 2023 09:49:16 -0500 Subject: [PATCH] SAML enhancements (#13316) * Moving reconcile_users_org_team_mappings into common library * Renaming pipeline to social_pipeline * Breaking out SAML and generic Social Auth * Optimizing SMAL login process * Moving extraction of org in teams from backends into sso/common.create_orgs_and_teams * Altering saml_pipeline from testing Prefixing all internal functions with _ Modified subfunctions to not return values but instead manipulate multable objects Modified all functions to not add duplicate orgs to the orgs_to_create list * Updating the common function to respect a teams organization name * Added can_create flag to create_org_and_teams This made testing easier and allows for any adapter with a flag the ability to simply pass it into a function * Multiple changes to SAML pipeline Removed orgs_to_create from being passed into user_team functions, common create orgs code will add any team orgs to list of orgs automatically Passed SAML_AUTO_CREATE_OBJECTS flag into create_org_and_teams Fix bug where we were looking at values instead of keys Added loading of all teams if remove flag is set in update_user_teams_by_saml_attr * Moving common items between SAML and Social into a 'base' * Updating and adding testing * Renamed get_or_create_with_default_galaxy_cred to get_or_create_org_... --- awx/settings/defaults.py | 16 +- awx/sso/backends.py | 126 +--- awx/sso/common.py | 171 +++++ awx/sso/{pipeline.py => saml_pipeline.py} | 275 +++----- awx/sso/social_base_pipeline.py | 39 ++ awx/sso/social_pipeline.py | 90 +++ awx/sso/tests/functional/test_common.py | 280 ++++++++ awx/sso/tests/functional/test_pipeline.py | 566 ---------------- .../tests/functional/test_saml_pipeline.py | 639 ++++++++++++++++++ .../functional/test_social_base_pipeline.py | 76 +++ .../tests/functional/test_social_pipeline.py | 113 ++++ awx/sso/tests/unit/test_pipeline.py | 2 - awx/sso/tests/unit/test_pipelines.py | 12 + 13 files changed, 1541 insertions(+), 864 deletions(-) create mode 100644 awx/sso/common.py rename awx/sso/{pipeline.py => saml_pipeline.py} (51%) create mode 100644 awx/sso/social_base_pipeline.py create mode 100644 awx/sso/social_pipeline.py create mode 100644 awx/sso/tests/functional/test_common.py delete mode 100644 awx/sso/tests/functional/test_pipeline.py create mode 100644 awx/sso/tests/functional/test_saml_pipeline.py create mode 100644 awx/sso/tests/functional/test_social_base_pipeline.py create mode 100644 awx/sso/tests/functional/test_social_pipeline.py delete mode 100644 awx/sso/tests/unit/test_pipeline.py create mode 100644 awx/sso/tests/unit/test_pipelines.py diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index e0d95adb5d..4d18540bcd 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -473,21 +473,15 @@ _SOCIAL_AUTH_PIPELINE_BASE = ( 'social_core.pipeline.user.get_username', 'social_core.pipeline.social_auth.associate_by_email', 'social_core.pipeline.user.create_user', - 'awx.sso.pipeline.check_user_found_or_created', + 'awx.sso.social_base_pipeline.check_user_found_or_created', 'social_core.pipeline.social_auth.associate_user', 'social_core.pipeline.social_auth.load_extra_data', - 'awx.sso.pipeline.set_is_active_for_new_user', + 'awx.sso.social_base_pipeline.set_is_active_for_new_user', 'social_core.pipeline.user.user_details', - 'awx.sso.pipeline.prevent_inactive_login', -) -SOCIAL_AUTH_PIPELINE = _SOCIAL_AUTH_PIPELINE_BASE + ('awx.sso.pipeline.update_user_orgs', 'awx.sso.pipeline.update_user_teams') -SOCIAL_AUTH_SAML_PIPELINE = _SOCIAL_AUTH_PIPELINE_BASE + ( - 'awx.sso.pipeline.update_user_orgs_by_saml_attr', - 'awx.sso.pipeline.update_user_teams_by_saml_attr', - 'awx.sso.pipeline.update_user_orgs', - 'awx.sso.pipeline.update_user_teams', - 'awx.sso.pipeline.update_user_flags', + 'awx.sso.social_base_pipeline.prevent_inactive_login', ) +SOCIAL_AUTH_PIPELINE = _SOCIAL_AUTH_PIPELINE_BASE + ('awx.sso.social_pipeline.update_user_orgs', 'awx.sso.social_pipeline.update_user_teams') +SOCIAL_AUTH_SAML_PIPELINE = _SOCIAL_AUTH_PIPELINE_BASE + ('awx.sso.saml_pipeline.populate_user', 'awx.sso.saml_pipeline.update_user_flags') SAML_AUTO_CREATE_OBJECTS = True SOCIAL_AUTH_LOGIN_URL = '/' diff --git a/awx/sso/backends.py b/awx/sso/backends.py index 715cdd3b05..06f2a6c671 100644 --- a/awx/sso/backends.py +++ b/awx/sso/backends.py @@ -11,11 +11,9 @@ import ldap # Django from django.dispatch import receiver from django.contrib.auth.models import User -from django.contrib.contenttypes.models import ContentType from django.conf import settings as django_settings from django.core.signals import setting_changed from django.utils.encoding import force_str -from django.db.utils import IntegrityError # django-auth-ldap from django_auth_ldap.backend import LDAPSettings as BaseLDAPSettings @@ -36,6 +34,7 @@ from social_core.backends.saml import SAMLIdentityProvider as BaseSAMLIdentityPr # Ansible Tower from awx.sso.models import UserEnterpriseAuth +from awx.sso.common import create_org_and_teams, reconcile_users_org_team_mappings logger = logging.getLogger('awx.sso.backends') @@ -365,8 +364,6 @@ def on_populate_user(sender, **kwargs): Handle signal from LDAP backend to populate the user object. Update user organization/team memberships according to their LDAP groups. """ - from awx.main.models import Organization, Team - user = kwargs['user'] ldap_user = kwargs['ldap_user'] backend = ldap_user.backend @@ -390,43 +387,16 @@ def on_populate_user(sender, **kwargs): org_map = getattr(backend.settings, 'ORGANIZATION_MAP', {}) team_map = getattr(backend.settings, 'TEAM_MAP', {}) - - # Move this junk into save of the settings for performance later, there is no need to do that here - # with maybe the exception of someone defining this in settings before the server is started? - # ============================================================================================================== - - # Get all of the IDs and names of orgs in the DB and create any new org defined in LDAP that does not exist in the DB - existing_orgs = {} - for (org_id, org_name) in Organization.objects.all().values_list('id', 'name'): - existing_orgs[org_name] = org_id - - # Create any orgs (if needed) for all entries in the org and team maps - for org_name in set(list(org_map.keys()) + [item.get('organization', None) for item in team_map.values()]): - if org_name and org_name not in existing_orgs: - logger.info("LDAP adapter is creating org {}".format(org_name)) - try: - new_org = Organization.objects.create(name=org_name) - except IntegrityError: - # Another thread must have created this org before we did so now we need to get it - new_org = Organization.objects.get(name=org_name) - # Add the org name to the existing orgs since we created it and we may need it to build the teams below - existing_orgs[org_name] = new_org.id - - # Do the same for teams - existing_team_names = list(Team.objects.all().values_list('name', flat=True)) + orgs_list = list(org_map.keys()) + team_map = {} for team_name, team_opts in team_map.items(): if not team_opts.get('organization', None): # You can't save the LDAP config in the UI w/o an org (or '' or null as the org) so if we somehow got this condition its an error logger.error("Team named {} in LDAP team map settings is invalid due to missing organization".format(team_name)) continue - if team_name not in existing_team_names: - try: - Team.objects.create(name=team_name, organization_id=existing_orgs[team_opts['organization']]) - except IntegrityError: - # If another process got here before us that is ok because we don't need the ID from this team or anything - pass - # End move some day - # ============================================================================================================== + team_map[team_name] = team_opts['organization'] + + create_org_and_teams(orgs_list, team_map, 'LDAP') # Compute in memory what the state is of the different LDAP orgs org_roles_and_ldap_attributes = {'admin_role': 'admins', 'auditor_role': 'auditors', 'member_role': 'users'} @@ -475,87 +445,3 @@ def on_populate_user(sender, **kwargs): profile.save() reconcile_users_org_team_mappings(user, desired_org_states, desired_team_states, 'LDAP') - - -def reconcile_users_org_team_mappings(user, desired_org_states, desired_team_states, source): - # - # Arguments: - # user - a user object - # desired_org_states: { '': { '': or None } } - # desired_team_states: { '': { '': { '': or None } } } - # source - a text label indicating the "authentication adapter" for debug messages - # - # This function will load the users existing roles and then based on the deisred states modify the users roles - # True indicates the user needs to be a member of the role - # False indicates the user should not be a member of the role - # None means this function should not change the users membership of a role - # - from awx.main.models import Organization, Team - - content_types = [] - reconcile_items = [] - if desired_org_states: - content_types.append(ContentType.objects.get_for_model(Organization)) - reconcile_items.append(('organization', desired_org_states)) - if desired_team_states: - content_types.append(ContentType.objects.get_for_model(Team)) - reconcile_items.append(('team', desired_team_states)) - - if not content_types: - # If both desired states were empty we can simply return because there is nothing to reconcile - return - - # users_roles is a flat set of IDs - users_roles = set(user.roles.filter(content_type__in=content_types).values_list('pk', flat=True)) - - for object_type, desired_states in reconcile_items: - roles = [] - # Get a set of named tuples for the org/team name plus all of the roles we got above - if object_type == 'organization': - for sub_dict in desired_states.values(): - for role_name in sub_dict: - if sub_dict[role_name] is None: - continue - if role_name not in roles: - roles.append(role_name) - model_roles = Organization.objects.filter(name__in=desired_states.keys()).values_list('name', *roles, named=True) - else: - team_names = [] - for teams_dict in desired_states.values(): - team_names.extend(teams_dict.keys()) - for sub_dict in teams_dict.values(): - for role_name in sub_dict: - if sub_dict[role_name] is None: - continue - if role_name not in roles: - roles.append(role_name) - model_roles = Team.objects.filter(name__in=team_names).values_list('name', 'organization__name', *roles, named=True) - - for row in model_roles: - for role_name in roles: - if object_type == 'organization': - desired_state = desired_states.get(row.name, {}) - else: - desired_state = desired_states.get(row.organization__name, {}).get(row.name, {}) - - if desired_state.get(role_name, None) is None: - # The mapping was not defined for this [org/team]/role so we can just pass - continue - - # If somehow the auth adapter knows about an items role but that role is not defined in the DB we are going to print a pretty error - # This is your classic safety net that we should never hit; but here you are reading this comment... good luck and Godspeed. - role_id = getattr(row, role_name, None) - if role_id is None: - logger.error("{} adapter wanted to manage role {} of {} {} but that role is not defined".format(source, role_name, object_type, row.name)) - continue - - if desired_state[role_name]: - # The desired state was the user mapped into the object_type, if the user was not mapped in map them in - if role_id not in users_roles: - logger.debug("{} adapter adding user {} to {} {} as {}".format(source, user.username, object_type, row.name, role_name)) - user.roles.add(role_id) - else: - # The desired state was the user was not mapped into the org, if the user has the permission remove it - if role_id in users_roles: - logger.debug("{} adapter removing user {} permission of {} from {} {}".format(source, user.username, role_name, object_type, row.name)) - user.roles.remove(role_id) diff --git a/awx/sso/common.py b/awx/sso/common.py new file mode 100644 index 0000000000..a80b519f13 --- /dev/null +++ b/awx/sso/common.py @@ -0,0 +1,171 @@ +# Copyright (c) 2022 Ansible, Inc. +# All Rights Reserved. + +import logging + +from django.contrib.contenttypes.models import ContentType +from django.db.utils import IntegrityError +from awx.main.models import Organization, Team + +logger = logging.getLogger('awx.sso.common') + + +def get_orgs_by_ids(): + existing_orgs = {} + for (org_id, org_name) in Organization.objects.all().values_list('id', 'name'): + existing_orgs[org_name] = org_id + return existing_orgs + + +def reconcile_users_org_team_mappings(user, desired_org_states, desired_team_states, source): + # + # Arguments: + # user - a user object + # desired_org_states: { '': { '': or None } } + # desired_team_states: { '': { '': { '': or None } } } + # source - a text label indicating the "authentication adapter" for debug messages + # + # This function will load the users existing roles and then based on the desired states modify the users roles + # True indicates the user needs to be a member of the role + # False indicates the user should not be a member of the role + # None means this function should not change the users membership of a role + # + + content_types = [] + reconcile_items = [] + if desired_org_states: + content_types.append(ContentType.objects.get_for_model(Organization)) + reconcile_items.append(('organization', desired_org_states)) + if desired_team_states: + content_types.append(ContentType.objects.get_for_model(Team)) + reconcile_items.append(('team', desired_team_states)) + + if not content_types: + # If both desired states were empty we can simply return because there is nothing to reconcile + return + + # users_roles is a flat set of IDs + users_roles = set(user.roles.filter(content_type__in=content_types).values_list('pk', flat=True)) + + for object_type, desired_states in reconcile_items: + roles = [] + # Get a set of named tuples for the org/team name plus all of the roles we got above + if object_type == 'organization': + for sub_dict in desired_states.values(): + for role_name in sub_dict: + if sub_dict[role_name] is None: + continue + if role_name not in roles: + roles.append(role_name) + model_roles = Organization.objects.filter(name__in=desired_states.keys()).values_list('name', *roles, named=True) + else: + team_names = [] + for teams_dict in desired_states.values(): + team_names.extend(teams_dict.keys()) + for sub_dict in teams_dict.values(): + for role_name in sub_dict: + if sub_dict[role_name] is None: + continue + if role_name not in roles: + roles.append(role_name) + model_roles = Team.objects.filter(name__in=team_names).values_list('name', 'organization__name', *roles, named=True) + + for row in model_roles: + for role_name in roles: + if object_type == 'organization': + desired_state = desired_states.get(row.name, {}) + else: + desired_state = desired_states.get(row.organization__name, {}).get(row.name, {}) + + if desired_state.get(role_name, None) is None: + # The mapping was not defined for this [org/team]/role so we can just pass + continue + + # If somehow the auth adapter knows about an items role but that role is not defined in the DB we are going to print a pretty error + # This is your classic safety net that we should never hit; but here you are reading this comment... good luck and Godspeed. + role_id = getattr(row, role_name, None) + if role_id is None: + logger.error("{} adapter wanted to manage role {} of {} {} but that role is not defined".format(source, role_name, object_type, row.name)) + continue + + if desired_state[role_name]: + # The desired state was the user mapped into the object_type, if the user was not mapped in map them in + if role_id not in users_roles: + logger.debug("{} adapter adding user {} to {} {} as {}".format(source, user.username, object_type, row.name, role_name)) + user.roles.add(role_id) + else: + # The desired state was the user was not mapped into the org, if the user has the permission remove it + if role_id in users_roles: + logger.debug("{} adapter removing user {} permission of {} from {} {}".format(source, user.username, role_name, object_type, row.name)) + user.roles.remove(role_id) + + +def create_org_and_teams(org_list, team_map, adapter, can_create=True): + # + # org_list is a set of organization names + # team_map is a dict of {: } + # + # Move this junk into save of the settings for performance later, there is no need to do that here + # with maybe the exception of someone defining this in settings before the server is started? + # ============================================================================================================== + + if not can_create: + logger.debug(f"Adapter {adapter} is not allowed to create orgs/teams") + return + + # Get all of the IDs and names of orgs in the DB and create any new org defined in LDAP that does not exist in the DB + existing_orgs = get_orgs_by_ids() + + # Parse through orgs and teams provided and create a list of unique items we care about creating + all_orgs = list(set(org_list)) + all_teams = [] + for team_name in team_map: + org_name = team_map[team_name] + if org_name: + if org_name not in all_orgs: + all_orgs.append(org_name) + # We don't have to test if this is in all_teams because team_map is already a hash + all_teams.append(team_name) + else: + # The UI should prevent this condition so this is just a double check to prevent a stack trace.... + # although the rest of the login process might stack later on + logger.error("{} adapter is attempting to create a team {} but it does not have an org".format(adapter, team_name)) + + for org_name in all_orgs: + if org_name and org_name not in existing_orgs: + logger.info("{} adapter is creating org {}".format(adapter, org_name)) + try: + new_org = get_or_create_org_with_default_galaxy_cred(name=org_name) + except IntegrityError: + # Another thread must have created this org before we did so now we need to get it + new_org = get_or_create_org_with_default_galaxy_cred(name=org_name) + # Add the org name to the existing orgs since we created it and we may need it to build the teams below + existing_orgs[org_name] = new_org.id + + # Do the same for teams + existing_team_names = list(Team.objects.all().values_list('name', flat=True)) + for team_name in all_teams: + if team_name not in existing_team_names: + logger.info("{} adapter is creating team {} in org {}".format(adapter, team_name, team_map[team_name])) + try: + Team.objects.create(name=team_name, organization_id=existing_orgs[team_map[team_name]]) + except IntegrityError: + # If another process got here before us that is ok because we don't need the ID from this team or anything + pass + # End move some day + # ============================================================================================================== + + +def get_or_create_org_with_default_galaxy_cred(**kwargs): + from awx.main.models import Organization, Credential + + (org, org_created) = Organization.objects.get_or_create(**kwargs) + if org_created: + logger.debug("Created org {} (id {}) from {}".format(org.name, org.id, kwargs)) + public_galaxy_credential = Credential.objects.filter(managed=True, name='Ansible Galaxy').first() + if public_galaxy_credential is not None: + org.galaxy_credentials.add(public_galaxy_credential) + logger.debug("Added default Ansible Galaxy credential to org") + else: + logger.debug("Could not find default Ansible Galaxy credential to add to org") + return org diff --git a/awx/sso/pipeline.py b/awx/sso/saml_pipeline.py similarity index 51% rename from awx/sso/pipeline.py rename to awx/sso/saml_pipeline.py index 348396b1c9..a0060e13e8 100644 --- a/awx/sso/pipeline.py +++ b/awx/sso/saml_pipeline.py @@ -5,59 +5,43 @@ import re import logging - -# Python Social Auth -from social_core.exceptions import AuthException - # Django -from django.core.exceptions import ObjectDoesNotExist -from django.utils.translation import gettext_lazy as _ -from django.db.models import Q +from django.conf import settings + +from awx.main.models import Team +from awx.sso.common import create_org_and_teams, reconcile_users_org_team_mappings, get_orgs_by_ids + +logger = logging.getLogger('awx.sso.saml_pipeline') -logger = logging.getLogger('awx.sso.pipeline') - - -class AuthNotFound(AuthException): - def __init__(self, backend, email_or_uid, *args, **kwargs): - self.email_or_uid = email_or_uid - super(AuthNotFound, self).__init__(backend, *args, **kwargs) - - def __str__(self): - return _('An account cannot be found for {0}').format(self.email_or_uid) - - -class AuthInactive(AuthException): - def __str__(self): - return _('Your account is inactive') - - -def check_user_found_or_created(backend, details, user=None, *args, **kwargs): +def populate_user(backend, details, user=None, *args, **kwargs): if not user: - email_or_uid = details.get('email') or kwargs.get('email') or kwargs.get('uid') or '???' - raise AuthNotFound(backend, email_or_uid) + return + + # Build the in-memory settings for how this user should be modeled + desired_org_state = {} + desired_team_state = {} + orgs_to_create = [] + teams_to_create = {} + _update_user_orgs_by_saml_attr(backend, desired_org_state, orgs_to_create, **kwargs) + _update_user_teams_by_saml_attr(desired_team_state, teams_to_create, **kwargs) + _update_user_orgs(backend, desired_org_state, orgs_to_create, user) + _update_user_teams(backend, desired_team_state, teams_to_create, user) + + # If the SAML adapter is allowed to create objects, lets do that first + create_org_and_teams(orgs_to_create, teams_to_create, 'SAML', settings.SAML_AUTO_CREATE_OBJECTS) + + # Finally reconcile the user + reconcile_users_org_team_mappings(user, desired_org_state, desired_team_state, 'SAML') -def set_is_active_for_new_user(strategy, details, user=None, *args, **kwargs): - if kwargs.get('is_new', False): - details['is_active'] = True - return {'details': details} - - -def prevent_inactive_login(backend, details, user=None, *args, **kwargs): - if user and not user.is_active: - raise AuthInactive(backend) - - -def _update_m2m_from_expression(user, related, expr, remove=True): +def _update_m2m_from_expression(user, expr, remove=True): """ Helper function to update m2m relationship based on user matching one or more expressions. """ should_add = False - if expr is None: - return - elif not expr: + if expr is None or not expr: pass elif expr is True: should_add = True @@ -72,70 +56,18 @@ def _update_m2m_from_expression(user, related, expr, remove=True): if ex.match(user.username) or ex.match(user.email): should_add = True if should_add: - related.add(user) + return True elif remove: - related.remove(user) + return False + else: + return None -def get_or_create_with_default_galaxy_cred(**kwargs): - from awx.main.models import Organization, Credential - - (org, org_created) = Organization.objects.get_or_create(**kwargs) - if org_created: - logger.debug("Created org {} (id {}) from {}".format(org.name, org.id, kwargs)) - public_galaxy_credential = Credential.objects.filter(managed=True, name='Ansible Galaxy').first() - if public_galaxy_credential is not None: - org.galaxy_credentials.add(public_galaxy_credential) - logger.debug("Added default Ansible Galaxy credential to org") - else: - logger.debug("Could not find default Ansible Galaxy credential to add to org") - return org - - -def _update_org_from_attr(user, related, attr, remove, remove_admins, remove_auditors, backend): - from awx.main.models import Organization - from django.conf import settings - - org_ids = [] - - for org_name in attr: - try: - if settings.SAML_AUTO_CREATE_OBJECTS: - try: - organization_alias = backend.setting('ORGANIZATION_MAP').get(org_name).get('organization_alias') - if organization_alias is not None: - organization_name = organization_alias - else: - organization_name = org_name - except Exception: - organization_name = org_name - org = get_or_create_with_default_galaxy_cred(name=organization_name) - else: - org = Organization.objects.get(name=org_name) - except ObjectDoesNotExist: - continue - - org_ids.append(org.id) - getattr(org, related).members.add(user) - - if remove: - [o.member_role.members.remove(user) for o in Organization.objects.filter(Q(member_role__members=user) & ~Q(id__in=org_ids))] - - if remove_admins: - [o.admin_role.members.remove(user) for o in Organization.objects.filter(Q(admin_role__members=user) & ~Q(id__in=org_ids))] - - if remove_auditors: - [o.auditor_role.members.remove(user) for o in Organization.objects.filter(Q(auditor_role__members=user) & ~Q(id__in=org_ids))] - - -def update_user_orgs(backend, details, user=None, *args, **kwargs): +def _update_user_orgs(backend, desired_org_state, orgs_to_create, user=None): """ Update organization memberships for the given user based on mapping rules defined in settings. """ - if not user: - return - org_map = backend.setting('ORGANIZATION_MAP') or {} for org_name, org_opts in org_map.items(): organization_alias = org_opts.get('organization_alias') @@ -143,78 +75,108 @@ def update_user_orgs(backend, details, user=None, *args, **kwargs): organization_name = organization_alias else: organization_name = org_name - org = get_or_create_with_default_galaxy_cred(name=organization_name) + if organization_name not in orgs_to_create: + orgs_to_create.append(organization_name) - # Update org admins from expression(s). remove = bool(org_opts.get('remove', True)) - admins_expr = org_opts.get('admins', None) - remove_admins = bool(org_opts.get('remove_admins', remove)) - _update_m2m_from_expression(user, org.admin_role.members, admins_expr, remove_admins) - # Update org users from expression(s). - users_expr = org_opts.get('users', None) - remove_users = bool(org_opts.get('remove_users', remove)) - _update_m2m_from_expression(user, org.member_role.members, users_expr, remove_users) + if organization_name not in desired_org_state: + desired_org_state[organization_name] = {} + + for role_name, user_type in (('admin_role', 'admins'), ('member_role', 'users'), ('auditor_role', 'auditors')): + is_member_expression = org_opts.get(user_type, None) + remove_members = bool(org_opts.get('remove_{}'.format(user_type), remove)) + has_role = _update_m2m_from_expression(user, is_member_expression, remove_members) + desired_org_state[organization_name][role_name] = has_role -def update_user_teams(backend, details, user=None, *args, **kwargs): +def _update_user_teams(backend, desired_team_state, teams_to_create, user=None): """ Update team memberships for the given user based on mapping rules defined in settings. """ - if not user: - return - from awx.main.models import Team team_map = backend.setting('TEAM_MAP') or {} for team_name, team_opts in team_map.items(): # Get or create the org to update. if 'organization' not in team_opts: continue - org = get_or_create_with_default_galaxy_cred(name=team_opts['organization']) - - # Update team members from expression(s). - team = Team.objects.get_or_create(name=team_name, organization=org)[0] + teams_to_create[team_name] = team_opts['organization'] users_expr = team_opts.get('users', None) remove = bool(team_opts.get('remove', True)) - _update_m2m_from_expression(user, team.member_role.members, users_expr, remove) + add_or_remove = _update_m2m_from_expression(user, users_expr, remove) + if add_or_remove is not None: + org_name = team_opts['organization'] + if org_name not in desired_team_state: + desired_team_state[org_name] = {} + desired_team_state[org_name][team_name] = {'member_role': add_or_remove} -def update_user_orgs_by_saml_attr(backend, details, user=None, *args, **kwargs): - if not user: - return - from django.conf import settings - +def _update_user_orgs_by_saml_attr(backend, desired_org_state, orgs_to_create, **kwargs): org_map = settings.SOCIAL_AUTH_SAML_ORGANIZATION_ATTR - if org_map.get('saml_attr') is None and org_map.get('saml_admin_attr') is None and org_map.get('saml_auditor_attr') is None: - return + roles_and_flags = ( + ('member_role', 'remove', 'saml_attr'), + ('admin_role', 'remove_admins', 'saml_admin_attr'), + ('auditor_role', 'remove_auditors', 'saml_auditor_attr'), + ) - remove = bool(org_map.get('remove', True)) - remove_admins = bool(org_map.get('remove_admins', True)) - remove_auditors = bool(org_map.get('remove_auditors', True)) + # If the remove_flag was present we need to load all of the orgs and remove the user from the role + all_orgs = None + for role, remove_flag, _ in roles_and_flags: + remove = bool(org_map.get(remove_flag, True)) + if remove: + # Only get the all orgs once, and only if needed + if all_orgs is None: + all_orgs = get_orgs_by_ids() + for org_name in all_orgs.keys(): + if org_name not in desired_org_state: + desired_org_state[org_name] = {} + desired_org_state[org_name][role] = False - attr_values = kwargs.get('response', {}).get('attributes', {}).get(org_map.get('saml_attr'), []) - attr_admin_values = kwargs.get('response', {}).get('attributes', {}).get(org_map.get('saml_admin_attr'), []) - attr_auditor_values = kwargs.get('response', {}).get('attributes', {}).get(org_map.get('saml_auditor_attr'), []) - - _update_org_from_attr(user, "member_role", attr_values, remove, False, False, backend) - _update_org_from_attr(user, "admin_role", attr_admin_values, False, remove_admins, False, backend) - _update_org_from_attr(user, "auditor_role", attr_auditor_values, False, False, remove_auditors, backend) + # Now we can add the user as a member/admin/auditor for any orgs they have specified + for role, _, attr_flag in roles_and_flags: + if org_map.get(attr_flag) is None: + continue + saml_attr_values = kwargs.get('response', {}).get('attributes', {}).get(org_map.get(attr_flag), []) + for org_name in saml_attr_values: + try: + organization_alias = backend.setting('ORGANIZATION_MAP').get(org_name).get('organization_alias') + if organization_alias is not None: + organization_name = organization_alias + else: + organization_name = org_name + except Exception: + organization_name = org_name + if organization_name not in orgs_to_create: + orgs_to_create.append(organization_name) + if organization_name not in desired_org_state: + desired_org_state[organization_name] = {} + desired_org_state[organization_name][role] = True -def update_user_teams_by_saml_attr(backend, details, user=None, *args, **kwargs): - if not user: - return - from awx.main.models import Organization, Team - from django.conf import settings - +def _update_user_teams_by_saml_attr(desired_team_state, teams_to_create, **kwargs): + # + # Map users into organizations based on SOCIAL_AUTH_SAML_TEAM_ATTR setting + # team_map = settings.SOCIAL_AUTH_SAML_TEAM_ATTR if team_map.get('saml_attr') is None: return + all_teams = None + # The role and flag is hard coded here but intended to be flexible in case we ever wanted to add another team type + for role, remove_flag in [('member_role', 'remove')]: + remove = bool(team_map.get(remove_flag, True)) + if remove: + # Only get the all orgs once, and only if needed + if all_teams is None: + all_teams = Team.objects.all().values_list('name', 'organization__name') + for (team_name, organization_name) in all_teams: + if organization_name not in desired_team_state: + desired_team_state[organization_name] = {} + desired_team_state[organization_name][team_name] = {role: False} + saml_team_names = set(kwargs.get('response', {}).get('attributes', {}).get(team_map['saml_attr'], [])) - team_ids = [] for team_name_map in team_map.get('team_org_map', []): team_name = team_name_map.get('team', None) team_alias = team_name_map.get('team_alias', None) @@ -225,29 +187,17 @@ def update_user_teams_by_saml_attr(backend, details, user=None, *args, **kwargs) logger.error("organization name invalid for team {}".format(team_name)) continue - try: - if settings.SAML_AUTO_CREATE_OBJECTS: - org = get_or_create_with_default_galaxy_cred(name=organization_name) - else: - org = Organization.objects.get(name=organization_name) - except ObjectDoesNotExist: - continue - if team_alias: team_name = team_alias - try: - if settings.SAML_AUTO_CREATE_OBJECTS: - team = Team.objects.get_or_create(name=team_name, organization=org)[0] - else: - team = Team.objects.get(name=team_name, organization=org) - except ObjectDoesNotExist: - continue - team_ids.append(team.id) - team.member_role.members.add(user) + teams_to_create[team_name] = organization_name + user_is_member_of_team = True + else: + user_is_member_of_team = False - if team_map.get('remove', True): - [t.member_role.members.remove(user) for t in Team.objects.filter(Q(member_role__members=user) & ~Q(id__in=team_ids))] + if organization_name not in desired_team_state: + desired_team_state[organization_name] = {} + desired_team_state[organization_name][team_name] = {'member_role': user_is_member_of_team} def _get_matches(list1, list2): @@ -329,11 +279,6 @@ def _check_flag(user, flag, attributes, user_flags_settings): def update_user_flags(backend, details, user=None, *args, **kwargs): - if not user: - return - - from django.conf import settings - user_flags_settings = settings.SOCIAL_AUTH_SAML_USER_FLAGS_BY_ATTR attributes = kwargs.get('response', {}).get('attributes', {}) diff --git a/awx/sso/social_base_pipeline.py b/awx/sso/social_base_pipeline.py new file mode 100644 index 0000000000..ccdaf1d200 --- /dev/null +++ b/awx/sso/social_base_pipeline.py @@ -0,0 +1,39 @@ +# Copyright (c) 2015 Ansible, Inc. +# All Rights Reserved. + +# Python Social Auth +from social_core.exceptions import AuthException + +# Django +from django.utils.translation import gettext_lazy as _ + + +class AuthNotFound(AuthException): + def __init__(self, backend, email_or_uid, *args, **kwargs): + self.email_or_uid = email_or_uid + super(AuthNotFound, self).__init__(backend, *args, **kwargs) + + def __str__(self): + return _('An account cannot be found for {0}').format(self.email_or_uid) + + +class AuthInactive(AuthException): + def __str__(self): + return _('Your account is inactive') + + +def check_user_found_or_created(backend, details, user=None, *args, **kwargs): + if not user: + email_or_uid = details.get('email') or kwargs.get('email') or kwargs.get('uid') or '???' + raise AuthNotFound(backend, email_or_uid) + + +def set_is_active_for_new_user(strategy, details, user=None, *args, **kwargs): + if kwargs.get('is_new', False): + details['is_active'] = True + return {'details': details} + + +def prevent_inactive_login(backend, details, user=None, *args, **kwargs): + if user and not user.is_active: + raise AuthInactive(backend) diff --git a/awx/sso/social_pipeline.py b/awx/sso/social_pipeline.py new file mode 100644 index 0000000000..b4fb4c1fe3 --- /dev/null +++ b/awx/sso/social_pipeline.py @@ -0,0 +1,90 @@ +# Copyright (c) 2015 Ansible, Inc. +# All Rights Reserved. + +# Python +import re +import logging + +from awx.sso.common import get_or_create_org_with_default_galaxy_cred + +logger = logging.getLogger('awx.sso.social_pipeline') + + +def _update_m2m_from_expression(user, related, expr, remove=True): + """ + Helper function to update m2m relationship based on user matching one or + more expressions. + """ + should_add = False + if expr is None: + return + elif not expr: + pass + elif expr is True: + should_add = True + else: + if isinstance(expr, (str, type(re.compile('')))): + expr = [expr] + for ex in expr: + if isinstance(ex, str): + if user.username == ex or user.email == ex: + should_add = True + elif isinstance(ex, type(re.compile(''))): + if ex.match(user.username) or ex.match(user.email): + should_add = True + if should_add: + related.add(user) + elif remove: + related.remove(user) + + +def update_user_orgs(backend, details, user=None, *args, **kwargs): + """ + Update organization memberships for the given user based on mapping rules + defined in settings. + """ + if not user: + return + + org_map = backend.setting('ORGANIZATION_MAP') or {} + for org_name, org_opts in org_map.items(): + organization_alias = org_opts.get('organization_alias') + if organization_alias: + organization_name = organization_alias + else: + organization_name = org_name + org = get_or_create_org_with_default_galaxy_cred(name=organization_name) + + # Update org admins from expression(s). + remove = bool(org_opts.get('remove', True)) + admins_expr = org_opts.get('admins', None) + remove_admins = bool(org_opts.get('remove_admins', remove)) + _update_m2m_from_expression(user, org.admin_role.members, admins_expr, remove_admins) + + # Update org users from expression(s). + users_expr = org_opts.get('users', None) + remove_users = bool(org_opts.get('remove_users', remove)) + _update_m2m_from_expression(user, org.member_role.members, users_expr, remove_users) + + +def update_user_teams(backend, details, user=None, *args, **kwargs): + """ + Update team memberships for the given user based on mapping rules defined + in settings. + """ + if not user: + return + from awx.main.models import Team + + team_map = backend.setting('TEAM_MAP') or {} + for team_name, team_opts in team_map.items(): + # Get or create the org to update. + if 'organization' not in team_opts: + continue + org = get_or_create_org_with_default_galaxy_cred(name=team_opts['organization']) + + # Update team members from expression(s). + team = Team.objects.get_or_create(name=team_name, organization=org)[0] + users_expr = team_opts.get('users', None) + remove = bool(team_opts.get('remove', True)) + _update_m2m_from_expression(user, team.member_role.members, users_expr, remove) diff --git a/awx/sso/tests/functional/test_common.py b/awx/sso/tests/functional/test_common.py new file mode 100644 index 0000000000..4fc3edd841 --- /dev/null +++ b/awx/sso/tests/functional/test_common.py @@ -0,0 +1,280 @@ +import pytest +from collections import Counter +from django.core.exceptions import FieldError +from django.utils.timezone import now + +from awx.main.models import Credential, CredentialType, Organization, Team, User +from awx.sso.common import get_orgs_by_ids, reconcile_users_org_team_mappings, create_org_and_teams, get_or_create_org_with_default_galaxy_cred + + +@pytest.mark.django_db +class TestCommonFunctions: + @pytest.fixture + def orgs(self): + o1 = Organization.objects.create(name='Default1') + o2 = Organization.objects.create(name='Default2') + o3 = Organization.objects.create(name='Default3') + return (o1, o2, o3) + + @pytest.fixture + def galaxy_credential(self): + galaxy_type = CredentialType.objects.create(kind='galaxy') + cred = Credential( + created=now(), modified=now(), name='Ansible Galaxy', managed=True, credential_type=galaxy_type, inputs={'url': 'https://galaxy.ansible.com/'} + ) + cred.save() + + def test_get_orgs_by_ids(self, orgs): + orgs_and_ids = get_orgs_by_ids() + o1, o2, o3 = orgs + assert Counter(orgs_and_ids.keys()) == Counter([o1.name, o2.name, o3.name]) + assert Counter(orgs_and_ids.values()) == Counter([o1.id, o2.id, o3.id]) + + def test_reconcile_users_org_team_mappings(self): + # Create objects for us to play with + user = User.objects.create(username='user1@foo.com', last_name='foo', first_name='bar', email='user1@foo.com', is_active=True) + org1 = Organization.objects.create(name='Default1') + org2 = Organization.objects.create(name='Default2') + team1 = Team.objects.create(name='Team1', organization=org1) + team2 = Team.objects.create(name='Team1', organization=org2) + + # Try adding nothing + reconcile_users_org_team_mappings(user, {}, {}, 'Nada') + assert list(user.roles.all()) == [] + + # Add a user to an org that does not exist (should have no affect) + reconcile_users_org_team_mappings( + user, + { + 'junk': {'member_role': True}, + }, + {}, + 'Nada', + ) + assert list(user.roles.all()) == [] + + # Remove a user to an org that does not exist (should have no affect) + reconcile_users_org_team_mappings( + user, + { + 'junk': {'member_role': False}, + }, + {}, + 'Nada', + ) + assert list(user.roles.all()) == [] + + # Add the user to the orgs + reconcile_users_org_team_mappings(user, {org1.name: {'member_role': True}, org2.name: {'member_role': True}}, {}, 'Nada') + assert len(user.roles.all()) == 2 + assert user in org1.member_role + assert user in org2.member_role + + # Remove the user from the orgs + reconcile_users_org_team_mappings(user, {org1.name: {'member_role': False}, org2.name: {'member_role': False}}, {}, 'Nada') + assert list(user.roles.all()) == [] + assert user not in org1.member_role + assert user not in org2.member_role + + # Remove the user from the orgs (again, should have no affect) + reconcile_users_org_team_mappings(user, {org1.name: {'member_role': False}, org2.name: {'member_role': False}}, {}, 'Nada') + assert list(user.roles.all()) == [] + assert user not in org1.member_role + assert user not in org2.member_role + + # Add a user back to the member role + reconcile_users_org_team_mappings( + user, + { + org1.name: { + 'member_role': True, + }, + }, + {}, + 'Nada', + ) + users_roles = set(user.roles.values_list('pk', flat=True)) + assert len(users_roles) == 1 + assert user in org1.member_role + + # Add the user to additional roles + reconcile_users_org_team_mappings( + user, + { + org1.name: {'admin_role': True, 'auditor_role': True}, + }, + {}, + 'Nada', + ) + assert len(user.roles.all()) == 3 + assert user in org1.member_role + assert user in org1.admin_role + assert user in org1.auditor_role + + # Add a user to a non-existent role (results in FieldError exception) + with pytest.raises(FieldError): + reconcile_users_org_team_mappings( + user, + { + org1.name: { + 'dne_role': True, + }, + }, + {}, + 'Nada', + ) + + # Try adding a user to a role that should not exist on an org (technically this works at this time) + reconcile_users_org_team_mappings( + user, + { + org1.name: { + 'read_role_id': True, + }, + }, + {}, + 'Nada', + ) + assert len(user.roles.all()) == 4 + assert user in org1.member_role + assert user in org1.admin_role + assert user in org1.auditor_role + + # Remove all of the org perms to test team perms + reconcile_users_org_team_mappings( + user, + { + org1.name: { + 'read_role_id': False, + 'member_role': False, + 'admin_role': False, + 'auditor_role': False, + }, + }, + {}, + 'Nada', + ) + assert list(user.roles.all()) == [] + + # Add the user as a member to one of the teams + reconcile_users_org_team_mappings(user, {}, {org1.name: {team1.name: {'member_role': True}}}, 'Nada') + assert len(user.roles.all()) == 1 + assert user in team1.member_role + # Validate that the user did not become a member of a team with the same name in a different org + assert user not in team2.member_role + + # Remove the user from the team + reconcile_users_org_team_mappings(user, {}, {org1.name: {team1.name: {'member_role': False}}}, 'Nada') + assert list(user.roles.all()) == [] + assert user not in team1.member_role + + # Remove the user from the team again + reconcile_users_org_team_mappings(user, {}, {org1.name: {team1.name: {'member_role': False}}}, 'Nada') + assert list(user.roles.all()) == [] + + # Add the user to a team that does not exist (should have no affect) + reconcile_users_org_team_mappings(user, {}, {org1.name: {'junk': {'member_role': True}}}, 'Nada') + assert list(user.roles.all()) == [] + + # Remove the user from a team that does not exist (should have no affect) + reconcile_users_org_team_mappings(user, {}, {org1.name: {'junk': {'member_role': False}}}, 'Nada') + assert list(user.roles.all()) == [] + + # Test a None setting + reconcile_users_org_team_mappings(user, {}, {org1.name: {'junk': {'member_role': None}}}, 'Nada') + assert list(user.roles.all()) == [] + + # Add the user multiple teams in different orgs + reconcile_users_org_team_mappings(user, {}, {org1.name: {team1.name: {'member_role': True}}, org2.name: {team2.name: {'member_role': True}}}, 'Nada') + assert len(user.roles.all()) == 2 + assert user in team1.member_role + assert user in team2.member_role + + # Remove the user from just one of the teams + reconcile_users_org_team_mappings(user, {}, {org2.name: {team2.name: {'member_role': False}}}, 'Nada') + assert len(user.roles.all()) == 1 + assert user in team1.member_role + assert user not in team2.member_role + + @pytest.mark.parametrize( + "org_list, team_map, can_create, org_count, team_count", + [ + # In this case we will only pass in organizations + ( + ["org1", "org2"], + {}, + True, + 2, + 0, + ), + # In this case we will only pass in teams but the orgs will be created from the teams + ( + [], + {"team1": "org1", "team2": "org2"}, + True, + 2, + 2, + ), + # In this case we will reuse an org + ( + ["org1"], + {"team1": "org1", "team2": "org1"}, + True, + 1, + 2, + ), + # In this case we have a combination of orgs, orgs reused and an org created by a team + ( + ["org1", "org2", "org3"], + {"team1": "org1", "team2": "org4"}, + True, + 4, + 2, + ), + # In this case we will test a case that the UI should prevent and have a team with no Org + # This should create org1/2 but only team1 + ( + ["org1"], + {"team1": "org2", "team2": None}, + True, + 2, + 1, + ), + # Block any creation with the can_create flag + ( + ["org1"], + {"team1": "org2", "team2": None}, + False, + 0, + 0, + ), + ], + ) + def test_create_org_and_teams(self, galaxy_credential, org_list, team_map, can_create, org_count, team_count): + create_org_and_teams(org_list, team_map, 'py.test', can_create=can_create) + assert Organization.objects.count() == org_count + assert Team.objects.count() == team_count + + def test_get_or_create_org_with_default_galaxy_cred_add_galaxy_cred(self, galaxy_credential): + # If this method creates the org it should get the default galaxy credential + num_orgs = 4 + for number in range(1, (num_orgs + 1)): + get_or_create_org_with_default_galaxy_cred(name=f"Default {number}") + + assert Organization.objects.count() == 4 + + for o in Organization.objects.all(): + assert o.galaxy_credentials.count() == 1 + assert o.galaxy_credentials.first().name == 'Ansible Galaxy' + + def test_get_or_create_org_with_default_galaxy_cred_no_galaxy_cred(self, galaxy_credential): + # If the org is pre-created, we should not add the galaxy_credential + num_orgs = 4 + for number in range(1, (num_orgs + 1)): + Organization.objects.create(name=f"Default {number}") + get_or_create_org_with_default_galaxy_cred(name=f"Default {number}") + + assert Organization.objects.count() == 4 + + for o in Organization.objects.all(): + assert o.galaxy_credentials.count() == 0 diff --git a/awx/sso/tests/functional/test_pipeline.py b/awx/sso/tests/functional/test_pipeline.py deleted file mode 100644 index 6bf034b68a..0000000000 --- a/awx/sso/tests/functional/test_pipeline.py +++ /dev/null @@ -1,566 +0,0 @@ -import pytest -import re -from unittest import mock - -from django.utils.timezone import now - -from awx.conf.registry import settings_registry -from awx.sso.pipeline import update_user_orgs, update_user_teams, update_user_orgs_by_saml_attr, update_user_teams_by_saml_attr, _check_flag -from awx.main.models import User, Team, Organization, Credential, CredentialType - - -@pytest.fixture -def galaxy_credential(): - galaxy_type = CredentialType.objects.create(kind='galaxy') - cred = Credential( - created=now(), modified=now(), name='Ansible Galaxy', managed=True, credential_type=galaxy_type, inputs={'url': 'https://galaxy.ansible.com/'} - ) - cred.save() - - -@pytest.fixture -def users(): - u1 = User.objects.create(username='user1@foo.com', last_name='foo', first_name='bar', email='user1@foo.com') - u2 = User.objects.create(username='user2@foo.com', last_name='foo', first_name='bar', email='user2@foo.com') - u3 = User.objects.create(username='user3@foo.com', last_name='foo', first_name='bar', email='user3@foo.com') - return (u1, u2, u3) - - -@pytest.mark.django_db -class TestSAMLMap: - @pytest.fixture - def backend(self): - class Backend: - s = { - 'ORGANIZATION_MAP': { - 'Default': { - 'remove': True, - 'admins': 'foobar', - 'remove_admins': True, - 'users': 'foo', - 'remove_users': True, - 'organization_alias': '', - } - }, - 'TEAM_MAP': {'Blue': {'organization': 'Default', 'remove': True, 'users': ''}, 'Red': {'organization': 'Default', 'remove': True, 'users': ''}}, - } - - def setting(self, key): - return self.s[key] - - return Backend() - - @pytest.fixture - def org(self): - return Organization.objects.create(name="Default") - - def test_update_user_orgs(self, org, backend, users, galaxy_credential): - u1, u2, u3 = users - - # Test user membership logic with regular expressions - backend.setting('ORGANIZATION_MAP')['Default']['admins'] = re.compile('.*') - backend.setting('ORGANIZATION_MAP')['Default']['users'] = re.compile('.*') - - update_user_orgs(backend, None, u1) - update_user_orgs(backend, None, u2) - update_user_orgs(backend, None, u3) - - assert org.admin_role.members.count() == 3 - assert org.member_role.members.count() == 3 - - # Test remove feature enabled - backend.setting('ORGANIZATION_MAP')['Default']['admins'] = '' - backend.setting('ORGANIZATION_MAP')['Default']['users'] = '' - backend.setting('ORGANIZATION_MAP')['Default']['remove_admins'] = True - backend.setting('ORGANIZATION_MAP')['Default']['remove_users'] = True - update_user_orgs(backend, None, u1) - - assert org.admin_role.members.count() == 2 - assert org.member_role.members.count() == 2 - - # Test remove feature disabled - backend.setting('ORGANIZATION_MAP')['Default']['remove_admins'] = False - backend.setting('ORGANIZATION_MAP')['Default']['remove_users'] = False - update_user_orgs(backend, None, u2) - - assert org.admin_role.members.count() == 2 - assert org.member_role.members.count() == 2 - - # Test organization alias feature - backend.setting('ORGANIZATION_MAP')['Default']['organization_alias'] = 'Default_Alias' - update_user_orgs(backend, None, u1) - assert Organization.objects.get(name="Default_Alias") is not None - - for o in Organization.objects.all(): - if o.name == 'Default': - # The default org was already created and should not have a galaxy credential - assert o.galaxy_credentials.count() == 0 - else: - # The Default_Alias was created by SAML and should get the galaxy credential - assert o.galaxy_credentials.count() == 1 - assert o.galaxy_credentials.first().name == 'Ansible Galaxy' - - def test_update_user_teams(self, backend, users, galaxy_credential): - u1, u2, u3 = users - - # Test user membership logic with regular expressions - backend.setting('TEAM_MAP')['Blue']['users'] = re.compile('.*') - backend.setting('TEAM_MAP')['Red']['users'] = re.compile('.*') - - update_user_teams(backend, None, u1) - update_user_teams(backend, None, u2) - update_user_teams(backend, None, u3) - - assert Team.objects.get(name="Red").member_role.members.count() == 3 - assert Team.objects.get(name="Blue").member_role.members.count() == 3 - - # Test remove feature enabled - backend.setting('TEAM_MAP')['Blue']['remove'] = True - backend.setting('TEAM_MAP')['Red']['remove'] = True - backend.setting('TEAM_MAP')['Blue']['users'] = '' - backend.setting('TEAM_MAP')['Red']['users'] = '' - - update_user_teams(backend, None, u1) - - assert Team.objects.get(name="Red").member_role.members.count() == 2 - assert Team.objects.get(name="Blue").member_role.members.count() == 2 - - # Test remove feature disabled - backend.setting('TEAM_MAP')['Blue']['remove'] = False - backend.setting('TEAM_MAP')['Red']['remove'] = False - - update_user_teams(backend, None, u2) - - assert Team.objects.get(name="Red").member_role.members.count() == 2 - assert Team.objects.get(name="Blue").member_role.members.count() == 2 - - for o in Organization.objects.all(): - assert o.galaxy_credentials.count() == 1 - assert o.galaxy_credentials.first().name == 'Ansible Galaxy' - - -@pytest.mark.django_db -class TestSAMLAttr: - @pytest.fixture - def kwargs(self): - return { - 'username': u'cmeyers@redhat.com', - 'uid': 'idp:cmeyers@redhat.com', - 'request': {u'SAMLResponse': [], u'RelayState': [u'idp']}, - 'is_new': False, - 'response': { - 'session_index': '_0728f0e0-b766-0135-75fa-02842b07c044', - 'idp_name': u'idp', - 'attributes': { - 'memberOf': ['Default1', 'Default2'], - 'admins': ['Default3'], - 'auditors': ['Default4'], - 'groups': ['Blue', 'Red'], - 'User.email': ['cmeyers@redhat.com'], - 'User.LastName': ['Meyers'], - 'name_id': 'cmeyers@redhat.com', - 'User.FirstName': ['Chris'], - 'PersonImmutableID': [], - }, - }, - # 'social': , - 'social': None, - # 'strategy': , - 'strategy': None, - 'new_association': False, - } - - @pytest.fixture - def orgs(self): - o1 = Organization.objects.create(name='Default1') - o2 = Organization.objects.create(name='Default2') - o3 = Organization.objects.create(name='Default3') - return (o1, o2, o3) - - @pytest.fixture - def mock_settings(self, request): - fixture_args = request.node.get_closest_marker('fixture_args') - if fixture_args and 'autocreate' in fixture_args.kwargs: - autocreate = fixture_args.kwargs['autocreate'] - else: - autocreate = True - - class MockSettings: - SAML_AUTO_CREATE_OBJECTS = autocreate - SOCIAL_AUTH_SAML_ORGANIZATION_ATTR = { - 'saml_attr': 'memberOf', - 'saml_admin_attr': 'admins', - 'saml_auditor_attr': 'auditors', - 'remove': True, - 'remove_admins': True, - } - SOCIAL_AUTH_SAML_TEAM_ATTR = { - 'saml_attr': 'groups', - 'remove': True, - 'team_org_map': [ - {'team': 'Blue', 'organization': 'Default1'}, - {'team': 'Blue', 'organization': 'Default2'}, - {'team': 'Blue', 'organization': 'Default3'}, - {'team': 'Red', 'organization': 'Default1'}, - {'team': 'Green', 'organization': 'Default1'}, - {'team': 'Green', 'organization': 'Default3'}, - {'team': 'Yellow', 'team_alias': 'Yellow_Alias', 'organization': 'Default4', 'organization_alias': 'Default4_Alias'}, - ], - } - - mock_settings_obj = MockSettings() - for key in settings_registry.get_registered_settings(category_slug='logging'): - value = settings_registry.get_setting_field(key).get_default() - setattr(mock_settings_obj, key, value) - setattr(mock_settings_obj, 'DEBUG', True) - - return mock_settings_obj - - @pytest.fixture - def backend(self): - class Backend: - s = { - 'ORGANIZATION_MAP': { - 'Default1': { - 'remove': True, - 'admins': 'foobar', - 'remove_admins': True, - 'users': 'foo', - 'remove_users': True, - 'organization_alias': 'o1_alias', - } - } - } - - def setting(self, key): - return self.s[key] - - return Backend() - - def test_update_user_orgs_by_saml_attr(self, orgs, users, galaxy_credential, kwargs, mock_settings, backend): - with mock.patch('django.conf.settings', mock_settings): - o1, o2, o3 = orgs - u1, u2, u3 = users - - # Test getting orgs from attribute - update_user_orgs_by_saml_attr(None, None, u1, **kwargs) - update_user_orgs_by_saml_attr(None, None, u2, **kwargs) - update_user_orgs_by_saml_attr(None, None, u3, **kwargs) - - assert o1.member_role.members.count() == 3 - assert o2.member_role.members.count() == 3 - assert o3.member_role.members.count() == 0 - - # Test remove logic enabled - kwargs['response']['attributes']['memberOf'] = ['Default3'] - - update_user_orgs_by_saml_attr(None, None, u1, **kwargs) - - assert o1.member_role.members.count() == 2 - assert o2.member_role.members.count() == 2 - assert o3.member_role.members.count() == 1 - - # Test remove logic disabled - mock_settings.SOCIAL_AUTH_SAML_ORGANIZATION_ATTR['remove'] = False - kwargs['response']['attributes']['memberOf'] = ['Default1', 'Default2'] - - update_user_orgs_by_saml_attr(None, None, u1, **kwargs) - - assert o1.member_role.members.count() == 3 - assert o2.member_role.members.count() == 3 - assert o3.member_role.members.count() == 1 - - update_user_orgs_by_saml_attr(backend, None, u1, **kwargs) - assert Organization.objects.get(name="o1_alias").member_role.members.count() == 1 - - for o in Organization.objects.all(): - if o.id in [o1.id, o2.id, o3.id]: - # o[123] were created without a default galaxy cred - assert o.galaxy_credentials.count() == 0 - else: - # anything else created should have a default galaxy cred - assert o.galaxy_credentials.count() == 1 - assert o.galaxy_credentials.first().name == 'Ansible Galaxy' - - def test_update_user_teams_by_saml_attr(self, orgs, users, galaxy_credential, kwargs, mock_settings): - with mock.patch('django.conf.settings', mock_settings): - o1, o2, o3 = orgs - u1, u2, u3 = users - - # Test getting teams from attribute with team->org mapping - - kwargs['response']['attributes']['groups'] = ['Blue', 'Red', 'Green'] - - # Ensure basic functionality - update_user_teams_by_saml_attr(None, None, u1, **kwargs) - update_user_teams_by_saml_attr(None, None, u2, **kwargs) - update_user_teams_by_saml_attr(None, None, u3, **kwargs) - - assert Team.objects.get(name='Blue', organization__name='Default1').member_role.members.count() == 3 - assert Team.objects.get(name='Blue', organization__name='Default2').member_role.members.count() == 3 - assert Team.objects.get(name='Blue', organization__name='Default3').member_role.members.count() == 3 - - assert Team.objects.get(name='Red', organization__name='Default1').member_role.members.count() == 3 - - assert Team.objects.get(name='Green', organization__name='Default1').member_role.members.count() == 3 - assert Team.objects.get(name='Green', organization__name='Default3').member_role.members.count() == 3 - - # Test remove logic - kwargs['response']['attributes']['groups'] = ['Green'] - update_user_teams_by_saml_attr(None, None, u1, **kwargs) - update_user_teams_by_saml_attr(None, None, u2, **kwargs) - update_user_teams_by_saml_attr(None, None, u3, **kwargs) - - assert Team.objects.get(name='Blue', organization__name='Default1').member_role.members.count() == 0 - assert Team.objects.get(name='Blue', organization__name='Default2').member_role.members.count() == 0 - assert Team.objects.get(name='Blue', organization__name='Default3').member_role.members.count() == 0 - - assert Team.objects.get(name='Red', organization__name='Default1').member_role.members.count() == 0 - - assert Team.objects.get(name='Green', organization__name='Default1').member_role.members.count() == 3 - assert Team.objects.get(name='Green', organization__name='Default3').member_role.members.count() == 3 - - # Test remove logic disabled - mock_settings.SOCIAL_AUTH_SAML_TEAM_ATTR['remove'] = False - kwargs['response']['attributes']['groups'] = ['Blue'] - - update_user_teams_by_saml_attr(None, None, u1, **kwargs) - update_user_teams_by_saml_attr(None, None, u2, **kwargs) - update_user_teams_by_saml_attr(None, None, u3, **kwargs) - - assert Team.objects.get(name='Blue', organization__name='Default1').member_role.members.count() == 3 - assert Team.objects.get(name='Blue', organization__name='Default2').member_role.members.count() == 3 - assert Team.objects.get(name='Blue', organization__name='Default3').member_role.members.count() == 3 - - assert Team.objects.get(name='Red', organization__name='Default1').member_role.members.count() == 0 - - assert Team.objects.get(name='Green', organization__name='Default1').member_role.members.count() == 3 - assert Team.objects.get(name='Green', organization__name='Default3').member_role.members.count() == 3 - - for o in Organization.objects.all(): - if o.id in [o1.id, o2.id, o3.id]: - # o[123] were created without a default galaxy cred - assert o.galaxy_credentials.count() == 0 - else: - # anything else created should have a default galaxy cred - assert o.galaxy_credentials.count() == 1 - assert o.galaxy_credentials.first().name == 'Ansible Galaxy' - - def test_update_user_teams_alias_by_saml_attr(self, orgs, users, galaxy_credential, kwargs, mock_settings): - with mock.patch('django.conf.settings', mock_settings): - u1 = users[0] - - # Test getting teams from attribute with team->org mapping - kwargs['response']['attributes']['groups'] = ['Yellow'] - - # Ensure team and org will be created - update_user_teams_by_saml_attr(None, None, u1, **kwargs) - - assert Team.objects.filter(name='Yellow', organization__name='Default4').count() == 0 - assert Team.objects.filter(name='Yellow_Alias', organization__name='Default4').count() == 1 - assert Team.objects.get(name='Yellow_Alias', organization__name='Default4').member_role.members.count() == 1 - - # only Org 4 got created/updated - org = Organization.objects.get(name='Default4') - assert org.galaxy_credentials.count() == 1 - assert org.galaxy_credentials.first().name == 'Ansible Galaxy' - - @pytest.mark.fixture_args(autocreate=False) - def test_autocreate_disabled(self, users, kwargs, mock_settings): - kwargs['response']['attributes']['memberOf'] = ['Default1', 'Default2', 'Default3'] - kwargs['response']['attributes']['groups'] = ['Blue', 'Red', 'Green'] - with mock.patch('django.conf.settings', mock_settings): - for u in users: - update_user_orgs_by_saml_attr(None, None, u, **kwargs) - update_user_teams_by_saml_attr(None, None, u, **kwargs) - assert Organization.objects.count() == 0 - assert Team.objects.count() == 0 - - # precreate everything - o1 = Organization.objects.create(name='Default1') - o2 = Organization.objects.create(name='Default2') - o3 = Organization.objects.create(name='Default3') - Team.objects.create(name='Blue', organization_id=o1.id) - Team.objects.create(name='Blue', organization_id=o2.id) - Team.objects.create(name='Blue', organization_id=o3.id) - Team.objects.create(name='Red', organization_id=o1.id) - Team.objects.create(name='Green', organization_id=o1.id) - Team.objects.create(name='Green', organization_id=o3.id) - - for u in users: - update_user_orgs_by_saml_attr(None, None, u, **kwargs) - update_user_teams_by_saml_attr(None, None, u, **kwargs) - - assert o1.member_role.members.count() == 3 - assert o2.member_role.members.count() == 3 - assert o3.member_role.members.count() == 3 - - assert Team.objects.get(name='Blue', organization__name='Default1').member_role.members.count() == 3 - assert Team.objects.get(name='Blue', organization__name='Default2').member_role.members.count() == 3 - assert Team.objects.get(name='Blue', organization__name='Default3').member_role.members.count() == 3 - - assert Team.objects.get(name='Red', organization__name='Default1').member_role.members.count() == 3 - - assert Team.objects.get(name='Green', organization__name='Default1').member_role.members.count() == 3 - assert Team.objects.get(name='Green', organization__name='Default3').member_role.members.count() == 3 - - def test_galaxy_credential_auto_assign(self, users, kwargs, galaxy_credential, mock_settings): - kwargs['response']['attributes']['memberOf'] = ['Default1', 'Default2', 'Default3'] - kwargs['response']['attributes']['groups'] = ['Blue', 'Red', 'Green'] - with mock.patch('django.conf.settings', mock_settings): - for u in users: - update_user_orgs_by_saml_attr(None, None, u, **kwargs) - update_user_teams_by_saml_attr(None, None, u, **kwargs) - - assert Organization.objects.count() == 4 - for o in Organization.objects.all(): - assert o.galaxy_credentials.count() == 1 - assert o.galaxy_credentials.first().name == 'Ansible Galaxy' - - def test_galaxy_credential_no_auto_assign(self, users, kwargs, galaxy_credential, mock_settings): - # A Galaxy credential should not be added to an existing org - o = Organization.objects.create(name='Default1') - o = Organization.objects.create(name='Default2') - o = Organization.objects.create(name='Default3') - o = Organization.objects.create(name='Default4') - kwargs['response']['attributes']['memberOf'] = ['Default1'] - kwargs['response']['attributes']['groups'] = ['Blue'] - with mock.patch('django.conf.settings', mock_settings): - for u in users: - update_user_orgs_by_saml_attr(None, None, u, **kwargs) - update_user_teams_by_saml_attr(None, None, u, **kwargs) - - assert Organization.objects.count() == 4 - for o in Organization.objects.all(): - assert o.galaxy_credentials.count() == 0 - - -@pytest.mark.django_db -class TestSAMLUserFlags: - @pytest.mark.parametrize( - "user_flags_settings, expected, is_superuser", - [ - # In this case we will pass no user flags so new_flag should be false and changed will def be false - ( - {}, - (False, False), - False, - ), - # NOTE: The first handful of tests test role/value as string instead of lists. - # This was from the initial implementation of these fields but the code should be able to handle this - # There are a couple tests at the end of this which will validate arrays in these values. - # - # In this case we will give the user a group to make them an admin - ( - {'is_superuser_role': 'test-role-1'}, - (True, True), - False, - ), - # In this case we will give the user a flag that will make then an admin - ( - {'is_superuser_attr': 'is_superuser'}, - (True, True), - False, - ), - # In this case we will give the user a flag but the wrong value - ( - {'is_superuser_attr': 'is_superuser', 'is_superuser_value': 'junk'}, - (False, False), - False, - ), - # In this case we will give the user a flag and the right value - ( - {'is_superuser_attr': 'is_superuser', 'is_superuser_value': 'true'}, - (True, True), - False, - ), - # In this case we will give the user a proper role and an is_superuser_attr role that they dont have, this should make them an admin - ( - {'is_superuser_role': 'test-role-1', 'is_superuser_attr': 'gibberish', 'is_superuser_value': 'true'}, - (True, True), - False, - ), - # In this case we will give the user a proper role and an is_superuser_attr role that they have, this should make them an admin - ( - {'is_superuser_role': 'test-role-1', 'is_superuser_attr': 'test-role-1'}, - (True, True), - False, - ), - # In this case we will give the user a proper role and an is_superuser_attr role that they have but a bad value, this should make them an admin - ( - {'is_superuser_role': 'test-role-1', 'is_superuser_attr': 'is_superuser', 'is_superuser_value': 'junk'}, - (False, False), - False, - ), - # In this case we will give the user everything - ( - {'is_superuser_role': 'test-role-1', 'is_superuser_attr': 'is_superuser', 'is_superuser_value': 'true'}, - (True, True), - False, - ), - # In this test case we will validate that a single attribute (instead of a list) still works - ( - {'is_superuser_attr': 'name_id', 'is_superuser_value': 'test_id'}, - (True, True), - False, - ), - # This will be a negative test for a single atrribute - ( - {'is_superuser_attr': 'name_id', 'is_superuser_value': 'junk'}, - (False, False), - False, - ), - # The user is already a superuser so we should remove them - ( - {'is_superuser_attr': 'name_id', 'is_superuser_value': 'junk', 'remove_superusers': True}, - (False, True), - True, - ), - # The user is already a superuser but we don't have a remove field - ( - {'is_superuser_attr': 'name_id', 'is_superuser_value': 'junk', 'remove_superusers': False}, - (True, False), - True, - ), - # Positive test for multiple values for is_superuser_value - ( - {'is_superuser_attr': 'is_superuser', 'is_superuser_value': ['junk', 'junk2', 'else', 'junk']}, - (True, True), - False, - ), - # Negative test for multiple values for is_superuser_value - ( - {'is_superuser_attr': 'is_superuser', 'is_superuser_value': ['junk', 'junk2', 'junk']}, - (False, True), - True, - ), - # Positive test for multiple values of is_superuser_role - ( - {'is_superuser_role': ['junk', 'junk2', 'something', 'junk']}, - (True, True), - False, - ), - # Negative test for multiple values of is_superuser_role - ( - {'is_superuser_role': ['junk', 'junk2', 'junk']}, - (False, True), - True, - ), - ], - ) - def test__check_flag(self, user_flags_settings, expected, is_superuser): - user = User() - user.username = 'John' - user.is_superuser = is_superuser - - attributes = { - 'email': ['noone@nowhere.com'], - 'last_name': ['Westcott'], - 'is_superuser': ['something', 'else', 'true'], - 'username': ['test_id'], - 'first_name': ['John'], - 'Role': ['test-role-1', 'something', 'different'], - 'name_id': 'test_id', - } - - assert expected == _check_flag(user, 'superuser', attributes, user_flags_settings) diff --git a/awx/sso/tests/functional/test_saml_pipeline.py b/awx/sso/tests/functional/test_saml_pipeline.py new file mode 100644 index 0000000000..628d793d4e --- /dev/null +++ b/awx/sso/tests/functional/test_saml_pipeline.py @@ -0,0 +1,639 @@ +import pytest +import re + +from django.test.utils import override_settings +from awx.main.models import User, Organization, Team +from awx.sso.saml_pipeline import ( + _update_m2m_from_expression, + _update_user_orgs, + _update_user_teams, + _update_user_orgs_by_saml_attr, + _update_user_teams_by_saml_attr, + _check_flag, +) + +# from unittest import mock +# from django.utils.timezone import now +# , Credential, CredentialType + + +@pytest.fixture +def users(): + u1 = User.objects.create(username='user1@foo.com', last_name='foo', first_name='bar', email='user1@foo.com') + u2 = User.objects.create(username='user2@foo.com', last_name='foo', first_name='bar', email='user2@foo.com') + u3 = User.objects.create(username='user3@foo.com', last_name='foo', first_name='bar', email='user3@foo.com') + return (u1, u2, u3) + + +@pytest.mark.django_db +class TestSAMLPopulateUser: + # The main populate_user does not need to be tested since its just a conglomeration of other functions that we test + # This test is here in case someone alters the code in the future in a way that does require testing + def test_populate_user(self): + assert True + + +@pytest.mark.django_db +class TestSAMLSimpleMaps: + # This tests __update_user_orgs and __update_user_teams + @pytest.fixture + def backend(self): + class Backend: + s = { + 'ORGANIZATION_MAP': { + 'Default': { + 'remove': True, + 'admins': 'foobar', + 'remove_admins': True, + 'users': 'foo', + 'remove_users': True, + 'organization_alias': '', + } + }, + 'TEAM_MAP': {'Blue': {'organization': 'Default', 'remove': True, 'users': ''}, 'Red': {'organization': 'Default', 'remove': True, 'users': ''}}, + } + + def setting(self, key): + return self.s[key] + + return Backend() + + def test__update_user_orgs(self, backend, users): + u1, u2, u3 = users + + # Test user membership logic with regular expressions + backend.setting('ORGANIZATION_MAP')['Default']['admins'] = re.compile('.*') + backend.setting('ORGANIZATION_MAP')['Default']['users'] = re.compile('.*') + + desired_org_state = {} + orgs_to_create = [] + _update_user_orgs(backend, desired_org_state, orgs_to_create, u1) + _update_user_orgs(backend, desired_org_state, orgs_to_create, u2) + _update_user_orgs(backend, desired_org_state, orgs_to_create, u3) + + assert desired_org_state == {'Default': {'member_role': True, 'admin_role': True, 'auditor_role': False}} + assert orgs_to_create == ['Default'] + + # Test remove feature enabled + backend.setting('ORGANIZATION_MAP')['Default']['admins'] = '' + backend.setting('ORGANIZATION_MAP')['Default']['users'] = '' + backend.setting('ORGANIZATION_MAP')['Default']['remove_admins'] = True + backend.setting('ORGANIZATION_MAP')['Default']['remove_users'] = True + desired_org_state = {} + orgs_to_create = [] + _update_user_orgs(backend, desired_org_state, orgs_to_create, u1) + assert desired_org_state == {'Default': {'member_role': False, 'admin_role': False, 'auditor_role': False}} + assert orgs_to_create == ['Default'] + + # Test remove feature disabled + backend.setting('ORGANIZATION_MAP')['Default']['remove_admins'] = False + backend.setting('ORGANIZATION_MAP')['Default']['remove_users'] = False + desired_org_state = {} + orgs_to_create = [] + _update_user_orgs(backend, desired_org_state, orgs_to_create, u2) + + assert desired_org_state == {'Default': {'member_role': None, 'admin_role': None, 'auditor_role': False}} + assert orgs_to_create == ['Default'] + + # Test organization alias feature + backend.setting('ORGANIZATION_MAP')['Default']['organization_alias'] = 'Default_Alias' + orgs_to_create = [] + _update_user_orgs(backend, {}, orgs_to_create, u1) + assert orgs_to_create == ['Default_Alias'] + + def test__update_user_teams(self, backend, users): + u1, u2, u3 = users + + # Test user membership logic with regular expressions + backend.setting('TEAM_MAP')['Blue']['users'] = re.compile('.*') + backend.setting('TEAM_MAP')['Red']['users'] = re.compile('.*') + + desired_team_state = {} + teams_to_create = {} + _update_user_teams(backend, desired_team_state, teams_to_create, u1) + assert teams_to_create == {'Red': 'Default', 'Blue': 'Default'} + assert desired_team_state == {'Default': {'Blue': {'member_role': True}, 'Red': {'member_role': True}}} + + # Test remove feature enabled + backend.setting('TEAM_MAP')['Blue']['remove'] = True + backend.setting('TEAM_MAP')['Red']['remove'] = True + backend.setting('TEAM_MAP')['Blue']['users'] = '' + backend.setting('TEAM_MAP')['Red']['users'] = '' + + desired_team_state = {} + teams_to_create = {} + _update_user_teams(backend, desired_team_state, teams_to_create, u1) + assert teams_to_create == {'Red': 'Default', 'Blue': 'Default'} + assert desired_team_state == {'Default': {'Blue': {'member_role': False}, 'Red': {'member_role': False}}} + + # Test remove feature disabled + backend.setting('TEAM_MAP')['Blue']['remove'] = False + backend.setting('TEAM_MAP')['Red']['remove'] = False + + desired_team_state = {} + teams_to_create = {} + _update_user_teams(backend, desired_team_state, teams_to_create, u2) + assert teams_to_create == {'Red': 'Default', 'Blue': 'Default'} + # If we don't care about team memberships we just don't add them to the hash so this would be an empty hash + assert desired_team_state == {} + + +@pytest.mark.django_db +class TestSAMLM2M: + @pytest.mark.parametrize( + "expression, remove, expected_return", + [ + # No expression with no remove + (None, False, None), + ("", False, None), + # No expression with remove + (None, True, False), + # True expression with and without remove + (True, False, True), + (True, True, True), + # Single string matching the user name + ("user1", False, True), + # Single string matching the user email + ("user1@foo.com", False, True), + # Single string not matching username or email, no remove + ("user27", False, None), + # Single string not matching username or email, with remove + ("user27", True, False), + # Same tests with arrays instead of strings + (["user1"], False, True), + (["user1@foo.com"], False, True), + (["user27"], False, None), + (["user27"], True, False), + # Arrays with nothing matching + (["user27", "user28"], False, None), + (["user27", "user28"], True, False), + # Arrays with all matches + (["user1", "user1@foo.com"], False, True), + # Arrays with some match, some not + (["user1", "user28", "user27"], False, True), + # + # Note: For RE's, usually settings takes care of the compilation for us, so we have to do it manually for testing. + # we also need to remove any / or flags for the compile to happen + # + # Matching username regex non-array + (re.compile("^user.*"), False, True), + (re.compile("^user.*"), True, True), + # Matching email regex non-array + (re.compile(".*@foo.com$"), False, True), + (re.compile(".*@foo.com$"), True, True), + # Non-array not matching username or email + (re.compile("^$"), False, None), + (re.compile("^$"), True, False), + # All re tests just in array form + ([re.compile("^user.*")], False, True), + ([re.compile("^user.*")], True, True), + ([re.compile(".*@foo.com$")], False, True), + ([re.compile(".*@foo.com$")], True, True), + ([re.compile("^$")], False, None), + ([re.compile("^$")], True, False), + # An re with username matching but not email + ([re.compile("^user.*"), re.compile(".*@bar.com$")], False, True), + # An re with email matching but not username + ([re.compile("^user27$"), re.compile(".*@foo.com$")], False, True), + # An re array with no matching + ([re.compile("^user27$"), re.compile(".*@bar.com$")], False, None), + ([re.compile("^user27$"), re.compile(".*@bar.com$")], True, False), + # + # A mix of re and strings + # + # String matches, re does not + (["user1", re.compile(".*@bar.com$")], False, True), + # String does not match, re does + (["user27", re.compile(".*@foo.com$")], False, True), + # Nothing matches + (["user27", re.compile(".*@bar.com$")], False, None), + (["user27", re.compile(".*@bar.com$")], True, False), + ], + ) + def test__update_m2m_from_expression(self, expression, remove, expected_return): + user = User.objects.create(username='user1', last_name='foo', first_name='bar', email='user1@foo.com') + return_val = _update_m2m_from_expression(user, expression, remove) + assert return_val == expected_return + + +@pytest.mark.django_db +class TestSAMLAttrMaps: + @pytest.fixture + def backend(self): + class Backend: + s = { + 'ORGANIZATION_MAP': { + 'Default1': { + 'remove': True, + 'admins': 'foobar', + 'remove_admins': True, + 'users': 'foo', + 'remove_users': True, + 'organization_alias': 'o1_alias', + } + } + } + + def setting(self, key): + return self.s[key] + + return Backend() + + @pytest.mark.parametrize( + "setting, expected_state, expected_orgs_to_create, kwargs_member_of_mods", + [ + ( + # Default test, make sure that our roles get applied and removed as specified (with an alias) + { + 'saml_attr': 'memberOf', + 'saml_admin_attr': 'admins', + 'saml_auditor_attr': 'auditors', + 'remove': True, + 'remove_admins': True, + }, + { + 'Default2': {'member_role': True}, + 'Default3': {'admin_role': True}, + 'Default4': {'auditor_role': True}, + 'o1_alias': {'member_role': True}, + 'Rando1': {'admin_role': False, 'auditor_role': False, 'member_role': False}, + }, + [ + 'o1_alias', + 'Default2', + 'Default3', + 'Default4', + ], + None, + ), + ( + # Similar test, we are just going to override the values "coming from the IdP" to limit the teams + { + 'saml_attr': 'memberOf', + 'saml_admin_attr': 'admins', + 'saml_auditor_attr': 'auditors', + 'remove': True, + 'remove_admins': True, + }, + { + 'Default3': {'admin_role': True, 'member_role': True}, + 'Default4': {'auditor_role': True}, + 'Rando1': {'admin_role': False, 'auditor_role': False, 'member_role': False}, + }, + [ + 'Default3', + 'Default4', + ], + ['Default3'], + ), + ( + # Test to make sure the remove logic is working + { + 'saml_attr': 'memberOf', + 'saml_admin_attr': 'admins', + 'saml_auditor_attr': 'auditors', + 'remove': False, + 'remove_admins': False, + 'remove_auditors': False, + }, + { + 'Default2': {'member_role': True}, + 'Default3': {'admin_role': True}, + 'Default4': {'auditor_role': True}, + 'o1_alias': {'member_role': True}, + }, + [ + 'o1_alias', + 'Default2', + 'Default3', + 'Default4', + ], + ['Default1', 'Default2'], + ), + ], + ) + def test__update_user_orgs_by_saml_attr(self, backend, setting, expected_state, expected_orgs_to_create, kwargs_member_of_mods): + kwargs = { + 'username': u'cmeyers@redhat.com', + 'uid': 'idp:cmeyers@redhat.com', + 'request': {u'SAMLResponse': [], u'RelayState': [u'idp']}, + 'is_new': False, + 'response': { + 'session_index': '_0728f0e0-b766-0135-75fa-02842b07c044', + 'idp_name': u'idp', + 'attributes': { + 'memberOf': ['Default1', 'Default2'], + 'admins': ['Default3'], + 'auditors': ['Default4'], + 'groups': ['Blue', 'Red'], + 'User.email': ['cmeyers@redhat.com'], + 'User.LastName': ['Meyers'], + 'name_id': 'cmeyers@redhat.com', + 'User.FirstName': ['Chris'], + 'PersonImmutableID': [], + }, + }, + 'social': None, + 'strategy': None, + 'new_association': False, + } + if kwargs_member_of_mods: + kwargs['response']['attributes']['memberOf'] = kwargs_member_of_mods + + # Create a random organization in the database for testing + Organization.objects.create(name='Rando1') + + with override_settings(SOCIAL_AUTH_SAML_ORGANIZATION_ATTR=setting): + desired_org_state = {} + orgs_to_create = [] + _update_user_orgs_by_saml_attr(backend, desired_org_state, orgs_to_create, **kwargs) + assert desired_org_state == expected_state + assert orgs_to_create == expected_orgs_to_create + + @pytest.mark.parametrize( + "setting, expected_team_state, expected_teams_to_create, kwargs_group_override", + [ + ( + { + 'saml_attr': 'groups', + 'remove': False, + 'team_org_map': [ + {'team': 'Blue', 'organization': 'Default1'}, + {'team': 'Blue', 'organization': 'Default2'}, + {'team': 'Blue', 'organization': 'Default3'}, + {'team': 'Red', 'organization': 'Default1'}, + {'team': 'Green', 'organization': 'Default1'}, + {'team': 'Green', 'organization': 'Default3'}, + {'team': 'Yellow', 'team_alias': 'Yellow_Alias', 'organization': 'Default4', 'organization_alias': 'Default4_Alias'}, + ], + }, + { + 'Default1': { + 'Blue': {'member_role': True}, + 'Green': {'member_role': False}, + 'Red': {'member_role': True}, + }, + 'Default2': { + 'Blue': {'member_role': True}, + }, + 'Default3': { + 'Blue': {'member_role': True}, + 'Green': {'member_role': False}, + }, + 'Default4': { + 'Yellow': {'member_role': False}, + }, + }, + { + 'Blue': 'Default3', + 'Red': 'Default1', + }, + None, + ), + ( + { + 'saml_attr': 'groups', + 'remove': False, + 'team_org_map': [ + {'team': 'Blue', 'organization': 'Default1'}, + {'team': 'Blue', 'organization': 'Default2'}, + {'team': 'Blue', 'organization': 'Default3'}, + {'team': 'Red', 'organization': 'Default1'}, + {'team': 'Green', 'organization': 'Default1'}, + {'team': 'Green', 'organization': 'Default3'}, + {'team': 'Yellow', 'team_alias': 'Yellow_Alias', 'organization': 'Default4', 'organization_alias': 'Default4_Alias'}, + ], + }, + { + 'Default1': { + 'Blue': {'member_role': True}, + 'Green': {'member_role': True}, + 'Red': {'member_role': True}, + }, + 'Default2': { + 'Blue': {'member_role': True}, + }, + 'Default3': { + 'Blue': {'member_role': True}, + 'Green': {'member_role': True}, + }, + 'Default4': { + 'Yellow': {'member_role': False}, + }, + }, + { + 'Blue': 'Default3', + 'Red': 'Default1', + 'Green': 'Default3', + }, + ['Blue', 'Red', 'Green'], + ), + ( + { + 'saml_attr': 'groups', + 'remove': True, + 'team_org_map': [ + {'team': 'Blue', 'organization': 'Default1'}, + {'team': 'Blue', 'organization': 'Default2'}, + {'team': 'Blue', 'organization': 'Default3'}, + {'team': 'Red', 'organization': 'Default1'}, + {'team': 'Green', 'organization': 'Default1'}, + {'team': 'Green', 'organization': 'Default3'}, + {'team': 'Yellow', 'team_alias': 'Yellow_Alias', 'organization': 'Default4', 'organization_alias': 'Default4_Alias'}, + ], + }, + { + 'Default1': { + 'Blue': {'member_role': False}, + 'Green': {'member_role': True}, + 'Red': {'member_role': False}, + }, + 'Default2': { + 'Blue': {'member_role': False}, + }, + 'Default3': { + 'Blue': {'member_role': False}, + 'Green': {'member_role': True}, + }, + 'Default4': { + 'Yellow': {'member_role': False}, + }, + 'Rando1': { + 'Rando1': {'member_role': False}, + }, + }, + { + 'Green': 'Default3', + }, + ['Green'], + ), + ], + ) + def test__update_user_teams_by_saml_attr(self, setting, expected_team_state, expected_teams_to_create, kwargs_group_override): + kwargs = { + 'username': u'cmeyers@redhat.com', + 'uid': 'idp:cmeyers@redhat.com', + 'request': {u'SAMLResponse': [], u'RelayState': [u'idp']}, + 'is_new': False, + 'response': { + 'session_index': '_0728f0e0-b766-0135-75fa-02842b07c044', + 'idp_name': u'idp', + 'attributes': { + 'memberOf': ['Default1', 'Default2'], + 'admins': ['Default3'], + 'auditors': ['Default4'], + 'groups': ['Blue', 'Red'], + 'User.email': ['cmeyers@redhat.com'], + 'User.LastName': ['Meyers'], + 'name_id': 'cmeyers@redhat.com', + 'User.FirstName': ['Chris'], + 'PersonImmutableID': [], + }, + }, + 'social': None, + 'strategy': None, + 'new_association': False, + } + if kwargs_group_override: + kwargs['response']['attributes']['groups'] = kwargs_group_override + + o = Organization.objects.create(name='Rando1') + Team.objects.create(name='Rando1', organization_id=o.id) + + with override_settings(SOCIAL_AUTH_SAML_TEAM_ATTR=setting): + desired_team_state = {} + teams_to_create = {} + _update_user_teams_by_saml_attr(desired_team_state, teams_to_create, **kwargs) + assert desired_team_state == expected_team_state + assert teams_to_create == expected_teams_to_create + + +@pytest.mark.django_db +class TestSAMLUserFlags: + @pytest.mark.parametrize( + "user_flags_settings, expected, is_superuser", + [ + # In this case we will pass no user flags so new_flag should be false and changed will def be false + ( + {}, + (False, False), + False, + ), + # NOTE: The first handful of tests test role/value as string instead of lists. + # This was from the initial implementation of these fields but the code should be able to handle this + # There are a couple tests at the end of this which will validate arrays in these values. + # + # In this case we will give the user a group to make them an admin + ( + {'is_superuser_role': 'test-role-1'}, + (True, True), + False, + ), + # In this case we will give the user a flag that will make then an admin + ( + {'is_superuser_attr': 'is_superuser'}, + (True, True), + False, + ), + # In this case we will give the user a flag but the wrong value + ( + {'is_superuser_attr': 'is_superuser', 'is_superuser_value': 'junk'}, + (False, False), + False, + ), + # In this case we will give the user a flag and the right value + ( + {'is_superuser_attr': 'is_superuser', 'is_superuser_value': 'true'}, + (True, True), + False, + ), + # In this case we will give the user a proper role and an is_superuser_attr role that they don't have, this should make them an admin + ( + {'is_superuser_role': 'test-role-1', 'is_superuser_attr': 'gibberish', 'is_superuser_value': 'true'}, + (True, True), + False, + ), + # In this case we will give the user a proper role and an is_superuser_attr role that they have, this should make them an admin + ( + {'is_superuser_role': 'test-role-1', 'is_superuser_attr': 'test-role-1'}, + (True, True), + False, + ), + # In this case we will give the user a proper role and an is_superuser_attr role that they have but a bad value, this should make them an admin + ( + {'is_superuser_role': 'test-role-1', 'is_superuser_attr': 'is_superuser', 'is_superuser_value': 'junk'}, + (False, False), + False, + ), + # In this case we will give the user everything + ( + {'is_superuser_role': 'test-role-1', 'is_superuser_attr': 'is_superuser', 'is_superuser_value': 'true'}, + (True, True), + False, + ), + # In this test case we will validate that a single attribute (instead of a list) still works + ( + {'is_superuser_attr': 'name_id', 'is_superuser_value': 'test_id'}, + (True, True), + False, + ), + # This will be a negative test for a single attribute + ( + {'is_superuser_attr': 'name_id', 'is_superuser_value': 'junk'}, + (False, False), + False, + ), + # The user is already a superuser so we should remove them + ( + {'is_superuser_attr': 'name_id', 'is_superuser_value': 'junk', 'remove_superusers': True}, + (False, True), + True, + ), + # The user is already a superuser but we don't have a remove field + ( + {'is_superuser_attr': 'name_id', 'is_superuser_value': 'junk', 'remove_superusers': False}, + (True, False), + True, + ), + # Positive test for multiple values for is_superuser_value + ( + {'is_superuser_attr': 'is_superuser', 'is_superuser_value': ['junk', 'junk2', 'else', 'junk']}, + (True, True), + False, + ), + # Negative test for multiple values for is_superuser_value + ( + {'is_superuser_attr': 'is_superuser', 'is_superuser_value': ['junk', 'junk2', 'junk']}, + (False, True), + True, + ), + # Positive test for multiple values of is_superuser_role + ( + {'is_superuser_role': ['junk', 'junk2', 'something', 'junk']}, + (True, True), + False, + ), + # Negative test for multiple values of is_superuser_role + ( + {'is_superuser_role': ['junk', 'junk2', 'junk']}, + (False, True), + True, + ), + ], + ) + def test__check_flag(self, user_flags_settings, expected, is_superuser): + user = User() + user.username = 'John' + user.is_superuser = is_superuser + + attributes = { + 'email': ['noone@nowhere.com'], + 'last_name': ['Westcott'], + 'is_superuser': ['something', 'else', 'true'], + 'username': ['test_id'], + 'first_name': ['John'], + 'Role': ['test-role-1', 'something', 'different'], + 'name_id': 'test_id', + } + + assert expected == _check_flag(user, 'superuser', attributes, user_flags_settings) diff --git a/awx/sso/tests/functional/test_social_base_pipeline.py b/awx/sso/tests/functional/test_social_base_pipeline.py new file mode 100644 index 0000000000..38a49e15f3 --- /dev/null +++ b/awx/sso/tests/functional/test_social_base_pipeline.py @@ -0,0 +1,76 @@ +import pytest + +from awx.main.models import User +from awx.sso.social_base_pipeline import AuthNotFound, check_user_found_or_created, set_is_active_for_new_user, prevent_inactive_login, AuthInactive + + +@pytest.mark.django_db +class TestSocialBasePipeline: + def test_check_user_found_or_created_no_exception(self): + # If we have a user (the True param, we should not get an exception) + try: + check_user_found_or_created(None, {}, True) + except AuthNotFound: + assert False, 'check_user_found_or_created should not have raised an exception with a user' + + @pytest.mark.parametrize( + "details, kwargs, expected_id", + [ + ( + {}, + {}, + '???', + ), + ( + {}, + {'uid': 'kwargs_uid'}, + 'kwargs_uid', + ), + ( + {}, + {'uid': 'kwargs_uid', 'email': 'kwargs_email'}, + 'kwargs_email', + ), + ( + {'email': 'details_email'}, + {'uid': 'kwargs_uid', 'email': 'kwargs_email'}, + 'details_email', + ), + ], + ) + def test_check_user_found_or_created_exceptions(self, details, expected_id, kwargs): + with pytest.raises(AuthNotFound) as e: + check_user_found_or_created(None, details, False, None, **kwargs) + assert f'An account cannot be found for {expected_id}' == str(e.value) + + @pytest.mark.parametrize( + "kwargs, expected_details, expected_response", + [ + ({}, {}, None), + ({'is_new': False}, {}, None), + ({'is_new': True}, {'is_active': True}, {'details': {'is_active': True}}), + ], + ) + def test_set_is_active_for_new_user(self, kwargs, expected_details, expected_response): + details = {} + response = set_is_active_for_new_user(None, details, None, None, **kwargs) + assert details == expected_details + assert response == expected_response + + def test_prevent_inactive_login_no_exception_no_user(self): + try: + prevent_inactive_login(None, None, None, None, None) + except AuthInactive: + assert False, 'prevent_inactive_login should not have raised an exception with no user' + + def test_prevent_inactive_login_no_exception_active_user(self): + user = User.objects.create(username='user1@foo.com', last_name='foo', first_name='bar', email='user1@foo.com', is_active=True) + try: + prevent_inactive_login(None, None, user, None, None) + except AuthInactive: + assert False, 'prevent_inactive_login should not have raised an exception with an active user' + + def test_prevent_inactive_login_no_exception_inactive_user(self): + user = User.objects.create(username='user1@foo.com', last_name='foo', first_name='bar', email='user1@foo.com', is_active=False) + with pytest.raises(AuthInactive): + prevent_inactive_login(None, None, user, None, None) diff --git a/awx/sso/tests/functional/test_social_pipeline.py b/awx/sso/tests/functional/test_social_pipeline.py new file mode 100644 index 0000000000..f26886e719 --- /dev/null +++ b/awx/sso/tests/functional/test_social_pipeline.py @@ -0,0 +1,113 @@ +import pytest +import re + +from awx.sso.social_pipeline import update_user_orgs, update_user_teams +from awx.main.models import User, Team, Organization + + +@pytest.fixture +def users(): + u1 = User.objects.create(username='user1@foo.com', last_name='foo', first_name='bar', email='user1@foo.com') + u2 = User.objects.create(username='user2@foo.com', last_name='foo', first_name='bar', email='user2@foo.com') + u3 = User.objects.create(username='user3@foo.com', last_name='foo', first_name='bar', email='user3@foo.com') + return (u1, u2, u3) + + +@pytest.mark.django_db +class TestSocialPipeline: + @pytest.fixture + def backend(self): + class Backend: + s = { + 'ORGANIZATION_MAP': { + 'Default': { + 'remove': True, + 'admins': 'foobar', + 'remove_admins': True, + 'users': 'foo', + 'remove_users': True, + 'organization_alias': '', + } + }, + 'TEAM_MAP': {'Blue': {'organization': 'Default', 'remove': True, 'users': ''}, 'Red': {'organization': 'Default', 'remove': True, 'users': ''}}, + } + + def setting(self, key): + return self.s[key] + + return Backend() + + @pytest.fixture + def org(self): + return Organization.objects.create(name="Default") + + def test_update_user_orgs(self, org, backend, users): + u1, u2, u3 = users + + # Test user membership logic with regular expressions + backend.setting('ORGANIZATION_MAP')['Default']['admins'] = re.compile('.*') + backend.setting('ORGANIZATION_MAP')['Default']['users'] = re.compile('.*') + + update_user_orgs(backend, None, u1) + update_user_orgs(backend, None, u2) + update_user_orgs(backend, None, u3) + + assert org.admin_role.members.count() == 3 + assert org.member_role.members.count() == 3 + + # Test remove feature enabled + backend.setting('ORGANIZATION_MAP')['Default']['admins'] = '' + backend.setting('ORGANIZATION_MAP')['Default']['users'] = '' + backend.setting('ORGANIZATION_MAP')['Default']['remove_admins'] = True + backend.setting('ORGANIZATION_MAP')['Default']['remove_users'] = True + update_user_orgs(backend, None, u1) + + assert org.admin_role.members.count() == 2 + assert org.member_role.members.count() == 2 + + # Test remove feature disabled + backend.setting('ORGANIZATION_MAP')['Default']['remove_admins'] = False + backend.setting('ORGANIZATION_MAP')['Default']['remove_users'] = False + update_user_orgs(backend, None, u2) + + assert org.admin_role.members.count() == 2 + assert org.member_role.members.count() == 2 + + # Test organization alias feature + backend.setting('ORGANIZATION_MAP')['Default']['organization_alias'] = 'Default_Alias' + update_user_orgs(backend, None, u1) + assert Organization.objects.get(name="Default_Alias") is not None + + def test_update_user_teams(self, backend, users): + u1, u2, u3 = users + + # Test user membership logic with regular expressions + backend.setting('TEAM_MAP')['Blue']['users'] = re.compile('.*') + backend.setting('TEAM_MAP')['Red']['users'] = re.compile('.*') + + update_user_teams(backend, None, u1) + update_user_teams(backend, None, u2) + update_user_teams(backend, None, u3) + + assert Team.objects.get(name="Red").member_role.members.count() == 3 + assert Team.objects.get(name="Blue").member_role.members.count() == 3 + + # Test remove feature enabled + backend.setting('TEAM_MAP')['Blue']['remove'] = True + backend.setting('TEAM_MAP')['Red']['remove'] = True + backend.setting('TEAM_MAP')['Blue']['users'] = '' + backend.setting('TEAM_MAP')['Red']['users'] = '' + + update_user_teams(backend, None, u1) + + assert Team.objects.get(name="Red").member_role.members.count() == 2 + assert Team.objects.get(name="Blue").member_role.members.count() == 2 + + # Test remove feature disabled + backend.setting('TEAM_MAP')['Blue']['remove'] = False + backend.setting('TEAM_MAP')['Red']['remove'] = False + + update_user_teams(backend, None, u2) + + assert Team.objects.get(name="Red").member_role.members.count() == 2 + assert Team.objects.get(name="Blue").member_role.members.count() == 2 diff --git a/awx/sso/tests/unit/test_pipeline.py b/awx/sso/tests/unit/test_pipeline.py deleted file mode 100644 index 8e1dd4e92f..0000000000 --- a/awx/sso/tests/unit/test_pipeline.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_module_loads(): - from awx.sso import pipeline # noqa diff --git a/awx/sso/tests/unit/test_pipelines.py b/awx/sso/tests/unit/test_pipelines.py new file mode 100644 index 0000000000..94a1111187 --- /dev/null +++ b/awx/sso/tests/unit/test_pipelines.py @@ -0,0 +1,12 @@ +import pytest + + +@pytest.mark.parametrize( + "lib", + [ + ("saml_pipeline"), + ("social_pipeline"), + ], +) +def test_module_loads(lib): + module = __import__("awx.sso." + lib) # noqa