Configure Tower in Tower:

* Add separate Django app for configuration: awx.conf.
* Migrate from existing main.TowerSettings model to conf.Setting.
* Add settings wrapper to allow get/set/del via django.conf.settings.
* Update existing references to tower_settings to use django.conf.settings.
* Add a settings registry to allow for each Django app to register configurable settings.
* Support setting validation and conversion using Django REST Framework fields.
* Add /api/v1/settings/ to display a list of setting categories.
* Add /api/v1/settings/<slug>/ to display all settings in a category as a single object.
* Allow PUT/PATCH to update setting singleton, DELETE to reset to defaults.
* Add "all" category to display all settings across categories.
* Add "changed" category to display only settings configured in the database.
* Support per-user settings via "user" category (/api/v1/settings/user/).
* Support defaults for user settings via "user-defaults" category (/api/v1/settings/user-defaults/).
* Update serializer metadata to support category, category_slug and placeholder on OPTIONS responses.
* Update serializer metadata to handle child fields of a list/dict.
* Hide raw data form in browsable API for OPTIONS and DELETE.
* Combine existing licensing code into single "TaskEnhancer" class.
* Move license helper functions from awx.api.license into awx.conf.license.
* Update /api/v1/config/ to read/verify/update license using TaskEnhancer and settings wrapper.
* Add support for caching settings accessed via settings wrapper.
* Invalidate cached settings when Setting model changes or is deleted.
* Preload all database settings into cache on first access via settings wrapper.
* Add support for read-only settings than can update their value depending on other settings.
* Use setting_changed signal whenever a setting changes.
* Register configurable authentication, jobs, system and ui settings.
* Register configurable LDAP, RADIUS and social auth settings.
* Add custom fields and validators for URL, LDAP, RADIUS and social auth settings.
* Rewrite existing validator for Credential ssh_private_key to support validating private keys, certs or combinations of both.
* Get all unit/functional tests working with above changes.
* Add "migrate_to_database_settings" command to determine settings to be migrated into the database and comment them out when set in Python settings files.
* Add support for migrating license key from file to database.
* Remove database-configuable settings from local_settings.py example files.
* Update setup role to no longer install files for database-configurable settings.

f 94ff6ee More settings work.
f af4c4e0 Even more db settings stuff.
f 96ea9c0 More settings, attempt at singleton serializer for settings.
f 937c760 More work on singleton/category views in API, add code to comment out settings in Python files, work on command to migrate settings to database.
f 425b0d3 Minor fixes for sprint demo.
f ea402a4 Add support for read-only settings, cleanup license engine, get license support working with DB settings.
f ec289e4 Rename migration, minor fixmes, update setup role.
f 603640b Rewrite key/cert validator, finish adding social auth fields, hook up signals for setting_changed, use None to imply a setting is not set.
f 67d1b5a Get functional/unit tests passing.
f 2919b62 Flake8 fixes.
f e62f421 Add redbaron to requirements, get file to database migration working (except for license).
f c564508 Add support for migrating license file.
f 982f767 Add support for regex in social map fields.
This commit is contained in:
Chris Church
2016-09-26 22:14:47 -04:00
parent 609a3e6f2f
commit 6ebe45b1bd
92 changed files with 4401 additions and 1791 deletions

View File

