SAML enhancements (#13316)

* Moving reconcile_users_org_team_mappings into common library

* Renaming pipeline to social_pipeline

* Breaking out SAML and generic Social Auth

* Optimizing SMAL login process

* Moving extraction of org in teams from backends into sso/common.create_orgs_and_teams

* Altering saml_pipeline from testing

Prefixing all internal functions with _
Modified subfunctions to not return values but instead manipulate multable objects
Modified all functions to not add duplicate orgs to the orgs_to_create list

* Updating the common function to respect a teams organization name

* Added can_create flag to create_org_and_teams

This made testing easier and allows for any adapter with a flag the ability to simply pass it into a function

* Multiple changes to SAML pipeline

Removed orgs_to_create from being passed into user_team functions, common create orgs code will add any team orgs to list of orgs automatically

Passed SAML_AUTO_CREATE_OBJECTS flag into create_org_and_teams

Fix bug where we were looking at values instead of keys

Added loading of all teams if remove flag is set in update_user_teams_by_saml_attr

* Moving common items between SAML and Social into a 'base'

* Updating and adding testing

* Renamed get_or_create_with_default_galaxy_cred to get_or_create_org_...
This commit is contained in:
John Westcott IV
2023-01-27 09:49:16 -05:00
committed by GitHub
parent d7025a919c
commit 8fb831d3de
13 changed files with 1541 additions and 864 deletions

View File

@@ -0,0 +1,280 @@
import pytest
from collections import Counter
from django.core.exceptions import FieldError
from django.utils.timezone import now
from awx.main.models import Credential, CredentialType, Organization, Team, User
from awx.sso.common import get_orgs_by_ids, reconcile_users_org_team_mappings, create_org_and_teams, get_or_create_org_with_default_galaxy_cred
@pytest.mark.django_db
class TestCommonFunctions:
@pytest.fixture
def orgs(self):
o1 = Organization.objects.create(name='Default1')
o2 = Organization.objects.create(name='Default2')
o3 = Organization.objects.create(name='Default3')
return (o1, o2, o3)
@pytest.fixture
def galaxy_credential(self):
galaxy_type = CredentialType.objects.create(kind='galaxy')
cred = Credential(
created=now(), modified=now(), name='Ansible Galaxy', managed=True, credential_type=galaxy_type, inputs={'url': 'https://galaxy.ansible.com/'}
)
cred.save()
def test_get_orgs_by_ids(self, orgs):
orgs_and_ids = get_orgs_by_ids()
o1, o2, o3 = orgs
assert Counter(orgs_and_ids.keys()) == Counter([o1.name, o2.name, o3.name])
assert Counter(orgs_and_ids.values()) == Counter([o1.id, o2.id, o3.id])
def test_reconcile_users_org_team_mappings(self):
# Create objects for us to play with
user = User.objects.create(username='user1@foo.com', last_name='foo', first_name='bar', email='user1@foo.com', is_active=True)
org1 = Organization.objects.create(name='Default1')
org2 = Organization.objects.create(name='Default2')
team1 = Team.objects.create(name='Team1', organization=org1)
team2 = Team.objects.create(name='Team1', organization=org2)
# Try adding nothing
reconcile_users_org_team_mappings(user, {}, {}, 'Nada')
assert list(user.roles.all()) == []
# Add a user to an org that does not exist (should have no affect)
reconcile_users_org_team_mappings(
user,
{
'junk': {'member_role': True},
},
{},
'Nada',
)
assert list(user.roles.all()) == []
# Remove a user to an org that does not exist (should have no affect)
reconcile_users_org_team_mappings(
user,
{
'junk': {'member_role': False},
},
{},
'Nada',
)
assert list(user.roles.all()) == []
# Add the user to the orgs
reconcile_users_org_team_mappings(user, {org1.name: {'member_role': True}, org2.name: {'member_role': True}}, {}, 'Nada')
assert len(user.roles.all()) == 2
assert user in org1.member_role
assert user in org2.member_role
# Remove the user from the orgs
reconcile_users_org_team_mappings(user, {org1.name: {'member_role': False}, org2.name: {'member_role': False}}, {}, 'Nada')
assert list(user.roles.all()) == []
assert user not in org1.member_role
assert user not in org2.member_role
# Remove the user from the orgs (again, should have no affect)
reconcile_users_org_team_mappings(user, {org1.name: {'member_role': False}, org2.name: {'member_role': False}}, {}, 'Nada')
assert list(user.roles.all()) == []
assert user not in org1.member_role
assert user not in org2.member_role
# Add a user back to the member role
reconcile_users_org_team_mappings(
user,
{
org1.name: {
'member_role': True,
},
},
{},
'Nada',
)
users_roles = set(user.roles.values_list('pk', flat=True))
assert len(users_roles) == 1
assert user in org1.member_role
# Add the user to additional roles
reconcile_users_org_team_mappings(
user,
{
org1.name: {'admin_role': True, 'auditor_role': True},
},
{},
'Nada',
)
assert len(user.roles.all()) == 3
assert user in org1.member_role
assert user in org1.admin_role
assert user in org1.auditor_role
# Add a user to a non-existent role (results in FieldError exception)
with pytest.raises(FieldError):
reconcile_users_org_team_mappings(
user,
{
org1.name: {
'dne_role': True,
},
},
{},
'Nada',
)
# Try adding a user to a role that should not exist on an org (technically this works at this time)
reconcile_users_org_team_mappings(
user,
{
org1.name: {
'read_role_id': True,
},
},
{},
'Nada',
)
assert len(user.roles.all()) == 4
assert user in org1.member_role
assert user in org1.admin_role
assert user in org1.auditor_role
# Remove all of the org perms to test team perms
reconcile_users_org_team_mappings(
user,
{
org1.name: {
'read_role_id': False,
'member_role': False,
'admin_role': False,
'auditor_role': False,
},
},
{},
'Nada',
)
assert list(user.roles.all()) == []
# Add the user as a member to one of the teams
reconcile_users_org_team_mappings(user, {}, {org1.name: {team1.name: {'member_role': True}}}, 'Nada')
assert len(user.roles.all()) == 1
assert user in team1.member_role
# Validate that the user did not become a member of a team with the same name in a different org
assert user not in team2.member_role
# Remove the user from the team
reconcile_users_org_team_mappings(user, {}, {org1.name: {team1.name: {'member_role': False}}}, 'Nada')
assert list(user.roles.all()) == []
assert user not in team1.member_role
# Remove the user from the team again
reconcile_users_org_team_mappings(user, {}, {org1.name: {team1.name: {'member_role': False}}}, 'Nada')
assert list(user.roles.all()) == []
# Add the user to a team that does not exist (should have no affect)
reconcile_users_org_team_mappings(user, {}, {org1.name: {'junk': {'member_role': True}}}, 'Nada')
assert list(user.roles.all()) == []
# Remove the user from a team that does not exist (should have no affect)
reconcile_users_org_team_mappings(user, {}, {org1.name: {'junk': {'member_role': False}}}, 'Nada')
assert list(user.roles.all()) == []
# Test a None setting
reconcile_users_org_team_mappings(user, {}, {org1.name: {'junk': {'member_role': None}}}, 'Nada')
assert list(user.roles.all()) == []
# Add the user multiple teams in different orgs
reconcile_users_org_team_mappings(user, {}, {org1.name: {team1.name: {'member_role': True}}, org2.name: {team2.name: {'member_role': True}}}, 'Nada')
assert len(user.roles.all()) == 2
assert user in team1.member_role
assert user in team2.member_role
# Remove the user from just one of the teams
reconcile_users_org_team_mappings(user, {}, {org2.name: {team2.name: {'member_role': False}}}, 'Nada')
assert len(user.roles.all()) == 1
assert user in team1.member_role
assert user not in team2.member_role
@pytest.mark.parametrize(
"org_list, team_map, can_create, org_count, team_count",
[
# In this case we will only pass in organizations
(
["org1", "org2"],
{},
True,
2,
0,
),
# In this case we will only pass in teams but the orgs will be created from the teams
(
[],
{"team1": "org1", "team2": "org2"},
True,
2,
2,
),
# In this case we will reuse an org
(
["org1"],
{"team1": "org1", "team2": "org1"},
True,
1,
2,
),
# In this case we have a combination of orgs, orgs reused and an org created by a team
(
["org1", "org2", "org3"],
{"team1": "org1", "team2": "org4"},
True,
4,
2,
),
# In this case we will test a case that the UI should prevent and have a team with no Org
# This should create org1/2 but only team1
(
["org1"],
{"team1": "org2", "team2": None},
True,
2,
1,
),
# Block any creation with the can_create flag
(
["org1"],
{"team1": "org2", "team2": None},
False,
0,
0,
),
],
)
def test_create_org_and_teams(self, galaxy_credential, org_list, team_map, can_create, org_count, team_count):
create_org_and_teams(org_list, team_map, 'py.test', can_create=can_create)
assert Organization.objects.count() == org_count
assert Team.objects.count() == team_count
def test_get_or_create_org_with_default_galaxy_cred_add_galaxy_cred(self, galaxy_credential):
# If this method creates the org it should get the default galaxy credential
num_orgs = 4
for number in range(1, (num_orgs + 1)):
get_or_create_org_with_default_galaxy_cred(name=f"Default {number}")
assert Organization.objects.count() == 4
for o in Organization.objects.all():
assert o.galaxy_credentials.count() == 1
assert o.galaxy_credentials.first().name == 'Ansible Galaxy'
def test_get_or_create_org_with_default_galaxy_cred_no_galaxy_cred(self, galaxy_credential):
# If the org is pre-created, we should not add the galaxy_credential
num_orgs = 4
for number in range(1, (num_orgs + 1)):
Organization.objects.create(name=f"Default {number}")
get_or_create_org_with_default_galaxy_cred(name=f"Default {number}")
assert Organization.objects.count() == 4
for o in Organization.objects.all():
assert o.galaxy_credentials.count() == 0

View File

@@ -1,566 +0,0 @@
import pytest
import re
from unittest import mock
from django.utils.timezone import now
from awx.conf.registry import settings_registry
from awx.sso.pipeline import update_user_orgs, update_user_teams, update_user_orgs_by_saml_attr, update_user_teams_by_saml_attr, _check_flag
from awx.main.models import User, Team, Organization, Credential, CredentialType
@pytest.fixture
def galaxy_credential():
galaxy_type = CredentialType.objects.create(kind='galaxy')
cred = Credential(
created=now(), modified=now(), name='Ansible Galaxy', managed=True, credential_type=galaxy_type, inputs={'url': 'https://galaxy.ansible.com/'}
)
cred.save()
@pytest.fixture
def users():
u1 = User.objects.create(username='user1@foo.com', last_name='foo', first_name='bar', email='user1@foo.com')
u2 = User.objects.create(username='user2@foo.com', last_name='foo', first_name='bar', email='user2@foo.com')
u3 = User.objects.create(username='user3@foo.com', last_name='foo', first_name='bar', email='user3@foo.com')
return (u1, u2, u3)
@pytest.mark.django_db
class TestSAMLMap:
@pytest.fixture
def backend(self):
class Backend:
s = {
'ORGANIZATION_MAP': {
'Default': {
'remove': True,
'admins': 'foobar',
'remove_admins': True,
'users': 'foo',
'remove_users': True,
'organization_alias': '',
}
},
'TEAM_MAP': {'Blue': {'organization': 'Default', 'remove': True, 'users': ''}, 'Red': {'organization': 'Default', 'remove': True, 'users': ''}},
}
def setting(self, key):
return self.s[key]
return Backend()
@pytest.fixture
def org(self):
return Organization.objects.create(name="Default")
def test_update_user_orgs(self, org, backend, users, galaxy_credential):
u1, u2, u3 = users
# Test user membership logic with regular expressions
backend.setting('ORGANIZATION_MAP')['Default']['admins'] = re.compile('.*')
backend.setting('ORGANIZATION_MAP')['Default']['users'] = re.compile('.*')
update_user_orgs(backend, None, u1)
update_user_orgs(backend, None, u2)
update_user_orgs(backend, None, u3)
assert org.admin_role.members.count() == 3
assert org.member_role.members.count() == 3
# Test remove feature enabled
backend.setting('ORGANIZATION_MAP')['Default']['admins'] = ''
backend.setting('ORGANIZATION_MAP')['Default']['users'] = ''
backend.setting('ORGANIZATION_MAP')['Default']['remove_admins'] = True
backend.setting('ORGANIZATION_MAP')['Default']['remove_users'] = True
update_user_orgs(backend, None, u1)
assert org.admin_role.members.count() == 2
assert org.member_role.members.count() == 2
# Test remove feature disabled
backend.setting('ORGANIZATION_MAP')['Default']['remove_admins'] = False
backend.setting('ORGANIZATION_MAP')['Default']['remove_users'] = False
update_user_orgs(backend, None, u2)
assert org.admin_role.members.count() == 2
assert org.member_role.members.count() == 2
# Test organization alias feature
backend.setting('ORGANIZATION_MAP')['Default']['organization_alias'] = 'Default_Alias'
update_user_orgs(backend, None, u1)
assert Organization.objects.get(name="Default_Alias") is not None
for o in Organization.objects.all():
if o.name == 'Default':
# The default org was already created and should not have a galaxy credential
assert o.galaxy_credentials.count() == 0
else:
# The Default_Alias was created by SAML and should get the galaxy credential
assert o.galaxy_credentials.count() == 1
assert o.galaxy_credentials.first().name == 'Ansible Galaxy'
def test_update_user_teams(self, backend, users, galaxy_credential):
u1, u2, u3 = users
# Test user membership logic with regular expressions
backend.setting('TEAM_MAP')['Blue']['users'] = re.compile('.*')
backend.setting('TEAM_MAP')['Red']['users'] = re.compile('.*')
update_user_teams(backend, None, u1)
update_user_teams(backend, None, u2)
update_user_teams(backend, None, u3)
assert Team.objects.get(name="Red").member_role.members.count() == 3
assert Team.objects.get(name="Blue").member_role.members.count() == 3
# Test remove feature enabled
backend.setting('TEAM_MAP')['Blue']['remove'] = True
backend.setting('TEAM_MAP')['Red']['remove'] = True
backend.setting('TEAM_MAP')['Blue']['users'] = ''
backend.setting('TEAM_MAP')['Red']['users'] = ''
update_user_teams(backend, None, u1)
assert Team.objects.get(name="Red").member_role.members.count() == 2
assert Team.objects.get(name="Blue").member_role.members.count() == 2
# Test remove feature disabled
backend.setting('TEAM_MAP')['Blue']['remove'] = False
backend.setting('TEAM_MAP')['Red']['remove'] = False
update_user_teams(backend, None, u2)
assert Team.objects.get(name="Red").member_role.members.count() == 2
assert Team.objects.get(name="Blue").member_role.members.count() == 2
for o in Organization.objects.all():
assert o.galaxy_credentials.count() == 1
assert o.galaxy_credentials.first().name == 'Ansible Galaxy'
@pytest.mark.django_db
class TestSAMLAttr:
@pytest.fixture
def kwargs(self):
return {
'username': u'cmeyers@redhat.com',
'uid': 'idp:cmeyers@redhat.com',
'request': {u'SAMLResponse': [], u'RelayState': [u'idp']},
'is_new': False,
'response': {
'session_index': '_0728f0e0-b766-0135-75fa-02842b07c044',
'idp_name': u'idp',
'attributes': {
'memberOf': ['Default1', 'Default2'],
'admins': ['Default3'],
'auditors': ['Default4'],
'groups': ['Blue', 'Red'],
'User.email': ['cmeyers@redhat.com'],
'User.LastName': ['Meyers'],
'name_id': 'cmeyers@redhat.com',
'User.FirstName': ['Chris'],
'PersonImmutableID': [],
},
},
# 'social': <UserSocialAuth: cmeyers@redhat.com>,
'social': None,
# 'strategy': <awx.sso.strategies.django_strategy.AWXDjangoStrategy object at 0x8523a10>,
'strategy': None,
'new_association': False,
}
@pytest.fixture
def orgs(self):
o1 = Organization.objects.create(name='Default1')
o2 = Organization.objects.create(name='Default2')
o3 = Organization.objects.create(name='Default3')
return (o1, o2, o3)
@pytest.fixture
def mock_settings(self, request):
fixture_args = request.node.get_closest_marker('fixture_args')
if fixture_args and 'autocreate' in fixture_args.kwargs:
autocreate = fixture_args.kwargs['autocreate']
else:
autocreate = True
class MockSettings:
SAML_AUTO_CREATE_OBJECTS = autocreate
SOCIAL_AUTH_SAML_ORGANIZATION_ATTR = {
'saml_attr': 'memberOf',
'saml_admin_attr': 'admins',
'saml_auditor_attr': 'auditors',
'remove': True,
'remove_admins': True,
}
SOCIAL_AUTH_SAML_TEAM_ATTR = {
'saml_attr': 'groups',
'remove': True,
'team_org_map': [
{'team': 'Blue', 'organization': 'Default1'},
{'team': 'Blue', 'organization': 'Default2'},
{'team': 'Blue', 'organization': 'Default3'},
{'team': 'Red', 'organization': 'Default1'},
{'team': 'Green', 'organization': 'Default1'},
{'team': 'Green', 'organization': 'Default3'},
{'team': 'Yellow', 'team_alias': 'Yellow_Alias', 'organization': 'Default4', 'organization_alias': 'Default4_Alias'},
],
}
mock_settings_obj = MockSettings()
for key in settings_registry.get_registered_settings(category_slug='logging'):
value = settings_registry.get_setting_field(key).get_default()
setattr(mock_settings_obj, key, value)
setattr(mock_settings_obj, 'DEBUG', True)
return mock_settings_obj
@pytest.fixture
def backend(self):
class Backend:
s = {
'ORGANIZATION_MAP': {
'Default1': {
'remove': True,
'admins': 'foobar',
'remove_admins': True,
'users': 'foo',
'remove_users': True,
'organization_alias': 'o1_alias',
}
}
}
def setting(self, key):
return self.s[key]
return Backend()
def test_update_user_orgs_by_saml_attr(self, orgs, users, galaxy_credential, kwargs, mock_settings, backend):
with mock.patch('django.conf.settings', mock_settings):
o1, o2, o3 = orgs
u1, u2, u3 = users
# Test getting orgs from attribute
update_user_orgs_by_saml_attr(None, None, u1, **kwargs)
update_user_orgs_by_saml_attr(None, None, u2, **kwargs)
update_user_orgs_by_saml_attr(None, None, u3, **kwargs)
assert o1.member_role.members.count() == 3
assert o2.member_role.members.count() == 3
assert o3.member_role.members.count() == 0
# Test remove logic enabled
kwargs['response']['attributes']['memberOf'] = ['Default3']
update_user_orgs_by_saml_attr(None, None, u1, **kwargs)
assert o1.member_role.members.count() == 2
assert o2.member_role.members.count() == 2
assert o3.member_role.members.count() == 1
# Test remove logic disabled
mock_settings.SOCIAL_AUTH_SAML_ORGANIZATION_ATTR['remove'] = False
kwargs['response']['attributes']['memberOf'] = ['Default1', 'Default2']
update_user_orgs_by_saml_attr(None, None, u1, **kwargs)
assert o1.member_role.members.count() == 3
assert o2.member_role.members.count() == 3
assert o3.member_role.members.count() == 1
update_user_orgs_by_saml_attr(backend, None, u1, **kwargs)
assert Organization.objects.get(name="o1_alias").member_role.members.count() == 1
for o in Organization.objects.all():
if o.id in [o1.id, o2.id, o3.id]:
# o[123] were created without a default galaxy cred
assert o.galaxy_credentials.count() == 0
else:
# anything else created should have a default galaxy cred
assert o.galaxy_credentials.count() == 1
assert o.galaxy_credentials.first().name == 'Ansible Galaxy'
def test_update_user_teams_by_saml_attr(self, orgs, users, galaxy_credential, kwargs, mock_settings):
with mock.patch('django.conf.settings', mock_settings):
o1, o2, o3 = orgs
u1, u2, u3 = users
# Test getting teams from attribute with team->org mapping
kwargs['response']['attributes']['groups'] = ['Blue', 'Red', 'Green']
# Ensure basic functionality
update_user_teams_by_saml_attr(None, None, u1, **kwargs)
update_user_teams_by_saml_attr(None, None, u2, **kwargs)
update_user_teams_by_saml_attr(None, None, u3, **kwargs)
assert Team.objects.get(name='Blue', organization__name='Default1').member_role.members.count() == 3
assert Team.objects.get(name='Blue', organization__name='Default2').member_role.members.count() == 3
assert Team.objects.get(name='Blue', organization__name='Default3').member_role.members.count() == 3
assert Team.objects.get(name='Red', organization__name='Default1').member_role.members.count() == 3
assert Team.objects.get(name='Green', organization__name='Default1').member_role.members.count() == 3
assert Team.objects.get(name='Green', organization__name='Default3').member_role.members.count() == 3
# Test remove logic
kwargs['response']['attributes']['groups'] = ['Green']
update_user_teams_by_saml_attr(None, None, u1, **kwargs)
update_user_teams_by_saml_attr(None, None, u2, **kwargs)
update_user_teams_by_saml_attr(None, None, u3, **kwargs)
assert Team.objects.get(name='Blue', organization__name='Default1').member_role.members.count() == 0
assert Team.objects.get(name='Blue', organization__name='Default2').member_role.members.count() == 0
assert Team.objects.get(name='Blue', organization__name='Default3').member_role.members.count() == 0
assert Team.objects.get(name='Red', organization__name='Default1').member_role.members.count() == 0
assert Team.objects.get(name='Green', organization__name='Default1').member_role.members.count() == 3
assert Team.objects.get(name='Green', organization__name='Default3').member_role.members.count() == 3
# Test remove logic disabled
mock_settings.SOCIAL_AUTH_SAML_TEAM_ATTR['remove'] = False
kwargs['response']['attributes']['groups'] = ['Blue']
update_user_teams_by_saml_attr(None, None, u1, **kwargs)
update_user_teams_by_saml_attr(None, None, u2, **kwargs)
update_user_teams_by_saml_attr(None, None, u3, **kwargs)
assert Team.objects.get(name='Blue', organization__name='Default1').member_role.members.count() == 3
assert Team.objects.get(name='Blue', organization__name='Default2').member_role.members.count() == 3
assert Team.objects.get(name='Blue', organization__name='Default3').member_role.members.count() == 3
assert Team.objects.get(name='Red', organization__name='Default1').member_role.members.count() == 0
assert Team.objects.get(name='Green', organization__name='Default1').member_role.members.count() == 3
assert Team.objects.get(name='Green', organization__name='Default3').member_role.members.count() == 3
for o in Organization.objects.all():
if o.id in [o1.id, o2.id, o3.id]:
# o[123] were created without a default galaxy cred
assert o.galaxy_credentials.count() == 0
else:
# anything else created should have a default galaxy cred
assert o.galaxy_credentials.count() == 1
assert o.galaxy_credentials.first().name == 'Ansible Galaxy'
def test_update_user_teams_alias_by_saml_attr(self, orgs, users, galaxy_credential, kwargs, mock_settings):
with mock.patch('django.conf.settings', mock_settings):
u1 = users[0]
# Test getting teams from attribute with team->org mapping
kwargs['response']['attributes']['groups'] = ['Yellow']
# Ensure team and org will be created
update_user_teams_by_saml_attr(None, None, u1, **kwargs)
assert Team.objects.filter(name='Yellow', organization__name='Default4').count() == 0
assert Team.objects.filter(name='Yellow_Alias', organization__name='Default4').count() == 1
assert Team.objects.get(name='Yellow_Alias', organization__name='Default4').member_role.members.count() == 1
# only Org 4 got created/updated
org = Organization.objects.get(name='Default4')
assert org.galaxy_credentials.count() == 1
assert org.galaxy_credentials.first().name == 'Ansible Galaxy'
@pytest.mark.fixture_args(autocreate=False)
def test_autocreate_disabled(self, users, kwargs, mock_settings):
kwargs['response']['attributes']['memberOf'] = ['Default1', 'Default2', 'Default3']
kwargs['response']['attributes']['groups'] = ['Blue', 'Red', 'Green']
with mock.patch('django.conf.settings', mock_settings):
for u in users:
update_user_orgs_by_saml_attr(None, None, u, **kwargs)
update_user_teams_by_saml_attr(None, None, u, **kwargs)
assert Organization.objects.count() == 0
assert Team.objects.count() == 0
# precreate everything
o1 = Organization.objects.create(name='Default1')
o2 = Organization.objects.create(name='Default2')
o3 = Organization.objects.create(name='Default3')
Team.objects.create(name='Blue', organization_id=o1.id)
Team.objects.create(name='Blue', organization_id=o2.id)
Team.objects.create(name='Blue', organization_id=o3.id)
Team.objects.create(name='Red', organization_id=o1.id)
Team.objects.create(name='Green', organization_id=o1.id)
Team.objects.create(name='Green', organization_id=o3.id)
for u in users:
update_user_orgs_by_saml_attr(None, None, u, **kwargs)
update_user_teams_by_saml_attr(None, None, u, **kwargs)
assert o1.member_role.members.count() == 3
assert o2.member_role.members.count() == 3
assert o3.member_role.members.count() == 3
assert Team.objects.get(name='Blue', organization__name='Default1').member_role.members.count() == 3
assert Team.objects.get(name='Blue', organization__name='Default2').member_role.members.count() == 3
assert Team.objects.get(name='Blue', organization__name='Default3').member_role.members.count() == 3
assert Team.objects.get(name='Red', organization__name='Default1').member_role.members.count() == 3
assert Team.objects.get(name='Green', organization__name='Default1').member_role.members.count() == 3
assert Team.objects.get(name='Green', organization__name='Default3').member_role.members.count() == 3
def test_galaxy_credential_auto_assign(self, users, kwargs, galaxy_credential, mock_settings):
kwargs['response']['attributes']['memberOf'] = ['Default1', 'Default2', 'Default3']
kwargs['response']['attributes']['groups'] = ['Blue', 'Red', 'Green']
with mock.patch('django.conf.settings', mock_settings):
for u in users:
update_user_orgs_by_saml_attr(None, None, u, **kwargs)
update_user_teams_by_saml_attr(None, None, u, **kwargs)
assert Organization.objects.count() == 4
for o in Organization.objects.all():
assert o.galaxy_credentials.count() == 1
assert o.galaxy_credentials.first().name == 'Ansible Galaxy'
def test_galaxy_credential_no_auto_assign(self, users, kwargs, galaxy_credential, mock_settings):
# A Galaxy credential should not be added to an existing org
o = Organization.objects.create(name='Default1')
o = Organization.objects.create(name='Default2')
o = Organization.objects.create(name='Default3')
o = Organization.objects.create(name='Default4')
kwargs['response']['attributes']['memberOf'] = ['Default1']
kwargs['response']['attributes']['groups'] = ['Blue']
with mock.patch('django.conf.settings', mock_settings):
for u in users:
update_user_orgs_by_saml_attr(None, None, u, **kwargs)
update_user_teams_by_saml_attr(None, None, u, **kwargs)
assert Organization.objects.count() == 4
for o in Organization.objects.all():
assert o.galaxy_credentials.count() == 0
@pytest.mark.django_db
class TestSAMLUserFlags:
@pytest.mark.parametrize(
"user_flags_settings, expected, is_superuser",
[
# In this case we will pass no user flags so new_flag should be false and changed will def be false
(
{},
(False, False),
False,
),
# NOTE: The first handful of tests test role/value as string instead of lists.
# This was from the initial implementation of these fields but the code should be able to handle this
# There are a couple tests at the end of this which will validate arrays in these values.
#
# In this case we will give the user a group to make them an admin
(
{'is_superuser_role': 'test-role-1'},
(True, True),
False,
),
# In this case we will give the user a flag that will make then an admin
(
{'is_superuser_attr': 'is_superuser'},
(True, True),
False,
),
# In this case we will give the user a flag but the wrong value
(
{'is_superuser_attr': 'is_superuser', 'is_superuser_value': 'junk'},
(False, False),
False,
),
# In this case we will give the user a flag and the right value
(
{'is_superuser_attr': 'is_superuser', 'is_superuser_value': 'true'},
(True, True),
False,
),
# In this case we will give the user a proper role and an is_superuser_attr role that they dont have, this should make them an admin
(
{'is_superuser_role': 'test-role-1', 'is_superuser_attr': 'gibberish', 'is_superuser_value': 'true'},
(True, True),
False,
),
# In this case we will give the user a proper role and an is_superuser_attr role that they have, this should make them an admin
(
{'is_superuser_role': 'test-role-1', 'is_superuser_attr': 'test-role-1'},
(True, True),
False,
),
# In this case we will give the user a proper role and an is_superuser_attr role that they have but a bad value, this should make them an admin
(
{'is_superuser_role': 'test-role-1', 'is_superuser_attr': 'is_superuser', 'is_superuser_value': 'junk'},
(False, False),
False,
),
# In this case we will give the user everything
(
{'is_superuser_role': 'test-role-1', 'is_superuser_attr': 'is_superuser', 'is_superuser_value': 'true'},
(True, True),
False,
),
# In this test case we will validate that a single attribute (instead of a list) still works
(
{'is_superuser_attr': 'name_id', 'is_superuser_value': 'test_id'},
(True, True),
False,
),
# This will be a negative test for a single atrribute
(
{'is_superuser_attr': 'name_id', 'is_superuser_value': 'junk'},
(False, False),
False,
),
# The user is already a superuser so we should remove them
(
{'is_superuser_attr': 'name_id', 'is_superuser_value': 'junk', 'remove_superusers': True},
(False, True),
True,
),
# The user is already a superuser but we don't have a remove field
(
{'is_superuser_attr': 'name_id', 'is_superuser_value': 'junk', 'remove_superusers': False},
(True, False),
True,
),
# Positive test for multiple values for is_superuser_value
(
{'is_superuser_attr': 'is_superuser', 'is_superuser_value': ['junk', 'junk2', 'else', 'junk']},
(True, True),
False,
),
# Negative test for multiple values for is_superuser_value
(
{'is_superuser_attr': 'is_superuser', 'is_superuser_value': ['junk', 'junk2', 'junk']},
(False, True),
True,
),
# Positive test for multiple values of is_superuser_role
(
{'is_superuser_role': ['junk', 'junk2', 'something', 'junk']},
(True, True),
False,
),
# Negative test for multiple values of is_superuser_role
(
{'is_superuser_role': ['junk', 'junk2', 'junk']},
(False, True),
True,
),
],
)
def test__check_flag(self, user_flags_settings, expected, is_superuser):
user = User()
user.username = 'John'
user.is_superuser = is_superuser
attributes = {
'email': ['noone@nowhere.com'],
'last_name': ['Westcott'],
'is_superuser': ['something', 'else', 'true'],
'username': ['test_id'],
'first_name': ['John'],
'Role': ['test-role-1', 'something', 'different'],
'name_id': 'test_id',
}
assert expected == _check_flag(user, 'superuser', attributes, user_flags_settings)

View File

@@ -0,0 +1,639 @@
import pytest
import re
from django.test.utils import override_settings
from awx.main.models import User, Organization, Team
from awx.sso.saml_pipeline import (
_update_m2m_from_expression,
_update_user_orgs,
_update_user_teams,
_update_user_orgs_by_saml_attr,
_update_user_teams_by_saml_attr,
_check_flag,
)
# from unittest import mock
# from django.utils.timezone import now
# , Credential, CredentialType
@pytest.fixture
def users():
u1 = User.objects.create(username='user1@foo.com', last_name='foo', first_name='bar', email='user1@foo.com')
u2 = User.objects.create(username='user2@foo.com', last_name='foo', first_name='bar', email='user2@foo.com')
u3 = User.objects.create(username='user3@foo.com', last_name='foo', first_name='bar', email='user3@foo.com')
return (u1, u2, u3)
@pytest.mark.django_db
class TestSAMLPopulateUser:
# The main populate_user does not need to be tested since its just a conglomeration of other functions that we test
# This test is here in case someone alters the code in the future in a way that does require testing
def test_populate_user(self):
assert True
@pytest.mark.django_db
class TestSAMLSimpleMaps:
# This tests __update_user_orgs and __update_user_teams
@pytest.fixture
def backend(self):
class Backend:
s = {
'ORGANIZATION_MAP': {
'Default': {
'remove': True,
'admins': 'foobar',
'remove_admins': True,
'users': 'foo',
'remove_users': True,
'organization_alias': '',
}
},
'TEAM_MAP': {'Blue': {'organization': 'Default', 'remove': True, 'users': ''}, 'Red': {'organization': 'Default', 'remove': True, 'users': ''}},
}
def setting(self, key):
return self.s[key]
return Backend()
def test__update_user_orgs(self, backend, users):
u1, u2, u3 = users
# Test user membership logic with regular expressions
backend.setting('ORGANIZATION_MAP')['Default']['admins'] = re.compile('.*')
backend.setting('ORGANIZATION_MAP')['Default']['users'] = re.compile('.*')
desired_org_state = {}
orgs_to_create = []
_update_user_orgs(backend, desired_org_state, orgs_to_create, u1)
_update_user_orgs(backend, desired_org_state, orgs_to_create, u2)
_update_user_orgs(backend, desired_org_state, orgs_to_create, u3)
assert desired_org_state == {'Default': {'member_role': True, 'admin_role': True, 'auditor_role': False}}
assert orgs_to_create == ['Default']
# Test remove feature enabled
backend.setting('ORGANIZATION_MAP')['Default']['admins'] = ''
backend.setting('ORGANIZATION_MAP')['Default']['users'] = ''
backend.setting('ORGANIZATION_MAP')['Default']['remove_admins'] = True
backend.setting('ORGANIZATION_MAP')['Default']['remove_users'] = True
desired_org_state = {}
orgs_to_create = []
_update_user_orgs(backend, desired_org_state, orgs_to_create, u1)
assert desired_org_state == {'Default': {'member_role': False, 'admin_role': False, 'auditor_role': False}}
assert orgs_to_create == ['Default']
# Test remove feature disabled
backend.setting('ORGANIZATION_MAP')['Default']['remove_admins'] = False
backend.setting('ORGANIZATION_MAP')['Default']['remove_users'] = False
desired_org_state = {}
orgs_to_create = []
_update_user_orgs(backend, desired_org_state, orgs_to_create, u2)
assert desired_org_state == {'Default': {'member_role': None, 'admin_role': None, 'auditor_role': False}}
assert orgs_to_create == ['Default']
# Test organization alias feature
backend.setting('ORGANIZATION_MAP')['Default']['organization_alias'] = 'Default_Alias'
orgs_to_create = []
_update_user_orgs(backend, {}, orgs_to_create, u1)
assert orgs_to_create == ['Default_Alias']
def test__update_user_teams(self, backend, users):
u1, u2, u3 = users
# Test user membership logic with regular expressions
backend.setting('TEAM_MAP')['Blue']['users'] = re.compile('.*')
backend.setting('TEAM_MAP')['Red']['users'] = re.compile('.*')
desired_team_state = {}
teams_to_create = {}
_update_user_teams(backend, desired_team_state, teams_to_create, u1)
assert teams_to_create == {'Red': 'Default', 'Blue': 'Default'}
assert desired_team_state == {'Default': {'Blue': {'member_role': True}, 'Red': {'member_role': True}}}
# Test remove feature enabled
backend.setting('TEAM_MAP')['Blue']['remove'] = True
backend.setting('TEAM_MAP')['Red']['remove'] = True
backend.setting('TEAM_MAP')['Blue']['users'] = ''
backend.setting('TEAM_MAP')['Red']['users'] = ''
desired_team_state = {}
teams_to_create = {}
_update_user_teams(backend, desired_team_state, teams_to_create, u1)
assert teams_to_create == {'Red': 'Default', 'Blue': 'Default'}
assert desired_team_state == {'Default': {'Blue': {'member_role': False}, 'Red': {'member_role': False}}}
# Test remove feature disabled
backend.setting('TEAM_MAP')['Blue']['remove'] = False
backend.setting('TEAM_MAP')['Red']['remove'] = False
desired_team_state = {}
teams_to_create = {}
_update_user_teams(backend, desired_team_state, teams_to_create, u2)
assert teams_to_create == {'Red': 'Default', 'Blue': 'Default'}
# If we don't care about team memberships we just don't add them to the hash so this would be an empty hash
assert desired_team_state == {}
@pytest.mark.django_db
class TestSAMLM2M:
@pytest.mark.parametrize(
"expression, remove, expected_return",
[
# No expression with no remove
(None, False, None),
("", False, None),
# No expression with remove
(None, True, False),
# True expression with and without remove
(True, False, True),
(True, True, True),
# Single string matching the user name
("user1", False, True),
# Single string matching the user email
("user1@foo.com", False, True),
# Single string not matching username or email, no remove
("user27", False, None),
# Single string not matching username or email, with remove
("user27", True, False),
# Same tests with arrays instead of strings
(["user1"], False, True),
(["user1@foo.com"], False, True),
(["user27"], False, None),
(["user27"], True, False),
# Arrays with nothing matching
(["user27", "user28"], False, None),
(["user27", "user28"], True, False),
# Arrays with all matches
(["user1", "user1@foo.com"], False, True),
# Arrays with some match, some not
(["user1", "user28", "user27"], False, True),
#
# Note: For RE's, usually settings takes care of the compilation for us, so we have to do it manually for testing.
# we also need to remove any / or flags for the compile to happen
#
# Matching username regex non-array
(re.compile("^user.*"), False, True),
(re.compile("^user.*"), True, True),
# Matching email regex non-array
(re.compile(".*@foo.com$"), False, True),
(re.compile(".*@foo.com$"), True, True),
# Non-array not matching username or email
(re.compile("^$"), False, None),
(re.compile("^$"), True, False),
# All re tests just in array form
([re.compile("^user.*")], False, True),
([re.compile("^user.*")], True, True),
([re.compile(".*@foo.com$")], False, True),
([re.compile(".*@foo.com$")], True, True),
([re.compile("^$")], False, None),
([re.compile("^$")], True, False),
# An re with username matching but not email
([re.compile("^user.*"), re.compile(".*@bar.com$")], False, True),
# An re with email matching but not username
([re.compile("^user27$"), re.compile(".*@foo.com$")], False, True),
# An re array with no matching
([re.compile("^user27$"), re.compile(".*@bar.com$")], False, None),
([re.compile("^user27$"), re.compile(".*@bar.com$")], True, False),
#
# A mix of re and strings
#
# String matches, re does not
(["user1", re.compile(".*@bar.com$")], False, True),
# String does not match, re does
(["user27", re.compile(".*@foo.com$")], False, True),
# Nothing matches
(["user27", re.compile(".*@bar.com$")], False, None),
(["user27", re.compile(".*@bar.com$")], True, False),
],
)
def test__update_m2m_from_expression(self, expression, remove, expected_return):
user = User.objects.create(username='user1', last_name='foo', first_name='bar', email='user1@foo.com')
return_val = _update_m2m_from_expression(user, expression, remove)
assert return_val == expected_return
@pytest.mark.django_db
class TestSAMLAttrMaps:
@pytest.fixture
def backend(self):
class Backend:
s = {
'ORGANIZATION_MAP': {
'Default1': {
'remove': True,
'admins': 'foobar',
'remove_admins': True,
'users': 'foo',
'remove_users': True,
'organization_alias': 'o1_alias',
}
}
}
def setting(self, key):
return self.s[key]
return Backend()
@pytest.mark.parametrize(
"setting, expected_state, expected_orgs_to_create, kwargs_member_of_mods",
[
(
# Default test, make sure that our roles get applied and removed as specified (with an alias)
{
'saml_attr': 'memberOf',
'saml_admin_attr': 'admins',
'saml_auditor_attr': 'auditors',
'remove': True,
'remove_admins': True,
},
{
'Default2': {'member_role': True},
'Default3': {'admin_role': True},
'Default4': {'auditor_role': True},
'o1_alias': {'member_role': True},
'Rando1': {'admin_role': False, 'auditor_role': False, 'member_role': False},
},
[
'o1_alias',
'Default2',
'Default3',
'Default4',
],
None,
),
(
# Similar test, we are just going to override the values "coming from the IdP" to limit the teams
{
'saml_attr': 'memberOf',
'saml_admin_attr': 'admins',
'saml_auditor_attr': 'auditors',
'remove': True,
'remove_admins': True,
},
{
'Default3': {'admin_role': True, 'member_role': True},
'Default4': {'auditor_role': True},
'Rando1': {'admin_role': False, 'auditor_role': False, 'member_role': False},
},
[
'Default3',
'Default4',
],
['Default3'],
),
(
# Test to make sure the remove logic is working
{
'saml_attr': 'memberOf',
'saml_admin_attr': 'admins',
'saml_auditor_attr': 'auditors',
'remove': False,
'remove_admins': False,
'remove_auditors': False,
},
{
'Default2': {'member_role': True},
'Default3': {'admin_role': True},
'Default4': {'auditor_role': True},
'o1_alias': {'member_role': True},
},
[
'o1_alias',
'Default2',
'Default3',
'Default4',
],
['Default1', 'Default2'],
),
],
)
def test__update_user_orgs_by_saml_attr(self, backend, setting, expected_state, expected_orgs_to_create, kwargs_member_of_mods):
kwargs = {
'username': u'cmeyers@redhat.com',
'uid': 'idp:cmeyers@redhat.com',
'request': {u'SAMLResponse': [], u'RelayState': [u'idp']},
'is_new': False,
'response': {
'session_index': '_0728f0e0-b766-0135-75fa-02842b07c044',
'idp_name': u'idp',
'attributes': {
'memberOf': ['Default1', 'Default2'],
'admins': ['Default3'],
'auditors': ['Default4'],
'groups': ['Blue', 'Red'],
'User.email': ['cmeyers@redhat.com'],
'User.LastName': ['Meyers'],
'name_id': 'cmeyers@redhat.com',
'User.FirstName': ['Chris'],
'PersonImmutableID': [],
},
},
'social': None,
'strategy': None,
'new_association': False,
}
if kwargs_member_of_mods:
kwargs['response']['attributes']['memberOf'] = kwargs_member_of_mods
# Create a random organization in the database for testing
Organization.objects.create(name='Rando1')
with override_settings(SOCIAL_AUTH_SAML_ORGANIZATION_ATTR=setting):
desired_org_state = {}
orgs_to_create = []
_update_user_orgs_by_saml_attr(backend, desired_org_state, orgs_to_create, **kwargs)
assert desired_org_state == expected_state
assert orgs_to_create == expected_orgs_to_create
@pytest.mark.parametrize(
"setting, expected_team_state, expected_teams_to_create, kwargs_group_override",
[
(
{
'saml_attr': 'groups',
'remove': False,
'team_org_map': [
{'team': 'Blue', 'organization': 'Default1'},
{'team': 'Blue', 'organization': 'Default2'},
{'team': 'Blue', 'organization': 'Default3'},
{'team': 'Red', 'organization': 'Default1'},
{'team': 'Green', 'organization': 'Default1'},
{'team': 'Green', 'organization': 'Default3'},
{'team': 'Yellow', 'team_alias': 'Yellow_Alias', 'organization': 'Default4', 'organization_alias': 'Default4_Alias'},
],
},
{
'Default1': {
'Blue': {'member_role': True},
'Green': {'member_role': False},
'Red': {'member_role': True},
},
'Default2': {
'Blue': {'member_role': True},
},
'Default3': {
'Blue': {'member_role': True},
'Green': {'member_role': False},
},
'Default4': {
'Yellow': {'member_role': False},
},
},
{
'Blue': 'Default3',
'Red': 'Default1',
},
None,
),
(
{
'saml_attr': 'groups',
'remove': False,
'team_org_map': [
{'team': 'Blue', 'organization': 'Default1'},
{'team': 'Blue', 'organization': 'Default2'},
{'team': 'Blue', 'organization': 'Default3'},
{'team': 'Red', 'organization': 'Default1'},
{'team': 'Green', 'organization': 'Default1'},
{'team': 'Green', 'organization': 'Default3'},
{'team': 'Yellow', 'team_alias': 'Yellow_Alias', 'organization': 'Default4', 'organization_alias': 'Default4_Alias'},
],
},
{
'Default1': {
'Blue': {'member_role': True},
'Green': {'member_role': True},
'Red': {'member_role': True},
},
'Default2': {
'Blue': {'member_role': True},
},
'Default3': {
'Blue': {'member_role': True},
'Green': {'member_role': True},
},
'Default4': {
'Yellow': {'member_role': False},
},
},
{
'Blue': 'Default3',
'Red': 'Default1',
'Green': 'Default3',
},
['Blue', 'Red', 'Green'],
),
(
{
'saml_attr': 'groups',
'remove': True,
'team_org_map': [
{'team': 'Blue', 'organization': 'Default1'},
{'team': 'Blue', 'organization': 'Default2'},
{'team': 'Blue', 'organization': 'Default3'},
{'team': 'Red', 'organization': 'Default1'},
{'team': 'Green', 'organization': 'Default1'},
{'team': 'Green', 'organization': 'Default3'},
{'team': 'Yellow', 'team_alias': 'Yellow_Alias', 'organization': 'Default4', 'organization_alias': 'Default4_Alias'},
],
},
{
'Default1': {
'Blue': {'member_role': False},
'Green': {'member_role': True},
'Red': {'member_role': False},
},
'Default2': {
'Blue': {'member_role': False},
},
'Default3': {
'Blue': {'member_role': False},
'Green': {'member_role': True},
},
'Default4': {
'Yellow': {'member_role': False},
},
'Rando1': {
'Rando1': {'member_role': False},
},
},
{
'Green': 'Default3',
},
['Green'],
),
],
)
def test__update_user_teams_by_saml_attr(self, setting, expected_team_state, expected_teams_to_create, kwargs_group_override):
kwargs = {
'username': u'cmeyers@redhat.com',
'uid': 'idp:cmeyers@redhat.com',
'request': {u'SAMLResponse': [], u'RelayState': [u'idp']},
'is_new': False,
'response': {
'session_index': '_0728f0e0-b766-0135-75fa-02842b07c044',
'idp_name': u'idp',
'attributes': {
'memberOf': ['Default1', 'Default2'],
'admins': ['Default3'],
'auditors': ['Default4'],
'groups': ['Blue', 'Red'],
'User.email': ['cmeyers@redhat.com'],
'User.LastName': ['Meyers'],
'name_id': 'cmeyers@redhat.com',
'User.FirstName': ['Chris'],
'PersonImmutableID': [],
},
},
'social': None,
'strategy': None,
'new_association': False,
}
if kwargs_group_override:
kwargs['response']['attributes']['groups'] = kwargs_group_override
o = Organization.objects.create(name='Rando1')
Team.objects.create(name='Rando1', organization_id=o.id)
with override_settings(SOCIAL_AUTH_SAML_TEAM_ATTR=setting):
desired_team_state = {}
teams_to_create = {}
_update_user_teams_by_saml_attr(desired_team_state, teams_to_create, **kwargs)
assert desired_team_state == expected_team_state
assert teams_to_create == expected_teams_to_create
@pytest.mark.django_db
class TestSAMLUserFlags:
@pytest.mark.parametrize(
"user_flags_settings, expected, is_superuser",
[
# In this case we will pass no user flags so new_flag should be false and changed will def be false
(
{},
(False, False),
False,
),
# NOTE: The first handful of tests test role/value as string instead of lists.
# This was from the initial implementation of these fields but the code should be able to handle this
# There are a couple tests at the end of this which will validate arrays in these values.
#
# In this case we will give the user a group to make them an admin
(
{'is_superuser_role': 'test-role-1'},
(True, True),
False,
),
# In this case we will give the user a flag that will make then an admin
(
{'is_superuser_attr': 'is_superuser'},
(True, True),
False,
),
# In this case we will give the user a flag but the wrong value
(
{'is_superuser_attr': 'is_superuser', 'is_superuser_value': 'junk'},
(False, False),
False,
),
# In this case we will give the user a flag and the right value
(
{'is_superuser_attr': 'is_superuser', 'is_superuser_value': 'true'},
(True, True),
False,
),
# In this case we will give the user a proper role and an is_superuser_attr role that they don't have, this should make them an admin
(
{'is_superuser_role': 'test-role-1', 'is_superuser_attr': 'gibberish', 'is_superuser_value': 'true'},
(True, True),
False,
),
# In this case we will give the user a proper role and an is_superuser_attr role that they have, this should make them an admin
(
{'is_superuser_role': 'test-role-1', 'is_superuser_attr': 'test-role-1'},
(True, True),
False,
),
# In this case we will give the user a proper role and an is_superuser_attr role that they have but a bad value, this should make them an admin
(
{'is_superuser_role': 'test-role-1', 'is_superuser_attr': 'is_superuser', 'is_superuser_value': 'junk'},
(False, False),
False,
),
# In this case we will give the user everything
(
{'is_superuser_role': 'test-role-1', 'is_superuser_attr': 'is_superuser', 'is_superuser_value': 'true'},
(True, True),
False,
),
# In this test case we will validate that a single attribute (instead of a list) still works
(
{'is_superuser_attr': 'name_id', 'is_superuser_value': 'test_id'},
(True, True),
False,
),
# This will be a negative test for a single attribute
(
{'is_superuser_attr': 'name_id', 'is_superuser_value': 'junk'},
(False, False),
False,
),
# The user is already a superuser so we should remove them
(
{'is_superuser_attr': 'name_id', 'is_superuser_value': 'junk', 'remove_superusers': True},
(False, True),
True,
),
# The user is already a superuser but we don't have a remove field
(
{'is_superuser_attr': 'name_id', 'is_superuser_value': 'junk', 'remove_superusers': False},
(True, False),
True,
),
# Positive test for multiple values for is_superuser_value
(
{'is_superuser_attr': 'is_superuser', 'is_superuser_value': ['junk', 'junk2', 'else', 'junk']},
(True, True),
False,
),
# Negative test for multiple values for is_superuser_value
(
{'is_superuser_attr': 'is_superuser', 'is_superuser_value': ['junk', 'junk2', 'junk']},
(False, True),
True,
),
# Positive test for multiple values of is_superuser_role
(
{'is_superuser_role': ['junk', 'junk2', 'something', 'junk']},
(True, True),
False,
),
# Negative test for multiple values of is_superuser_role
(
{'is_superuser_role': ['junk', 'junk2', 'junk']},
(False, True),
True,
),
],
)
def test__check_flag(self, user_flags_settings, expected, is_superuser):
user = User()
user.username = 'John'
user.is_superuser = is_superuser
attributes = {
'email': ['noone@nowhere.com'],
'last_name': ['Westcott'],
'is_superuser': ['something', 'else', 'true'],
'username': ['test_id'],
'first_name': ['John'],
'Role': ['test-role-1', 'something', 'different'],
'name_id': 'test_id',
}
assert expected == _check_flag(user, 'superuser', attributes, user_flags_settings)

View File

@@ -0,0 +1,76 @@
import pytest
from awx.main.models import User
from awx.sso.social_base_pipeline import AuthNotFound, check_user_found_or_created, set_is_active_for_new_user, prevent_inactive_login, AuthInactive
@pytest.mark.django_db
class TestSocialBasePipeline:
def test_check_user_found_or_created_no_exception(self):
# If we have a user (the True param, we should not get an exception)
try:
check_user_found_or_created(None, {}, True)
except AuthNotFound:
assert False, 'check_user_found_or_created should not have raised an exception with a user'
@pytest.mark.parametrize(
"details, kwargs, expected_id",
[
(
{},
{},
'???',
),
(
{},
{'uid': 'kwargs_uid'},
'kwargs_uid',
),
(
{},
{'uid': 'kwargs_uid', 'email': 'kwargs_email'},
'kwargs_email',
),
(
{'email': 'details_email'},
{'uid': 'kwargs_uid', 'email': 'kwargs_email'},
'details_email',
),
],
)
def test_check_user_found_or_created_exceptions(self, details, expected_id, kwargs):
with pytest.raises(AuthNotFound) as e:
check_user_found_or_created(None, details, False, None, **kwargs)
assert f'An account cannot be found for {expected_id}' == str(e.value)
@pytest.mark.parametrize(
"kwargs, expected_details, expected_response",
[
({}, {}, None),
({'is_new': False}, {}, None),
({'is_new': True}, {'is_active': True}, {'details': {'is_active': True}}),
],
)
def test_set_is_active_for_new_user(self, kwargs, expected_details, expected_response):
details = {}
response = set_is_active_for_new_user(None, details, None, None, **kwargs)
assert details == expected_details
assert response == expected_response
def test_prevent_inactive_login_no_exception_no_user(self):
try:
prevent_inactive_login(None, None, None, None, None)
except AuthInactive:
assert False, 'prevent_inactive_login should not have raised an exception with no user'
def test_prevent_inactive_login_no_exception_active_user(self):
user = User.objects.create(username='user1@foo.com', last_name='foo', first_name='bar', email='user1@foo.com', is_active=True)
try:
prevent_inactive_login(None, None, user, None, None)
except AuthInactive:
assert False, 'prevent_inactive_login should not have raised an exception with an active user'
def test_prevent_inactive_login_no_exception_inactive_user(self):
user = User.objects.create(username='user1@foo.com', last_name='foo', first_name='bar', email='user1@foo.com', is_active=False)
with pytest.raises(AuthInactive):
prevent_inactive_login(None, None, user, None, None)

View File

@@ -0,0 +1,113 @@
import pytest
import re
from awx.sso.social_pipeline import update_user_orgs, update_user_teams
from awx.main.models import User, Team, Organization
@pytest.fixture
def users():
u1 = User.objects.create(username='user1@foo.com', last_name='foo', first_name='bar', email='user1@foo.com')
u2 = User.objects.create(username='user2@foo.com', last_name='foo', first_name='bar', email='user2@foo.com')
u3 = User.objects.create(username='user3@foo.com', last_name='foo', first_name='bar', email='user3@foo.com')
return (u1, u2, u3)
@pytest.mark.django_db
class TestSocialPipeline:
@pytest.fixture
def backend(self):
class Backend:
s = {
'ORGANIZATION_MAP': {
'Default': {
'remove': True,
'admins': 'foobar',
'remove_admins': True,
'users': 'foo',
'remove_users': True,
'organization_alias': '',
}
},
'TEAM_MAP': {'Blue': {'organization': 'Default', 'remove': True, 'users': ''}, 'Red': {'organization': 'Default', 'remove': True, 'users': ''}},
}
def setting(self, key):
return self.s[key]
return Backend()
@pytest.fixture
def org(self):
return Organization.objects.create(name="Default")
def test_update_user_orgs(self, org, backend, users):
u1, u2, u3 = users
# Test user membership logic with regular expressions
backend.setting('ORGANIZATION_MAP')['Default']['admins'] = re.compile('.*')
backend.setting('ORGANIZATION_MAP')['Default']['users'] = re.compile('.*')
update_user_orgs(backend, None, u1)
update_user_orgs(backend, None, u2)
update_user_orgs(backend, None, u3)
assert org.admin_role.members.count() == 3
assert org.member_role.members.count() == 3
# Test remove feature enabled
backend.setting('ORGANIZATION_MAP')['Default']['admins'] = ''
backend.setting('ORGANIZATION_MAP')['Default']['users'] = ''
backend.setting('ORGANIZATION_MAP')['Default']['remove_admins'] = True
backend.setting('ORGANIZATION_MAP')['Default']['remove_users'] = True
update_user_orgs(backend, None, u1)
assert org.admin_role.members.count() == 2
assert org.member_role.members.count() == 2
# Test remove feature disabled
backend.setting('ORGANIZATION_MAP')['Default']['remove_admins'] = False
backend.setting('ORGANIZATION_MAP')['Default']['remove_users'] = False
update_user_orgs(backend, None, u2)
assert org.admin_role.members.count() == 2
assert org.member_role.members.count() == 2
# Test organization alias feature
backend.setting('ORGANIZATION_MAP')['Default']['organization_alias'] = 'Default_Alias'
update_user_orgs(backend, None, u1)
assert Organization.objects.get(name="Default_Alias") is not None
def test_update_user_teams(self, backend, users):
u1, u2, u3 = users
# Test user membership logic with regular expressions
backend.setting('TEAM_MAP')['Blue']['users'] = re.compile('.*')
backend.setting('TEAM_MAP')['Red']['users'] = re.compile('.*')
update_user_teams(backend, None, u1)
update_user_teams(backend, None, u2)
update_user_teams(backend, None, u3)
assert Team.objects.get(name="Red").member_role.members.count() == 3
assert Team.objects.get(name="Blue").member_role.members.count() == 3
# Test remove feature enabled
backend.setting('TEAM_MAP')['Blue']['remove'] = True
backend.setting('TEAM_MAP')['Red']['remove'] = True
backend.setting('TEAM_MAP')['Blue']['users'] = ''
backend.setting('TEAM_MAP')['Red']['users'] = ''
update_user_teams(backend, None, u1)
assert Team.objects.get(name="Red").member_role.members.count() == 2
assert Team.objects.get(name="Blue").member_role.members.count() == 2
# Test remove feature disabled
backend.setting('TEAM_MAP')['Blue']['remove'] = False
backend.setting('TEAM_MAP')['Red']['remove'] = False
update_user_teams(backend, None, u2)
assert Team.objects.get(name="Red").member_role.members.count() == 2
assert Team.objects.get(name="Blue").member_role.members.count() == 2