Resolve actions conflicts and delete unwatned files

Bump migrations and delete some files

Resolve remaining conflicts

Fix requirements

Flake8 fixes

Prefer devel changes for schema

Use correct versions

Remove sso connected stuff

Update to modern actions and collection fixes

Remove unwated alias

Version problems in actions

Fix more versioning problems

Update warning string

Messed it up again

Shorten exception

More removals

Remove pbr license

Remove tests deleted in devel

Remove unexpected files

Remove some content missed in the rebase

Use sleep_task from devel

Restore devel live conftest file

Add in settings that got missed

Prefer devel version of collection test

Finish repairing .github path

Remove unintended test file duplication

Undo more unintended file additions
This commit is contained in:
AlanCoding
2025-09-12 12:10:50 -04:00
parent 38f858303d
commit 55a7591f89
83 changed files with 103 additions and 13609 deletions

View File

@@ -1,6 +0,0 @@
{
"VMWARE_HOST": "https://foo.invalid",
"VMWARE_PASSWORD": "fooo",
"VMWARE_USER": "fooo",
"VMWARE_VALIDATE_CERTS": "False"
}

View File

@@ -1,4 +0,0 @@
---
{
"demo.query.example": ""
}

View File

@@ -1,17 +1,57 @@
import time
import logging
from dispatcherd.publish import task
from django.db import connection
from awx.main.dispatch import get_task_queuename
from awx.main.dispatch.publish import task
from awx.main.dispatch.publish import task as old_task
from ansible_base.lib.utils.db import advisory_lock
logger = logging.getLogger(__name__)
@task(queue=get_task_queuename)
@old_task(queue=get_task_queuename)
def sleep_task(seconds=10, log=False):
if log:
logger.info('starting sleep_task')
time.sleep(seconds)
if log:
logger.info('finished sleep_task')
@task()
def sleep_break_connection(seconds=0.2):
"""
Interact with the database in an intentionally breaking way.
After this finishes, queries made by this connection are expected to error
with "the connection is closed"
This is obviously a problem for any task that comes afterwards.
So this is used to break things so that the fixes may be demonstrated.
"""
with connection.cursor() as cursor:
cursor.execute(f"SET idle_session_timeout = '{seconds / 2}s';")
logger.info(f'sleeping for {seconds}s > {seconds / 2}s session timeout')
time.sleep(seconds)
for i in range(1, 3):
logger.info(f'\nRunning query number {i}')
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1;")
logger.info(' query worked, not expected')
except Exception as exc:
logger.info(f' query errored as expected\ntype: {type(exc)}\nstr: {str(exc)}')
logger.info(f'Connection present: {bool(connection.connection)}, reports closed: {getattr(connection.connection, "closed", "not_found")}')
@task()
def advisory_lock_exception():
time.sleep(0.2) # so it can fill up all the workers... hacky for now
with advisory_lock('advisory_lock_exception', lock_session_timeout_milliseconds=20):
raise RuntimeError('this is an intentional error')

View File

@@ -1,344 +0,0 @@
"""Tests for GitHub App Installation access token extraction plugin."""
from typing import TypedDict
import pytest
from pytest_mock import MockerFixture
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.rsa import (
RSAPrivateKey,
RSAPublicKey,
generate_private_key,
)
from cryptography.hazmat.primitives.serialization import (
Encoding,
NoEncryption,
PrivateFormat,
PublicFormat,
)
from github.Auth import AppInstallationAuth
from github.Consts import DEFAULT_JWT_ALGORITHM
from github.GithubException import (
BadAttributeException,
GithubException,
UnknownObjectException,
)
from jwt import decode as decode_jwt
from awx.main.credential_plugins import github_app
github_app_jwt_client_id_unsupported = pytest.mark.xfail(
raises=(AssertionError, ValueError),
reason='Client ID in JWT is not currently supported by ' 'PyGitHub and is disabled.\n\n' 'Ref: https://github.com/PyGithub/PyGithub/issues/3213',
)
RSA_PUBLIC_EXPONENT = 65_537 # noqa: WPS303
MINIMUM_RSA_KEY_SIZE = 1024 # the lowest value chosen for performance in tests
@pytest.fixture(scope='module')
def rsa_private_key() -> RSAPrivateKey:
"""Generate an RSA private key."""
return generate_private_key(
public_exponent=RSA_PUBLIC_EXPONENT,
key_size=MINIMUM_RSA_KEY_SIZE, # would be 4096 or higher in production
backend=default_backend(),
)
@pytest.fixture(scope='module')
def rsa_public_key(rsa_private_key: RSAPrivateKey) -> RSAPublicKey:
"""Extract a public key out of the private one."""
return rsa_private_key.public_key()
@pytest.fixture(scope='module')
def rsa_private_key_bytes(rsa_private_key: RSAPrivateKey) -> bytes:
r"""Generate an unencrypted PKCS#1 formatted RSA private key.
Encoded as PEM-bytes.
This is what the GitHub-downloaded PEM files contain.
Ref: https://developer.github.com/apps/building-github-apps/\
authenticating-with-github-apps/
"""
return rsa_private_key.private_bytes(
encoding=Encoding.PEM,
format=PrivateFormat.TraditionalOpenSSL, # A.K.A. PKCS#1
encryption_algorithm=NoEncryption(),
)
@pytest.fixture(scope='module')
def rsa_private_key_str(rsa_private_key_bytes: bytes) -> str:
"""Return private key as an instance of string."""
return rsa_private_key_bytes.decode('utf-8')
@pytest.fixture(scope='module')
def rsa_public_key_bytes(rsa_public_key: RSAPublicKey) -> bytes:
"""Return a PKCS#1 formatted RSA public key encoded as PEM."""
return rsa_public_key.public_bytes(
encoding=Encoding.PEM,
format=PublicFormat.PKCS1,
)
class AppInstallIds(TypedDict):
"""Schema for augmented extractor function keyword args."""
app_or_client_id: str
install_id: str
@pytest.mark.parametrize(
('extract_github_app_install_token_args', 'expected_error_msg'),
(
pytest.param(
{
'app_or_client_id': 'invalid',
'install_id': '666',
},
'^Expected GitHub App or Client ID to be an integer or a string ' r'starting with `Iv1\.` followed by 16 hexadecimal digits, but got' " 'invalid'$",
id='gh-app-id-broken-text',
),
pytest.param(
{
'app_or_client_id': 'Iv1.bbbbbbbbbbbbbbb',
'install_id': '666',
},
'^Expected GitHub App or Client ID to be an integer or a string '
r'starting with `Iv1\.` followed by 16 hexadecimal digits, but got'
" 'Iv1.bbbbbbbbbbbbbbb'$",
id='gh-app-id-client-id-not-enough-chars',
),
pytest.param(
{
'app_or_client_id': 'Iv1.bbbbbbbbbbbbbbbx',
'install_id': '666',
},
'^Expected GitHub App or Client ID to be an integer or a string '
r'starting with `Iv1\.` followed by 16 hexadecimal digits, but got'
" 'Iv1.bbbbbbbbbbbbbbbx'$",
id='gh-app-id-client-id-broken-hex',
),
pytest.param(
{
'app_or_client_id': 'Iv1.bbbbbbbbbbbbbbbbb',
'install_id': '666',
},
'^Expected GitHub App or Client ID to be an integer or a string '
r'starting with `Iv1\.` followed by 16 hexadecimal digits, but got'
" 'Iv1.bbbbbbbbbbbbbbbbb'$",
id='gh-app-id-client-id-too-many-chars',
),
pytest.param(
{
'app_or_client_id': 999,
'install_id': 'invalid',
},
'^Expected GitHub App Installation ID to be an integer ' "but got 'invalid'$",
id='gh-app-invalid-install-id-with-int-app-id',
),
pytest.param(
{
'app_or_client_id': '999',
'install_id': 'invalid',
},
'^Expected GitHub App Installation ID to be an integer ' "but got 'invalid'$",
id='gh-app-invalid-install-id-with-str-digit-app-id',
),
pytest.param(
{
'app_or_client_id': 'Iv1.cccccccccccccccc',
'install_id': 'invalid',
},
'^Expected GitHub App Installation ID to be an integer ' "but got 'invalid'$",
id='gh-app-invalid-install-id-with-client-id',
marks=github_app_jwt_client_id_unsupported,
),
),
)
def test_github_app_invalid_args(
extract_github_app_install_token_args: AppInstallIds,
expected_error_msg: str,
) -> None:
"""Test that invalid arguments make token extractor bail early."""
with pytest.raises(ValueError, match=expected_error_msg):
github_app.extract_github_app_install_token(
github_api_url='https://github.com',
private_rsa_key='key',
**extract_github_app_install_token_args,
)
@pytest.mark.parametrize(
(
'github_exception',
'transformed_exception',
'error_msg',
),
(
(
BadAttributeException(
'',
{},
Exception(),
),
RuntimeError,
(
r'^Broken GitHub @ https://github\.com with '
r'app_or_client_id: 123, install_id: 456\. It is a bug, '
'please report it to the '
r"developers\.\n\n\('', \{\}, Exception\(\)\)$"
),
),
(
GithubException(-1),
RuntimeError,
(
'^An unexpected error happened while talking to GitHub API '
r'@ https://github\.com '
r'\(app_or_client_id: 123, install_id: 456\)\. '
r'Is the app or client ID correct\? '
r'And the private RSA key\? '
r'See https://docs\.github\.com/rest/reference/apps'
r'#create-an-installation-access-token-for-an-app\.'
r'\n\n-1$'
),
),
(
UnknownObjectException(-1),
ValueError,
(
'^Failed to retrieve a GitHub installation token from '
r'https://github\.com using '
r'app_or_client_id: 123, install_id: 456\. '
r'Is the app installed\? See '
r'https://docs\.github\.com/rest/reference/apps'
r'#create-an-installation-access-token-for-an-app\.'
r'\n\n-1$'
),
),
),
ids=(
'github-broken',
'unexpected-error',
'no-install',
),
)
def test_github_app_api_errors(
mocker: MockerFixture,
github_exception: Exception,
transformed_exception: type[Exception],
error_msg: str,
) -> None:
"""Test successful GitHub authentication."""
application_id = 123
installation_id = 456
mocker.patch.object(
github_app.Auth.AppInstallationAuth,
'token',
new_callable=mocker.PropertyMock,
side_effect=github_exception,
)
with pytest.raises(transformed_exception, match=error_msg):
github_app.extract_github_app_install_token(
github_api_url='https://github.com',
app_or_client_id=application_id,
install_id=installation_id,
private_rsa_key='key',
)
class _FakeAppInstallationAuth(AppInstallationAuth):
@property
def token(self: '_FakeAppInstallationAuth') -> str:
return 'token-sentinel'
@pytest.mark.parametrize(
'application_id',
(
123,
'123',
pytest.param(
'Iv1.aaaaaaaaaaaaaaaa',
marks=github_app_jwt_client_id_unsupported,
),
),
ids=('app-id-int', 'app-id-str', 'client-id'),
)
@pytest.mark.parametrize(
'installation_id',
(456, '456'),
ids=('install-id-int', 'install-id-str'),
)
# pylint: disable-next=too-many-arguments,too-many-positional-arguments
def test_github_app_github_authentication( # noqa: WPS211
application_id: int | str,
installation_id: int | str,
mocker: MockerFixture,
monkeypatch: pytest.MonkeyPatch,
rsa_private_key_str: str,
rsa_public_key_bytes: bytes,
) -> None:
"""Test successful GitHub authentication."""
monkeypatch.setattr(
github_app.Auth,
'AppInstallationAuth',
_FakeAppInstallationAuth,
)
get_installation_auth_spy = mocker.spy(
github_app.Auth,
'AppInstallationAuth',
)
github_initializer_spy = mocker.spy(github_app, 'Github')
token = github_app.extract_github_app_install_token(
github_api_url='https://github.com',
app_or_client_id=application_id,
install_id=installation_id,
private_rsa_key=rsa_private_key_str,
)
observed_pygithub_obj = github_initializer_spy.spy_return
observed_gh_install_auth_obj = get_installation_auth_spy.spy_return
# pylint: disable-next=protected-access
signed_jwt = observed_gh_install_auth_obj._app_auth.token # noqa: WPS437
assert token == 'token-sentinel'
assert observed_pygithub_obj.requester.base_url == 'https://github.com'
assert observed_gh_install_auth_obj.installation_id == int(installation_id)
assert isinstance(observed_gh_install_auth_obj, _FakeAppInstallationAuth)
# NOTE: The `decode_jwt()` call asserts that no
# NOTE: `jwt.exceptions.InvalidSignatureError()` exception gets raised
# NOTE: which would indicate incorrect RSA key or corrupted payload if
# NOTE: that was to happen. This verifies that JWT is signed with the
# NOTE: private RSA key we passed by using its public counterpart.
decode_jwt(
signed_jwt,
key=rsa_public_key_bytes,
algorithms=[DEFAULT_JWT_ALGORITHM],
options={
'require': ['exp', 'iat', 'iss'],
'strict_aud': False,
'verify_aud': True,
'verify_exp': True,
'verify_signature': True,
'verify_nbf': True,
},
audience=None, # GH App JWT don't set the audience claim
issuer=str(application_id),
leeway=0.001, # noqa: WPS432
)