@@ -31,8 +31,8 @@ from django.utils.encoding import force_text
# AWX
from awx.main.models import * # noqa
from awx.main.management.commands.run_task_system import run_taskmanager
from awx.main.task_engine import TaskEnhancer
from awx.main.utils import get_ansible_version
from awx.main.task_engine import TaskEngager as LicenseWriter
from awx.sso.backends import LDAPSettings
from awx.main.tests.URI import URI # noqa
@@ -143,35 +143,25 @@ class BaseTestMixin(MockCommonlySlowTestMixin):
return __name__ + '-generated-' + string + rnd_str
def create_test_license_file(self, instance_count=10000, license_date=int(time.time() + 3600), features=None):
writer = LicenseWriter(
settings.LICENSE = TaskEnhancer(
company_name='AWX',
contact_name='AWX Admin',
contact_email='awx@example.com',
license_date=license_date,
instance_count=instance_count,
license_type='enterprise',
features=features)
handle, license_path = tempfile.mkstemp(suffix='.json')
os.close(handle)
writer.write_file(license_path)
self._temp_paths.append(license_path)
os.environ['AWX_LICENSE_FILE'] = license_path
cache.clear()
features=features,
).enhance()
def create_basic_license_file(self, instance_count=100, license_date=int(time.time() + 3600)):
writer = LicenseWriter(
settings.LICENSE = TaskEnhancer(
company_name='AWX',
contact_name='AWX Admin',
contact_email='awx@example.com',
license_date=license_date,
instance_count=instance_count,
license_type='basic')
handle, license_path = tempfile.mkstemp(suffix='.json')
os.close(handle)
writer.write_file(license_path)
self._temp_paths.append(license_path)
os.environ['AWX_LICENSE_FILE'] = license_path
cache.clear()
license_type='basic',
).enhance()
def create_expired_license_file(self, instance_count=1000, grace_period=False):
license_date = time.time() - 1

View File

