mirror of
https://github.com/ansible/awx.git
synced 2026-03-11 06:29:31 -02:30
Bump migrations and delete some files Resolve remaining conflicts Fix requirements Flake8 fixes Prefer devel changes for schema Use correct versions Remove sso connected stuff Update to modern actions and collection fixes Remove unwated alias Version problems in actions Fix more versioning problems Update warning string Messed it up again Shorten exception More removals Remove pbr license Remove tests deleted in devel Remove unexpected files Remove some content missed in the rebase Use sleep_task from devel Restore devel live conftest file Add in settings that got missed Prefer devel version of collection test Finish repairing .github path Remove unintended test file duplication Undo more unintended file additions
391 lines
13 KiB
Python
391 lines
13 KiB
Python
from __future__ import absolute_import, division, print_function
|
|
|
|
__metaclass__ = type
|
|
|
|
import io
|
|
import os
|
|
import json
|
|
import datetime
|
|
import importlib
|
|
from contextlib import redirect_stdout, suppress
|
|
from unittest import mock
|
|
import logging
|
|
|
|
from requests.models import Response, PreparedRequest
|
|
|
|
import pytest
|
|
|
|
from ansible.module_utils.six import raise_from
|
|
|
|
from ansible_base.rbac.models import RoleDefinition, DABPermission
|
|
from ansible_base.rbac import permission_registry
|
|
|
|
from awx.main.tests.conftest import load_all_credentials # noqa: F401; pylint: disable=unused-import
|
|
from awx.main.tests.functional.conftest import _request
|
|
from awx.main.tests.functional.conftest import credentialtype_scm, credentialtype_ssh # noqa: F401; pylint: disable=unused-import
|
|
from awx.main.models import (
|
|
Organization,
|
|
Project,
|
|
Inventory,
|
|
JobTemplate,
|
|
Credential,
|
|
CredentialType,
|
|
ExecutionEnvironment,
|
|
UnifiedJob,
|
|
WorkflowJobTemplate,
|
|
NotificationTemplate,
|
|
Schedule,
|
|
Team,
|
|
)
|
|
|
|
from django.db import transaction
|
|
|
|
|
|
HAS_TOWER_CLI = False
|
|
HAS_AWX_KIT = False
|
|
logger = logging.getLogger('awx.main.tests')
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def awxkit_path_set(monkeypatch):
|
|
"""Monkey patch sys.path, insert awxkit source code so that
|
|
the package does not need to be installed.
|
|
"""
|
|
base_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir, 'awxkit'))
|
|
monkeypatch.syspath_prepend(base_folder)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def import_awxkit():
|
|
global HAS_TOWER_CLI
|
|
global HAS_AWX_KIT
|
|
try:
|
|
import tower_cli # noqa
|
|
HAS_TOWER_CLI = True
|
|
except ImportError:
|
|
HAS_TOWER_CLI = False
|
|
|
|
try:
|
|
import awxkit # noqa
|
|
HAS_AWX_KIT = True
|
|
except ImportError:
|
|
HAS_AWX_KIT = False
|
|
|
|
|
|
def sanitize_dict(din):
|
|
"""Sanitize Django response data to purge it of internal types
|
|
so it may be used to cast a requests response object
|
|
"""
|
|
if isinstance(din, (int, str, type(None), bool)):
|
|
return din # native JSON types, no problem
|
|
elif isinstance(din, datetime.datetime):
|
|
return din.isoformat()
|
|
elif isinstance(din, list):
|
|
for i in range(len(din)):
|
|
din[i] = sanitize_dict(din[i])
|
|
return din
|
|
elif isinstance(din, dict):
|
|
for k in din.copy().keys():
|
|
din[k] = sanitize_dict(din[k])
|
|
return din
|
|
else:
|
|
return str(din) # translation proxies often not string but stringlike
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def collection_path_set(monkeypatch):
|
|
"""Monkey patch sys.path, insert the root of the collection folder
|
|
so that content can be imported without being fully packaged
|
|
"""
|
|
base_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))
|
|
monkeypatch.syspath_prepend(base_folder)
|
|
|
|
|
|
@pytest.fixture
|
|
def collection_import():
|
|
"""These tests run assuming that the awx_collection folder is inserted
|
|
into the PATH before-hand by collection_path_set.
|
|
But all imports internally to the collection
|
|
go through this fixture so that can be changed if needed.
|
|
For instance, we could switch to fully-qualified import paths.
|
|
"""
|
|
|
|
def rf(path):
|
|
return importlib.import_module(path)
|
|
|
|
return rf
|
|
|
|
|
|
def _process_request_data(kwargs_copy, kwargs):
|
|
"""Helper to process 'data' in request kwargs."""
|
|
if 'data' in kwargs:
|
|
if isinstance(kwargs['data'], dict):
|
|
kwargs_copy['data'] = kwargs['data']
|
|
elif kwargs['data'] is None:
|
|
pass
|
|
elif isinstance(kwargs['data'], str):
|
|
kwargs_copy['data'] = json.loads(kwargs['data'])
|
|
else:
|
|
raise RuntimeError('Expected data to be dict or str, got {0}, data: {1}'.format(type(kwargs['data']), kwargs['data']))
|
|
|
|
|
|
def _process_request_params(kwargs_copy, kwargs, method):
|
|
"""Helper to process 'params' in request kwargs."""
|
|
if 'params' in kwargs and method == 'GET':
|
|
if not kwargs_copy.get('data'):
|
|
kwargs_copy['data'] = {}
|
|
if isinstance(kwargs['params'], dict):
|
|
kwargs_copy['data'].update(kwargs['params'])
|
|
elif isinstance(kwargs['params'], list):
|
|
for k, v in kwargs['params']:
|
|
kwargs_copy['data'][k] = v
|
|
|
|
|
|
def _get_resource_class(resource_module):
|
|
"""Helper to determine the Ansible module resource class."""
|
|
if getattr(resource_module, 'ControllerAWXKitModule', None):
|
|
return resource_module.ControllerAWXKitModule
|
|
elif getattr(resource_module, 'ControllerAPIModule', None):
|
|
return resource_module.ControllerAPIModule
|
|
else:
|
|
raise RuntimeError("The module has neither a ControllerAWXKitModule or a ControllerAPIModule")
|
|
|
|
|
|
def _get_tower_cli_mgr(new_request):
|
|
"""Helper to get the appropriate tower_cli mock context manager."""
|
|
if HAS_TOWER_CLI:
|
|
return mock.patch('tower_cli.api.Session.request', new=new_request)
|
|
elif HAS_AWX_KIT:
|
|
return mock.patch('awxkit.api.client.requests.Session.request', new=new_request)
|
|
else:
|
|
return suppress()
|
|
|
|
|
|
def _run_and_capture_module_output(resource_module, stdout_buffer):
|
|
"""Helper to run the module and capture its stdout."""
|
|
try:
|
|
with redirect_stdout(stdout_buffer):
|
|
resource_module.main()
|
|
except SystemExit:
|
|
pass # A system exit indicates successful execution
|
|
except Exception:
|
|
# dump the stdout back to console for debugging
|
|
print(stdout_buffer.getvalue())
|
|
raise
|
|
|
|
|
|
def _parse_and_handle_module_result(module_stdout):
|
|
"""Helper to parse module output and handle exceptions."""
|
|
try:
|
|
result = json.loads(module_stdout)
|
|
except Exception as e:
|
|
raise_from(Exception('Module did not write valid JSON, error: {0}, stdout:\n{1}'.format(str(e), module_stdout)), e)
|
|
|
|
if 'exception' in result:
|
|
if "ModuleNotFoundError: No module named 'tower_cli'" in result['exception']:
|
|
pytest.skip('The tower-cli library is needed to run this test, module no longer supported.')
|
|
raise Exception('Module encountered error:\n{0}'.format(result['exception']))
|
|
return result
|
|
|
|
|
|
@pytest.fixture
|
|
def run_module(request, collection_import, mocker):
|
|
def rf(module_name, module_params, request_user):
|
|
|
|
def new_request(self, method, url, **kwargs):
|
|
kwargs_copy = kwargs.copy()
|
|
_process_request_data(kwargs_copy, kwargs)
|
|
_process_request_params(kwargs_copy, kwargs, method)
|
|
|
|
# make request
|
|
with transaction.atomic():
|
|
rf_django = _request(method.lower()) # Renamed rf to avoid conflict with outer rf
|
|
django_response = rf_django(url, user=request_user, expect=None, **kwargs_copy)
|
|
|
|
# requests library response object is different from the Django response, but they are the same concept
|
|
# this converts the Django response object into a requests response object for consumption
|
|
resp = Response()
|
|
py_data = django_response.data
|
|
sanitize_dict(py_data)
|
|
resp._content = bytes(json.dumps(django_response.data), encoding='utf8')
|
|
resp.status_code = django_response.status_code
|
|
resp.headers = dict(django_response.headers)
|
|
|
|
if request.config.getoption('verbose') > 0:
|
|
logger.info('%s %s by %s, code:%s', method, '/api/' + url.split('/api/')[1], request_user.username, resp.status_code)
|
|
|
|
resp.request = PreparedRequest()
|
|
resp.request.prepare(method=method, url=url)
|
|
return resp
|
|
|
|
def new_open(self, method, url, **kwargs):
|
|
r = new_request(self, method, url, **kwargs)
|
|
m = mock.MagicMock(read=mock.MagicMock(return_value=r._content), status=r.status_code, getheader=mock.MagicMock(side_effect=r.headers.get))
|
|
return m
|
|
|
|
stdout_buffer = io.StringIO()
|
|
resource_module = collection_import('plugins.modules.{0}'.format(module_name))
|
|
|
|
if not isinstance(module_params, dict):
|
|
raise RuntimeError('Module params must be dict, got {0}'.format(type(module_params)))
|
|
|
|
def mock_load_params(self):
|
|
self.params = module_params
|
|
|
|
resource_class = _get_resource_class(resource_module)
|
|
|
|
with mock.patch.object(resource_class, '_load_params', new=mock_load_params):
|
|
mocker.patch('ansible.module_utils.basic._ANSIBLE_PROFILE', 'legacy')
|
|
|
|
with mock.patch('ansible.module_utils.urls.Request.open', new=new_open):
|
|
with _get_tower_cli_mgr(new_request):
|
|
_run_and_capture_module_output(resource_module, stdout_buffer)
|
|
|
|
module_stdout = stdout_buffer.getvalue().strip()
|
|
result = _parse_and_handle_module_result(module_stdout)
|
|
return result
|
|
|
|
return rf
|
|
|
|
|
|
@pytest.fixture
|
|
def survey_spec():
|
|
return {
|
|
"spec": [{"index": 0, "question_name": "my question?", "default": "mydef", "variable": "myvar", "type": "text", "required": False}],
|
|
"description": "test",
|
|
"name": "test",
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def organization():
|
|
return Organization.objects.create(name='Default')
|
|
|
|
|
|
@pytest.fixture
|
|
def project(organization):
|
|
return Project.objects.create(
|
|
name="test-proj",
|
|
description="test-proj-desc",
|
|
organization=organization,
|
|
playbook_files=['helloworld.yml'],
|
|
local_path='_92__test_proj',
|
|
scm_revision='1234567890123456789012345678901234567890',
|
|
scm_url='localhost',
|
|
scm_type='git',
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def inventory(organization):
|
|
return Inventory.objects.create(name='test-inv', organization=organization)
|
|
|
|
|
|
@pytest.fixture
|
|
def job_template(project, inventory):
|
|
return JobTemplate.objects.create(name='test-jt', project=project, inventory=inventory, playbook='helloworld.yml')
|
|
|
|
|
|
@pytest.fixture
|
|
def team(organization):
|
|
return Team.objects.create(name='test-team', organization=organization)
|
|
|
|
|
|
@pytest.fixture
|
|
def machine_credential(credentialtype_ssh, organization): # noqa: F811
|
|
return Credential.objects.create(credential_type=credentialtype_ssh, name='machine-cred', inputs={'username': 'test_user', 'password': 'pas4word'})
|
|
|
|
|
|
@pytest.fixture
|
|
def vault_credential(organization):
|
|
ct = CredentialType.defaults['vault']()
|
|
ct.save()
|
|
return Credential.objects.create(credential_type=ct, name='vault-cred', inputs={'vault_id': 'foo', 'vault_password': 'pas4word'})
|
|
|
|
|
|
@pytest.fixture
|
|
def kube_credential():
|
|
ct = CredentialType.defaults['kubernetes_bearer_token']()
|
|
ct.save()
|
|
return Credential.objects.create(credential_type=ct, name='kube-cred', inputs={'host': 'my.cluster', 'bearer_token': 'my-token', 'verify_ssl': False})
|
|
|
|
|
|
@pytest.fixture
|
|
def silence_deprecation():
|
|
"""The deprecation warnings are stored in a global variable
|
|
they will create cross-test interference. Use this to turn them off.
|
|
"""
|
|
with mock.patch('ansible.module_utils.basic.AnsibleModule.deprecate') as this_mock:
|
|
yield this_mock
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def silence_warning():
|
|
"""Warnings use global variable, same as deprecations."""
|
|
with mock.patch('ansible.module_utils.basic.AnsibleModule.warn') as this_mock:
|
|
yield this_mock
|
|
|
|
|
|
@pytest.fixture
|
|
def execution_environment():
|
|
return ExecutionEnvironment.objects.create(name="test-ee", description="test-ee", managed=False)
|
|
|
|
|
|
@pytest.fixture(scope='session', autouse=True)
|
|
def mock_has_unpartitioned_events():
|
|
# has_unpartitioned_events determines if there are any events still
|
|
# left in the old, unpartitioned job events table. In order to work,
|
|
# this method looks up when the partition migration occurred. When
|
|
# Django's unit tests run, however, there will be no record of the migration.
|
|
# We mock this out to circumvent the migration query.
|
|
with mock.patch.object(UnifiedJob, 'has_unpartitioned_events', new=False) as _fixture:
|
|
yield _fixture
|
|
|
|
|
|
@pytest.fixture
|
|
def workflow_job_template(organization, inventory):
|
|
return WorkflowJobTemplate.objects.create(name='test-workflow_job_template', organization=organization, inventory=inventory)
|
|
|
|
|
|
@pytest.fixture
|
|
def notification_template(organization):
|
|
return NotificationTemplate.objects.create(
|
|
name='test-notification_template',
|
|
organization=organization,
|
|
notification_type="webhook",
|
|
notification_configuration=dict(
|
|
url="http://localhost",
|
|
username="",
|
|
password="",
|
|
headers={
|
|
"Test": "Header",
|
|
},
|
|
),
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def job_template_role_definition():
|
|
rd = RoleDefinition.objects.create(name='test_view_jt', content_type=permission_registry.content_type_model.objects.get_for_model(JobTemplate))
|
|
permission_codenames = ['view_jobtemplate', 'execute_jobtemplate']
|
|
permissions = DABPermission.objects.filter(codename__in=permission_codenames)
|
|
rd.permissions.add(*permissions)
|
|
return rd
|
|
|
|
|
|
@pytest.fixture
|
|
def scm_credential(credentialtype_scm, organization): # noqa: F811
|
|
return Credential.objects.create(
|
|
credential_type=credentialtype_scm, name='scm-cred', inputs={'username': 'optimus', 'password': 'prime'}, organization=organization
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def rrule():
|
|
return 'DTSTART:20151117T050000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=1'
|
|
|
|
|
|
@pytest.fixture
|
|
def schedule(job_template, rrule):
|
|
return Schedule.objects.create(unified_job_template=job_template, name='test-sched', rrule=rrule)
|