View File

@@ -1,217 +0,0 @@
import pytest
from unittest import mock
from awx.main.credential_plugins import hashivault, azure_kv
from azure.keyvault.secrets import (
KeyVaultSecret,
SecretClient,
SecretProperties,
)
def test_imported_azure_cloud_sdk_vars():
from awx.main.credential_plugins import azure_kv
assert len(azure_kv.clouds) > 0
assert all([hasattr(c, 'name') for c in azure_kv.clouds])
assert all([hasattr(c, 'suffixes') for c in azure_kv.clouds])
assert all([hasattr(c.suffixes, 'keyvault_dns') for c in azure_kv.clouds])
def test_hashivault_approle_auth():
kwargs = {
'role_id': 'the_role_id',
'secret_id': 'the_secret_id',
}
expected_res = {
'role_id': 'the_role_id',
'secret_id': 'the_secret_id',
}
res = hashivault.approle_auth(**kwargs)
assert res == expected_res
def test_hashivault_kubernetes_auth():
kwargs = {
'kubernetes_role': 'the_kubernetes_role',
}
expected_res = {
'role': 'the_kubernetes_role',
'jwt': 'the_jwt',
}
with mock.patch('pathlib.Path') as path_mock:
mock.mock_open(path_mock.return_value.open, read_data='the_jwt')
res = hashivault.kubernetes_auth(**kwargs)
path_mock.assert_called_with('/var/run/secrets/kubernetes.io/serviceaccount/token')
assert res == expected_res
def test_hashivault_client_cert_auth_explicit_role():
kwargs = {
'client_cert_role': 'test-cert-1',
}
expected_res = {
'name': 'test-cert-1',
}
res = hashivault.client_cert_auth(**kwargs)
assert res == expected_res
def test_hashivault_client_cert_auth_no_role():
kwargs = {}
expected_res = {
'name': None,
}
res = hashivault.client_cert_auth(**kwargs)
assert res == expected_res
def test_hashivault_userpass_auth():
kwargs = {'username': 'the_username', 'password': 'the_password'}
expected_res = {'username': 'the_username', 'password': 'the_password'}
res = hashivault.userpass_auth(**kwargs)
assert res == expected_res
def test_hashivault_handle_auth_token():
kwargs = {
'token': 'the_token',
}
token = hashivault.handle_auth(**kwargs)
assert token == kwargs['token']
def test_hashivault_handle_auth_approle():
kwargs = {
'role_id': 'the_role_id',
'secret_id': 'the_secret_id',
}
with mock.patch.object(hashivault, 'method_auth') as method_mock:
method_mock.return_value = 'the_token'
token = hashivault.handle_auth(**kwargs)
method_mock.assert_called_with(**kwargs, auth_param=kwargs)
assert token == 'the_token'
def test_hashivault_handle_auth_kubernetes():
kwargs = {
'kubernetes_role': 'the_kubernetes_role',
}
with mock.patch.object(hashivault, 'method_auth') as method_mock:
with mock.patch('pathlib.Path') as path_mock:
mock.mock_open(path_mock.return_value.open, read_data='the_jwt')
method_mock.return_value = 'the_token'
token = hashivault.handle_auth(**kwargs)
method_mock.assert_called_with(**kwargs, auth_param={'role': 'the_kubernetes_role', 'jwt': 'the_jwt'})
assert token == 'the_token'
def test_hashivault_handle_auth_client_cert():
kwargs = {
'client_cert_public': "foo",
'client_cert_private': "bar",
'client_cert_role': 'test-cert-1',
}
auth_params = {
'name': 'test-cert-1',
}
with mock.patch.object(hashivault, 'method_auth') as method_mock:
method_mock.return_value = 'the_token'
token = hashivault.handle_auth(**kwargs)
method_mock.assert_called_with(**kwargs, auth_param=auth_params)
assert token == 'the_token'
def test_hashivault_handle_auth_not_enough_args():
with pytest.raises(Exception):
hashivault.handle_auth()
class TestDelineaImports:
"""
These module have a try-except for ImportError which will allow using the older library
but we do not want the awx_devel image to have the older library,
so these tests are designed to fail if these wind up using the fallback import
"""
def test_dsv_import(self):
from awx.main.credential_plugins.dsv import SecretsVault # noqa
# assert this module as opposed to older thycotic.secrets.vault
assert SecretsVault.__module__ == 'delinea.secrets.vault'
def test_tss_import(self):
from awx.main.credential_plugins.tss import DomainPasswordGrantAuthorizer, PasswordGrantAuthorizer, SecretServer, ServerSecret # noqa
for cls in (DomainPasswordGrantAuthorizer, PasswordGrantAuthorizer, SecretServer, ServerSecret):
# assert this module as opposed to older thycotic.secrets.server
assert cls.__module__ == 'delinea.secrets.server'
class _FakeSecretClient(SecretClient):
def get_secret(
self: '_FakeSecretClient',
name: str,
version: str | None = None,
**kwargs: str,
) -> KeyVaultSecret:
props = SecretProperties(None, None)
return KeyVaultSecret(properties=props, value='test-secret')
def test_azure_kv_invalid_env() -> None:
"""Test running outside of Azure raises error."""
error_msg = (
'You are not operating on an Azure VM, so the Managed Identity '
'feature is unavailable. Please provide the full Client ID, '
'Client Secret, and Tenant ID or run the software on an Azure VM.'
)
with pytest.raises(
RuntimeError,
match=error_msg,
):
azure_kv.azure_keyvault_backend(
url='https://test.vault.azure.net',
client='',
secret='client-secret',
tenant='tenant-id',
secret_field='secret',
secret_version='',
)
@pytest.mark.parametrize(
('client', 'secret', 'tenant'),
(
pytest.param('', '', '', id='managed-identity'),
pytest.param(
'client-id',
'client-secret',
'tenant-id',
id='client-secret-credential',
),
),
)
def test_azure_kv_valid_auth(
monkeypatch: pytest.MonkeyPatch,
client: str,
secret: str,
tenant: str,
) -> None:
"""Test successful Azure authentication via Managed Identity and credentials."""
monkeypatch.setattr(
azure_kv,
'SecretClient',
_FakeSecretClient,
)
keyvault_secret = azure_kv.azure_keyvault_backend(
url='https://test.vault.azure.net',
client=client,
secret=secret,
tenant=tenant,
secret_field='secret',
secret_version='',
)
assert keyvault_secret == 'test-secret'

View File

@@ -1,45 +0,0 @@
import pytest
# AWX
from awx.main.ha import is_ha_environment
from awx.main.models.ha import Instance
from awx.main.dispatch.pool import get_auto_max_workers
# Django
from django.test.utils import override_settings
@pytest.mark.django_db
def test_multiple_instances():
for i in range(2):
Instance.objects.create(hostname=f'foo{i}', node_type='hybrid')
assert is_ha_environment()
@pytest.mark.django_db
def test_db_localhost():
Instance.objects.create(hostname='foo', node_type='hybrid')
Instance.objects.create(hostname='bar', node_type='execution')
assert is_ha_environment() is False
@pytest.mark.django_db
@pytest.mark.parametrize(
'settings',
[
dict(SYSTEM_TASK_ABS_MEM='16Gi', SYSTEM_TASK_ABS_CPU='24', SYSTEM_TASK_FORKS_MEM=400, SYSTEM_TASK_FORKS_CPU=4),
dict(SYSTEM_TASK_ABS_MEM='124Gi', SYSTEM_TASK_ABS_CPU='2', SYSTEM_TASK_FORKS_MEM=None, SYSTEM_TASK_FORKS_CPU=None),
],
ids=['cpu_dominated', 'memory_dominated'],
)
def test_dispatcher_max_workers_reserve(settings, fake_redis):
"""This tests that the dispatcher max_workers matches instance capacity
Assumes capacity_adjustment is 1,
plus reserve worker count
"""
with override_settings(**settings):
i = Instance.objects.create(hostname='test-1', node_type='hybrid')
i.local_health_check()
assert get_auto_max_workers() == i.capacity + 7, (i.cpu, i.memory, i.cpu_capacity, i.mem_capacity)

View File

@@ -1,56 +0,0 @@
import pytest
from awx.main.migrations._db_constraints import _rename_duplicates
from awx.main.models import JobTemplate
@pytest.mark.django_db
def test_rename_job_template_duplicates(organization, project):
ids = []
for i in range(5):
jt = JobTemplate.objects.create(name=f'jt-{i}', organization=organization, project=project)
ids.append(jt.id) # saved in order of creation
# Hack to first allow duplicate names of JT to test migration
JobTemplate.objects.filter(id__in=ids).update(org_unique=False)
# Set all JTs to the same name
JobTemplate.objects.filter(id__in=ids).update(name='same_name_for_test')
_rename_duplicates(JobTemplate)
first_jt = JobTemplate.objects.get(id=ids[0])
assert first_jt.name == 'same_name_for_test'
for i, pk in enumerate(ids):
if i == 0:
continue
jt = JobTemplate.objects.get(id=pk)
# Name should be set based on creation order
assert jt.name == f'same_name_for_test_dup{i}'
@pytest.mark.django_db
def test_rename_job_template_name_too_long(organization, project):
ids = []
for i in range(3):
jt = JobTemplate.objects.create(name=f'jt-{i}', organization=organization, project=project)
ids.append(jt.id) # saved in order of creation
JobTemplate.objects.filter(id__in=ids).update(org_unique=False)
chars = 512
# Set all JTs to the same reaaaaaaly long name
JobTemplate.objects.filter(id__in=ids).update(name='A' * chars)
_rename_duplicates(JobTemplate)
first_jt = JobTemplate.objects.get(id=ids[0])
assert first_jt.name == 'A' * chars
for i, pk in enumerate(ids):
if i == 0:
continue
jt = JobTemplate.objects.get(id=pk)
assert jt.name.endswith(f'dup{i}')
assert len(jt.name) <= 512

View File