@@ -1,3 +1,31 @@
TEST_SSH_RSA1_KEY_DATA = '''-----BEGIN PRIVATE KEY-----
uFZFyag7VVqI+q/oGnQu+wj/pMi5ox+Qz5L3W0D745DzwgDXOeObAfNlr9NtIKbn
sZ5E0+rYB4Q/U0CYr5juNJQV1dbxq2Em1160axboe2QbvX6wE6Sm6wW9b9cr+PoF
MoYQebUnCY0ObrLbrRugSfZc17lyxK0ZGRgPXKhpMg6Ecv8XpvhjUYU9Esyqfuco
/p26Q140/HsHeHYNma0dQHCEjMr/qEzOY1qguHj+hRf3SARtM9Q+YNgpxchcDDVS
O+n+8Ljd/p82bpEJwxmpXealeWbI6gB9/R6wcCL+ZyCZpnHJd/NJ809Vtu47ZdDi
E6jvqS/3AQhuQKhJlLSDIzezB2VKKrHwOvHkg/+uLoCqHN34Gk6Qio7x69SvXy88
a7q9D1l/Zx60o08FyZyqlo7l0l/r8EY+36cuI/lvAvfxc5VHVEOvKseUjFRBiCv9
MkKNxaScoYsPwY7SIS6gD93tg3eM5pA0nfMfya9u1+uq/QCM1gNG3mm6Zd8YG4c/
Dx4bmsj8cp5ni/Ffl/sKzKYq1THunJEFGXOZRibdxk/Fal3SQrRAwy7CgLQL8SMh
IWqcFm25OtSOP1r1LE25t5pQsMdmp0IP2fEF0t/pXPm1ZfrTurPMqpo4FGm2hkki
U3sH/o6nrkSOjklOLWlwtTkkL4dWPlNwc8OYj8zFizXJkAfv1spzhv3lRouNkw4N
Mm22W7us2f3Ob0H5C07k26h6VuXX+0AybD4tIIcUXCLoNTqA0HvqhKpEuHu3Ck10
RaB8xHTxgwdhGVaNHMfy9B9l4tNs3Tb5k0LyeRRGVDhWCFo6axYULYebkj+hFLLY
+JE5RzPDFpTf1xbuT+e56H/lLFCUdDu0bn+D0W4ifXaVFegak4r6O4B53CbMqr+R
t6qDPKLUIuVJXK0J6Ay6XgmheXJGbgKh4OtDsc06gsTCE1nY4f/Z82AQahPBfTtF
J2z+NHdsLPn//HlxspGQtmLpuS7Wx0HYXZ+kPRSiE/vmITw85R2u8JSHQicVNN4C
2rlUo15TIU3tTx+WUIrHKHPidUNNotRb2p9n9FoSidU6upKnQHAT/JNv/zcvaia3
Bhl/wagheWTDnFKSmJ4HlKxplM/32h6MfHqsMVOl4F6eZWKaKgSgN8doXyFJo+sc
yAC6S0gJlD2gQI24iTI4Du1+UGh2MGb69eChvi5mbbdesaZrlR1dRqZpHG+6ob4H
nYLndRvobXS5l6pgGTDRYoUgSbQe21a7Uf3soGl5jHqLWc1zEPwrxV7Wr31mApr6
8VtGZcLSr0691Q1NLO3eIfuhbMN2mssX/Sl4t+4BibaucNIMfmhKQi8uHtwAXb47
+TMFlG2EQhZULFM4fLdF1vaizInU3cBk8lsz8i71tDc+5VQTEwoEB7Gksy/XZWEt
6SGHxXUDtNYa+G2O+sQhgqBjLIkVTV6KJOpvNZM+s8Vzv8qoFnD7isKBBrRvF1bP
GOXEG1jd7nSR0WSwcMCHGOrFEELDQPw3k5jqEdPFgVODoZPr+drZVnVz5SAGBk5Y
wsCNaDW+1dABYFlqRTepP5rrSu9wHnRAZ3ZGv+DHoGqenIC5IBR0sQ==
-----END PRIVATE KEY-----'''
TEST_SSH_KEY_DATA = '''-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAyQ8F5bbgjHvk4SZJsKI9OmJKMFxZqRhvx4LaqjLTKbBwRBsY
1/C00NPiZn70dKbeyV7RNVZxuzM6yd3D3lwTdbDu/eJ0x72t3ch+TdLt/aenyy10

View File

@@ -6,28 +6,27 @@ from awx.main.models.activity_stream import ActivityStream
from awx.main.access import ActivityStreamAccess
from django.core.urlresolvers import reverse
from django.conf import settings
def mock_feature_enabled(feature, bypass_database=None):
def mock_feature_enabled(feature):
return True
@pytest.fixture
def activity_stream_entry(organization, org_admin):
return ActivityStream.objects.filter(organization__pk=organization.pk, user=org_admin, operation='associate').first()
@pytest.mark.skipif(not getattr(settings, 'ACTIVITY_STREAM_ENABLED', True), reason="Activity stream not enabled")
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_get_activity_stream_list(monkeypatch, organization, get, user):
def test_get_activity_stream_list(monkeypatch, organization, get, user, settings):
settings.ACTIVITY_STREAM_ENABLED = True
url = reverse('api:activity_stream_list')
response = get(url, user('admin', True))
assert response.status_code == 200
@pytest.mark.skipif(not getattr(settings, 'ACTIVITY_STREAM_ENABLED', True), reason="Activity stream not enabled")
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_basic_fields(monkeypatch, organization, get, user):
def test_basic_fields(monkeypatch, organization, get, user, settings):
settings.ACTIVITY_STREAM_ENABLED = True
u = user('admin', True)
activity_stream = ActivityStream.objects.filter(organization=organization).latest('pk')
activity_stream.actor = u
@@ -44,10 +43,10 @@ def test_basic_fields(monkeypatch, organization, get, user):
assert 'organization' in response.data['summary_fields']
assert response.data['summary_fields']['organization'][0]['name'] == 'test-org'
@pytest.mark.skipif(not getattr(settings, 'ACTIVITY_STREAM_ENABLED', True), reason="Activity stream not enabled")
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_middleware_actor_added(monkeypatch, post, get, user):
def test_middleware_actor_added(monkeypatch, post, get, user, settings):
settings.ACTIVITY_STREAM_ENABLED = True
u = user('admin-poster', True)
url = reverse('api:organization_list')
@@ -66,21 +65,19 @@ def test_middleware_actor_added(monkeypatch, post, get, user):
assert response.status_code == 200
assert response.data['summary_fields']['actor']['username'] == 'admin-poster'
@pytest.mark.skipif(not getattr(settings, 'ACTIVITY_STREAM_ENABLED', True), reason="Activity stream not enabled")
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_rbac_stream_resource_roles(activity_stream_entry, organization, org_admin):
def test_rbac_stream_resource_roles(activity_stream_entry, organization, org_admin, settings):
settings.ACTIVITY_STREAM_ENABLED = True
assert activity_stream_entry.user.first() == org_admin
assert activity_stream_entry.organization.first() == organization
assert activity_stream_entry.role.first() == organization.admin_role
assert activity_stream_entry.object_relationship_type == 'awx.main.models.organization.Organization.admin_role'
@pytest.mark.skipif(not getattr(settings, 'ACTIVITY_STREAM_ENABLED', True), reason="Activity stream not enabled")
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_rbac_stream_user_roles(activity_stream_entry, organization, org_admin):
def test_rbac_stream_user_roles(activity_stream_entry, organization, org_admin, settings):
settings.ACTIVITY_STREAM_ENABLED = True
assert activity_stream_entry.user.first() == org_admin
assert activity_stream_entry.organization.first() == organization
assert activity_stream_entry.role.first() == organization.admin_role
@@ -88,9 +85,9 @@ def test_rbac_stream_user_roles(activity_stream_entry, organization, org_admin):
@pytest.mark.django_db
@pytest.mark.activity_stream_access
@pytest.mark.skipif(not getattr(settings, 'ACTIVITY_STREAM_ENABLED', True), reason="Activity stream not enabled")
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
def test_stream_access_cant_change(activity_stream_entry, organization, org_admin):
def test_stream_access_cant_change(activity_stream_entry, organization, org_admin, settings):
settings.ACTIVITY_STREAM_ENABLED = True
access = ActivityStreamAccess(org_admin)
# These should always return false because the activity stream can not be edited
assert not access.can_add(activity_stream_entry)
@@ -99,12 +96,12 @@ def test_stream_access_cant_change(activity_stream_entry, organization, org_admi
@pytest.mark.django_db
@pytest.mark.activity_stream_access
@pytest.mark.skipif(not getattr(settings, 'ACTIVITY_STREAM_ENABLED', True), reason="Activity stream not enabled")
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
def test_stream_queryset_hides_shows_items(
activity_stream_entry, organization, user, org_admin,
project, org_credential, inventory, label, deploy_jobtemplate,
notification_template, group, host, team):
notification_template, group, host, team, settings):
settings.ACTIVITY_STREAM_ENABLED = True
# this user is not in any organizations and should not see any resource activity
no_access_user = user('no-access-user', False)
queryset = ActivityStreamAccess(no_access_user).get_queryset()

View File

@@ -13,10 +13,10 @@ from awx.main.utils import timestamp_apiformat
from django.core.urlresolvers import reverse
from django.utils import timezone
def mock_feature_enabled(feature, bypass_database=None):
def mock_feature_enabled(feature):
return True
def mock_feature_disabled(feature, bypass_database=None):
def mock_feature_disabled(feature):
return False
def setup_common(hosts, fact_scans, get, user, epoch=timezone.now(), get_params={}, host_count=1):

View File

@@ -6,10 +6,10 @@ from awx.main.utils import timestamp_apiformat
from django.core.urlresolvers import reverse
from django.utils import timezone
def mock_feature_enabled(feature, bypass_database=None):
def mock_feature_enabled(feature):
return True
def mock_feature_disabled(feature, bypass_database=None):
def mock_feature_disabled(feature):
return False
# TODO: Consider making the fact_scan() fixture a Class, instead of a function, and move this method into it

View File

@@ -99,7 +99,7 @@ def test_organization_inventory_list(organization, inventory_factory, get, alice
@pytest.mark.django_db
@mock.patch('awx.api.views.feature_enabled', lambda feature,bypass_db=None: True)
@mock.patch('awx.api.views.feature_enabled', lambda feature: True)
def test_create_organization(post, admin, alice):
new_org = {
'name': 'new org',
@@ -111,7 +111,7 @@ def test_create_organization(post, admin, alice):
@pytest.mark.django_db
@mock.patch('awx.api.views.feature_enabled', lambda feature,bypass_db=None: True)
@mock.patch('awx.api.views.feature_enabled', lambda feature: True)
def test_create_organization_xfail(post, alice):
new_org = {
'name': 'new org',

View File

@@ -6,7 +6,7 @@ from django.core.urlresolvers import reverse
from awx.main.models.jobs import JobTemplate, Job
from awx.main.models.activity_stream import ActivityStream
from awx.api.license import LicenseForbids
from awx.conf.license import LicenseForbids
from awx.main.access import JobTemplateAccess

View File

@@ -16,10 +16,10 @@ from awx.main.management.commands.cleanup_facts import CleanupFacts, Command
from awx.main.models.fact import Fact
from awx.main.models.inventory import Host
def mock_feature_enabled(feature, bypass_database=None):
def mock_feature_enabled(feature):
return True
def mock_feature_disabled(feature, bypass_database=None):
def mock_feature_disabled(feature):
return False
@pytest.mark.django_db

View File

@@ -1,28 +1,23 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
import json
import mock
import os
import tempfile
import time
import pytest
from datetime import datetime
from awx.main.models import Host
from awx.main.task_engine import TaskSerializer, TaskEngager
from awx.main.task_engine import TaskEnhancer
@pytest.mark.django_db
def test_license_writer(inventory, admin):
writer = TaskEngager(
task_enhancer = TaskEnhancer(
company_name='acmecorp',
contact_name='Michael DeHaan',
contact_email='michael@ansibleworks.com',
license_date=25000, # seconds since epoch
instance_count=500)
data = writer.get_data()
data = task_enhancer.enhance()
Host.objects.bulk_create(
[
@@ -42,13 +37,7 @@ def test_license_writer(inventory, admin):
assert data['license_date'] == 25000
assert data['license_key'] == "11bae31f31c6a6cdcb483a278cdbe98bd8ac5761acd7163a50090b0f098b3a13"
strdata = writer.get_string()
strdata_loaded = json.loads(strdata)
assert strdata_loaded == data
reader = TaskSerializer()
vdata = reader.from_string(strdata)
vdata = task_enhancer.validate_enhancements()
assert vdata['available_instances'] == 500
assert vdata['current_instances'] == 12
@@ -63,70 +52,41 @@ def test_license_writer(inventory, admin):
@pytest.mark.django_db
def test_expired_licenses():
reader = TaskSerializer()
writer = TaskEngager(
task_enhancer = TaskEnhancer(
company_name='Tower',
contact_name='Tower Admin',
contact_email='tower@ansible.com',
license_date=int(time.time() - 3600),
instance_count=100,
trial=True)
strdata = writer.get_string()
vdata = reader.from_string(strdata)
task_enhancer.enhance()
vdata = task_enhancer.validate_enhancements()
assert vdata['compliant'] is False
assert vdata['grace_period_remaining'] < 0
writer = TaskEngager(
task_enhancer = TaskEnhancer(
company_name='Tower',
contact_name='Tower Admin',
contact_email='tower@ansible.com',
license_date=int(time.time() - 2592001),
instance_count=100,
trial=False)
strdata = writer.get_string()
vdata = reader.from_string(strdata)
task_enhancer.enhance()
vdata = task_enhancer.validate_enhancements()
assert vdata['compliant'] is False
assert vdata['grace_period_remaining'] < 0
writer = TaskEngager(
task_enhancer = TaskEnhancer(
company_name='Tower',
contact_name='Tower Admin',
contact_email='tower@ansible.com',
license_date=int(time.time() - 3600),
instance_count=100,
trial=False)
strdata = writer.get_string()
vdata = reader.from_string(strdata)
task_enhancer.enhance()
vdata = task_enhancer.validate_enhancements()
assert vdata['compliant'] is False
assert vdata['grace_period_remaining'] > 0
@pytest.mark.django_db
def test_aws_license():
os.environ['AWX_LICENSE_FILE'] = 'non-existent-license-file.json'
h, path = tempfile.mkstemp()
with os.fdopen(h, 'w') as f:
json.dump({'instance_count': 100}, f)
def fetch_ami(_self):
_self.attributes['ami-id'] = 'ami-00000000'
return True
def fetch_instance(_self):
_self.attributes['instance-id'] = 'i-00000000'
return True
with mock.patch('awx.main.task_engine.TEMPORARY_TASK_FILE', path):
with mock.patch('awx.main.task_engine.TemporaryTaskEngine.fetch_ami', fetch_ami):
with mock.patch('awx.main.task_engine.TemporaryTaskEngine.fetch_instance', fetch_instance):
reader = TaskSerializer()
license = reader.from_file()
assert license['is_aws']
assert license['time_remaining']
assert license['free_instances'] > 0
assert license['grace_period_remaining'] > 0
os.unlink(path)

View File

@@ -5,7 +5,7 @@ from django.db import transaction
from django.core.urlresolvers import reverse
from awx.main.models.rbac import Role, ROLE_SINGLETON_SYSTEM_ADMINISTRATOR
def mock_feature_enabled(feature, bypass_database=None):
def mock_feature_enabled(feature):
return True
#@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)

View File

@@ -20,7 +20,6 @@ from crum import impersonate
# AWX
from awx.main.utils import * # noqa
from awx.main.models import * # noqa
from awx.main.conf import tower_settings
from awx.main.tests.base import BaseJobExecutionTest
from awx.main.tests.data.ssh import (
TEST_SSH_KEY_DATA,
@@ -572,14 +571,14 @@ class AdHocCommandApiTest(BaseAdHocCommandTest):
# Try to relaunch ad hoc command when module has been removed from
# allowed list of modules.
try:
ad_hoc_commands = tower_settings.AD_HOC_COMMANDS
tower_settings.AD_HOC_COMMANDS = []
ad_hoc_commands = settings.AD_HOC_COMMANDS
settings.AD_HOC_COMMANDS = []
with self.current_user('admin'):
response = self.get(url, expect=200)
self.assertEqual(response['passwords_needed_to_start'], [])
response = self.post(url, {}, expect=400)
finally:
tower_settings.AD_HOC_COMMANDS = ad_hoc_commands
settings.AD_HOC_COMMANDS = ad_hoc_commands
# Try to relaunch after the inventory has been marked inactive.
self.inventory.delete()

View File

@@ -15,7 +15,6 @@ from django.test.utils import override_settings
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest
from awx.main.conf import tower_settings
__all__ = ['AuthTokenTimeoutTest', 'AuthTokenLimitTest', 'AuthTokenProxyTest', 'UsersTest', 'LdapTest']
@@ -38,7 +37,7 @@ class AuthTokenTimeoutTest(BaseTest):
response = self._generic_rest(dashboard_url, expect=200, method='get', return_response_object=True, client_kwargs=kwargs)
self.assertIn('Auth-Token-Timeout', response)
self.assertEqual(response['Auth-Token-Timeout'], str(tower_settings.AUTH_TOKEN_EXPIRATION))
self.assertEqual(response['Auth-Token-Timeout'], str(settings.AUTH_TOKEN_EXPIRATION))
class AuthTokenLimitTest(BaseTest):
def setUp(self):

View File

@@ -0,0 +1,6 @@
import pytest
@pytest.fixture(autouse=True)
def _disable_database_settings(mocker):
mocker.patch('awx.conf.settings.SettingsWrapper._get_supported_settings', return_value=[])

View File

@@ -1,56 +0,0 @@
from django.core.exceptions import ValidationError
from awx.main.models.credential import validate_ssh_private_key
import pytest
def test_valid_rsa_key():
begin = """-----BEGIN RSA PRIVATE KEY-----"""
end = """-----END RSA PRIVATE KEY-----"""
unvalidated_key = build_key(begin, body, end)
key_data = validate_ssh_private_key(unvalidated_key)
assert key_data['key_type'] == 'rsa'
def test_invalid_key():
unvalidated_key = build_key(key_begin, body, "END KEY")
with pytest.raises(ValidationError):
validate_ssh_private_key(unvalidated_key)
def test_key_type_empty():
unvalidated_key = build_key(key_begin, body, key_end)
key_data = validate_ssh_private_key(unvalidated_key)
assert key_data['key_type'] == 'rsa1'
def build_key(begin, body, end):
return """%s%s%s""" % (begin, body, end)
key_begin = """-----BEGIN PRIVATE KEY-----"""
key_end = """-----END PRIVATE KEY-----"""
body = """
uFZFyag7VVqI+q/oGnQu+wj/pMi5ox+Qz5L3W0D745DzwgDXOeObAfNlr9NtIKbn
sZ5E0+rYB4Q/U0CYr5juNJQV1dbxq2Em1160axboe2QbvX6wE6Sm6wW9b9cr+PoF
MoYQebUnCY0ObrLbrRugSfZc17lyxK0ZGRgPXKhpMg6Ecv8XpvhjUYU9Esyqfuco
/p26Q140/HsHeHYNma0dQHCEjMr/qEzOY1qguHj+hRf3SARtM9Q+YNgpxchcDDVS
O+n+8Ljd/p82bpEJwxmpXealeWbI6gB9/R6wcCL+ZyCZpnHJd/NJ809Vtu47ZdDi
E6jvqS/3AQhuQKhJlLSDIzezB2VKKrHwOvHkg/+uLoCqHN34Gk6Qio7x69SvXy88
a7q9D1l/Zx60o08FyZyqlo7l0l/r8EY+36cuI/lvAvfxc5VHVEOvKseUjFRBiCv9
MkKNxaScoYsPwY7SIS6gD93tg3eM5pA0nfMfya9u1+uq/QCM1gNG3mm6Zd8YG4c/
Dx4bmsj8cp5ni/Ffl/sKzKYq1THunJEFGXOZRibdxk/Fal3SQrRAwy7CgLQL8SMh
IWqcFm25OtSOP1r1LE25t5pQsMdmp0IP2fEF0t/pXPm1ZfrTurPMqpo4FGm2hkki
U3sH/o6nrkSOjklOLWlwtTkkL4dWPlNwc8OYj8zFizXJkAfv1spzhv3lRouNkw4N
Mm22W7us2f3Ob0H5C07k26h6VuXX+0AybD4tIIcUXCLoNTqA0HvqhKpEuHu3Ck10
RaB8xHTxgwdhGVaNHMfy9B9l4tNs3Tb5k0LyeRRGVDhWCFo6axYULYebkj+hFLLY
+JE5RzPDFpTf1xbuT+e56H/lLFCUdDu0bn+D0W4ifXaVFegak4r6O4B53CbMqr+R
t6qDPKLUIuVJXK0J6Ay6XgmheXJGbgKh4OtDsc06gsTCE1nY4f/Z82AQahPBfTtF
J2z+NHdsLPn//HlxspGQtmLpuS7Wx0HYXZ+kPRSiE/vmITw85R2u8JSHQicVNN4C
2rlUo15TIU3tTx+WUIrHKHPidUNNotRb2p9n9FoSidU6upKnQHAT/JNv/zcvaia3
Bhl/wagheWTDnFKSmJ4HlKxplM/32h6MfHqsMVOl4F6eZWKaKgSgN8doXyFJo+sc
yAC6S0gJlD2gQI24iTI4Du1+UGh2MGb69eChvi5mbbdesaZrlR1dRqZpHG+6ob4H
nYLndRvobXS5l6pgGTDRYoUgSbQe21a7Uf3soGl5jHqLWc1zEPwrxV7Wr31mApr6
8VtGZcLSr0691Q1NLO3eIfuhbMN2mssX/Sl4t+4BibaucNIMfmhKQi8uHtwAXb47
+TMFlG2EQhZULFM4fLdF1vaizInU3cBk8lsz8i71tDc+5VQTEwoEB7Gksy/XZWEt
6SGHxXUDtNYa+G2O+sQhgqBjLIkVTV6KJOpvNZM+s8Vzv8qoFnD7isKBBrRvF1bP
GOXEG1jd7nSR0WSwcMCHGOrFEELDQPw3k5jqEdPFgVODoZPr+drZVnVz5SAGBk5Y
wsCNaDW+1dABYFlqRTepP5rrSu9wHnRAZ3ZGv+DHoGqenIC5IBR0sQ==
"""

View File

@@ -10,9 +10,7 @@ from awx.main.tasks import (
send_notifications,
run_administrative_checks,
)
from awx.main.task_engine import TaskSerializer
from awx.main.task_engine import TaskEnhancer
@contextmanager
def apply_patches(_patches):
@@ -51,12 +49,11 @@ def test_send_notifications_list(mocker):
@pytest.mark.parametrize("current_instances,call_count", [(91, 2), (89,1)])
def test_run_admin_checks_usage(mocker, current_instances, call_count):
patches = list()
patches.append(mocker.patch('awx.main.tasks.tower_settings'))
patches.append(mocker.patch('awx.main.tasks.User'))
mock_ts = mocker.Mock(spec=TaskSerializer)
mock_ts.from_database.return_value = {'instance_count': 100, 'current_instances': current_instances}
patches.append(mocker.patch('awx.main.tasks.TaskSerializer', return_value=mock_ts))
mock_te = mocker.Mock(spec=TaskEnhancer)
mock_te.validate_enhancements.return_value = {'instance_count': 100, 'current_instances': current_instances, 'date_warning': True}
patches.append(mocker.patch('awx.main.tasks.TaskEnhancer', return_value=mock_te))
mock_sm = mocker.Mock()
patches.append(mocker.patch('awx.main.tasks.send_mail', wraps=mock_sm))

View File

@@ -0,0 +1,91 @@
from django.core.exceptions import ValidationError
from awx.main.validators import (
validate_private_key,
validate_certificate,
validate_ssh_private_key,
)
from awx.main.tests.data.ssh import (
TEST_SSH_RSA1_KEY_DATA,
TEST_SSH_KEY_DATA,
TEST_SSH_KEY_DATA_LOCKED,
TEST_OPENSSH_KEY_DATA,
TEST_OPENSSH_KEY_DATA_LOCKED,
TEST_SSH_CERT_KEY,
)
import pytest
def test_valid_rsa_key():
valid_key = TEST_SSH_KEY_DATA
pem_objects = validate_private_key(valid_key)
assert pem_objects[0]['key_type'] == 'rsa'
assert not pem_objects[0]['key_enc']
with pytest.raises(ValidationError):
validate_certificate(valid_key)
pem_objects = validate_ssh_private_key(valid_key)
assert pem_objects[0]['key_type'] == 'rsa'
assert not pem_objects[0]['key_enc']
def test_valid_locked_rsa_key():
valid_key = TEST_SSH_KEY_DATA_LOCKED
pem_objects = validate_private_key(valid_key)
assert pem_objects[0]['key_type'] == 'rsa'
assert pem_objects[0]['key_enc']
with pytest.raises(ValidationError):
validate_certificate(valid_key)
pem_objects = validate_ssh_private_key(valid_key)
assert pem_objects[0]['key_type'] == 'rsa'
assert pem_objects[0]['key_enc']
def test_invalid_rsa_key():
invalid_key = TEST_SSH_KEY_DATA.replace('-----END', '----END')
with pytest.raises(ValidationError):
validate_private_key(invalid_key)
with pytest.raises(ValidationError):
validate_certificate(invalid_key)
with pytest.raises(ValidationError):
validate_ssh_private_key(invalid_key)
def test_valid_openssh_key():
valid_key = TEST_OPENSSH_KEY_DATA
pem_objects = validate_private_key(valid_key)
assert pem_objects[0]['key_type'] == 'ed25519'
assert not pem_objects[0]['key_enc']
with pytest.raises(ValidationError):
validate_certificate(valid_key)
pem_objects = validate_ssh_private_key(valid_key)
assert pem_objects[0]['key_type'] == 'ed25519'
assert not pem_objects[0]['key_enc']
def test_valid_locked_openssh_key():
valid_key = TEST_OPENSSH_KEY_DATA_LOCKED
pem_objects = validate_private_key(valid_key)
assert pem_objects[0]['key_type'] == 'ed25519'
assert pem_objects[0]['key_enc']
with pytest.raises(ValidationError):
validate_certificate(valid_key)
pem_objects = validate_ssh_private_key(valid_key)
assert pem_objects[0]['key_type'] == 'ed25519'
assert pem_objects[0]['key_enc']
def test_valid_rsa1_key():
valid_key = TEST_SSH_RSA1_KEY_DATA
pem_objects = validate_ssh_private_key(valid_key)
assert pem_objects[0]['key_type'] == 'rsa1'
assert not pem_objects[0]['key_enc']
with pytest.raises(ValidationError):
validate_certificate(valid_key)
pem_objects = validate_ssh_private_key(valid_key)
assert pem_objects[0]['key_type'] == 'rsa1'
assert not pem_objects[0]['key_enc']
def test_cert_with_key():
cert_with_key = TEST_SSH_CERT_KEY
with pytest.raises(ValidationError):
validate_private_key(cert_with_key)
with pytest.raises(ValidationError):
validate_certificate(cert_with_key)
pem_objects = validate_ssh_private_key(cert_with_key)
assert pem_objects[0]['type'] == 'CERTIFICATE'
assert pem_objects[1]['key_type'] == 'rsa'
assert not pem_objects[1]['key_enc']