Remove sso app (#15550)

Remove sso app.
This commit is contained in:
Djebran Lezzoum
2024-10-02 20:06:50 +02:00
committed by jessicamack
parent 1ca034b0a7
commit 4c7697465b
44 changed files with 51 additions and 1817 deletions

View File

@@ -339,7 +339,7 @@ api-lint:
awx-link:
[ -d "/awx_devel/awx.egg-info" ] || $(PYTHON) /awx_devel/tools/scripts/egg_info_dev
TEST_DIRS ?= awx/main/tests/unit awx/main/tests/functional awx/conf/tests awx/sso/tests
TEST_DIRS ?= awx/main/tests/unit awx/main/tests/functional awx/conf/tests
PYTEST_ARGS ?= -n auto
## Run all API unit tests.
test:
@@ -440,7 +440,7 @@ test_unit:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
py.test awx/main/tests/unit awx/conf/tests/unit awx/sso/tests/unit
py.test awx/main/tests/unit awx/conf/tests/unit
## Output test coverage as HTML (into htmlcov directory).
coverage_html:

View File

@@ -8,7 +8,6 @@ from rest_framework import serializers
from awx.conf import fields, register, register_validate
from awx.api.fields import OAuth2ProviderField
from oauth2_provider.settings import oauth2_settings
from awx.sso.common import is_remote_auth_enabled
register(
@@ -109,7 +108,7 @@ register(
def authentication_validate(serializer, attrs):
if attrs.get('DISABLE_LOCAL_AUTH', False) and not is_remote_auth_enabled():
if attrs.get('DISABLE_LOCAL_AUTH', False):
raise serializers.ValidationError(_("There are no remote authentication systems configured."))
return attrs

View File

@@ -134,8 +134,6 @@ from awx.api.fields import BooleanNullField, CharNullField, ChoiceNullField, Ver
# AWX Utils
from awx.api.validators import HostnameRegexValidator
from awx.sso.common import get_external_account
logger = logging.getLogger('awx.api.serializers')
# Fields that should be summarized regardless of object type.
@@ -961,7 +959,6 @@ class UnifiedJobStdoutSerializer(UnifiedJobSerializer):
class UserSerializer(BaseSerializer):
password = serializers.CharField(required=False, default='', help_text=_('Field used to change the password.'))
external_account = serializers.SerializerMethodField(help_text=_('Set if the account is managed by an external service'))
is_system_auditor = serializers.BooleanField(default=False)
show_capabilities = ['edit', 'delete']
@@ -979,20 +976,12 @@ class UserSerializer(BaseSerializer):
'is_system_auditor',
'password',
'last_login',
'external_account',
)
extra_kwargs = {'last_login': {'read_only': True}}
def to_representation(self, obj):
ret = super(UserSerializer, self).to_representation(obj)
if self.get_external_account(obj):
# If this is an external account it shouldn't have a password field
ret.pop('password', None)
else:
# If its an internal account lets assume there is a password and return $encrypted$ to the user
ret['password'] = '$encrypted$'
if obj and type(self) is UserSerializer:
ret['auth'] = obj.social_auth.values('provider', 'uid')
ret['password'] = '$encrypted$'
return ret
def get_validation_exclusions(self, obj=None):
@@ -1025,12 +1014,7 @@ class UserSerializer(BaseSerializer):
return value
def _update_password(self, obj, new_password):
# For now we're not raising an error, just not saving password for
# users managed by external authentication services (who already have an unusable password set).
# get_external_account function will return something like social or enterprise when the user is external,
# and return None when the user isn't external.
# We want to allow a password update only for non-external accounts.
if new_password and new_password != '$encrypted$' and not self.get_external_account(obj):
if new_password and new_password != '$encrypted$':
obj.set_password(new_password)
obj.save(update_fields=['password'])
@@ -1045,9 +1029,6 @@ class UserSerializer(BaseSerializer):
obj.set_unusable_password()
obj.save(update_fields=['password'])
def get_external_account(self, obj):
return get_external_account(obj)
def create(self, validated_data):
new_password = validated_data.pop('password', None)
is_system_auditor = validated_data.pop('is_system_auditor', None)

View File

@@ -50,9 +50,6 @@ from rest_framework_yaml.renderers import YAMLRenderer
# ansi2html
from ansi2html import Ansi2HTMLConverter
# Python Social Auth
from social_core.backends.utils import load_backends
# Django OAuth Toolkit
from oauth2_provider.models import get_access_token_model
@@ -129,6 +126,9 @@ from awx.api.views.mixin import (
from awx.api.pagination import UnifiedJobEventPagination
from awx.main.utils import set_environ
if 'ansible_base.authentication' in getattr(settings, "INSTALLED_APPS", []):
from ansible_base.authentication.models.authenticator import Authenticator as AnsibleBaseAuthenticator
logger = logging.getLogger('awx.api.views')
@@ -684,20 +684,18 @@ class AuthView(APIView):
swagger_topic = 'System Configuration'
def get(self, request):
from rest_framework.reverse import reverse
data = OrderedDict()
err_backend, err_message = request.session.get('social_auth_error', (None, None))
auth_backends = list(load_backends(settings.AUTHENTICATION_BACKENDS, force_load=True).items())
# Return auth backends in consistent order: oidc.
auth_backends.sort(key=lambda x: x[0])
for name, backend in auth_backends:
login_url = reverse('social:begin', args=(name,))
complete_url = request.build_absolute_uri(reverse('social:complete', args=(name,)))
backend_data = {'login_url': login_url, 'complete_url': complete_url}
if err_backend == name and err_message:
backend_data['error'] = err_message
data[name] = backend_data
if 'ansible_base.authentication' in getattr(settings, "INSTALLED_APPS", []):
# app is using ansible_base authentication
# add ansible_base authenticators
authenticators = AnsibleBaseAuthenticator.objects.filter(enabled=True, category="sso")
for authenticator in authenticators:
login_url = authenticator.get_login_url()
data[authenticator.name] = {
'login_url': login_url,
'name': authenticator.name,
}
return Response(data)

View File

@@ -61,18 +61,3 @@ def on_post_delete_setting(sender, **kwargs):
key = getattr(instance, '_saved_key_', None)
if key:
handle_setting_change(key, True)
@receiver(setting_changed)
def disable_local_auth(**kwargs):
if (kwargs['setting'], kwargs['value']) == ('DISABLE_LOCAL_AUTH', True):
from django.contrib.auth.models import User
from oauth2_provider.models import RefreshToken
from awx.main.models.oauth import OAuth2AccessToken
from awx.main.management.commands.revoke_oauth2_tokens import revoke_tokens
logger.warning("Triggering token invalidation for local users.")
qs = User.objects.filter(enterprise_auth__isnull=True, social_auth__isnull=True)
revoke_tokens(RefreshToken.objects.filter(revoked=None, user__in=qs))
revoke_tokens(OAuth2AccessToken.objects.filter(user__in=qs))

View File

@@ -93,8 +93,8 @@ class DisableLocalAuthMiddleware(MiddlewareMixin):
user = request.user
if not user.pk:
return
if not (user.social_auth.exists() or user.enterprise_auth.exists()):
logout(request)
logout(request)
class URLModificationMiddleware(MiddlewareMixin):

View File

@@ -1,4 +1,4 @@
# Generated by Django 4.2.10 on 2024-08-09 16:47
# Generated by Django 4.2.10 on 2024-09-16 10:22
from django.db import migrations

View File

@@ -0,0 +1,27 @@
# Generated by Django 4.2.10 on 2024-09-16 15:21
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('main', '0196_delete_profile'),
]
operations = [
# delete all sso application migrations
migrations.RunSQL("DELETE FROM django_migrations WHERE app = 'sso';"),
# delete all sso application content group permissions
migrations.RunSQL(
"DELETE FROM auth_group_permissions "
"WHERE permission_id IN "
"(SELECT id FROM auth_permission WHERE content_type_id in (SELECT id FROM django_content_type WHERE app_label = 'sso'));"
),
# delete all sso application content permissions
migrations.RunSQL("DELETE FROM auth_permission " "WHERE content_type_id IN (SELECT id FROM django_content_type WHERE app_label = 'sso');"),
# delete sso application content type
migrations.RunSQL("DELETE FROM django_content_type WHERE app_label = 'sso';"),
# drop sso application created table
migrations.RunSQL("DROP TABLE IF EXISTS sso_userenterpriseauth;"),
]

View File

@@ -244,16 +244,6 @@ def user_is_system_auditor(user, tf):
User.add_to_class('is_system_auditor', user_is_system_auditor)
def user_is_in_enterprise_category(user, category):
ret = (category,) in user.enterprise_auth.values_list('provider') and not user.has_usable_password()
# NOTE: this if block ensures existing enterprise users are still able to
# log in. Remove it in a future release
return ret
User.add_to_class('is_in_enterprise_category', user_is_in_enterprise_category)
def o_auth2_application_get_absolute_url(self, request=None):
return reverse('api:o_auth2_application_detail', kwargs={'pk': self.pk}, request=request)

View File

@@ -12,9 +12,7 @@ from django.conf import settings
# Django OAuth Toolkit
from oauth2_provider.models import AbstractApplication, AbstractAccessToken
from oauth2_provider.generators import generate_client_secret
from oauthlib import oauth2
from awx.sso.common import get_external_account
from awx.main.fields import OAuth2ClientSecretField
@@ -123,15 +121,5 @@ class OAuth2AccessToken(AbstractAccessToken):
connection.on_commit(_update_last_used)
return valid
def validate_external_users(self):
if self.user and settings.ALLOW_OAUTH2_FOR_EXTERNAL_USERS is False:
external_account = get_external_account(self.user)
if external_account is not None:
raise oauth2.AccessDeniedError(
_('OAuth2 Tokens cannot be created by users associated with an external authentication provider ({})').format(external_account)
)
def save(self, *args, **kwargs):
if not self.pk:
self.validate_external_users()
super(OAuth2AccessToken, self).save(*args, **kwargs)

View File

@@ -314,8 +314,6 @@ TEMPLATES = [
'django.contrib.messages.context_processors.messages',
'awx.ui.context_processors.csp',
'awx.ui.context_processors.version',
'social_django.context_processors.backends',
'social_django.context_processors.login_redirect',
],
'builtins': ['awx.main.templatetags.swagger'],
'libraries': {
@@ -349,14 +347,12 @@ INSTALLED_APPS = [
'rest_framework',
'django_extensions',
'polymorphic',
'social_django',
'django_guid',
'corsheaders',
'awx.conf',
'awx.main',
'awx.api',
'awx.ui',
'awx.sso',
'solo',
'ansible_base.rest_filters',
'ansible_base.jwt_consumer',
@@ -391,9 +387,7 @@ REST_FRAMEWORK = {
# 'URL_FORMAT_OVERRIDE': None,
}
AUTHENTICATION_BACKENDS = (
'awx.main.backends.AWXModelBackend',
)
AUTHENTICATION_BACKENDS = ('awx.main.backends.AWXModelBackend',)
# Django OAuth Toolkit settings
@@ -460,10 +454,6 @@ CELERYBEAT_SCHEDULE = {
DJANGO_REDIS_IGNORE_EXCEPTIONS = True
CACHES = {'default': {'BACKEND': 'awx.main.cache.AWXRedisCache', 'LOCATION': 'unix:///var/run/redis/redis.sock?db=1'}}
# Social Auth configuration.
SOCIAL_AUTH_STRATEGY = 'social_django.strategy.DjangoStrategy'
SOCIAL_AUTH_STORAGE = 'social_django.models.DjangoStorage'
SOCIAL_AUTH_USER_MODEL = 'auth.User'
ROLE_SINGLETON_USER_RELATIONSHIP = ''
ROLE_SINGLETON_TEAM_RELATIONSHIP = ''
@@ -471,41 +461,6 @@ ROLE_SINGLETON_TEAM_RELATIONSHIP = ''
ROLE_BYPASS_SUPERUSER_FLAGS = ['is_superuser']
ROLE_BYPASS_ACTION_FLAGS = {'view': 'is_system_auditor'}
_SOCIAL_AUTH_PIPELINE_BASE = (
'social_core.pipeline.social_auth.social_details',
'social_core.pipeline.social_auth.social_uid',
'social_core.pipeline.social_auth.auth_allowed',
'social_core.pipeline.social_auth.social_user',
'social_core.pipeline.user.get_username',
'social_core.pipeline.social_auth.associate_by_email',
'social_core.pipeline.user.create_user',
'awx.sso.social_base_pipeline.check_user_found_or_created',
'social_core.pipeline.social_auth.associate_user',
'social_core.pipeline.social_auth.load_extra_data',
'awx.sso.social_base_pipeline.set_is_active_for_new_user',
'social_core.pipeline.user.user_details',
'awx.sso.social_base_pipeline.prevent_inactive_login',
)
SOCIAL_AUTH_PIPELINE = _SOCIAL_AUTH_PIPELINE_BASE + (
'awx.sso.social_pipeline.update_user_orgs',
'awx.sso.social_pipeline.update_user_teams',
'ansible_base.resource_registry.utils.service_backed_sso_pipeline.redirect_to_resource_server',
)
SOCIAL_AUTH_LOGIN_URL = '/'
SOCIAL_AUTH_LOGIN_REDIRECT_URL = '/sso/complete/'
SOCIAL_AUTH_LOGIN_ERROR_URL = '/sso/error/'
SOCIAL_AUTH_INACTIVE_USER_URL = '/sso/inactive/'
SOCIAL_AUTH_RAISE_EXCEPTIONS = False
SOCIAL_AUTH_USERNAME_IS_FULL_EMAIL = False
# SOCIAL_AUTH_SLUGIFY_USERNAMES = True
SOCIAL_AUTH_CLEAN_USERNAMES = True
SOCIAL_AUTH_SANITIZE_REDIRECTS = True
SOCIAL_AUTH_REDIRECT_IS_HTTPS = False
# Any ANSIBLE_* settings will be passed to the task runner subprocess
# environment
@@ -946,7 +901,6 @@ MIDDLEWARE = [
'awx.main.middleware.DisableLocalAuthMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'awx.main.middleware.OptionalURLPrefixPath',
'awx.sso.middleware.SocialAuthMiddleware',
'crum.CurrentRequestUserMiddleware',
'awx.main.middleware.URLModificationMiddleware',
'awx.main.middleware.SessionTimeoutMiddleware',

View File

@@ -1,2 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.

View File

@@ -1,8 +0,0 @@
# Django
from django.apps import AppConfig
from django.utils.translation import gettext_lazy as _
class SSOConfig(AppConfig):
name = 'awx.sso'
verbose_name = _('Single Sign-On')

View File

@@ -1,35 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import logging
# Django
from django.contrib.auth.models import User
from django.conf import settings as django_settings
# Ansible Tower
from awx.sso.models import UserEnterpriseAuth
logger = logging.getLogger('awx.sso.backends')
def _decorate_enterprise_user(user, provider):
user.set_unusable_password()
user.save()
enterprise_auth, _ = UserEnterpriseAuth.objects.get_or_create(user=user, provider=provider)
return enterprise_auth
def _get_or_set_enterprise_user(username, password, provider):
created = False
try:
user = User.objects.prefetch_related('enterprise_auth').get(username=username)
except User.DoesNotExist:
user = User(username=username)
enterprise_auth = _decorate_enterprise_user(user, provider)
logger.debug("Created enterprise user %s via %s backend." % (username, enterprise_auth.get_provider_display()))
created = True
if created or user.is_in_enterprise_category(provider):
return user
logger.warning("Enterprise user %s already defined in Tower." % username)

View File

@@ -1,195 +0,0 @@
# Copyright (c) 2022 Ansible, Inc.
# All Rights Reserved.
import logging
from django.contrib.contenttypes.models import ContentType
from django.db.utils import IntegrityError
from awx.main.models import Organization, Team
logger = logging.getLogger('awx.sso.common')
def get_orgs_by_ids():
existing_orgs = {}
for org_id, org_name in Organization.objects.all().values_list('id', 'name'):
existing_orgs[org_name] = org_id
return existing_orgs
def reconcile_users_org_team_mappings(user, desired_org_states, desired_team_states, source):
#
# Arguments:
# user - a user object
# desired_org_states: { '<org_name>': { '<role>': <boolean> or None } }
# desired_team_states: { '<org_name>': { '<team name>': { '<role>': <boolean> or None } } }
# source - a text label indicating the "authentication adapter" for debug messages
#
# This function will load the users existing roles and then based on the desired states modify the users roles
# True indicates the user needs to be a member of the role
# False indicates the user should not be a member of the role
# None means this function should not change the users membership of a role
#
content_types = []
reconcile_items = []
if desired_org_states:
content_types.append(ContentType.objects.get_for_model(Organization))
reconcile_items.append(('organization', desired_org_states))
if desired_team_states:
content_types.append(ContentType.objects.get_for_model(Team))
reconcile_items.append(('team', desired_team_states))
if not content_types:
# If both desired states were empty we can simply return because there is nothing to reconcile
return
# users_roles is a flat set of IDs
users_roles = set(user.roles.filter(content_type__in=content_types).values_list('pk', flat=True))
for object_type, desired_states in reconcile_items:
roles = []
# Get a set of named tuples for the org/team name plus all of the roles we got above
if object_type == 'organization':
for sub_dict in desired_states.values():
for role_name in sub_dict:
if sub_dict[role_name] is None:
continue
if role_name not in roles:
roles.append(role_name)
model_roles = Organization.objects.filter(name__in=desired_states.keys()).values_list('name', *roles, named=True)
else:
team_names = []
for teams_dict in desired_states.values():
team_names.extend(teams_dict.keys())
for sub_dict in teams_dict.values():
for role_name in sub_dict:
if sub_dict[role_name] is None:
continue
if role_name not in roles:
roles.append(role_name)
model_roles = Team.objects.filter(name__in=team_names).values_list('name', 'organization__name', *roles, named=True)
for row in model_roles:
for role_name in roles:
if object_type == 'organization':
desired_state = desired_states.get(row.name, {})
else:
desired_state = desired_states.get(row.organization__name, {}).get(row.name, {})
if desired_state.get(role_name, None) is None:
# The mapping was not defined for this [org/team]/role so we can just pass
continue
# If somehow the auth adapter knows about an items role but that role is not defined in the DB we are going to print a pretty error
# This is your classic safety net that we should never hit; but here you are reading this comment... good luck and Godspeed.
role_id = getattr(row, role_name, None)
if role_id is None:
logger.error("{} adapter wanted to manage role {} of {} {} but that role is not defined".format(source, role_name, object_type, row.name))
continue
if desired_state[role_name]:
# The desired state was the user mapped into the object_type, if the user was not mapped in map them in
if role_id not in users_roles:
logger.debug("{} adapter adding user {} to {} {} as {}".format(source, user.username, object_type, row.name, role_name))
user.roles.add(role_id)
else:
# The desired state was the user was not mapped into the org, if the user has the permission remove it
if role_id in users_roles:
logger.debug("{} adapter removing user {} permission of {} from {} {}".format(source, user.username, role_name, object_type, row.name))
user.roles.remove(role_id)
def create_org_and_teams(org_list, team_map, adapter, can_create=True):
#
# org_list is a set of organization names
# team_map is a dict of {<team_name>: <org name>}
#
# Move this junk into save of the settings for performance later, there is no need to do that here
# with maybe the exception of someone defining this in settings before the server is started?
# ==============================================================================================================
if not can_create:
logger.debug(f"Adapter {adapter} is not allowed to create orgs/teams")
return
# Get all of the IDs and names of orgs in the DB and create any new org defined in org_list that does not exist in the DB
existing_orgs = get_orgs_by_ids()
# Parse through orgs and teams provided and create a list of unique items we care about creating
all_orgs = list(set(org_list))
all_teams = []
for team_name in team_map:
org_name = team_map[team_name]
if org_name:
if org_name not in all_orgs:
all_orgs.append(org_name)
# We don't have to test if this is in all_teams because team_map is already a hash
all_teams.append(team_name)
else:
# The UI should prevent this condition so this is just a double check to prevent a stack trace....
# although the rest of the login process might stack later on
logger.error("{} adapter is attempting to create a team {} but it does not have an org".format(adapter, team_name))
for org_name in all_orgs:
if org_name and org_name not in existing_orgs:
logger.info("{} adapter is creating org {}".format(adapter, org_name))
try:
new_org = get_or_create_org_with_default_galaxy_cred(name=org_name)
except IntegrityError:
# Another thread must have created this org before we did so now we need to get it
new_org = get_or_create_org_with_default_galaxy_cred(name=org_name)
# Add the org name to the existing orgs since we created it and we may need it to build the teams below
existing_orgs[org_name] = new_org.id
# Do the same for teams
existing_team_names = list(Team.objects.all().values_list('name', flat=True))
for team_name in all_teams:
if team_name not in existing_team_names:
logger.info("{} adapter is creating team {} in org {}".format(adapter, team_name, team_map[team_name]))
try:
Team.objects.create(name=team_name, organization_id=existing_orgs[team_map[team_name]])
except IntegrityError:
# If another process got here before us that is ok because we don't need the ID from this team or anything
pass
# End move some day
# ==============================================================================================================
def get_or_create_org_with_default_galaxy_cred(**kwargs):
from awx.main.models import Organization, Credential
(org, org_created) = Organization.objects.get_or_create(**kwargs)
if org_created:
logger.debug("Created org {} (id {}) from {}".format(org.name, org.id, kwargs))
public_galaxy_credential = Credential.objects.filter(managed=True, name='Ansible Galaxy').first()
if public_galaxy_credential is not None:
org.galaxy_credentials.add(public_galaxy_credential)
logger.debug("Added default Ansible Galaxy credential to org")
else:
logger.debug("Could not find default Ansible Galaxy credential to add to org")
return org
def get_external_account(user):
account_type = None
if user.social_auth.all():
account_type = "social"
if user.enterprise_auth.all():
account_type = "enterprise"
return account_type
def is_remote_auth_enabled():
from django.conf import settings
settings_that_turn_on_remote_auth = []
# Also include any SOCAIL_AUTH_*KEY
for social_auth_key in dir(settings):
if social_auth_key.startswith('SOCIAL_AUTH_') and social_auth_key.endswith('_KEY'):
settings_that_turn_on_remote_auth.append(social_auth_key)
return any(getattr(settings, s, None) for s in settings_that_turn_on_remote_auth)

View File

@@ -1,180 +0,0 @@
# Python
import collections
import urllib.parse as urlparse
# Django
from django.conf import settings
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
# AWX
from awx.conf import register, fields
from awx.sso.fields import (
AuthenticationBackendsField,
SocialOrganizationMapField,
SocialTeamMapField,
)
class SocialAuthCallbackURL(object):
def __init__(self, provider):
self.provider = provider
def __call__(self):
path = reverse('social:complete', args=(self.provider,))
return urlparse.urljoin(settings.TOWER_URL_BASE, path)
SOCIAL_AUTH_ORGANIZATION_MAP_HELP_TEXT = _(
'''\
Mapping to organization admins/users from social auth accounts. This setting
controls which users are placed into which organizations based on their
username and email address. Configuration details are available in the
documentation.\
'''
)
# FIXME: /regex/gim (flags)
SOCIAL_AUTH_ORGANIZATION_MAP_PLACEHOLDER = collections.OrderedDict(
[
('Default', collections.OrderedDict([('users', True)])),
('Test Org', collections.OrderedDict([('admins', ['admin@example.com']), ('auditors', ['auditor@example.com']), ('users', True)])),
(
'Test Org 2',
collections.OrderedDict(
[
('admins', ['admin@example.com', r'/^tower-[^@]+*?@.*$/']),
('remove_admins', True),
('users', r'/^[^@].*?@example\.com$/i'),
('remove_users', True),
]
),
),
]
)
SOCIAL_AUTH_TEAM_MAP_HELP_TEXT = _(
'''\
Mapping of team members (users) from social auth accounts. Configuration
details are available in the documentation.\
'''
)
SOCIAL_AUTH_TEAM_MAP_PLACEHOLDER = collections.OrderedDict(
[
('My Team', collections.OrderedDict([('organization', 'Test Org'), ('users', [r'/^[^@]+?@test\.example\.com$/']), ('remove', True)])),
('Other Team', collections.OrderedDict([('organization', 'Test Org 2'), ('users', r'/^[^@]+?@test2\.example\.com$/i'), ('remove', False)])),
]
)
if settings.ALLOW_LOCAL_RESOURCE_MANAGEMENT:
###############################################################################
# AUTHENTICATION BACKENDS DYNAMIC SETTING
###############################################################################
register(
'AUTHENTICATION_BACKENDS',
field_class=AuthenticationBackendsField,
label=_('Authentication Backends'),
help_text=_('List of authentication backends that are enabled based on license features and other authentication settings.'),
read_only=True,
depends_on=AuthenticationBackendsField.get_all_required_settings(),
category=_('Authentication'),
category_slug='authentication',
)
register(
'SOCIAL_AUTH_ORGANIZATION_MAP',
field_class=SocialOrganizationMapField,
allow_null=True,
default=None,
label=_('Social Auth Organization Map'),
help_text=SOCIAL_AUTH_ORGANIZATION_MAP_HELP_TEXT,
category=_('Authentication'),
category_slug='authentication',
placeholder=SOCIAL_AUTH_ORGANIZATION_MAP_PLACEHOLDER,
)
register(
'SOCIAL_AUTH_TEAM_MAP',
field_class=SocialTeamMapField,
allow_null=True,
default=None,
label=_('Social Auth Team Map'),
help_text=SOCIAL_AUTH_TEAM_MAP_HELP_TEXT,
category=_('Authentication'),
category_slug='authentication',
placeholder=SOCIAL_AUTH_TEAM_MAP_PLACEHOLDER,
)
register(
'SOCIAL_AUTH_USER_FIELDS',
field_class=fields.StringListField,
allow_null=True,
default=None,
label=_('Social Auth User Fields'),
help_text=_(
'When set to an empty list `[]`, this setting prevents new user '
'accounts from being created. Only users who have previously '
'logged in using social auth or have a user account with a '
'matching email address will be able to login.'
),
category=_('Authentication'),
category_slug='authentication',
placeholder=['username', 'email'],
)
register(
'SOCIAL_AUTH_USERNAME_IS_FULL_EMAIL',
field_class=fields.BooleanField,
default=False,
label=_('Use Email address for usernames'),
help_text=_('Enabling this setting will tell social auth to use the full Email as username instead of the full name'),
category=_('Authentication'),
category_slug='authentication',
)
register(
'LOCAL_PASSWORD_MIN_LENGTH',
field_class=fields.IntegerField,
min_value=0,
default=0,
label=_('Minimum number of characters in local password'),
help_text=_('Minimum number of characters required in a local password. 0 means no minimum'),
category=_('Authentication'),
category_slug='authentication',
)
register(
'LOCAL_PASSWORD_MIN_DIGITS',
field_class=fields.IntegerField,
min_value=0,
default=0,
label=_('Minimum number of digit characters in local password'),
help_text=_('Minimum number of digit characters required in a local password. 0 means no minimum'),
category=_('Authentication'),
category_slug='authentication',
)
register(
'LOCAL_PASSWORD_MIN_UPPER',
field_class=fields.IntegerField,
min_value=0,
default=0,
label=_('Minimum number of uppercase characters in local password'),
help_text=_('Minimum number of uppercase characters required in a local password. 0 means no minimum'),
category=_('Authentication'),
category_slug='authentication',
)
register(
'LOCAL_PASSWORD_MIN_SPECIAL',
field_class=fields.IntegerField,
min_value=0,
default=0,
label=_('Minimum number of special characters in local password'),
help_text=_('Minimum number of special characters required in a local password. 0 means no minimum'),
category=_('Authentication'),
category_slug='authentication',
)

View File

@@ -1,229 +0,0 @@
import collections
import copy
import json
import re
import six
# Django
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import ValidationError
from rest_framework.fields import empty, Field, SkipField
# AWX
from awx.conf import fields
def get_subclasses(cls):
for subclass in cls.__subclasses__():
for subsubclass in get_subclasses(subclass):
yield subsubclass
yield subclass
class DependsOnMixin:
def get_depends_on(self):
"""
Get the value of the dependent field.
First try to find the value in the request.
Then fall back to the raw value from the setting in the DB.
"""
from django.conf import settings
dependent_key = next(iter(self.depends_on))
if self.context:
request = self.context.get('request', None)
if request and request.data and request.data.get(dependent_key, None):
return request.data.get(dependent_key)
res = settings._get_local(dependent_key, validate=False)
return res
class _Forbidden(Field):
default_error_messages = {'invalid': _('Invalid field.')}
def run_validation(self, value):
self.fail('invalid')
class HybridDictField(fields.DictField):
"""A DictField, but with defined fixed Fields for certain keys."""
def __init__(self, *args, **kwargs):
self.allow_blank = kwargs.pop('allow_blank', False)
fields = [
sorted(
((field_name, obj) for field_name, obj in cls.__dict__.items() if isinstance(obj, Field) and field_name != 'child'),
key=lambda x: x[1]._creation_counter,
)
for cls in reversed(self.__class__.__mro__)
]
self._declared_fields = collections.OrderedDict(f for group in fields for f in group)
super().__init__(*args, **kwargs)
def to_representation(self, value):
fields = copy.deepcopy(self._declared_fields)
return {
key: field.to_representation(val) if val is not None else None
for key, val, field in ((six.text_type(key), val, fields.get(key, self.child)) for key, val in value.items())
if not field.write_only
}
def run_child_validation(self, data):
result = {}
if not data and self.allow_blank:
return result
errors = collections.OrderedDict()
fields = copy.deepcopy(self._declared_fields)
keys = set(fields.keys()) | set(data.keys())
for key in keys:
value = data.get(key, empty)
key = six.text_type(key)
field = fields.get(key, self.child)
try:
if field.read_only:
continue # Ignore read_only fields, as Serializer seems to do.
result[key] = field.run_validation(value)
except ValidationError as e:
errors[key] = e.detail
except SkipField:
pass
if not errors:
return result
raise ValidationError(errors)
class AuthenticationBackendsField(fields.StringListField):
# Mapping of settings that must be set in order to enable each
# authentication backend.
REQUIRED_BACKEND_SETTINGS = collections.OrderedDict(
[
('social_core.backends.open_id_connect.OpenIdConnectAuth', ['SOCIAL_AUTH_OIDC_KEY', 'SOCIAL_AUTH_OIDC_SECRET', 'SOCIAL_AUTH_OIDC_OIDC_ENDPOINT']),
('django.contrib.auth.backends.ModelBackend', []),
('awx.main.backends.AWXModelBackend', []),
]
)
@classmethod
def get_all_required_settings(cls):
all_required_settings = set(['LICENSE'])
for required_settings in cls.REQUIRED_BACKEND_SETTINGS.values():
all_required_settings.update(required_settings)
return all_required_settings
def __init__(self, *args, **kwargs):
kwargs.setdefault('default', self._default_from_required_settings)
super(AuthenticationBackendsField, self).__init__(*args, **kwargs)
def _default_from_required_settings(self):
from django.conf import settings
try:
backends = settings._awx_conf_settings._get_default('AUTHENTICATION_BACKENDS')
except AttributeError:
backends = self.REQUIRED_BACKEND_SETTINGS.keys()
# Filter which authentication backends are enabled based on their
# required settings being defined and non-empty.
for backend, required_settings in self.REQUIRED_BACKEND_SETTINGS.items():
if backend not in backends:
continue
if all([getattr(settings, rs, None) for rs in required_settings]):
continue
backends = [x for x in backends if x != backend]
return backends
class SocialMapStringRegexField(fields.CharField):
def to_representation(self, value):
if isinstance(value, type(re.compile(''))):
flags = []
if value.flags & re.I:
flags.append('i')
if value.flags & re.M:
flags.append('m')
return '/{}/{}'.format(value.pattern, ''.join(flags))
else:
return super(SocialMapStringRegexField, self).to_representation(value)
def to_internal_value(self, data):
data = super(SocialMapStringRegexField, self).to_internal_value(data)
match = re.match(r'^/(?P<pattern>.*)/(?P<flags>[im]+)?$', data)
if match:
flags = 0
if match.group('flags'):
if 'i' in match.group('flags'):
flags |= re.I
if 'm' in match.group('flags'):
flags |= re.M
try:
return re.compile(match.group('pattern'), flags)
except re.error as e:
raise ValidationError('{}: {}'.format(e, data))
return data
class SocialMapField(fields.ListField):
default_error_messages = {'type_error': _('Expected None, True, False, a string or list of strings but got {input_type} instead.')}
child = SocialMapStringRegexField()
def to_representation(self, value):
if isinstance(value, (list, tuple)):
return super(SocialMapField, self).to_representation(value)
elif value in fields.BooleanField.TRUE_VALUES:
return True
elif value in fields.BooleanField.FALSE_VALUES:
return False
elif value in fields.BooleanField.NULL_VALUES:
return None
elif isinstance(value, (str, type(re.compile('')))):
return self.child.to_representation(value)
else:
self.fail('type_error', input_type=type(value))
def to_internal_value(self, data):
if isinstance(data, (list, tuple)):
return super(SocialMapField, self).to_internal_value(data)
elif data in fields.BooleanField.TRUE_VALUES:
return True
elif data in fields.BooleanField.FALSE_VALUES:
return False
elif data in fields.BooleanField.NULL_VALUES:
return None
elif isinstance(data, str):
return self.child.run_validation(data)
else:
self.fail('type_error', input_type=type(data))
class SocialSingleOrganizationMapField(HybridDictField):
admins = SocialMapField(allow_null=True, required=False)
users = SocialMapField(allow_null=True, required=False)
remove_admins = fields.BooleanField(required=False)
remove_users = fields.BooleanField(required=False)
organization_alias = SocialMapField(allow_null=True, required=False)
child = _Forbidden()
class SocialOrganizationMapField(fields.DictField):
child = SocialSingleOrganizationMapField()
class SocialSingleTeamMapField(HybridDictField):
organization = fields.CharField()
users = SocialMapField(allow_null=True, required=False)
remove = fields.BooleanField(required=False)
child = _Forbidden()
class SocialTeamMapField(fields.DictField):
child = SocialSingleTeamMapField()

View File

@@ -1,80 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import urllib.parse
# Django
from django.conf import settings
from django.utils.functional import LazyObject
from django.shortcuts import redirect
# Python Social Auth
from social_core.exceptions import SocialAuthBaseException
from social_core.utils import social_logger
from social_django import utils
from social_django.middleware import SocialAuthExceptionMiddleware
class SocialAuthMiddleware(SocialAuthExceptionMiddleware):
def process_request(self, request):
if request.path.startswith('/sso'):
# See upgrade blocker note in requirements/README.md
utils.BACKENDS = settings.AUTHENTICATION_BACKENDS
token_key = request.COOKIES.get('token', '')
token_key = urllib.parse.quote(urllib.parse.unquote(token_key).strip('"'))
if not hasattr(request, 'successful_authenticator'):
request.successful_authenticator = None
if not request.path.startswith('/sso/') and 'migrations_notran' not in request.path:
if request.user and request.user.is_authenticated:
# The rest of the code base rely hevily on type/inheritance checks,
# LazyObject sent from Django auth middleware can be buggy if not
# converted back to its original object.
if isinstance(request.user, LazyObject) and request.user._wrapped:
request.user = request.user._wrapped
request.session.pop('social_auth_error', None)
request.session.pop('social_auth_last_backend', None)
return self.get_response(request)
def process_view(self, request, callback, callback_args, callback_kwargs):
if request.path.startswith('/sso/login/'):
request.session['social_auth_last_backend'] = callback_kwargs['backend']
def process_exception(self, request, exception):
strategy = getattr(request, 'social_strategy', None)
if strategy is None or self.raise_exception(request, exception):
return
if isinstance(exception, SocialAuthBaseException) or request.path.startswith('/sso/'):
backend = getattr(request, 'backend', None)
backend_name = getattr(backend, 'name', 'unknown-backend')
message = self.get_message(request, exception)
if request.session.get('social_auth_last_backend') != backend_name:
backend_name = request.session.get('social_auth_last_backend')
message = request.GET.get('error_description', message)
full_backend_name = backend_name
try:
idp_name = strategy.request_data()['RelayState']
full_backend_name = '%s:%s' % (backend_name, idp_name)
except KeyError:
pass
social_logger.error(message)
url = self.get_redirect_uri(request, exception)
request.session['social_auth_error'] = (full_backend_name, message)
return redirect(url)
def get_message(self, request, exception):
msg = str(exception)
if msg and msg[-1] not in '.?!':
msg = msg + '.'
return msg
def get_redirect_uri(self, request, exception):
strategy = getattr(request, 'social_strategy', None)
return strategy.session_get('next', '') or strategy.setting('LOGIN_ERROR_URL')

View File

@@ -1,21 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
from django.conf import settings
class Migration(migrations.Migration):
dependencies = [migrations.swappable_dependency(settings.AUTH_USER_MODEL)]
operations = [
migrations.CreateModel(
name='UserEnterpriseAuth',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('provider', models.CharField(max_length=32, choices=[(b'radius', 'RADIUS'), (b'tacacs+', 'TACACS+')])),
('user', models.ForeignKey(related_name='enterprise_auth', on_delete=models.CASCADE, to=settings.AUTH_USER_MODEL)),
],
),
migrations.AlterUniqueTogether(name='userenterpriseauth', unique_together=set([('user', 'provider')])),
]

View File

@@ -1,16 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [('sso', '0001_initial')]
operations = [
migrations.AlterField(
model_name='userenterpriseauth',
name='provider',
field=models.CharField(max_length=32, choices=[('radius', 'RADIUS'), ('tacacs+', 'TACACS+'), ('saml', 'SAML')]),
)
]

View File

@@ -1,9 +0,0 @@
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('sso', '0002_expand_provider_options'),
]
# NOOP, migration is kept to preserve integrity.
operations = []

View File

@@ -1,20 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Django
from django.db import models
from django.contrib.auth.models import User
from django.utils.translation import gettext_lazy as _
# todo: this model to be removed as part of sso removal issue AAP-28380
class UserEnterpriseAuth(models.Model):
"""Enterprise Auth association model"""
PROVIDER_CHOICES = (('radius', _('RADIUS')), ('tacacs+', _('TACACS+')))
class Meta:
unique_together = ('user', 'provider')
user = models.ForeignKey(User, related_name='enterprise_auth', on_delete=models.CASCADE)
provider = models.CharField(max_length=32, choices=PROVIDER_CHOICES)

View File

@@ -1,39 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python Social Auth
from social_core.exceptions import AuthException
# Django
from django.utils.translation import gettext_lazy as _
class AuthNotFound(AuthException):
def __init__(self, backend, email_or_uid, *args, **kwargs):
self.email_or_uid = email_or_uid
super(AuthNotFound, self).__init__(backend, *args, **kwargs)
def __str__(self):
return _('An account cannot be found for {0}').format(self.email_or_uid)
class AuthInactive(AuthException):
def __str__(self):
return _('Your account is inactive')
def check_user_found_or_created(backend, details, user=None, *args, **kwargs):
if not user:
email_or_uid = details.get('email') or kwargs.get('email') or kwargs.get('uid') or '???'
raise AuthNotFound(backend, email_or_uid)
def set_is_active_for_new_user(strategy, details, user=None, *args, **kwargs):
if kwargs.get('is_new', False):
details['is_active'] = True
return {'details': details}
def prevent_inactive_login(backend, details, user=None, *args, **kwargs):
if user and not user.is_active:
raise AuthInactive(backend)

View File

@@ -1,90 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import re
import logging
from awx.sso.common import get_or_create_org_with_default_galaxy_cred
logger = logging.getLogger('awx.sso.social_pipeline')
def _update_m2m_from_expression(user, related, expr, remove=True):
"""
Helper function to update m2m relationship based on user matching one or
more expressions.
"""
should_add = False
if expr is None:
return
elif not expr:
pass
elif expr is True:
should_add = True
else:
if isinstance(expr, (str, type(re.compile('')))):
expr = [expr]
for ex in expr:
if isinstance(ex, str):
if user.username == ex or user.email == ex:
should_add = True
elif isinstance(ex, type(re.compile(''))):
if ex.match(user.username) or ex.match(user.email):
should_add = True
if should_add:
related.add(user)
elif remove:
related.remove(user)
def update_user_orgs(backend, details, user=None, *args, **kwargs):
"""
Update organization memberships for the given user based on mapping rules
defined in settings.
"""
if not user:
return
org_map = backend.setting('ORGANIZATION_MAP') or {}
for org_name, org_opts in org_map.items():
organization_alias = org_opts.get('organization_alias')
if organization_alias:
organization_name = organization_alias
else:
organization_name = org_name
org = get_or_create_org_with_default_galaxy_cred(name=organization_name)
# Update org admins from expression(s).
remove = bool(org_opts.get('remove', True))
admins_expr = org_opts.get('admins', None)
remove_admins = bool(org_opts.get('remove_admins', remove))
_update_m2m_from_expression(user, org.admin_role.members, admins_expr, remove_admins)
# Update org users from expression(s).
users_expr = org_opts.get('users', None)
remove_users = bool(org_opts.get('remove_users', remove))
_update_m2m_from_expression(user, org.member_role.members, users_expr, remove_users)
def update_user_teams(backend, details, user=None, *args, **kwargs):
"""
Update team memberships for the given user based on mapping rules defined
in settings.
"""
if not user:
return
from awx.main.models import Team
team_map = backend.setting('TEAM_MAP') or {}
for team_name, team_opts in team_map.items():
# Get or create the org to update.
if 'organization' not in team_opts:
continue
org = get_or_create_org_with_default_galaxy_cred(name=team_opts['organization'])
# Update team members from expression(s).
team = Team.objects.get_or_create(name=team_name, organization=org)[0]
users_expr = team_opts.get('users', None)
remove = bool(team_opts.get('remove', True))
_update_m2m_from_expression(user, team.member_role.members, users_expr, remove)

View File

@@ -1,344 +0,0 @@
import pytest
from collections import Counter
from django.core.exceptions import FieldError
from django.utils.timezone import now
from django.test.utils import override_settings
from awx.main.models import Credential, CredentialType, Organization, Team, User
from awx.sso.common import (
get_orgs_by_ids,
reconcile_users_org_team_mappings,
create_org_and_teams,
get_or_create_org_with_default_galaxy_cred,
is_remote_auth_enabled,
get_external_account,
)
class MicroMockObject(object):
def all(self):
return True
@pytest.mark.django_db
class TestCommonFunctions:
@pytest.fixture
def orgs(self):
o1 = Organization.objects.create(name='Default1')
o2 = Organization.objects.create(name='Default2')
o3 = Organization.objects.create(name='Default3')
return (o1, o2, o3)
@pytest.fixture
def galaxy_credential(self):
galaxy_type = CredentialType.objects.create(kind='galaxy')
cred = Credential(
created=now(), modified=now(), name='Ansible Galaxy', managed=True, credential_type=galaxy_type, inputs={'url': 'https://galaxy.ansible.com/'}
)
cred.save()
def test_get_orgs_by_ids(self, orgs):
orgs_and_ids = get_orgs_by_ids()
o1, o2, o3 = orgs
assert Counter(orgs_and_ids.keys()) == Counter([o1.name, o2.name, o3.name])
assert Counter(orgs_and_ids.values()) == Counter([o1.id, o2.id, o3.id])
def test_reconcile_users_org_team_mappings(self):
# Create objects for us to play with
user = User.objects.create(username='user1@foo.com', last_name='foo', first_name='bar', email='user1@foo.com', is_active=True)
org1 = Organization.objects.create(name='Default1')
org2 = Organization.objects.create(name='Default2')
team1 = Team.objects.create(name='Team1', organization=org1)
team2 = Team.objects.create(name='Team1', organization=org2)
# Try adding nothing
reconcile_users_org_team_mappings(user, {}, {}, 'Nada')
assert list(user.roles.all()) == []
# Add a user to an org that does not exist (should have no affect)
reconcile_users_org_team_mappings(
user,
{
'junk': {'member_role': True},
},
{},
'Nada',
)
assert list(user.roles.all()) == []
# Remove a user to an org that does not exist (should have no affect)
reconcile_users_org_team_mappings(
user,
{
'junk': {'member_role': False},
},
{},
'Nada',
)
assert list(user.roles.all()) == []
# Add the user to the orgs
reconcile_users_org_team_mappings(user, {org1.name: {'member_role': True}, org2.name: {'member_role': True}}, {}, 'Nada')
assert len(user.roles.all()) == 2
assert user in org1.member_role
assert user in org2.member_role
# Remove the user from the orgs
reconcile_users_org_team_mappings(user, {org1.name: {'member_role': False}, org2.name: {'member_role': False}}, {}, 'Nada')
assert list(user.roles.all()) == []
assert user not in org1.member_role
assert user not in org2.member_role
# Remove the user from the orgs (again, should have no affect)
reconcile_users_org_team_mappings(user, {org1.name: {'member_role': False}, org2.name: {'member_role': False}}, {}, 'Nada')
assert list(user.roles.all()) == []
assert user not in org1.member_role
assert user not in org2.member_role
# Add a user back to the member role
reconcile_users_org_team_mappings(
user,
{
org1.name: {
'member_role': True,
},
},
{},
'Nada',
)
users_roles = set(user.roles.values_list('pk', flat=True))
assert len(users_roles) == 1
assert user in org1.member_role
# Add the user to additional roles
reconcile_users_org_team_mappings(
user,
{
org1.name: {'admin_role': True, 'auditor_role': True},
},
{},
'Nada',
)
assert len(user.roles.all()) == 3
assert user in org1.member_role
assert user in org1.admin_role
assert user in org1.auditor_role
# Add a user to a non-existent role (results in FieldError exception)
with pytest.raises(FieldError):
reconcile_users_org_team_mappings(
user,
{
org1.name: {
'dne_role': True,
},
},
{},
'Nada',
)
# Try adding a user to a role that should not exist on an org (technically this works at this time)
reconcile_users_org_team_mappings(
user,
{
org1.name: {
'read_role_id': True,
},
},
{},
'Nada',
)
assert len(user.roles.all()) == 4
assert user in org1.member_role
assert user in org1.admin_role
assert user in org1.auditor_role
# Remove all of the org perms to test team perms
reconcile_users_org_team_mappings(
user,
{
org1.name: {
'read_role_id': False,
'member_role': False,
'admin_role': False,
'auditor_role': False,
},
},
{},
'Nada',
)
assert list(user.roles.all()) == []
# Add the user as a member to one of the teams
reconcile_users_org_team_mappings(user, {}, {org1.name: {team1.name: {'member_role': True}}}, 'Nada')
assert len(user.roles.all()) == 1
assert user in team1.member_role
# Validate that the user did not become a member of a team with the same name in a different org
assert user not in team2.member_role
# Remove the user from the team
reconcile_users_org_team_mappings(user, {}, {org1.name: {team1.name: {'member_role': False}}}, 'Nada')
assert list(user.roles.all()) == []
assert user not in team1.member_role
# Remove the user from the team again
reconcile_users_org_team_mappings(user, {}, {org1.name: {team1.name: {'member_role': False}}}, 'Nada')
assert list(user.roles.all()) == []
# Add the user to a team that does not exist (should have no affect)
reconcile_users_org_team_mappings(user, {}, {org1.name: {'junk': {'member_role': True}}}, 'Nada')
assert list(user.roles.all()) == []
# Remove the user from a team that does not exist (should have no affect)
reconcile_users_org_team_mappings(user, {}, {org1.name: {'junk': {'member_role': False}}}, 'Nada')
assert list(user.roles.all()) == []
# Test a None setting
reconcile_users_org_team_mappings(user, {}, {org1.name: {'junk': {'member_role': None}}}, 'Nada')
assert list(user.roles.all()) == []
# Add the user multiple teams in different orgs
reconcile_users_org_team_mappings(user, {}, {org1.name: {team1.name: {'member_role': True}}, org2.name: {team2.name: {'member_role': True}}}, 'Nada')
assert len(user.roles.all()) == 2
assert user in team1.member_role
assert user in team2.member_role
# Remove the user from just one of the teams
reconcile_users_org_team_mappings(user, {}, {org2.name: {team2.name: {'member_role': False}}}, 'Nada')
assert len(user.roles.all()) == 1
assert user in team1.member_role
assert user not in team2.member_role
@pytest.mark.parametrize(
"org_list, team_map, can_create, org_count, team_count",
[
# In this case we will only pass in organizations
(
["org1", "org2"],
{},
True,
2,
0,
),
# In this case we will only pass in teams but the orgs will be created from the teams
(
[],
{"team1": "org1", "team2": "org2"},
True,
2,
2,
),
# In this case we will reuse an org
(
["org1"],
{"team1": "org1", "team2": "org1"},
True,
1,
2,
),
# In this case we have a combination of orgs, orgs reused and an org created by a team
(
["org1", "org2", "org3"],
{"team1": "org1", "team2": "org4"},
True,
4,
2,
),
# In this case we will test a case that the UI should prevent and have a team with no Org
# This should create org1/2 but only team1
(
["org1"],
{"team1": "org2", "team2": None},
True,
2,
1,
),
# Block any creation with the can_create flag
(
["org1"],
{"team1": "org2", "team2": None},
False,
0,
0,
),
],
)
def test_create_org_and_teams(self, galaxy_credential, org_list, team_map, can_create, org_count, team_count):
create_org_and_teams(org_list, team_map, 'py.test', can_create=can_create)
assert Organization.objects.count() == org_count
assert Team.objects.count() == team_count
def test_get_or_create_org_with_default_galaxy_cred_add_galaxy_cred(self, galaxy_credential):
# If this method creates the org it should get the default galaxy credential
num_orgs = 4
for number in range(1, (num_orgs + 1)):
get_or_create_org_with_default_galaxy_cred(name=f"Default {number}")
assert Organization.objects.count() == 4
for o in Organization.objects.all():
assert o.galaxy_credentials.count() == 1
assert o.galaxy_credentials.first().name == 'Ansible Galaxy'
def test_get_or_create_org_with_default_galaxy_cred_no_galaxy_cred(self, galaxy_credential):
# If the org is pre-created, we should not add the galaxy_credential
num_orgs = 4
for number in range(1, (num_orgs + 1)):
Organization.objects.create(name=f"Default {number}")
get_or_create_org_with_default_galaxy_cred(name=f"Default {number}")
assert Organization.objects.count() == 4
for o in Organization.objects.all():
assert o.galaxy_credentials.count() == 0
@pytest.mark.parametrize(
"enable_social, enable_enterprise, expected_results",
[
(False, False, None),
(True, False, 'social'),
(True, True, 'enterprise'),
(True, True, 'enterprise'),
(False, True, 'enterprise'),
(True, False, 'social'),
],
)
def test_get_external_account(self, enable_enterprise, expected_results):
try:
user = User.objects.get(username="external_tester")
except User.DoesNotExist:
user = User(username="external_tester")
user.set_unusable_password()
user.save()
if enable_enterprise:
from awx.sso.models import UserEnterpriseAuth
enterprise_auth = UserEnterpriseAuth(user=user, provider='saml')
enterprise_auth.save()
assert get_external_account(user) == expected_results
@pytest.mark.parametrize(
"setting, expected",
[
# Set none of the social auth settings
('JUNK_SETTING', False),
# Try a hypothetical future one
('SOCIAL_AUTH_GIBBERISH_KEY', True),
],
)
def test_is_remote_auth_enabled(self, setting, expected):
with override_settings(**{setting: True}):
assert is_remote_auth_enabled() == expected
@pytest.mark.parametrize(
"key_one, key_one_value, key_two, key_two_value, expected",
[
('JUNK_SETTING', True, 'JUNK2_SETTING', True, False),
],
)
def test_is_remote_auth_enabled_multiple_keys(self, key_one, key_one_value, key_two, key_two_value, expected):
with override_settings(**{key_one: key_one_value}):
with override_settings(**{key_two: key_two_value}):
assert is_remote_auth_enabled() == expected

View File

@@ -1,76 +0,0 @@
import pytest
from awx.main.models import User
from awx.sso.social_base_pipeline import AuthNotFound, check_user_found_or_created, set_is_active_for_new_user, prevent_inactive_login, AuthInactive
@pytest.mark.django_db
class TestSocialBasePipeline:
def test_check_user_found_or_created_no_exception(self):
# If we have a user (the True param, we should not get an exception)
try:
check_user_found_or_created(None, {}, True)
except AuthNotFound:
assert False, 'check_user_found_or_created should not have raised an exception with a user'
@pytest.mark.parametrize(
"details, kwargs, expected_id",
[
(
{},
{},
'???',
),
(
{},
{'uid': 'kwargs_uid'},
'kwargs_uid',
),
(
{},
{'uid': 'kwargs_uid', 'email': 'kwargs_email'},
'kwargs_email',
),
(
{'email': 'details_email'},
{'uid': 'kwargs_uid', 'email': 'kwargs_email'},
'details_email',
),
],
)
def test_check_user_found_or_created_exceptions(self, details, expected_id, kwargs):
with pytest.raises(AuthNotFound) as e:
check_user_found_or_created(None, details, False, None, **kwargs)
assert f'An account cannot be found for {expected_id}' == str(e.value)
@pytest.mark.parametrize(
"kwargs, expected_details, expected_response",
[
({}, {}, None),
({'is_new': False}, {}, None),
({'is_new': True}, {'is_active': True}, {'details': {'is_active': True}}),
],
)
def test_set_is_active_for_new_user(self, kwargs, expected_details, expected_response):
details = {}
response = set_is_active_for_new_user(None, details, None, None, **kwargs)
assert details == expected_details
assert response == expected_response
def test_prevent_inactive_login_no_exception_no_user(self):
try:
prevent_inactive_login(None, None, None, None, None)
except AuthInactive:
assert False, 'prevent_inactive_login should not have raised an exception with no user'
def test_prevent_inactive_login_no_exception_active_user(self):
user = User.objects.create(username='user1@foo.com', last_name='foo', first_name='bar', email='user1@foo.com', is_active=True)
try:
prevent_inactive_login(None, None, user, None, None)
except AuthInactive:
assert False, 'prevent_inactive_login should not have raised an exception with an active user'
def test_prevent_inactive_login_no_exception_inactive_user(self):
user = User.objects.create(username='user1@foo.com', last_name='foo', first_name='bar', email='user1@foo.com', is_active=False)
with pytest.raises(AuthInactive):
prevent_inactive_login(None, None, user, None, None)

View File

@@ -1,113 +0,0 @@
import pytest
import re
from awx.sso.social_pipeline import update_user_orgs, update_user_teams
from awx.main.models import User, Team, Organization
@pytest.fixture
def users():
u1 = User.objects.create(username='user1@foo.com', last_name='foo', first_name='bar', email='user1@foo.com')
u2 = User.objects.create(username='user2@foo.com', last_name='foo', first_name='bar', email='user2@foo.com')
u3 = User.objects.create(username='user3@foo.com', last_name='foo', first_name='bar', email='user3@foo.com')
return (u1, u2, u3)
@pytest.mark.django_db
class TestSocialPipeline:
@pytest.fixture
def backend(self):
class Backend:
s = {
'ORGANIZATION_MAP': {
'Default': {
'remove': True,
'admins': 'foobar',
'remove_admins': True,
'users': 'foo',
'remove_users': True,
'organization_alias': '',
}
},
'TEAM_MAP': {'Blue': {'organization': 'Default', 'remove': True, 'users': ''}, 'Red': {'organization': 'Default', 'remove': True, 'users': ''}},
}
def setting(self, key):
return self.s[key]
return Backend()
@pytest.fixture
def org(self):
return Organization.objects.create(name="Default")
def test_update_user_orgs(self, org, backend, users):
u1, u2, u3 = users
# Test user membership logic with regular expressions
backend.setting('ORGANIZATION_MAP')['Default']['admins'] = re.compile('.*')
backend.setting('ORGANIZATION_MAP')['Default']['users'] = re.compile('.*')
update_user_orgs(backend, None, u1)
update_user_orgs(backend, None, u2)
update_user_orgs(backend, None, u3)
assert org.admin_role.members.count() == 3
assert org.member_role.members.count() == 3
# Test remove feature enabled
backend.setting('ORGANIZATION_MAP')['Default']['admins'] = ''
backend.setting('ORGANIZATION_MAP')['Default']['users'] = ''
backend.setting('ORGANIZATION_MAP')['Default']['remove_admins'] = True
backend.setting('ORGANIZATION_MAP')['Default']['remove_users'] = True
update_user_orgs(backend, None, u1)
assert org.admin_role.members.count() == 2
assert org.member_role.members.count() == 2
# Test remove feature disabled
backend.setting('ORGANIZATION_MAP')['Default']['remove_admins'] = False
backend.setting('ORGANIZATION_MAP')['Default']['remove_users'] = False
update_user_orgs(backend, None, u2)
assert org.admin_role.members.count() == 2
assert org.member_role.members.count() == 2
# Test organization alias feature
backend.setting('ORGANIZATION_MAP')['Default']['organization_alias'] = 'Default_Alias'
update_user_orgs(backend, None, u1)
assert Organization.objects.get(name="Default_Alias") is not None
def test_update_user_teams(self, backend, users):
u1, u2, u3 = users
# Test user membership logic with regular expressions
backend.setting('TEAM_MAP')['Blue']['users'] = re.compile('.*')
backend.setting('TEAM_MAP')['Red']['users'] = re.compile('.*')
update_user_teams(backend, None, u1)
update_user_teams(backend, None, u2)
update_user_teams(backend, None, u3)
assert Team.objects.get(name="Red").member_role.members.count() == 3
assert Team.objects.get(name="Blue").member_role.members.count() == 3
# Test remove feature enabled
backend.setting('TEAM_MAP')['Blue']['remove'] = True
backend.setting('TEAM_MAP')['Red']['remove'] = True
backend.setting('TEAM_MAP')['Blue']['users'] = ''
backend.setting('TEAM_MAP')['Red']['users'] = ''
update_user_teams(backend, None, u1)
assert Team.objects.get(name="Red").member_role.members.count() == 2
assert Team.objects.get(name="Blue").member_role.members.count() == 2
# Test remove feature disabled
backend.setting('TEAM_MAP')['Blue']['remove'] = False
backend.setting('TEAM_MAP')['Red']['remove'] = False
update_user_teams(backend, None, u2)
assert Team.objects.get(name="Red").member_role.members.count() == 2
assert Team.objects.get(name="Blue").member_role.members.count() == 2

View File

@@ -1,4 +0,0 @@
# Ensure that our autouse overwrites are working
def test_cache(settings):
assert settings.CACHES['default']['BACKEND'] == 'django.core.cache.backends.locmem.LocMemCache'
assert settings.CACHES['default']['LOCATION'].startswith('unique-')

View File

@@ -1,4 +0,0 @@
import pytest
from rest_framework.exceptions import ValidationError

View File

@@ -1,11 +0,0 @@
import pytest
@pytest.mark.parametrize(
"lib",
[
("social_pipeline"),
],
)
def test_module_loads(lib):
module = __import__("awx.sso." + lib) # noqa

View File

@@ -1,14 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
from django.urls import re_path
from awx.sso.views import sso_complete, sso_error, sso_inactive
app_name = 'sso'
urlpatterns = [
re_path(r'^complete/$', sso_complete, name='sso_complete'),
re_path(r'^error/$', sso_error, name='sso_error'),
re_path(r'^inactive/$', sso_inactive, name='sso_inactive'),
]

View File

@@ -1,5 +0,0 @@
# Django
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
__all__ = []

View File

@@ -1,46 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import urllib.parse
import logging
# Django
from django.urls import reverse
from django.views.generic.base import RedirectView
from django.utils.encoding import smart_str
from django.conf import settings
logger = logging.getLogger('awx.sso.views')
class BaseRedirectView(RedirectView):
permanent = True
def get_redirect_url(self, *args, **kwargs):
last_path = self.request.COOKIES.get('lastPath', '')
last_path = urllib.parse.quote(urllib.parse.unquote(last_path).strip('"'))
url = reverse('ui:index')
if last_path:
return '%s#%s' % (url, last_path)
else:
return url
sso_error = BaseRedirectView.as_view()
sso_inactive = BaseRedirectView.as_view()
class CompleteView(BaseRedirectView):
def dispatch(self, request, *args, **kwargs):
response = super(CompleteView, self).dispatch(request, *args, **kwargs)
if self.request.user and self.request.user.is_authenticated:
logger.info(smart_str(u"User {} logged in".format(self.request.user.username)))
response.set_cookie(
'userLoggedIn', 'true', secure=getattr(settings, 'SESSION_COOKIE_SECURE', False), samesite=getattr(settings, 'USER_COOKIE_SAMESITE', 'Lax')
)
response.setdefault('X-API-Session-Cookie-Name', getattr(settings, 'SESSION_COOKIE_NAME', 'awx_sessionid'))
return response
sso_complete = CompleteView.as_view()

View File

@@ -26,8 +26,6 @@ def get_urlpatterns(prefix=None):
path(f'api{prefix}v2/', include(api_version_urls)),
path(f'api{prefix}', include(api_urls)),
path('', include(root_urls)),
re_path(r'^sso/', include('awx.sso.urls', namespace='sso')),
re_path(r'^sso/', include('social_django.urls', namespace='social')),
re_path(r'^(?:api/)?400.html$', handle_400),
re_path(r'^(?:api/)?403.html$', handle_403),
re_path(r'^(?:api/)?404.html$', handle_404),
@@ -36,7 +34,7 @@ def get_urlpatterns(prefix=None):
re_path(r'^login/', handle_login_redirect),
# want api/v2/doesnotexist to return a 404, not match the ui urls,
# so use a negative lookahead assertion here
re_path(r'^(?!api/|sso/).*', include('awx.ui.urls', namespace='ui')),
re_path(r'^(?!api/).*', include('awx.ui.urls', namespace='ui')),
]
if settings.SETTINGS_MODULE == 'awx.settings.development':

View File

@@ -13,7 +13,6 @@ import django # NOQA
from django.conf import settings # NOQA
from django.urls import resolve # NOQA
from django.core.wsgi import get_wsgi_application # NOQA
import social_django # NOQA
"""

View File

@@ -1,48 +0,0 @@
PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2
--------------------------------------------
1. This LICENSE AGREEMENT is between the Python Software Foundation
("PSF"), and the Individual or Organization ("Licensee") accessing and
otherwise using this software ("Python") in source or binary form and
its associated documentation.
2. Subject to the terms and conditions of this License Agreement, PSF
hereby grants Licensee a nonexclusive, royalty-free, world-wide
license to reproduce, analyze, test, perform and/or display publicly,
prepare derivative works, distribute, and otherwise use Python
alone or in any derivative version, provided, however, that PSF's
License Agreement and PSF's notice of copyright, i.e., "Copyright (c)
2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008 Python Software Foundation;
All Rights Reserved" are retained in Python alone or in any derivative
version prepared by Licensee.
3. In the event Licensee prepares a derivative work that is based on
or incorporates Python or any part thereof, and wants to make
the derivative work available to others as provided herein, then
Licensee hereby agrees to include in any such work a brief summary of
the changes made to Python.
4. PSF is making Python available to Licensee on an "AS IS"
basis. PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND
DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT
INFRINGE ANY THIRD PARTY RIGHTS.
5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON,
OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
6. This License Agreement will automatically terminate upon a material
breach of its terms and conditions.
7. Nothing in this License Agreement shall be deemed to create any
relationship of agency, partnership, or joint venture between PSF and
Licensee. This License Agreement does not grant permission to use PSF
trademarks or trade name in a trademark sense to endorse or promote
products or services of Licensee, or any third party.
8. By copying, installing or otherwise using Python, Licensee
agrees to be bound by the terms and conditions of this License
Agreement.

View File

@@ -1,21 +0,0 @@
The MIT License (MIT)
Copyright (c) 2015 Michael Davis
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -1,27 +0,0 @@
Copyright (c) 2012-2016, Matías Aguirre
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of this project nor the names of its contributors may be
used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@@ -1,27 +0,0 @@
Copyright (c) 2012-2016, Matías Aguirre
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of this project nor the names of its contributors may be
used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@@ -53,8 +53,6 @@ python-tss-sdk>=1.2.1
pyyaml>=6.0.1
pyzstd # otel collector log file compression library
receptorctl
social-auth-core[openidconnect]==4.4.2 # see UPGRADE BLOCKERs
social-auth-app-django==5.4.0 # see UPGRADE BLOCKERs
sqlparse>=0.4.4 # Required by django https://github.com/ansible/awx/security/dependabot/96
redis[hiredis]
requests

View File

@@ -106,17 +106,12 @@ cryptography==41.0.7
# pyjwt
# pyopenssl
# service-identity
# social-auth-core
cython==0.29.37
# via -r /awx_devel/requirements/requirements.in
daphne==3.0.2
# via
# -r /awx_devel/requirements/requirements.in
# channels
defusedxml==0.7.1
# via
# python3-openid
# social-auth-core
deprecated==1.2.14
# via
# opentelemetry-api
@@ -137,7 +132,6 @@ django==4.2.10
# django-polymorphic
# django-solo
# djangorestframework
# social-auth-app-django
# via -r /awx_devel/requirements/requirements_git.txt
django-cors-headers==4.3.1
# via -r /awx_devel/requirements/requirements.in
@@ -295,7 +289,6 @@ oauthlib==3.2.2
# django-oauth-toolkit
# kubernetes
# requests-oauthlib
# social-auth-core
openshift==0.13.2
# via -r /awx_devel/requirements/requirements.in
opentelemetry-api==1.24.0
@@ -382,7 +375,6 @@ pyjwt[crypto]==2.8.0
# adal
# django-ansible-base
# msal
# social-auth-core
# twilio
pyopenssl==24.0.0
# via
@@ -402,14 +394,11 @@ python-dateutil==2.8.2
# receptorctl
python-dsv-sdk==1.0.4
# via -r /awx_devel/requirements/requirements.in
python-jose==3.3.0
# via social-auth-core
python-string-utils==1.0.0
# via openshift
python-tss-sdk==1.2.2
# via -r /awx_devel/requirements/requirements.in
python3-openid==3.2.0
# via social-auth-core
# via -r /awx_devel/requirements/requirements_git.txt
pytz==2024.1
# via
@@ -448,13 +437,11 @@ requests==2.31.0
# python-dsv-sdk
# python-tss-sdk
# requests-oauthlib
# social-auth-core
# twilio
requests-oauthlib==1.3.1
# via
# kubernetes
# msrest
# social-auth-core
rpds-py==0.18.0
# via
# jsonschema
@@ -490,12 +477,6 @@ slack-sdk==3.27.0
# via -r /awx_devel/requirements/requirements.in
smmap==5.0.1
# via gitdb
social-auth-app-django==5.4.0
# via -r /awx_devel/requirements/requirements.in
social-auth-core[openidconnect]==4.4.2
# via
# -r /awx_devel/requirements/requirements.in
# social-auth-app-django
sqlparse==0.4.4
# via
# -r /awx_devel/requirements/requirements.in