@@ -2,7 +2,6 @@ import pytest
from django_test_migrations.plan import all_migrations, nodes_to_tuples
from django.utils.timezone import now
from django.utils import timezone
"""
Most tests that live in here can probably be deleted at some point. They are mainly

View File

@@ -1,96 +0,0 @@
import pytest
import os
import tempfile
import shutil
from awx.main.tasks.jobs import RunJob
from awx.main.tasks.system import CleanupImagesAndFiles, execution_node_health_check
from awx.main.models import Instance, Job
@pytest.fixture
def scm_revision_file(tmpdir_factory):
# Returns path to temporary testing revision file
revision_file = tmpdir_factory.mktemp('revisions').join('revision.txt')
with open(str(revision_file), 'w') as f:
f.write('1234567890123456789012345678901234567890')
return os.path.join(revision_file.dirname, 'revision.txt')
@pytest.mark.django_db
@pytest.mark.parametrize('node_type', ('control. hybrid'))
def test_no_worker_info_on_AWX_nodes(node_type):
hostname = 'us-south-3-compute.invalid'
Instance.objects.create(hostname=hostname, node_type=node_type)
assert execution_node_health_check(hostname) is None
@pytest.fixture
def job_folder_factory(request):
def _rf(job_id='1234'):
pdd_path = tempfile.mkdtemp(prefix=f'awx_{job_id}_')
def test_folder_cleanup():
if os.path.exists(pdd_path):
shutil.rmtree(pdd_path)
request.addfinalizer(test_folder_cleanup)
return pdd_path
return _rf
@pytest.fixture
def mock_job_folder(job_folder_factory):
return job_folder_factory()
@pytest.mark.django_db
def test_folder_cleanup_stale_file(mock_job_folder, mock_me):
CleanupImagesAndFiles.run()
assert os.path.exists(mock_job_folder) # grace period should protect folder from deletion
CleanupImagesAndFiles.run(grace_period=0)
assert not os.path.exists(mock_job_folder) # should be deleted
@pytest.mark.django_db
def test_folder_cleanup_running_job(mock_job_folder, me_inst):
job = Job.objects.create(id=1234, controller_node=me_inst.hostname, status='running')
CleanupImagesAndFiles.run(grace_period=0)
assert os.path.exists(mock_job_folder) # running job should prevent folder from getting deleted
job.status = 'failed'
job.save(update_fields=['status'])
CleanupImagesAndFiles.run(grace_period=0)
assert not os.path.exists(mock_job_folder) # job is finished and no grace period, should delete
@pytest.mark.django_db
def test_folder_cleanup_multiple_running_jobs(job_folder_factory, me_inst):
jobs = []
dirs = []
num_jobs = 3
for i in range(num_jobs):
job = Job.objects.create(controller_node=me_inst.hostname, status='running')
dirs.append(job_folder_factory(job.id))
jobs.append(job)
CleanupImagesAndFiles.run(grace_period=0)
assert [os.path.exists(d) for d in dirs] == [True for i in range(num_jobs)]
@pytest.mark.django_db
def test_does_not_run_reaped_job(mocker, mock_me):
job = Job.objects.create(status='failed', job_explanation='This job has been reaped.')
mock_run = mocker.patch('awx.main.tasks.jobs.ansible_runner.interface.run')
try:
RunJob().run(job.id)
except Exception:
pass
job.refresh_from_db()
assert job.status == 'failed'
mock_run.assert_not_called()

View File

@@ -3,6 +3,7 @@ import time
import os
import shutil
import tempfile
import logging
import pytest
@@ -13,11 +14,15 @@ from awx.api.versioning import reverse
# These tests are invoked from the awx/main/tests/live/ subfolder
# so any fixtures from higher-up conftest files must be explicitly included
from awx.main.tests.functional.conftest import * # noqa
from awx.main.tests.conftest import load_all_credentials # noqa: F401; pylint: disable=unused-import
from awx.main.tests import data
from awx.main.models import Project, JobTemplate, Organization, Inventory
logger = logging.getLogger(__name__)
PROJ_DATA = os.path.join(os.path.dirname(data.__file__), 'projects')
@@ -133,30 +138,29 @@ def podman_image_generator():
@pytest.fixture
def run_job_from_playbook(default_org, demo_inv, post, admin):
def _rf(test_name, playbook, local_path=None, scm_url=None, jt_params=None):
project_name = f'{test_name} project'
jt_name = f'{test_name} JT: {playbook}'
old_proj = Project.objects.filter(name=project_name).first()
if old_proj:
old_proj.delete()
old_jt = JobTemplate.objects.filter(name=jt_name).first()
if old_jt:
old_jt.delete()
proj_kwargs = {'name': project_name, 'organization': default_org.id}
def project_factory(post, default_org, admin):
def _rf(scm_url=None, local_path=None):
proj_kwargs = {}
if local_path:
# manual path
project_name = f'Manual roject {local_path}'
proj_kwargs['scm_type'] = ''
proj_kwargs['local_path'] = local_path
elif scm_url:
project_name = f'Project {scm_url}'
proj_kwargs['scm_type'] = 'git'
proj_kwargs['scm_url'] = scm_url
else:
raise RuntimeError('Need to provide scm_url or local_path')
proj_kwargs['name'] = project_name
proj_kwargs['organization'] = default_org.id
old_proj = Project.objects.filter(name=project_name).first()
if old_proj:
logger.info(f'Deleting existing project {project_name}')
old_proj.delete()
result = post(
reverse('api:project_list'),
proj_kwargs,
@@ -164,6 +168,23 @@ def run_job_from_playbook(default_org, demo_inv, post, admin):
expect=201,
)
proj = Project.objects.get(id=result.data['id'])
return proj
return _rf
@pytest.fixture
def run_job_from_playbook(demo_inv, post, admin, project_factory):
def _rf(test_name, playbook, local_path=None, scm_url=None, jt_params=None, proj=None, wait=True):
jt_name = f'{test_name} JT: {playbook}'
if not proj:
proj = project_factory(scm_url=scm_url, local_path=local_path)
old_jt = JobTemplate.objects.filter(name=jt_name).first()
if old_jt:
logger.info(f'Deleting existing JT {jt_name}')
old_jt.delete()
if proj.current_job:
wait_for_job(proj.current_job)
@@ -185,7 +206,9 @@ def run_job_from_playbook(default_org, demo_inv, post, admin):
job = jt.create_unified_job()
job.signal_start()
wait_for_job(job)
assert job.status == 'successful'
if wait:
wait_for_job(job)
assert job.status == 'successful'
return {'job': job, 'job_template': jt, 'project': proj}
return _rf

View File

@@ -1,581 +0,0 @@
import os
import pytest
from unittest.mock import patch, Mock, call, DEFAULT
from io import StringIO
from unittest import TestCase
from awx.main.management.commands.import_auth_config_to_gateway import Command
from awx.main.utils.gateway_client import GatewayAPIError
class TestImportAuthConfigToGatewayCommand(TestCase):
def setUp(self):
self.command = Command()
def options_basic_auth_full_send(self):
return {
'basic_auth': True,
'skip_all_authenticators': False,
'skip_oidc': False,
'skip_github': False,
'skip_ldap': False,
'skip_ad': False,
'skip_saml': False,
'skip_radius': False,
'skip_tacacs': False,
'skip_google': False,
'skip_settings': False,
'force': False,
}
def options_basic_auth_skip_all_individual(self):
return {
'basic_auth': True,
'skip_all_authenticators': False,
'skip_oidc': True,
'skip_github': True,
'skip_ldap': True,
'skip_ad': True,
'skip_saml': True,
'skip_radius': True,
'skip_tacacs': True,
'skip_google': True,
'skip_settings': True,
'force': False,
}
def options_svc_token_full_send(self):
options = self.options_basic_auth_full_send()
options['basic_auth'] = False
return options
def options_svc_token_skip_all(self):
options = self.options_basic_auth_skip_all_individual()
options['basic_auth'] = False
return options
def create_mock_migrator(
self,
mock_migrator_class,
authenticator_type="TestAuth",
created=0,
updated=0,
unchanged=0,
failed=0,
mappers_created=0,
mappers_updated=0,
mappers_failed=0,
settings_created=0,
settings_updated=0,
settings_unchanged=0,
settings_failed=0,
):
"""Helper method to create a mock migrator with specified return values."""
mock_migrator = Mock()
mock_migrator.get_authenticator_type.return_value = authenticator_type
mock_migrator.migrate.return_value = {
'created': created,
'updated': updated,
'unchanged': unchanged,
'failed': failed,
'mappers_created': mappers_created,
'mappers_updated': mappers_updated,
'mappers_failed': mappers_failed,
}
mock_migrator_class.return_value = mock_migrator
return mock_migrator
def test_add_arguments(self):
"""Test that all expected arguments are properly added to the parser."""
parser = Mock()
self.command.add_arguments(parser)
expected_calls = [
call('--basic-auth', action='store_true', help='Use HTTP Basic Authentication between Controller and Gateway'),
call(
'--skip-all-authenticators',
action='store_true',
help='Skip importing all authenticators [GitHub, OIDC, SAML, Azure AD, LDAP, RADIUS, TACACS+, Google OAuth2]',
),
call('--skip-oidc', action='store_true', help='Skip importing generic OIDC authenticators'),
call('--skip-github', action='store_true', help='Skip importing GitHub authenticator'),
call('--skip-ldap', action='store_true', help='Skip importing LDAP authenticators'),
call('--skip-ad', action='store_true', help='Skip importing Azure AD authenticator'),
call('--skip-saml', action='store_true', help='Skip importing SAML authenticator'),
call('--skip-radius', action='store_true', help='Skip importing RADIUS authenticator'),
call('--skip-tacacs', action='store_true', help='Skip importing TACACS+ authenticator'),
call('--skip-google', action='store_true', help='Skip importing Google OAuth2 authenticator'),
call('--skip-settings', action='store_true', help='Skip importing settings'),
call(
'--force',
action='store_true',
help='Force migration even if configurations already exist. Does not apply to skipped authenticators nor skipped settings.',
),
]
parser.add_argument.assert_has_calls(expected_calls, any_order=True)
@patch.dict(os.environ, {}, clear=True)
@patch('sys.stdout', new_callable=StringIO)
def test_handle_missing_env_vars_basic_auth(self, mock_stdout):
"""Test that missing environment variables cause clean exit when using basic auth."""
with patch.object(self.command, 'stdout', mock_stdout):
with pytest.raises(SystemExit) as exc_info:
self.command.handle(**self.options_basic_auth_full_send())
# Should exit with code 0 for successful early validation
assert exc_info.value.code == 0
output = mock_stdout.getvalue()
self.assertIn('Missing required environment variables:', output)
self.assertIn('GATEWAY_BASE_URL', output)
self.assertIn('GATEWAY_USER', output)
self.assertIn('GATEWAY_PASSWORD', output)
@patch.dict(
os.environ,
{'GATEWAY_BASE_URL': 'https://gateway.example.com', 'GATEWAY_USER': 'testuser', 'GATEWAY_PASSWORD': 'testpass', 'GATEWAY_SKIP_VERIFY': 'true'},
)
@patch('awx.main.management.commands.import_auth_config_to_gateway.SettingsMigrator')
@patch.multiple(
'awx.main.management.commands.import_auth_config_to_gateway',
GitHubMigrator=DEFAULT,
OIDCMigrator=DEFAULT,
SAMLMigrator=DEFAULT,
AzureADMigrator=DEFAULT,
LDAPMigrator=DEFAULT,
RADIUSMigrator=DEFAULT,
TACACSMigrator=DEFAULT,
GoogleOAuth2Migrator=DEFAULT,
)
@patch('awx.main.management.commands.import_auth_config_to_gateway.GatewayClient')
@patch('sys.stdout', new_callable=StringIO)
def test_handle_basic_auth_success(self, mock_stdout, mock_gateway_client, mock_settings_migrator, **mock_migrators):
"""Test successful execution with basic auth."""
# Mock gateway client context manager
mock_client_instance = Mock()
mock_gateway_client.return_value.__enter__.return_value = mock_client_instance
mock_gateway_client.return_value.__exit__.return_value = None
for mock_migrator_class in mock_migrators.values():
self.create_mock_migrator(mock_migrator_class, created=1, mappers_created=2)
self.create_mock_migrator(mock_settings_migrator, settings_created=1, settings_updated=0, settings_unchanged=2, settings_failed=0)
with patch.object(self.command, 'stdout', mock_stdout):
with pytest.raises(SystemExit) as exc_info:
self.command.handle(**self.options_basic_auth_full_send())
# Should exit with code 0 for success
assert exc_info.value.code == 0
# Verify gateway client was created with correct parameters
mock_gateway_client.assert_called_once_with(
base_url='https://gateway.example.com', username='testuser', password='testpass', skip_verify=True, command=self.command
)
# Verify all migrators were created
for mock_migrator in mock_migrators.values():
mock_migrator.assert_called_once_with(mock_client_instance, self.command, force=False)
mock_settings_migrator.assert_called_once_with(mock_client_instance, self.command, force=False)
# Verify output contains success messages
output = mock_stdout.getvalue()
self.assertIn('HTTP Basic Auth: true', output)
self.assertIn('Successfully connected to Gateway', output)
self.assertIn('Migration Summary', output)
self.assertIn('authenticators', output)
self.assertIn('mappers', output)
self.assertIn('settings', output)
@patch.dict(os.environ, {'GATEWAY_SKIP_VERIFY': 'false'}, clear=True) # Ensure verify_https=True
@patch('awx.main.management.commands.import_auth_config_to_gateway.create_api_client')
@patch('awx.main.management.commands.import_auth_config_to_gateway.GatewayClientSVCToken')
@patch('awx.main.management.commands.import_auth_config_to_gateway.urlparse')
@patch('awx.main.management.commands.import_auth_config_to_gateway.urlunparse')
@patch('sys.stdout', new_callable=StringIO)
def test_handle_service_token_success(self, mock_stdout, mock_urlunparse, mock_urlparse, mock_gateway_client_svc, mock_create_api_client):
"""Test successful execution with service token."""
# Mock resource API client
mock_resource_client = Mock()
mock_resource_client.base_url = 'https://gateway.example.com/api/v1'
mock_resource_client.jwt_user_id = 'test-user'
mock_resource_client.jwt_expiration = '2024-12-31'
mock_resource_client.verify_https = True
mock_response = Mock()
mock_response.status_code = 200
mock_resource_client.get_service_metadata.return_value = mock_response
mock_create_api_client.return_value = mock_resource_client
# Mock URL parsing
mock_parsed = Mock()
mock_parsed.scheme = 'https'
mock_parsed.netloc = 'gateway.example.com'
mock_urlparse.return_value = mock_parsed
mock_urlunparse.return_value = 'https://gateway.example.com/'
# Mock gateway client context manager
mock_client_instance = Mock()
mock_gateway_client_svc.return_value.__enter__.return_value = mock_client_instance
mock_gateway_client_svc.return_value.__exit__.return_value = None
with patch.object(self.command, 'stdout', mock_stdout):
with patch('sys.exit'):
self.command.handle(**self.options_svc_token_skip_all())
# Should call sys.exit(0) for success, but may not due to test setup
# Just verify the command completed without raising an exception
# Verify resource API client was created and configured
mock_create_api_client.assert_called_once()
self.assertTrue(mock_resource_client.verify_https) # Should be True when GATEWAY_SKIP_VERIFY='false'
mock_resource_client.get_service_metadata.assert_called_once()
# Verify service token client was created
mock_gateway_client_svc.assert_called_once_with(resource_api_client=mock_resource_client, command=self.command)
# Verify output contains service token messages
output = mock_stdout.getvalue()
self.assertIn('Gateway Service Token: true', output)
self.assertIn('Connection Validated: True', output)
self.assertIn('No authentication configurations found to migrate.', output)
@patch.dict(os.environ, {'GATEWAY_BASE_URL': 'https://gateway.example.com', 'GATEWAY_USER': 'testuser', 'GATEWAY_PASSWORD': 'testpass'})
@patch.multiple(
'awx.main.management.commands.import_auth_config_to_gateway',
GitHubMigrator=DEFAULT,
OIDCMigrator=DEFAULT,
SAMLMigrator=DEFAULT,
AzureADMigrator=DEFAULT,
LDAPMigrator=DEFAULT,
RADIUSMigrator=DEFAULT,
TACACSMigrator=DEFAULT,
GoogleOAuth2Migrator=DEFAULT,
SettingsMigrator=DEFAULT,
)
@patch('awx.main.management.commands.import_auth_config_to_gateway.GatewayClient')
@patch('sys.stdout', new_callable=StringIO)
def test_skip_flags_prevent_authenticator_individual_and_settings_migration(self, mock_stdout, mock_gateway_client, **mock_migrators):
"""Test that skip flags prevent corresponding migrators from being created."""
# Mock gateway client context manager
mock_client_instance = Mock()
mock_gateway_client.return_value.__enter__.return_value = mock_client_instance
mock_gateway_client.return_value.__exit__.return_value = None
with patch.object(self.command, 'stdout', mock_stdout):
with patch('sys.exit'):
self.command.handle(**self.options_basic_auth_skip_all_individual())
# Should call sys.exit(0) for success, but may not due to test setup
# Just verify the command completed without raising an exception
# Verify no migrators were created
for mock_migrator in mock_migrators.values():
mock_migrator.assert_not_called()
# Verify warning message about no configurations
output = mock_stdout.getvalue()
self.assertIn('No authentication configurations found to migrate.', output)
self.assertIn('Settings migration will not execute.', output)
self.assertIn('NO MIGRATIONS WILL EXECUTE.', output)
@patch.dict(os.environ, {'GATEWAY_BASE_URL': 'https://gateway.example.com', 'GATEWAY_USER': 'testuser', 'GATEWAY_PASSWORD': 'testpass'})
@patch.multiple(
'awx.main.management.commands.import_auth_config_to_gateway',
GitHubMigrator=DEFAULT,
OIDCMigrator=DEFAULT,
SAMLMigrator=DEFAULT,
AzureADMigrator=DEFAULT,
LDAPMigrator=DEFAULT,
RADIUSMigrator=DEFAULT,
TACACSMigrator=DEFAULT,
GoogleOAuth2Migrator=DEFAULT,
)
@patch('awx.main.management.commands.import_auth_config_to_gateway.GatewayClient')
@patch('sys.stdout', new_callable=StringIO)
def test_skip_flags_prevent_authenticator_migration(self, mock_stdout, mock_gateway_client, **mock_migrators):
"""Test that skip flags prevent corresponding migrators from being created."""
# Mock gateway client context manager
mock_client_instance = Mock()
mock_gateway_client.return_value.__enter__.return_value = mock_client_instance
mock_gateway_client.return_value.__exit__.return_value = None
options = self.options_basic_auth_full_send()
options['skip_all_authenticators'] = True
with patch.object(self.command, 'stdout', mock_stdout):
with pytest.raises(SystemExit) as exc_info:
self.command.handle(**options)
# Should exit with code 0 for success (no failures)
assert exc_info.value.code == 0
# Verify no migrators were created
for mock_migrator in mock_migrators.values():
mock_migrator.assert_not_called()
# Verify warning message about no configurations
output = mock_stdout.getvalue()
self.assertIn('No authentication configurations found to migrate.', output)
self.assertNotIn('Settings migration will not execute.', output)
self.assertNotIn('NO MIGRATIONS WILL EXECUTE.', output)
@patch.dict(os.environ, {'GATEWAY_BASE_URL': 'https://gateway.example.com', 'GATEWAY_USER': 'testuser', 'GATEWAY_PASSWORD': 'testpass'})
@patch('awx.main.management.commands.import_auth_config_to_gateway.GatewayClient')
@patch('sys.stdout', new_callable=StringIO)
def test_handle_gateway_api_error(self, mock_stdout, mock_gateway_client):
"""Test handling of GatewayAPIError exceptions."""
# Mock gateway client to raise GatewayAPIError
mock_gateway_client.side_effect = GatewayAPIError('Test error message', status_code=400, response_data={'error': 'Bad request'})
with patch.object(self.command, 'stdout', mock_stdout):
with pytest.raises(SystemExit) as exc_info:
self.command.handle(**self.options_basic_auth_full_send())
# Should exit with code 1 for errors
assert exc_info.value.code == 1
# Verify error message output
output = mock_stdout.getvalue()
self.assertIn('Gateway API Error: Test error message', output)
self.assertIn('Status Code: 400', output)
self.assertIn("Response: {'error': 'Bad request'}", output)
@patch.dict(os.environ, {'GATEWAY_BASE_URL': 'https://gateway.example.com', 'GATEWAY_USER': 'testuser', 'GATEWAY_PASSWORD': 'testpass'})
@patch('awx.main.management.commands.import_auth_config_to_gateway.GatewayClient')
@patch('sys.stdout', new_callable=StringIO)
def test_handle_unexpected_error(self, mock_stdout, mock_gateway_client):
"""Test handling of unexpected exceptions."""
# Mock gateway client to raise unexpected error
mock_gateway_client.side_effect = ValueError('Unexpected error')
with patch.object(self.command, 'stdout', mock_stdout):
with pytest.raises(SystemExit) as exc_info:
self.command.handle(**self.options_basic_auth_full_send())
# Should exit with code 1 for errors
assert exc_info.value.code == 1
# Verify error message output
output = mock_stdout.getvalue()
self.assertIn('Unexpected error during migration: Unexpected error', output)
@patch.dict(os.environ, {'GATEWAY_BASE_URL': 'https://gateway.example.com', 'GATEWAY_USER': 'testuser', 'GATEWAY_PASSWORD': 'testpass'})
@patch('awx.main.management.commands.import_auth_config_to_gateway.GatewayClient')
@patch('awx.main.management.commands.import_auth_config_to_gateway.GitHubMigrator')
@patch('awx.main.management.commands.import_auth_config_to_gateway.SettingsMigrator')
@patch('sys.stdout', new_callable=StringIO)
def test_force_flag_passed_to_migrators(self, mock_stdout, mock_github, mock_settings_migrator, mock_gateway_client):
"""Test that force flag is properly passed to migrators."""
# Mock gateway client context manager
mock_client_instance = Mock()
mock_gateway_client.return_value.__enter__.return_value = mock_client_instance
mock_gateway_client.return_value.__exit__.return_value = None
# Mock migrator
self.create_mock_migrator(mock_github, authenticator_type="GitHub", created=0, mappers_created=2)
self.create_mock_migrator(
mock_settings_migrator, authenticator_type="Settings", settings_created=0, settings_updated=2, settings_unchanged=0, settings_failed=0
)
options = self.options_basic_auth_skip_all_individual()
options['force'] = True
options['skip_github'] = False
options['skip_settings'] = False
with patch.object(self.command, 'stdout', mock_stdout):
with pytest.raises(SystemExit) as exc_info:
self.command.handle(**options)
# Should exit with code 0 for success
assert exc_info.value.code == 0
# Verify migrator was created with force=True
mock_github.assert_called_once_with(mock_client_instance, self.command, force=True)
# Verify settings migrator was created with force=True
mock_settings_migrator.assert_called_once_with(mock_client_instance, self.command, force=True)
@patch('sys.stdout', new_callable=StringIO)
def test_print_export_summary(self, mock_stdout):
"""Test the _print_export_summary method."""
result = {
'created': 2,
'updated': 1,
'unchanged': 3,
'failed': 0,
'mappers_created': 5,
'mappers_updated': 2,
'mappers_failed': 1,
}
with patch.object(self.command, 'stdout', mock_stdout):
self.command._print_export_summary('SAML', result)
output = mock_stdout.getvalue()
self.assertIn('--- SAML Export Summary ---', output)
self.assertIn('Authenticators created: 2', output)
self.assertIn('Authenticators updated: 1', output)
self.assertIn('Authenticators unchanged: 3', output)
self.assertIn('Authenticators failed: 0', output)
self.assertIn('Mappers created: 5', output)
self.assertIn('Mappers updated: 2', output)
self.assertIn('Mappers failed: 1', output)
@patch('sys.stdout', new_callable=StringIO)
def test_print_export_summary_settings(self, mock_stdout):
"""Test the _print_export_summary method."""
result = {
'settings_created': 2,
'settings_updated': 1,
'settings_unchanged': 3,
'settings_failed': 0,
}
with patch.object(self.command, 'stdout', mock_stdout):
self.command._print_export_summary('Settings', result)
output = mock_stdout.getvalue()
self.assertIn('--- Settings Export Summary ---', output)
self.assertIn('Settings created: 2', output)
self.assertIn('Settings updated: 1', output)
self.assertIn('Settings unchanged: 3', output)
self.assertIn('Settings failed: 0', output)
@patch('sys.stdout', new_callable=StringIO)
def test_print_export_summary_missing_keys(self, mock_stdout):
"""Test _print_export_summary handles missing keys gracefully."""
result = {
'created': 1,
'updated': 2,
# Missing other keys
}
with patch.object(self.command, 'stdout', mock_stdout):
self.command._print_export_summary('LDAP', result)
output = mock_stdout.getvalue()
self.assertIn('--- LDAP Export Summary ---', output)
self.assertIn('Authenticators created: 1', output)
self.assertIn('Authenticators updated: 2', output)
self.assertIn('Authenticators unchanged: 0', output) # Default value
self.assertIn('Mappers created: 0', output) # Default value
@patch.dict(os.environ, {'GATEWAY_BASE_URL': 'https://gateway.example.com', 'GATEWAY_USER': 'testuser', 'GATEWAY_PASSWORD': 'testpass'})
@patch('awx.main.management.commands.import_auth_config_to_gateway.GatewayClient')
@patch('awx.main.management.commands.import_auth_config_to_gateway.GitHubMigrator')
@patch('awx.main.management.commands.import_auth_config_to_gateway.OIDCMigrator')
@patch('sys.stdout', new_callable=StringIO)
def test_total_results_accumulation(self, mock_stdout, mock_oidc, mock_github, mock_gateway_client):
"""Test that results from multiple migrators are properly accumulated."""
# Mock gateway client context manager
mock_client_instance = Mock()
mock_gateway_client.return_value.__enter__.return_value = mock_client_instance
mock_gateway_client.return_value.__exit__.return_value = None
# Mock migrators with different results
self.create_mock_migrator(mock_github, authenticator_type="GitHub", created=1, mappers_created=2)
self.create_mock_migrator(mock_oidc, authenticator_type="OIDC", created=0, updated=1, unchanged=1, mappers_created=1, mappers_updated=1)
options = self.options_basic_auth_skip_all_individual()
options['skip_oidc'] = False
options['skip_github'] = False
with patch.object(self.command, 'stdout', mock_stdout):
with pytest.raises(SystemExit) as exc_info:
self.command.handle(**options)
# Should exit with code 0 for success
assert exc_info.value.code == 0
# Verify total results are accumulated correctly
output = mock_stdout.getvalue()
self.assertIn('Total authenticators created: 1', output) # 1 + 0
self.assertIn('Total authenticators updated: 1', output) # 0 + 1
self.assertIn('Total authenticators unchanged: 1', output) # 0 + 1
self.assertIn('Total authenticators failed: 0', output) # 0 + 0
self.assertIn('Total mappers created: 3', output) # 2 + 1
self.assertIn('Total mappers updated: 1', output) # 0 + 1
self.assertIn('Total mappers failed: 0', output) # 0 + 0
@patch('sys.stdout', new_callable=StringIO)
def test_environment_variable_parsing(self, mock_stdout):
"""Test that environment variables are parsed correctly."""
test_cases = [
('true', True),
('1', True),
('yes', True),
('on', True),
('TRUE', True),
('false', False),
('0', False),
('no', False),
('off', False),
('', False),
('random', False),
]
for env_value, expected in test_cases:
with patch.dict(
os.environ,
{
'GATEWAY_BASE_URL': 'https://gateway.example.com',
'GATEWAY_USER': 'testuser',
'GATEWAY_PASSWORD': 'testpass',
'GATEWAY_SKIP_VERIFY': env_value,
},
):
with patch('awx.main.management.commands.import_auth_config_to_gateway.GatewayClient') as mock_gateway_client:
# Mock gateway client context manager
mock_client_instance = Mock()
mock_gateway_client.return_value.__enter__.return_value = mock_client_instance
mock_gateway_client.return_value.__exit__.return_value = None
with patch.object(self.command, 'stdout', mock_stdout):
with patch('sys.exit'):
self.command.handle(**self.options_basic_auth_skip_all_individual())
# Verify gateway client was called with correct skip_verify value
mock_gateway_client.assert_called_once_with(
base_url='https://gateway.example.com', username='testuser', password='testpass', skip_verify=expected, command=self.command
)
# Reset for next iteration
mock_gateway_client.reset_mock()
mock_stdout.seek(0)
mock_stdout.truncate(0)
@patch.dict(os.environ, {'GATEWAY_SKIP_VERIFY': 'false'})
@patch('awx.main.management.commands.import_auth_config_to_gateway.create_api_client')
@patch('awx.main.management.commands.import_auth_config_to_gateway.urlparse')
@patch('awx.main.management.commands.import_auth_config_to_gateway.urlunparse')
@patch('awx.main.management.commands.import_auth_config_to_gateway.SettingsMigrator')
@patch('sys.stdout', new_callable=StringIO)
def test_service_token_connection_validation_failure(self, mock_stdout, mock_settings_migrator, mock_urlunparse, mock_urlparse, mock_create_api_client):
"""Test that non-200 response from get_service_metadata causes error exit."""
# Mock resource API client with failing response
mock_resource_client = Mock()
mock_resource_client.base_url = 'https://gateway.example.com/api/v1'
mock_resource_client.jwt_user_id = 'test-user'
mock_resource_client.jwt_expiration = '2024-12-31'
mock_resource_client.verify_https = True
mock_response = Mock()
mock_response.status_code = 401 # Simulate unauthenticated error
mock_resource_client.get_service_metadata.return_value = mock_response
mock_create_api_client.return_value = mock_resource_client
# Mock URL parsing (needed for the service token flow)
mock_parsed = Mock()
mock_parsed.scheme = 'https'
mock_parsed.netloc = 'gateway.example.com'
mock_urlparse.return_value = mock_parsed
mock_urlunparse.return_value = 'https://gateway.example.com/'
with patch.object(self.command, 'stdout', mock_stdout):
with pytest.raises(SystemExit) as exc_info:
self.command.handle(**self.options_svc_token_skip_all())
# Should exit with code 1 for connection failure
assert exc_info.value.code == 1
# Verify error message is displayed
output = mock_stdout.getvalue()
self.assertIn(
'Gateway Service Token is unable to connect to Gateway via the base URL https://gateway.example.com/. Recieved HTTP response code 401', output
)
self.assertIn('Connection Validated: False', output)

View File

@@ -871,314 +871,6 @@ class TestJobCredentials(TestJobExecution):
assert f.read() == self.EXAMPLE_PRIVATE_KEY
assert safe_env['ANSIBLE_NET_PASSWORD'] == HIDDEN_PASSWORD
def test_terraform_cloud_credentials(self, job, private_data_dir, mock_me):
terraform = CredentialType.defaults['terraform']()
hcl_config = '''
backend "s3" {
bucket = "s3_sample_bucket"
key = "/tf_state/"
region = "us-east-1"
}
'''
credential = Credential(pk=1, credential_type=terraform, inputs={'configuration': hcl_config})
credential.inputs['configuration'] = encrypt_field(credential, 'configuration')
job.credentials.add(credential)
env = {}
safe_env = {}
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
local_path = to_host_path(env['TF_BACKEND_CONFIG_FILE'], private_data_dir)
config = open(local_path, 'r').read()
assert config == hcl_config
def test_terraform_gcs_backend_credentials(self, job, private_data_dir, mock_me):
terraform = CredentialType.defaults['terraform']()
hcl_config = '''
backend "gcs" {
bucket = "gce_storage"
}
'''
gce_backend_credentials = '''
{
"type": "service_account",
"project_id": "sample",
"private_key_id": "eeeeeeeeeeeeeeeeeeeeeeeeeee",
"private_key": "-----BEGIN PRIVATE KEY-----\naaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\n-----END PRIVATE KEY-----\n",
"client_email": "sample@sample.iam.gserviceaccount.com",
"client_id": "0123456789",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/cloud-content-robot%40sample.iam.gserviceaccount.com",
}
'''
credential = Credential(pk=1, credential_type=terraform, inputs={'configuration': hcl_config, 'gce_credentials': gce_backend_credentials})
credential.inputs['configuration'] = encrypt_field(credential, 'configuration')
credential.inputs['gce_credentials'] = encrypt_field(credential, 'gce_credentials')
job.credentials.add(credential)
env = {}
safe_env = {}
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
local_path = to_host_path(env['TF_BACKEND_CONFIG_FILE'], private_data_dir)
config = open(local_path, 'r').read()
assert config == hcl_config
credentials_path = to_host_path(env['GOOGLE_BACKEND_CREDENTIALS'], private_data_dir)
credentials = open(credentials_path, 'r').read()
assert credentials == gce_backend_credentials
def test_custom_environment_injectors_with_jinja_syntax_error(self, private_data_dir, mock_me):
some_cloud = CredentialType(
kind='cloud',
name='SomeCloud',
managed=False,
inputs={'fields': [{'id': 'api_token', 'label': 'API Token', 'type': 'string'}]},
injectors={'env': {'MY_CLOUD_API_TOKEN': '{{api_token.foo()}}'}},
)
credential = Credential(pk=1, credential_type=some_cloud, inputs={'api_token': 'ABC123'})
with pytest.raises(jinja2.exceptions.UndefinedError):
credential.credential_type.inject_credential(credential, {}, {}, [], private_data_dir)
def test_custom_environment_injectors(self, private_data_dir, mock_me):
some_cloud = CredentialType(
kind='cloud',
name='SomeCloud',
managed=False,
inputs={'fields': [{'id': 'api_token', 'label': 'API Token', 'type': 'string'}]},
injectors={'env': {'MY_CLOUD_API_TOKEN': '{{api_token}}'}},
)
credential = Credential(pk=1, credential_type=some_cloud, inputs={'api_token': 'ABC123'})
env = {}
credential.credential_type.inject_credential(credential, env, {}, [], private_data_dir)
assert env['MY_CLOUD_API_TOKEN'] == 'ABC123'
def test_custom_environment_injectors_with_boolean_env_var(self, private_data_dir, mock_me):
some_cloud = CredentialType(
kind='cloud',
name='SomeCloud',
managed=False,
inputs={'fields': [{'id': 'turbo_button', 'label': 'Turbo Button', 'type': 'boolean'}]},
injectors={'env': {'TURBO_BUTTON': '{{turbo_button}}'}},
)
credential = Credential(pk=1, credential_type=some_cloud, inputs={'turbo_button': True})
env = {}
credential.credential_type.inject_credential(credential, env, {}, [], private_data_dir)
assert env['TURBO_BUTTON'] == str(True)
def test_custom_environment_injectors_with_reserved_env_var(self, private_data_dir, job, mock_me):
task = jobs.RunJob()
task.instance = job
some_cloud = CredentialType(
kind='cloud',
name='SomeCloud',
managed=False,
inputs={'fields': [{'id': 'api_token', 'label': 'API Token', 'type': 'string'}]},
injectors={'env': {'JOB_ID': 'reserved'}},
)
credential = Credential(pk=1, credential_type=some_cloud, inputs={'api_token': 'ABC123'})
job.credentials.add(credential)
env = task.build_env(job, private_data_dir)
assert env['JOB_ID'] == str(job.pk)
def test_custom_environment_injectors_with_secret_field(self, private_data_dir, mock_me):
some_cloud = CredentialType(
kind='cloud',
name='SomeCloud',
managed=False,
inputs={'fields': [{'id': 'password', 'label': 'Password', 'type': 'string', 'secret': True}]},
injectors={'env': {'MY_CLOUD_PRIVATE_VAR': '{{password}}'}},
)
credential = Credential(pk=1, credential_type=some_cloud, inputs={'password': 'SUPER-SECRET-123'})
credential.inputs['password'] = encrypt_field(credential, 'password')
env = {}
safe_env = {}
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
assert env['MY_CLOUD_PRIVATE_VAR'] == 'SUPER-SECRET-123'
assert 'SUPER-SECRET-123' not in safe_env.values()
assert safe_env['MY_CLOUD_PRIVATE_VAR'] == HIDDEN_PASSWORD
def test_custom_environment_injectors_with_extra_vars(self, private_data_dir, job, mock_me):
task = jobs.RunJob()
some_cloud = CredentialType(
kind='cloud',
name='SomeCloud',
managed=False,
inputs={'fields': [{'id': 'api_token', 'label': 'API Token', 'type': 'string'}]},
injectors={'extra_vars': {'api_token': '{{api_token}}'}},
)
credential = Credential(pk=1, credential_type=some_cloud, inputs={'api_token': 'ABC123'})
job.credentials.add(credential)
args = task.build_args(job, private_data_dir, {})
credential.credential_type.inject_credential(credential, {}, {}, args, private_data_dir)
extra_vars = parse_extra_vars(args, private_data_dir)
assert extra_vars["api_token"] == "ABC123"
assert hasattr(extra_vars["api_token"], '__UNSAFE__')
def test_custom_environment_injectors_with_boolean_extra_vars(self, job, private_data_dir, mock_me):
task = jobs.RunJob()
some_cloud = CredentialType(
kind='cloud',
name='SomeCloud',
managed=False,
inputs={'fields': [{'id': 'turbo_button', 'label': 'Turbo Button', 'type': 'boolean'}]},
injectors={'extra_vars': {'turbo_button': '{{turbo_button}}'}},
)
credential = Credential(pk=1, credential_type=some_cloud, inputs={'turbo_button': True})
job.credentials.add(credential)
args = task.build_args(job, private_data_dir, {})
credential.credential_type.inject_credential(credential, {}, {}, args, private_data_dir)
extra_vars = parse_extra_vars(args, private_data_dir)
assert extra_vars["turbo_button"] == "True"
def test_custom_environment_injectors_with_nested_extra_vars(self, private_data_dir, job, mock_me):
task = jobs.RunJob()
some_cloud = CredentialType(
kind='cloud',
name='SomeCloud',
managed=False,
inputs={'fields': [{'id': 'host', 'label': 'Host', 'type': 'string'}]},
injectors={'extra_vars': {'auth': {'host': '{{host}}'}}},
)
credential = Credential(pk=1, credential_type=some_cloud, inputs={'host': 'example.com'})
job.credentials.add(credential)
args = task.build_args(job, private_data_dir, {})
credential.credential_type.inject_credential(credential, {}, {}, args, private_data_dir)
extra_vars = parse_extra_vars(args, private_data_dir)
assert extra_vars["auth"]["host"] == "example.com"
def test_custom_environment_injectors_with_templated_extra_vars_key(self, private_data_dir, job, mock_me):
task = jobs.RunJob()
some_cloud = CredentialType(
kind='cloud',
name='SomeCloud',
managed=False,
inputs={'fields': [{'id': 'environment', 'label': 'Environment', 'type': 'string'}, {'id': 'host', 'label': 'Host', 'type': 'string'}]},
injectors={'extra_vars': {'{{environment}}_auth': {'host': '{{host}}'}}},
)
credential = Credential(pk=1, credential_type=some_cloud, inputs={'environment': 'test', 'host': 'example.com'})
job.credentials.add(credential)
args = task.build_args(job, private_data_dir, {})
credential.credential_type.inject_credential(credential, {}, {}, args, private_data_dir)
extra_vars = parse_extra_vars(args, private_data_dir)
assert extra_vars["test_auth"]["host"] == "example.com"
def test_custom_environment_injectors_with_complicated_boolean_template(self, job, private_data_dir, mock_me):
task = jobs.RunJob()
some_cloud = CredentialType(
kind='cloud',
name='SomeCloud',
managed=False,
inputs={'fields': [{'id': 'turbo_button', 'label': 'Turbo Button', 'type': 'boolean'}]},
injectors={'extra_vars': {'turbo_button': '{% if turbo_button %}FAST!{% else %}SLOW!{% endif %}'}},
)
credential = Credential(pk=1, credential_type=some_cloud, inputs={'turbo_button': True})
job.credentials.add(credential)
args = task.build_args(job, private_data_dir, {})
credential.credential_type.inject_credential(credential, {}, {}, args, private_data_dir)
extra_vars = parse_extra_vars(args, private_data_dir)
assert extra_vars["turbo_button"] == "FAST!"
def test_custom_environment_injectors_with_secret_extra_vars(self, job, private_data_dir, mock_me):
"""
extra_vars that contain secret field values should be censored in the DB
"""
task = jobs.RunJob()
some_cloud = CredentialType(
kind='cloud',
name='SomeCloud',
managed=False,
inputs={'fields': [{'id': 'password', 'label': 'Password', 'type': 'string', 'secret': True}]},
injectors={'extra_vars': {'password': '{{password}}'}},
)
credential = Credential(pk=1, credential_type=some_cloud, inputs={'password': 'SUPER-SECRET-123'})
credential.inputs['password'] = encrypt_field(credential, 'password')
job.credentials.add(credential)
args = task.build_args(job, private_data_dir, {})
credential.credential_type.inject_credential(credential, {}, {}, args, private_data_dir)
extra_vars = parse_extra_vars(args, private_data_dir)
assert extra_vars["password"] == "SUPER-SECRET-123"
def test_custom_environment_injectors_with_file(self, private_data_dir, mock_me):
some_cloud = CredentialType(
kind='cloud',
name='SomeCloud',
managed=False,
inputs={'fields': [{'id': 'api_token', 'label': 'API Token', 'type': 'string'}]},
injectors={'file': {'template': '[mycloud]\n{{api_token}}'}, 'env': {'MY_CLOUD_INI_FILE': '{{tower.filename}}'}},
)
credential = Credential(pk=1, credential_type=some_cloud, inputs={'api_token': 'ABC123'})
env = {}
credential.credential_type.inject_credential(credential, env, {}, [], private_data_dir)
path = to_host_path(env['MY_CLOUD_INI_FILE'], private_data_dir)
assert open(path, 'r').read() == '[mycloud]\nABC123'
def test_custom_environment_injectors_with_unicode_content(self, private_data_dir, mock_me):
value = 'Iñtërnâtiônàlizætiøn'
some_cloud = CredentialType(
kind='cloud',
name='SomeCloud',
managed=False,
inputs={'fields': []},
injectors={'file': {'template': value}, 'env': {'MY_CLOUD_INI_FILE': '{{tower.filename}}'}},
)
credential = Credential(
pk=1,
credential_type=some_cloud,
)
env = {}
credential.credential_type.inject_credential(credential, env, {}, [], private_data_dir)
path = to_host_path(env['MY_CLOUD_INI_FILE'], private_data_dir)
assert open(path, 'r').read() == value
def test_custom_environment_injectors_with_files(self, private_data_dir, mock_me):
some_cloud = CredentialType(
kind='cloud',
name='SomeCloud',
managed=False,
inputs={'fields': [{'id': 'cert', 'label': 'Certificate', 'type': 'string'}, {'id': 'key', 'label': 'Key', 'type': 'string'}]},
injectors={
'file': {'template.cert': '[mycert]\n{{cert}}', 'template.key': '[mykey]\n{{key}}'},
'env': {'MY_CERT_INI_FILE': '{{tower.filename.cert}}', 'MY_KEY_INI_FILE': '{{tower.filename.key}}'},
},
)
credential = Credential(pk=1, credential_type=some_cloud, inputs={'cert': 'CERT123', 'key': 'KEY123'})
env = {}
credential.credential_type.inject_credential(credential, env, {}, [], private_data_dir)
cert_path = to_host_path(env['MY_CERT_INI_FILE'], private_data_dir)
key_path = to_host_path(env['MY_KEY_INI_FILE'], private_data_dir)
assert open(cert_path, 'r').read() == '[mycert]\nCERT123'
assert open(key_path, 'r').read() == '[mykey]\nKEY123'
def test_multi_cloud(self, private_data_dir, mock_me):
gce = CredentialType.defaults['gce']()
gce_credential = Credential(pk=1, credential_type=gce, inputs={'username': 'bob', 'project': 'some-project', 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY})

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,124 +0,0 @@
"""
Unit tests for GitHub authenticator migrator functionality.
"""
from unittest.mock import Mock, patch
from awx.sso.utils.github_migrator import GitHubMigrator
class TestGitHubMigrator:
"""Tests for GitHubMigrator class."""
def setup_method(self):
"""Set up test fixtures."""
self.gateway_client = Mock()
self.command = Mock()
self.migrator = GitHubMigrator(self.gateway_client, self.command)
def test_create_gateway_authenticator_returns_boolean_causes_crash(self):
"""
Test that verifies create_gateway_authenticator returns proper dictionary
structure instead of boolean when credentials are missing.
This test verifies the fix for the bug.
"""
# Mock the get_controller_config to return a GitHub config with missing credentials
github_config_missing_creds = {
'category': 'github',
'settings': {'SOCIAL_AUTH_GITHUB_KEY': '', 'SOCIAL_AUTH_GITHUB_SECRET': 'test-secret'}, # Missing key
'org_mappers': [],
'team_mappers': [],
'login_redirect_override': None,
}
with patch.object(self.migrator, 'get_controller_config', return_value=[github_config_missing_creds]):
with patch.object(self.migrator, '_write_output'): # Mock output to avoid noise
# This should NOT crash now that the bug is fixed
result = self.migrator.migrate()
# Verify the migration ran successfully without crashing
assert 'created' in result
assert 'failed' in result
# Should have failed=1 since the config has success=False (missing credentials)
assert result['failed'] == 1
def test_create_gateway_authenticator_returns_boolean_with_unknown_category(self):
"""
Test that verifies create_gateway_authenticator returns proper dictionary
structure instead of boolean when category is unknown.
This test verifies the fix for the bug.
"""
# Mock the get_controller_config to return a GitHub config with unknown category
github_config_unknown_category = {
'category': 'unknown-category',
'settings': {'SOCIAL_AUTH_UNKNOWN_KEY': 'test-key', 'SOCIAL_AUTH_UNKNOWN_SECRET': 'test-secret'},
'org_mappers': [],
'team_mappers': [],
'login_redirect_override': None,
}
with patch.object(self.migrator, 'get_controller_config', return_value=[github_config_unknown_category]):
with patch.object(self.migrator, '_write_output'): # Mock output to avoid noise
# This should NOT crash now that the bug is fixed
result = self.migrator.migrate()
# Verify the migration ran successfully without crashing
assert 'created' in result
assert 'failed' in result
# Should have failed=1 since the config has success=False (unknown category)
assert result['failed'] == 1
def test_create_gateway_authenticator_direct_boolean_return_missing_creds(self):
"""
Test that directly calls create_gateway_authenticator and verifies it returns
proper dictionary structure instead of boolean for missing credentials.
"""
# Config with missing key (empty string)
config_missing_key = {
'category': 'github',
'settings': {'SOCIAL_AUTH_GITHUB_KEY': '', 'SOCIAL_AUTH_GITHUB_SECRET': 'test-secret'}, # Missing key
'org_mappers': [],
'team_mappers': [],
'login_redirect_override': None,
}
with patch.object(self.migrator, '_write_output'): # Mock output to avoid noise
result = self.migrator.create_gateway_authenticator(config_missing_key)
# Now the method should return a proper dictionary structure
assert isinstance(result, dict), f"Expected dict, got {type(result)} with value: {result}"
assert 'success' in result, f"Expected 'success' key in result: {result}"
assert 'action' in result, f"Expected 'action' key in result: {result}"
assert 'error' in result, f"Expected 'error' key in result: {result}"
# Verify the expected values
assert result['success'] is False
assert result['action'] == 'skipped'
assert 'Missing OAuth2 credentials' in result['error']
def test_create_gateway_authenticator_direct_boolean_return_unknown_category(self):
"""
Test that directly calls create_gateway_authenticator and verifies it returns
proper dictionary structure instead of boolean for unknown category.
"""
# Config with unknown category
config_unknown_category = {
'category': 'unknown-category',
'settings': {'SOCIAL_AUTH_UNKNOWN_KEY': 'test-key', 'SOCIAL_AUTH_UNKNOWN_SECRET': 'test-secret'},
'org_mappers': [],
'team_mappers': [],
'login_redirect_override': None,
}
with patch.object(self.migrator, '_write_output'): # Mock output to avoid noise
result = self.migrator.create_gateway_authenticator(config_unknown_category)
# Now the method should return a proper dictionary structure
assert isinstance(result, dict), f"Expected dict, got {type(result)} with value: {result}"
assert 'success' in result, f"Expected 'success' key in result: {result}"
assert 'action' in result, f"Expected 'action' key in result: {result}"
assert 'error' in result, f"Expected 'error' key in result: {result}"
# Verify the expected values
assert result['success'] is False
assert result['action'] == 'skipped'
assert 'Unknown category unknown-category' in result['error']

File diff suppressed because it is too large Load Diff

View File

@@ -1,614 +0,0 @@
"""
Unit tests for role mapping utilities.
"""
import pytest
from awx.main.utils.gateway_mapping import role_map_to_gateway_format
from awx.sso.utils.ldap_migrator import LDAPMigrator
def get_role_mappers(role_map, start_order=1):
"""Helper function to get just the mappers from role_map_to_gateway_format."""
result, _ = role_map_to_gateway_format(role_map, start_order)
return result
def ldap_group_allow_to_gateway_format(result, ldap_group, deny=False, start_order=1):
"""Helper function to test LDAP group allow mapping via LDAPMigrator."""
migrator = LDAPMigrator()
return migrator._ldap_group_allow_to_gateway_format(result, ldap_group, deny, start_order)
class TestRoleMapToGatewayFormat:
"""Tests for role_map_to_gateway_format function."""
def test_none_input(self):
"""Test that None input returns empty list."""
result, next_order = role_map_to_gateway_format(None)
assert result == []
assert next_order == 1 # Default start_order
def test_empty_dict(self):
"""Test that empty dict returns empty list."""
result, next_order = role_map_to_gateway_format({})
assert result == []
assert next_order == 1
def test_is_superuser_single_group(self):
"""Test is_superuser with single group."""
role_map = {"is_superuser": "cn=awx_super_users,OU=administration groups,DC=contoso,DC=com"}
result, _ = role_map_to_gateway_format(role_map)
expected = [
{
"name": "is_superuser - role",
"authenticator": -1,
"revoke": True,
"map_type": "is_superuser",
"team": None,
"organization": None,
"triggers": {
"groups": {
"has_or": ["cn=awx_super_users,OU=administration groups,DC=contoso,DC=com"],
}
},
"order": 1,
}
]
assert result == expected
def test_is_superuser_multiple_groups(self):
"""Test is_superuser with multiple groups."""
role_map = {"is_superuser": ["cn=super_users,dc=example,dc=com", "cn=admins,dc=example,dc=com"]}
result, _ = role_map_to_gateway_format(role_map)
expected = [
{
"name": "is_superuser - role",
"authenticator": -1,
"revoke": True,
"map_type": "is_superuser",
"team": None,
"organization": None,
"triggers": {
"groups": {
"has_or": ["cn=super_users,dc=example,dc=com", "cn=admins,dc=example,dc=com"],
}
},
"order": 1,
}
]
assert result == expected
def test_is_system_auditor_single_group(self):
"""Test is_system_auditor with single group."""
role_map = {"is_system_auditor": "cn=auditors,dc=example,dc=com"}
result, _ = role_map_to_gateway_format(role_map)
expected = [
{
"name": "is_system_auditor - role",
"authenticator": -1,
"revoke": True,
"map_type": "role",
"role": "Platform Auditor",
"team": None,
"organization": None,
"triggers": {
"groups": {
"has_or": ["cn=auditors,dc=example,dc=com"],
}
},
"order": 1,
}
]
assert result == expected
def test_is_system_auditor_multiple_groups(self):
"""Test is_system_auditor with multiple groups."""
role_map = {"is_system_auditor": ["cn=auditors,dc=example,dc=com", "cn=viewers,dc=example,dc=com"]}
result, _ = role_map_to_gateway_format(role_map)
expected = [
{
"name": "is_system_auditor - role",
"authenticator": -1,
"revoke": True,
"map_type": "role",
"role": "Platform Auditor",
"team": None,
"organization": None,
"triggers": {
"groups": {
"has_or": ["cn=auditors,dc=example,dc=com", "cn=viewers,dc=example,dc=com"],
}
},
"order": 1,
}
]
assert result == expected
def test_multiple_roles(self):
"""Test multiple role mappings."""
role_map = {"is_superuser": "cn=super_users,dc=example,dc=com", "is_system_auditor": "cn=auditors,dc=example,dc=com"}
result, _ = role_map_to_gateway_format(role_map)
expected = [
{
"name": "is_superuser - role",
"authenticator": -1,
"revoke": True,
"map_type": "is_superuser",
"team": None,
"organization": None,
"triggers": {
"groups": {
"has_or": ["cn=super_users,dc=example,dc=com"],
}
},
"order": 1,
},
{
"name": "is_system_auditor - role",
"authenticator": -1,
"revoke": True,
"map_type": "role",
"role": "Platform Auditor",
"team": None,
"organization": None,
"triggers": {
"groups": {
"has_or": ["cn=auditors,dc=example,dc=com"],
}
},
"order": 2,
},
]
assert result == expected
def test_unsupported_role_flag(self):
"""Test that unsupported role flags are ignored."""
role_map = {
"is_superuser": "cn=super_users,dc=example,dc=com",
"is_staff": "cn=staff,dc=example,dc=com", # Unsupported flag
"is_system_auditor": "cn=auditors,dc=example,dc=com",
}
result, _ = role_map_to_gateway_format(role_map)
# Should only have 2 mappers (is_superuser and is_system_auditor)
assert len(result) == 2
assert result[0]["map_type"] == "is_superuser"
assert result[1]["map_type"] == "role"
assert result[1]["role"] == "Platform Auditor"
def test_order_increments_correctly(self):
"""Test that order values increment correctly."""
role_map = {"is_superuser": "cn=super_users,dc=example,dc=com", "is_system_auditor": "cn=auditors,dc=example,dc=com"}
result, _ = role_map_to_gateway_format(role_map)
assert len(result) == 2
assert result[0]["order"] == 1
assert result[1]["order"] == 2
def test_start_order_parameter(self):
"""Test that start_order parameter is respected."""
role_map = {"is_superuser": "cn=super_users,dc=example,dc=com"}
result, next_order = role_map_to_gateway_format(role_map, start_order=5)
assert result[0]["order"] == 5
assert next_order == 6
def test_string_to_list_conversion(self):
"""Test that string groups are converted to lists."""
role_map = {"is_superuser": "single-group"}
result, _ = role_map_to_gateway_format(role_map)
# Should convert string to list for has_or
assert result[0]["triggers"]["groups"]["has_or"] == ["single-group"]
def test_triggers_format_validation(self):
"""Test that trigger formats match Gateway specification."""
role_map = {"is_superuser": ["group1", "group2"]}
result, _ = role_map_to_gateway_format(role_map)
# Validate that triggers follow Gateway format
triggers = result[0]["triggers"]
assert "groups" in triggers
assert "has_or" in triggers["groups"]
assert isinstance(triggers["groups"]["has_or"], list)
assert triggers["groups"]["has_or"] == ["group1", "group2"]
def test_ldap_dn_format(self):
"""Test with realistic LDAP DN format."""
role_map = {
"is_superuser": "cn=awx_super_users,OU=administration groups,DC=contoso,DC=com",
"is_system_auditor": "cn=awx_auditors,OU=administration groups,DC=contoso,DC=com",
}
result, _ = role_map_to_gateway_format(role_map)
assert len(result) == 2
assert result[0]["triggers"]["groups"]["has_or"] == ["cn=awx_super_users,OU=administration groups,DC=contoso,DC=com"]
assert result[1]["triggers"]["groups"]["has_or"] == ["cn=awx_auditors,OU=administration groups,DC=contoso,DC=com"]
def test_gateway_format_compliance(self):
"""Test that all results comply with Gateway role mapping format."""
role_map = {"is_superuser": "cn=super_users,dc=example,dc=com", "is_system_auditor": "cn=auditors,dc=example,dc=com"}
result, _ = role_map_to_gateway_format(role_map)
for mapping in result:
# Required fields per Gateway spec
assert "name" in mapping
assert "authenticator" in mapping
assert "map_type" in mapping
assert "organization" in mapping
assert "team" in mapping
assert "triggers" in mapping
assert "order" in mapping
assert "revoke" in mapping
# Field types
assert isinstance(mapping["name"], str)
assert isinstance(mapping["authenticator"], int)
assert mapping["map_type"] in ["is_superuser", "role"]
assert mapping["organization"] is None
assert mapping["team"] is None
assert isinstance(mapping["triggers"], dict)
assert isinstance(mapping["order"], int)
assert isinstance(mapping["revoke"], bool)
# Specific field validations based on map_type
if mapping["map_type"] == "is_superuser":
assert "role" not in mapping
elif mapping["map_type"] == "role":
assert "role" in mapping
assert isinstance(mapping["role"], str)
assert mapping["role"] == "Platform Auditor"
# Parametrized tests for role mappings
@pytest.mark.parametrize(
"role_map,expected_length",
[
(None, 0),
({}, 0),
({"is_superuser": "group1"}, 1),
({"is_system_auditor": "group1"}, 1),
({"is_superuser": "group1", "is_system_auditor": "group2"}, 2),
({"is_staff": "group1"}, 0), # Unsupported flag
({"is_superuser": "group1", "is_staff": "group2", "is_system_auditor": "group3"}, 2), # Mixed supported/unsupported
],
)
def test_role_map_result_lengths(role_map, expected_length):
"""Test that role_map_to_gateway_format returns expected number of mappings."""
result, _ = role_map_to_gateway_format(role_map)
assert len(result) == expected_length
# Edge case tests
def test_empty_groups_handling():
"""Test handling of empty group lists."""
role_map = {"is_superuser": []}
result, _ = role_map_to_gateway_format(role_map)
assert len(result) == 1
assert result[0]["triggers"]["groups"]["has_or"] == []
def test_mixed_group_types():
"""Test handling of mixed group types (string and list)."""
role_map = {"is_superuser": "single-group", "is_system_auditor": ["group1", "group2"]}
result, _ = role_map_to_gateway_format(role_map)
assert len(result) == 2
assert result[0]["triggers"]["groups"]["has_or"] == ["single-group"]
assert result[1]["triggers"]["groups"]["has_or"] == ["group1", "group2"]
def test_realistic_ldap_user_flags_by_group():
"""Test with realistic LDAP USER_FLAGS_BY_GROUP data."""
role_map = {"is_superuser": "cn=awx_super_users,OU=administration groups,DC=contoso,DC=com"}
result, _ = role_map_to_gateway_format(role_map)
# This is exactly the use case from the user's example
assert len(result) == 1
assert result[0]["map_type"] == "is_superuser"
assert result[0]["triggers"]["groups"]["has_or"] == ["cn=awx_super_users,OU=administration groups,DC=contoso,DC=com"]
assert result[0]["revoke"] is True
assert result[0]["team"] is None
assert result[0]["organization"] is None
class TestLdapGroupAllowToGatewayFormat:
"""Tests for ldap_group_allow_to_gateway_format function."""
def test_none_input_with_empty_result(self):
"""Test that None input with empty result returns unchanged result."""
result = []
output_result, next_order = ldap_group_allow_to_gateway_format(result, None, deny=False)
assert output_result == []
assert next_order == 1 # Default start_order
def test_none_input_with_existing_result(self):
"""Test that None input with existing mappers returns unchanged result."""
result = [{"existing": "mapper"}]
output_result, next_order = ldap_group_allow_to_gateway_format(result, None, deny=False, start_order=5)
assert output_result == [{"existing": "mapper"}]
assert next_order == 5 # start_order unchanged
def test_require_group_mapping(self):
"""Test LDAP REQUIRE_GROUP mapping (deny=False)."""
result = []
ldap_group = "cn=allowed_users,dc=example,dc=com"
output_result, next_order = ldap_group_allow_to_gateway_format(result, ldap_group, deny=False, start_order=1)
expected = [
{
"name": "LDAP-RequireGroup",
"authenticator": -1,
"map_type": "allow",
"revoke": False,
"triggers": {"groups": {"has_and": ["cn=allowed_users,dc=example,dc=com"]}},
"order": 1,
}
]
assert output_result == expected
assert next_order == 2
def test_deny_group_mapping(self):
"""Test LDAP DENY_GROUP mapping (deny=True)."""
result = []
ldap_group = "cn=blocked_users,dc=example,dc=com"
output_result, next_order = ldap_group_allow_to_gateway_format(result, ldap_group, deny=True, start_order=1)
expected = [
{
"name": "LDAP-DenyGroup",
"authenticator": -1,
"map_type": "allow",
"revoke": True,
"triggers": {"groups": {"has_or": ["cn=blocked_users,dc=example,dc=com"]}},
"order": 1,
}
]
assert output_result == expected
assert next_order == 2
def test_appending_to_existing_result(self):
"""Test appending to existing result list."""
existing_mapper = {
"name": "existing-mapper",
"authenticator": -1,
"map_type": "role",
"order": 1,
}
result = [existing_mapper]
ldap_group = "cn=new_group,dc=example,dc=com"
output_result, next_order = ldap_group_allow_to_gateway_format(result, ldap_group, deny=False, start_order=2)
assert len(output_result) == 2
assert output_result[0] == existing_mapper # Original mapper unchanged
assert output_result[1]["name"] == "LDAP-RequireGroup"
assert output_result[1]["order"] == 2
assert next_order == 3
def test_custom_start_order(self):
"""Test that custom start_order is respected."""
result = []
ldap_group = "cn=test_group,dc=example,dc=com"
output_result, next_order = ldap_group_allow_to_gateway_format(result, ldap_group, deny=False, start_order=10)
assert output_result[0]["order"] == 10
assert next_order == 11
def test_require_vs_deny_trigger_differences(self):
"""Test the difference between require and deny group triggers."""
ldap_group = "cn=test_group,dc=example,dc=com"
# Test require group (deny=False)
require_result, _ = ldap_group_allow_to_gateway_format([], ldap_group, deny=False)
# Test deny group (deny=True)
deny_result, _ = ldap_group_allow_to_gateway_format([], ldap_group, deny=True)
# Require group should use has_and
assert require_result[0]["triggers"]["groups"]["has_and"] == ["cn=test_group,dc=example,dc=com"]
assert require_result[0]["revoke"] is False
assert require_result[0]["name"] == "LDAP-RequireGroup"
# Deny group should use has_or
assert deny_result[0]["triggers"]["groups"]["has_or"] == ["cn=test_group,dc=example,dc=com"]
assert deny_result[0]["revoke"] is True
assert deny_result[0]["name"] == "LDAP-DenyGroup"
def test_realistic_ldap_dn_format(self):
"""Test with realistic LDAP DN format."""
result = []
# Test with require group
require_group = "cn=awx_users,OU=application groups,DC=contoso,DC=com"
output_result, next_order = ldap_group_allow_to_gateway_format(result, require_group, deny=False, start_order=1)
assert len(output_result) == 1
assert output_result[0]["triggers"]["groups"]["has_and"] == ["cn=awx_users,OU=application groups,DC=contoso,DC=com"]
assert output_result[0]["name"] == "LDAP-RequireGroup"
assert next_order == 2
def test_multiple_sequential_calls(self):
"""Test multiple sequential calls to build complex allow mappers."""
result = []
# Add deny group first
result, next_order = ldap_group_allow_to_gateway_format(result, "cn=blocked,dc=example,dc=com", deny=True, start_order=1)
# Add require group second
result, next_order = ldap_group_allow_to_gateway_format(result, "cn=allowed,dc=example,dc=com", deny=False, start_order=next_order)
assert len(result) == 2
# First mapper should be deny group
assert result[0]["name"] == "LDAP-DenyGroup"
assert result[0]["revoke"] is True
assert result[0]["triggers"]["groups"]["has_or"] == ["cn=blocked,dc=example,dc=com"]
assert result[0]["order"] == 1
# Second mapper should be require group
assert result[1]["name"] == "LDAP-RequireGroup"
assert result[1]["revoke"] is False
assert result[1]["triggers"]["groups"]["has_and"] == ["cn=allowed,dc=example,dc=com"]
assert result[1]["order"] == 2
assert next_order == 3
def test_gateway_format_compliance(self):
"""Test that all results comply with Gateway allow mapping format."""
result = []
# Test both deny and require groups
result, _ = ldap_group_allow_to_gateway_format(result, "cn=denied,dc=example,dc=com", deny=True, start_order=1)
result, _ = ldap_group_allow_to_gateway_format(result, "cn=required,dc=example,dc=com", deny=False, start_order=2)
for mapping in result:
# Required fields per Gateway spec
assert "name" in mapping
assert "authenticator" in mapping
assert "map_type" in mapping
assert "triggers" in mapping
assert "order" in mapping
assert "revoke" in mapping
# Field types
assert isinstance(mapping["name"], str)
assert isinstance(mapping["authenticator"], int)
assert mapping["map_type"] == "allow"
assert isinstance(mapping["triggers"], dict)
assert isinstance(mapping["order"], int)
assert isinstance(mapping["revoke"], bool)
# Trigger format validation
assert "groups" in mapping["triggers"]
groups_trigger = mapping["triggers"]["groups"]
# Should have either has_and or has_or, but not both
has_and = "has_and" in groups_trigger
has_or = "has_or" in groups_trigger
assert has_and != has_or # XOR - exactly one should be true
if has_and:
assert isinstance(groups_trigger["has_and"], list)
assert len(groups_trigger["has_and"]) == 1
if has_or:
assert isinstance(groups_trigger["has_or"], list)
assert len(groups_trigger["has_or"]) == 1
def test_original_result_not_modified_when_none(self):
"""Test that original result list is not modified when ldap_group is None."""
original_result = [{"original": "mapper"}]
result_copy = original_result.copy()
output_result, _ = ldap_group_allow_to_gateway_format(original_result, None, deny=False)
# Original list should be unchanged
assert original_result == result_copy
# Output should be the same reference
assert output_result is original_result
def test_empty_string_group(self):
"""Test handling of empty string group."""
result = []
output_result, next_order = ldap_group_allow_to_gateway_format(result, "", deny=False, start_order=1)
# Should still create a mapper even with empty string
assert len(output_result) == 1
assert output_result[0]["triggers"]["groups"]["has_and"] == [""]
assert next_order == 2
# Parametrized tests for ldap_group_allow_to_gateway_format
@pytest.mark.parametrize(
"ldap_group,deny,expected_name,expected_revoke,expected_trigger_type",
[
("cn=test,dc=example,dc=com", True, "LDAP-DenyGroup", True, "has_or"),
("cn=test,dc=example,dc=com", False, "LDAP-RequireGroup", False, "has_and"),
("cn=users,ou=groups,dc=company,dc=com", True, "LDAP-DenyGroup", True, "has_or"),
("cn=users,ou=groups,dc=company,dc=com", False, "LDAP-RequireGroup", False, "has_and"),
],
)
def test_ldap_group_parametrized(ldap_group, deny, expected_name, expected_revoke, expected_trigger_type):
"""Parametrized test for various LDAP group configurations."""
result = []
output_result, next_order = ldap_group_allow_to_gateway_format(result, ldap_group, deny=deny, start_order=1)
assert len(output_result) == 1
mapper = output_result[0]
assert mapper["name"] == expected_name
assert mapper["revoke"] == expected_revoke
assert expected_trigger_type in mapper["triggers"]["groups"]
assert mapper["triggers"]["groups"][expected_trigger_type] == [ldap_group]
assert next_order == 2
def test_realistic_awx_ldap_migration_scenario():
"""Test realistic scenario from AWX LDAP migration."""
result = []
# Simulate AWX LDAP configuration with both REQUIRE_GROUP and DENY_GROUP
deny_group = "cn=blocked_users,OU=blocked groups,DC=contoso,DC=com"
require_group = "cn=awx_users,OU=application groups,DC=contoso,DC=com"
# Add deny group first (as in the migrator)
result, next_order = ldap_group_allow_to_gateway_format(result, deny_group, deny=True, start_order=1)
# Add require group second
result, next_order = ldap_group_allow_to_gateway_format(result, require_group, deny=False, start_order=next_order)
# Should have 2 allow mappers
assert len(result) == 2
# Verify deny group mapper
deny_mapper = result[0]
assert deny_mapper["name"] == "LDAP-DenyGroup"
assert deny_mapper["map_type"] == "allow"
assert deny_mapper["revoke"] is True
assert deny_mapper["triggers"]["groups"]["has_or"] == [deny_group]
assert deny_mapper["order"] == 1
# Verify require group mapper
require_mapper = result[1]
assert require_mapper["name"] == "LDAP-RequireGroup"
assert require_mapper["map_type"] == "allow"
assert require_mapper["revoke"] is False
assert require_mapper["triggers"]["groups"]["has_and"] == [require_group]
assert require_mapper["order"] == 2
assert next_order == 3