mirror of
https://github.com/ansible/awx.git
synced 2026-01-21 22:48:02 -03:30
Merge pull request #2840 from anoek/test-porting
Ported several old tests to our new test system
This commit is contained in:
commit
33547259e2
43
awx/main/tests/URI.py
Normal file
43
awx/main/tests/URI.py
Normal file
@ -0,0 +1,43 @@
|
||||
# Helps with test cases.
|
||||
# Save all components of a uri (i.e. scheme, username, password, etc.) so that
|
||||
# when we construct a uri string and decompose it, we can verify the decomposition
|
||||
class URI(object):
|
||||
DEFAULTS = {
|
||||
'scheme' : 'http',
|
||||
'username' : 'MYUSERNAME',
|
||||
'password' : 'MYPASSWORD',
|
||||
'host' : 'host.com',
|
||||
}
|
||||
|
||||
def __init__(self, description='N/A', scheme=DEFAULTS['scheme'], username=DEFAULTS['username'], password=DEFAULTS['password'], host=DEFAULTS['host']):
|
||||
self.description = description
|
||||
self.scheme = scheme
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.host = host
|
||||
|
||||
def get_uri(self):
|
||||
uri = "%s://" % self.scheme
|
||||
if self.username:
|
||||
uri += "%s" % self.username
|
||||
if self.password:
|
||||
uri += ":%s" % self.password
|
||||
if (self.username or self.password) and self.host is not None:
|
||||
uri += "@%s" % self.host
|
||||
elif self.host is not None:
|
||||
uri += "%s" % self.host
|
||||
return uri
|
||||
|
||||
def get_secret_count(self):
|
||||
secret_count = 0
|
||||
if self.username:
|
||||
secret_count += 1
|
||||
if self.password:
|
||||
secret_count += 1
|
||||
return secret_count
|
||||
|
||||
def __string__(self):
|
||||
return self.get_uri()
|
||||
|
||||
def __repr__(self):
|
||||
return self.get_uri()
|
||||
@ -35,6 +35,7 @@ from awx.main.management.commands.run_task_system import run_taskmanager
|
||||
from awx.main.utils import get_ansible_version
|
||||
from awx.main.task_engine import TaskEngager as LicenseWriter
|
||||
from awx.sso.backends import LDAPSettings
|
||||
from awx.main.tests.URI import URI # noqa
|
||||
|
||||
TEST_PLAYBOOK = '''- hosts: mygroup
|
||||
gather_facts: false
|
||||
@ -732,48 +733,3 @@ class BaseJobExecutionTest(QueueStartStopTestMixin, BaseLiveServerTest):
|
||||
'''
|
||||
Base class for celery task tests.
|
||||
'''
|
||||
|
||||
# Helps with test cases.
|
||||
# Save all components of a uri (i.e. scheme, username, password, etc.) so that
|
||||
# when we construct a uri string and decompose it, we can verify the decomposition
|
||||
class URI(object):
|
||||
DEFAULTS = {
|
||||
'scheme' : 'http',
|
||||
'username' : 'MYUSERNAME',
|
||||
'password' : 'MYPASSWORD',
|
||||
'host' : 'host.com',
|
||||
}
|
||||
|
||||
def __init__(self, description='N/A', scheme=DEFAULTS['scheme'], username=DEFAULTS['username'], password=DEFAULTS['password'], host=DEFAULTS['host']):
|
||||
self.description = description
|
||||
self.scheme = scheme
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.host = host
|
||||
|
||||
def get_uri(self):
|
||||
uri = "%s://" % self.scheme
|
||||
if self.username:
|
||||
uri += "%s" % self.username
|
||||
if self.password:
|
||||
uri += ":%s" % self.password
|
||||
if (self.username or self.password) and self.host is not None:
|
||||
uri += "@%s" % self.host
|
||||
elif self.host is not None:
|
||||
uri += "%s" % self.host
|
||||
return uri
|
||||
|
||||
def get_secret_count(self):
|
||||
secret_count = 0
|
||||
if self.username:
|
||||
secret_count += 1
|
||||
if self.password:
|
||||
secret_count += 1
|
||||
return secret_count
|
||||
|
||||
def __string__(self):
|
||||
return self.get_uri()
|
||||
|
||||
def __repr__(self):
|
||||
return self.get_uri()
|
||||
|
||||
|
||||
190
awx/main/tests/functional/api/test_organizations.py
Normal file
190
awx/main/tests/functional/api/test_organizations.py
Normal file
@ -0,0 +1,190 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
# Python
|
||||
import pytest
|
||||
import mock
|
||||
|
||||
|
||||
# Django
|
||||
from django.core.urlresolvers import reverse
|
||||
|
||||
# AWX
|
||||
from awx.main.models import * # noqa
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_organization_list_access_tests(options, head, get, admin, alice):
|
||||
options(reverse('api:organization_list'), user=admin, expect=200)
|
||||
head(reverse('api:organization_list'), user=admin, expect=200)
|
||||
get(reverse('api:organization_list'), user=admin, expect=200)
|
||||
options(reverse('api:organization_list'), user=alice, expect=200)
|
||||
head(reverse('api:organization_list'), user=alice, expect=200)
|
||||
get(reverse('api:organization_list'), user=alice, expect=200)
|
||||
options(reverse('api:organization_list'), user=None, expect=401)
|
||||
head(reverse('api:organization_list'), user=None, expect=401)
|
||||
get(reverse('api:organization_list'), user=None, expect=401)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_organization_access_tests(organization, get, admin, alice, bob):
|
||||
organization.member_role.members.add(alice)
|
||||
get(reverse('api:organization_detail', args=(organization.id,)), user=admin, expect=200)
|
||||
get(reverse('api:organization_detail', args=(organization.id,)), user=alice, expect=200)
|
||||
get(reverse('api:organization_detail', args=(organization.id,)), user=bob, expect=403)
|
||||
get(reverse('api:organization_detail', args=(organization.id,)), user=None, expect=401)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_organization_list_integrity(organization, get, admin, alice):
|
||||
res = get(reverse('api:organization_list'), user=admin)
|
||||
for field in ['id', 'url', 'name', 'description', 'created']:
|
||||
assert field in res.data['results'][0]
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_organization_list_visibility(organizations, get, admin, alice):
|
||||
orgs = organizations(2)
|
||||
|
||||
res = get(reverse('api:organization_list'), user=admin)
|
||||
assert res.data['count'] == 2
|
||||
assert len(res.data['results']) == 2
|
||||
|
||||
res = get(reverse('api:organization_list'), user=alice)
|
||||
assert res.data['count'] == 0
|
||||
|
||||
orgs[1].member_role.members.add(alice)
|
||||
|
||||
res = get(reverse('api:organization_list'), user=alice)
|
||||
assert res.data['count'] == 1
|
||||
assert len(res.data['results']) == 1
|
||||
assert res.data['results'][0]['id'] == orgs[1].id
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_organization_project_list(organization, project_factory, get, alice, bob, rando):
|
||||
prj1 = project_factory('project-one')
|
||||
project_factory('project-two')
|
||||
organization.admin_role.members.add(alice)
|
||||
organization.member_role.members.add(bob)
|
||||
prj1.use_role.members.add(bob)
|
||||
assert get(reverse('api:organization_projects_list', args=(organization.id,)), user=alice).data['count'] == 2
|
||||
assert get(reverse('api:organization_projects_list', args=(organization.id,)), user=bob).data['count'] == 1
|
||||
assert get(reverse('api:organization_projects_list', args=(organization.id,)), user=rando).status_code == 403
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_organization_user_list(organization, get, admin, alice, bob):
|
||||
organization.admin_role.members.add(alice)
|
||||
organization.member_role.members.add(alice)
|
||||
organization.member_role.members.add(bob)
|
||||
assert get(reverse('api:organization_users_list', args=(organization.id,)), user=admin).data['count'] == 2
|
||||
assert get(reverse('api:organization_users_list', args=(organization.id,)), user=alice).data['count'] == 2
|
||||
assert get(reverse('api:organization_users_list', args=(organization.id,)), user=bob).data['count'] == 2
|
||||
assert get(reverse('api:organization_admins_list', args=(organization.id,)), user=admin).data['count'] == 1
|
||||
assert get(reverse('api:organization_admins_list', args=(organization.id,)), user=alice).data['count'] == 1
|
||||
assert get(reverse('api:organization_admins_list', args=(organization.id,)), user=bob).data['count'] == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_organization_inventory_list(organization, inventory_factory, get, alice, bob, rando):
|
||||
inv1 = inventory_factory('inventory-one')
|
||||
inventory_factory('inventory-two')
|
||||
organization.admin_role.members.add(alice)
|
||||
organization.member_role.members.add(bob)
|
||||
inv1.use_role.members.add(bob)
|
||||
assert get(reverse('api:organization_inventories_list', args=(organization.id,)), user=alice).data['count'] == 2
|
||||
assert get(reverse('api:organization_inventories_list', args=(organization.id,)), user=bob).data['count'] == 1
|
||||
get(reverse('api:organization_inventories_list', args=(organization.id,)), user=rando, expect=403)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.api.views.feature_enabled', lambda feature,bypass_db=None: True)
|
||||
def test_create_organization(post, admin, alice):
|
||||
new_org = {
|
||||
'name': 'new org',
|
||||
'description': 'my description'
|
||||
}
|
||||
res = post(reverse('api:organization_list'), new_org, user=admin, expect=201)
|
||||
assert res.data['name'] == new_org['name']
|
||||
res = post(reverse('api:organization_list'), new_org, user=admin, expect=400)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.api.views.feature_enabled', lambda feature,bypass_db=None: True)
|
||||
def test_create_organization_xfail(post, alice):
|
||||
new_org = {
|
||||
'name': 'new org',
|
||||
'description': 'my description'
|
||||
}
|
||||
post(reverse('api:organization_list'), new_org, user=alice, expect=403)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_add_user_to_organization(post, organization, alice, bob):
|
||||
organization.admin_role.members.add(alice)
|
||||
post(reverse('api:organization_users_list', args=(organization.id,)), {'id': bob.id}, user=alice, expect=204)
|
||||
assert bob in organization.member_role
|
||||
post(reverse('api:organization_users_list', args=(organization.id,)), {'id': bob.id, 'disassociate': True} , user=alice, expect=204)
|
||||
assert bob not in organization.member_role
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_add_user_to_organization_xfail(post, organization, alice, bob):
|
||||
organization.member_role.members.add(alice)
|
||||
post(reverse('api:organization_users_list', args=(organization.id,)), {'id': bob.id}, user=alice, expect=403)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_add_admin_to_organization(post, organization, alice, bob):
|
||||
organization.admin_role.members.add(alice)
|
||||
post(reverse('api:organization_admins_list', args=(organization.id,)), {'id': bob.id}, user=alice, expect=204)
|
||||
assert bob in organization.admin_role
|
||||
assert bob in organization.member_role
|
||||
post(reverse('api:organization_admins_list', args=(organization.id,)), {'id': bob.id, 'disassociate': True} , user=alice, expect=204)
|
||||
assert bob not in organization.admin_role
|
||||
assert bob not in organization.member_role
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_add_admin_to_organization_xfail(post, organization, alice, bob):
|
||||
organization.member_role.members.add(alice)
|
||||
post(reverse('api:organization_admins_list', args=(organization.id,)), {'id': bob.id}, user=alice, expect=403)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_update_organization(get, put, organization, alice, bob):
|
||||
organization.admin_role.members.add(alice)
|
||||
data = get(reverse('api:organization_detail', args=(organization.id,)), user=alice, expect=200).data
|
||||
data['description'] = 'hi'
|
||||
put(reverse('api:organization_detail', args=(organization.id,)), data, user=alice, expect=200)
|
||||
organization.refresh_from_db()
|
||||
assert organization.description == 'hi'
|
||||
data['description'] = 'bye'
|
||||
put(reverse('api:organization_detail', args=(organization.id,)), data, user=bob, expect=403)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.main.access.BaseAccess.check_license', lambda *a, **kw: True)
|
||||
def test_delete_organization(delete, organization, admin):
|
||||
delete(reverse('api:organization_detail', args=(organization.id,)), user=admin, expect=204)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.main.access.BaseAccess.check_license', lambda *a, **kw: True)
|
||||
def test_delete_organization2(delete, organization, alice):
|
||||
organization.admin_role.members.add(alice)
|
||||
delete(reverse('api:organization_detail', args=(organization.id,)), user=alice, expect=204)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.main.access.BaseAccess.check_license', lambda *a, **kw: True)
|
||||
def test_delete_organization_xfail1(delete, organization, alice):
|
||||
organization.member_role.members.add(alice)
|
||||
delete(reverse('api:organization_detail', args=(organization.id,)), user=alice, expect=403)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@mock.patch('awx.main.access.BaseAccess.check_license', lambda *a, **kw: True)
|
||||
def test_delete_organization_xfail2(delete, organization):
|
||||
delete(reverse('api:organization_detail', args=(organization.id,)), user=None, expect=401)
|
||||
@ -1,8 +1,83 @@
|
||||
import pytest
|
||||
|
||||
from awx.main.models import UnifiedJob
|
||||
from django.core.urlresolvers import reverse
|
||||
|
||||
from awx.main.models import UnifiedJob, ProjectUpdate
|
||||
from awx.main.tests.base import URI
|
||||
|
||||
|
||||
TEST_STDOUTS = []
|
||||
uri = URI(scheme="https", username="Dhh3U47nmC26xk9PKscV", password="PXPfWW8YzYrgS@E5NbQ2H@", host="github.ginger.com/theirrepo.git/info/refs")
|
||||
TEST_STDOUTS.append({
|
||||
'description': 'uri in a plain text document',
|
||||
'uri' : uri,
|
||||
'text' : 'hello world %s goodbye world' % uri,
|
||||
'occurrences' : 1
|
||||
})
|
||||
|
||||
uri = URI(scheme="https", username="applepie@@@", password="thatyouknow@@@@", host="github.ginger.com/theirrepo.git/info/refs")
|
||||
TEST_STDOUTS.append({
|
||||
'description': 'uri appears twice in a multiline plain text document',
|
||||
'uri' : uri,
|
||||
'text' : 'hello world %s \n\nyoyo\n\nhello\n%s' % (uri, uri),
|
||||
'occurrences' : 2
|
||||
})
|
||||
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_cases(project):
|
||||
ret = []
|
||||
for e in TEST_STDOUTS:
|
||||
e['project'] = ProjectUpdate(project=project)
|
||||
e['project'].result_stdout_text = e['text']
|
||||
e['project'].save()
|
||||
ret.append(e)
|
||||
return ret
|
||||
|
||||
@pytest.fixture
|
||||
def negative_test_cases(job_factory):
|
||||
ret = []
|
||||
for e in TEST_STDOUTS:
|
||||
e['job'] = job_factory()
|
||||
e['job'].result_stdout_text = e['text']
|
||||
e['job'].save()
|
||||
ret.append(e)
|
||||
return ret
|
||||
|
||||
|
||||
formats = [
|
||||
('json', 'application/json'),
|
||||
('ansi', 'text/plain'),
|
||||
('txt', 'text/plain'),
|
||||
('html', 'text/html'),
|
||||
]
|
||||
|
||||
@pytest.mark.parametrize("format,content_type", formats)
|
||||
@pytest.mark.django_db
|
||||
def test_project_update_redaction_enabled(get, format, content_type, test_cases, admin):
|
||||
for test_data in test_cases:
|
||||
job = test_data['project']
|
||||
response = get(reverse("api:project_update_stdout", args=(job.pk,)) + "?format=" + format, user=admin, expect=200, accept=content_type)
|
||||
assert content_type in response['CONTENT-TYPE']
|
||||
assert response.data is not None
|
||||
content = response.data['content'] if format == 'json' else response.data
|
||||
assert test_data['uri'].username not in content
|
||||
assert test_data['uri'].password not in content
|
||||
assert content.count(test_data['uri'].host) == test_data['occurrences']
|
||||
|
||||
@pytest.mark.parametrize("format,content_type", formats)
|
||||
@pytest.mark.django_db
|
||||
def test_job_redaction_disabled(get, format, content_type, negative_test_cases, admin):
|
||||
for test_data in negative_test_cases:
|
||||
job = test_data['job']
|
||||
response = get(reverse("api:job_stdout", args=(job.pk,)) + "?format=" + format, user=admin, expect=200, format=format)
|
||||
content = response.data['content'] if format == 'json' else response.data
|
||||
assert response.data is not None
|
||||
assert test_data['uri'].username in content
|
||||
assert test_data['uri'].password in content
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_options_fields_choices(instance, options, user):
|
||||
|
||||
@ -15,3 +90,4 @@ def test_options_fields_choices(instance, options, user):
|
||||
assert 'choice' == response.data['actions']['GET']['status']['type']
|
||||
assert UnifiedJob.STATUS_CHOICES == response.data['actions']['GET']['status']['choices']
|
||||
|
||||
|
||||
|
||||
@ -124,6 +124,12 @@ def project_factory(organization):
|
||||
return prj
|
||||
return factory
|
||||
|
||||
@pytest.fixture
|
||||
def job_factory(job_template, admin):
|
||||
def factory(job_template=job_template, initial_state='new', created_by=admin):
|
||||
return job_template.create_job(created_by=created_by, status=initial_state)
|
||||
return factory
|
||||
|
||||
@pytest.fixture
|
||||
def team_factory(organization):
|
||||
def factory(name):
|
||||
@ -292,17 +298,23 @@ def permissions():
|
||||
'update':False, 'delete':False, 'scm_update':False, 'execute':False, 'use':True,},
|
||||
}
|
||||
|
||||
@pytest.fixture
|
||||
def post():
|
||||
def rf(url, data, user=None, middleware=None, expect=None, **kwargs):
|
||||
view, view_args, view_kwargs = resolve(urlparse(url)[2])
|
||||
|
||||
def _request(verb):
|
||||
def rf(url, data_or_user=None, user=None, middleware=None, expect=None, **kwargs):
|
||||
if type(data_or_user) is User and user is None:
|
||||
user = data_or_user
|
||||
elif 'data' not in kwargs:
|
||||
kwargs['data'] = data_or_user
|
||||
if 'format' not in kwargs:
|
||||
kwargs['format'] = 'json'
|
||||
request = APIRequestFactory().post(url, data, **kwargs)
|
||||
|
||||
view, view_args, view_kwargs = resolve(urlparse(url)[2])
|
||||
request = getattr(APIRequestFactory(), verb)(url, **kwargs)
|
||||
if middleware:
|
||||
middleware.process_request(request)
|
||||
if user:
|
||||
force_authenticate(request, user=user)
|
||||
|
||||
response = view(request, *view_args, **view_kwargs)
|
||||
if middleware:
|
||||
middleware.process_response(request, response)
|
||||
@ -310,134 +322,37 @@ def post():
|
||||
if response.status_code != expect:
|
||||
print(response.data)
|
||||
assert response.status_code == expect
|
||||
response.render()
|
||||
return response
|
||||
return rf
|
||||
|
||||
@pytest.fixture
|
||||
def post():
|
||||
return _request('post')
|
||||
|
||||
@pytest.fixture
|
||||
def get():
|
||||
def rf(url, user=None, middleware=None, expect=None, **kwargs):
|
||||
view, view_args, view_kwargs = resolve(urlparse(url)[2])
|
||||
if 'format' not in kwargs:
|
||||
kwargs['format'] = 'json'
|
||||
request = APIRequestFactory().get(url, **kwargs)
|
||||
if middleware:
|
||||
middleware.process_request(request)
|
||||
if user:
|
||||
force_authenticate(request, user=user)
|
||||
response = view(request, *view_args, **view_kwargs)
|
||||
if middleware:
|
||||
middleware.process_response(request, response)
|
||||
if expect:
|
||||
if response.status_code != expect:
|
||||
print(response.data)
|
||||
assert response.status_code == expect
|
||||
return response
|
||||
return rf
|
||||
return _request('get')
|
||||
|
||||
@pytest.fixture
|
||||
def put():
|
||||
def rf(url, data, user=None, middleware=None, expect=None, **kwargs):
|
||||
view, view_args, view_kwargs = resolve(urlparse(url)[2])
|
||||
if 'format' not in kwargs:
|
||||
kwargs['format'] = 'json'
|
||||
request = APIRequestFactory().put(url, data, **kwargs)
|
||||
if middleware:
|
||||
middleware.process_request(request)
|
||||
if user:
|
||||
force_authenticate(request, user=user)
|
||||
response = view(request, *view_args, **view_kwargs)
|
||||
if middleware:
|
||||
middleware.process_response(request, response)
|
||||
if expect:
|
||||
if response.status_code != expect:
|
||||
print(response.data)
|
||||
assert response.status_code == expect
|
||||
return response
|
||||
return rf
|
||||
return _request('put')
|
||||
|
||||
@pytest.fixture
|
||||
def patch():
|
||||
def rf(url, data, user=None, middleware=None, expect=None, **kwargs):
|
||||
view, view_args, view_kwargs = resolve(urlparse(url)[2])
|
||||
if 'format' not in kwargs:
|
||||
kwargs['format'] = 'json'
|
||||
request = APIRequestFactory().patch(url, data, **kwargs)
|
||||
if middleware:
|
||||
middleware.process_request(request)
|
||||
if user:
|
||||
force_authenticate(request, user=user)
|
||||
response = view(request, *view_args, **view_kwargs)
|
||||
if middleware:
|
||||
middleware.process_response(request, response)
|
||||
if expect:
|
||||
if response.status_code != expect:
|
||||
print(response.data)
|
||||
assert response.status_code == expect
|
||||
return response
|
||||
return rf
|
||||
return _request('patch')
|
||||
|
||||
@pytest.fixture
|
||||
def delete():
|
||||
def rf(url, user=None, middleware=None, expect=None, **kwargs):
|
||||
view, view_args, view_kwargs = resolve(urlparse(url)[2])
|
||||
if 'format' not in kwargs:
|
||||
kwargs['format'] = 'json'
|
||||
request = APIRequestFactory().delete(url, **kwargs)
|
||||
if middleware:
|
||||
middleware.process_request(request)
|
||||
if user:
|
||||
force_authenticate(request, user=user)
|
||||
response = view(request, *view_args, **view_kwargs)
|
||||
if middleware:
|
||||
middleware.process_response(request, response)
|
||||
if expect:
|
||||
if response.status_code != expect:
|
||||
print(response.data)
|
||||
assert response.status_code == expect
|
||||
return response
|
||||
return rf
|
||||
return _request('delete')
|
||||
|
||||
@pytest.fixture
|
||||
def head():
|
||||
def rf(url, user=None, middleware=None, expect=None, **kwargs):
|
||||
view, view_args, view_kwargs = resolve(urlparse(url)[2])
|
||||
if 'format' not in kwargs:
|
||||
kwargs['format'] = 'json'
|
||||
request = APIRequestFactory().head(url, **kwargs)
|
||||
if middleware:
|
||||
middleware.process_request(request)
|
||||
if user:
|
||||
force_authenticate(request, user=user)
|
||||
response = view(request, *view_args, **view_kwargs)
|
||||
if middleware:
|
||||
middleware.process_response(request, response)
|
||||
if expect:
|
||||
if response.status_code != expect:
|
||||
print(response.data)
|
||||
assert response.status_code == expect
|
||||
return response
|
||||
return rf
|
||||
return _request('head')
|
||||
|
||||
@pytest.fixture
|
||||
def options():
|
||||
def rf(url, data, user=None, middleware=None, expect=None, **kwargs):
|
||||
view, view_args, view_kwargs = resolve(urlparse(url)[2])
|
||||
if 'format' not in kwargs:
|
||||
kwargs['format'] = 'json'
|
||||
request = APIRequestFactory().options(url, data, **kwargs)
|
||||
if middleware:
|
||||
middleware.process_request(request)
|
||||
if user:
|
||||
force_authenticate(request, user=user)
|
||||
response = view(request, *view_args, **view_kwargs)
|
||||
if middleware:
|
||||
middleware.process_response(request, response)
|
||||
if expect:
|
||||
if response.status_code != expect:
|
||||
print(response.data)
|
||||
assert response.status_code == expect
|
||||
return response
|
||||
return rf
|
||||
return _request('options')
|
||||
|
||||
|
||||
|
||||
|
||||
132
awx/main/tests/functional/core/test_licenses.py
Normal file
132
awx/main/tests/functional/core/test_licenses.py
Normal file
@ -0,0 +1,132 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
import json
|
||||
import mock
|
||||
import os
|
||||
import tempfile
|
||||
import time
|
||||
import pytest
|
||||
from datetime import datetime
|
||||
|
||||
from awx.main.models import Host
|
||||
from awx.main.task_engine import TaskSerializer, TaskEngager
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_license_writer(inventory, admin):
|
||||
writer = TaskEngager(
|
||||
company_name='acmecorp',
|
||||
contact_name='Michael DeHaan',
|
||||
contact_email='michael@ansibleworks.com',
|
||||
license_date=25000, # seconds since epoch
|
||||
instance_count=500)
|
||||
|
||||
data = writer.get_data()
|
||||
|
||||
Host.objects.bulk_create(
|
||||
[
|
||||
Host(
|
||||
name='host.%d' % n,
|
||||
inventory=inventory,
|
||||
created_by=admin,
|
||||
modified=datetime.now(),
|
||||
created=datetime.now())
|
||||
for n in range(12)
|
||||
]
|
||||
)
|
||||
|
||||
assert data['instance_count'] == 500
|
||||
assert data['contact_name'] == 'Michael DeHaan'
|
||||
assert data['contact_email'] == 'michael@ansibleworks.com'
|
||||
assert data['license_date'] == 25000
|
||||
assert data['license_key'] == "11bae31f31c6a6cdcb483a278cdbe98bd8ac5761acd7163a50090b0f098b3a13"
|
||||
|
||||
strdata = writer.get_string()
|
||||
strdata_loaded = json.loads(strdata)
|
||||
assert strdata_loaded == data
|
||||
|
||||
reader = TaskSerializer()
|
||||
|
||||
vdata = reader.from_string(strdata)
|
||||
|
||||
assert vdata['available_instances'] == 500
|
||||
assert vdata['current_instances'] == 12
|
||||
assert vdata['free_instances'] == 488
|
||||
assert vdata['date_warning'] is True
|
||||
assert vdata['date_expired'] is True
|
||||
assert vdata['license_date'] == 25000
|
||||
assert vdata['time_remaining'] < 0
|
||||
assert vdata['valid_key'] is True
|
||||
assert vdata['compliant'] is False
|
||||
assert vdata['subscription_name']
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_expired_licenses():
|
||||
reader = TaskSerializer()
|
||||
writer = TaskEngager(
|
||||
company_name='Tower',
|
||||
contact_name='Tower Admin',
|
||||
contact_email='tower@ansible.com',
|
||||
license_date=int(time.time() - 3600),
|
||||
instance_count=100,
|
||||
trial=True)
|
||||
strdata = writer.get_string()
|
||||
vdata = reader.from_string(strdata)
|
||||
|
||||
assert vdata['compliant'] is False
|
||||
assert vdata['grace_period_remaining'] < 0
|
||||
|
||||
writer = TaskEngager(
|
||||
company_name='Tower',
|
||||
contact_name='Tower Admin',
|
||||
contact_email='tower@ansible.com',
|
||||
license_date=int(time.time() - 2592001),
|
||||
instance_count=100,
|
||||
trial=False)
|
||||
strdata = writer.get_string()
|
||||
vdata = reader.from_string(strdata)
|
||||
|
||||
assert vdata['compliant'] is False
|
||||
assert vdata['grace_period_remaining'] < 0
|
||||
|
||||
writer = TaskEngager(
|
||||
company_name='Tower',
|
||||
contact_name='Tower Admin',
|
||||
contact_email='tower@ansible.com',
|
||||
license_date=int(time.time() - 3600),
|
||||
instance_count=100,
|
||||
trial=False)
|
||||
strdata = writer.get_string()
|
||||
vdata = reader.from_string(strdata)
|
||||
|
||||
assert vdata['compliant'] is False
|
||||
assert vdata['grace_period_remaining'] > 0
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_aws_license():
|
||||
os.environ['AWX_LICENSE_FILE'] = 'non-existent-license-file.json'
|
||||
|
||||
h, path = tempfile.mkstemp()
|
||||
with os.fdopen(h, 'w') as f:
|
||||
json.dump({'instance_count': 100}, f)
|
||||
|
||||
def fetch_ami(_self):
|
||||
_self.attributes['ami-id'] = 'ami-00000000'
|
||||
return True
|
||||
|
||||
def fetch_instance(_self):
|
||||
_self.attributes['instance-id'] = 'i-00000000'
|
||||
return True
|
||||
|
||||
with mock.patch('awx.main.task_engine.TEMPORARY_TASK_FILE', path):
|
||||
with mock.patch('awx.main.task_engine.TemporaryTaskEngine.fetch_ami', fetch_ami):
|
||||
with mock.patch('awx.main.task_engine.TemporaryTaskEngine.fetch_instance', fetch_instance):
|
||||
reader = TaskSerializer()
|
||||
license = reader.from_file()
|
||||
assert license['is_aws']
|
||||
assert license['time_remaining']
|
||||
assert license['free_instances'] > 0
|
||||
assert license['grace_period_remaining'] > 0
|
||||
|
||||
os.unlink(path)
|
||||
39
awx/main/tests/functional/test_auth_token_limit.py
Normal file
39
awx/main/tests/functional/test_auth_token_limit.py
Normal file
@ -0,0 +1,39 @@
|
||||
import pytest
|
||||
from datetime import timedelta
|
||||
|
||||
from django.utils.timezone import now as tz_now
|
||||
from django.test.utils import override_settings
|
||||
|
||||
from awx.main.models import AuthToken, User
|
||||
|
||||
|
||||
@override_settings(AUTH_TOKEN_PER_USER=3)
|
||||
@pytest.mark.django_db
|
||||
def test_get_tokens_over_limit():
|
||||
now = tz_now()
|
||||
# Times are relative to now
|
||||
# (key, created on in seconds , expiration in seconds)
|
||||
test_data = [
|
||||
# a is implicitly expired
|
||||
("a", -1000, -10),
|
||||
# b's are invalid due to session limit of 3
|
||||
("b", -100, 60),
|
||||
("bb", -100, 60),
|
||||
("c", -90, 70),
|
||||
("d", -80, 80),
|
||||
("e", -70, 90),
|
||||
]
|
||||
user = User.objects.create_superuser('admin', 'foo@bar.com', 'password')
|
||||
for key, t_create, t_expire in test_data:
|
||||
AuthToken.objects.create(
|
||||
user=user,
|
||||
key=key,
|
||||
request_hash='this_is_a_hash',
|
||||
created=now + timedelta(seconds=t_create),
|
||||
expires=now + timedelta(seconds=t_expire),
|
||||
)
|
||||
invalid_tokens = AuthToken.get_tokens_over_limit(user, now=now)
|
||||
invalid_keys = [x.key for x in invalid_tokens]
|
||||
assert len(invalid_keys) == 2
|
||||
assert 'b' in invalid_keys
|
||||
assert 'bb' in invalid_keys
|
||||
@ -1,24 +0,0 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
|
||||
# Python
|
||||
import mock
|
||||
|
||||
# Django
|
||||
from django.test import SimpleTestCase
|
||||
|
||||
# AWX
|
||||
from awx.main.models import * # noqa
|
||||
from awx.main.ha import * # noqa
|
||||
|
||||
__all__ = ['HAUnitTest',]
|
||||
|
||||
class HAUnitTest(SimpleTestCase):
|
||||
|
||||
@mock.patch('awx.main.models.Instance.objects.count', return_value=2)
|
||||
def test_multiple_instances(self, ignore):
|
||||
self.assertTrue(is_ha_environment())
|
||||
|
||||
@mock.patch('awx.main.models.Instance.objects.count', return_value=1)
|
||||
def test_db_localhost(self, ignore):
|
||||
self.assertFalse(is_ha_environment())
|
||||
|
||||
@ -1,147 +0,0 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
from awx.main.models import Host, Inventory, Organization
|
||||
from awx.main.tests.base import BaseTest
|
||||
import awx.main.task_engine
|
||||
from awx.main.task_engine import * # noqa
|
||||
|
||||
class LicenseTests(BaseTest):
|
||||
|
||||
def setUp(self):
|
||||
self.start_redis()
|
||||
self.setup_instances()
|
||||
super(LicenseTests, self).setUp()
|
||||
self.setup_users()
|
||||
u = self.super_django_user
|
||||
org = Organization.objects.create(name='o1', created_by=u)
|
||||
org.admin_role.members.add(self.normal_django_user)
|
||||
self.inventory = Inventory.objects.create(name='hi', organization=org, created_by=u)
|
||||
Host.objects.create(name='a1', inventory=self.inventory, created_by=u)
|
||||
Host.objects.create(name='a2', inventory=self.inventory, created_by=u)
|
||||
Host.objects.create(name='a3', inventory=self.inventory, created_by=u)
|
||||
Host.objects.create(name='a4', inventory=self.inventory, created_by=u)
|
||||
Host.objects.create(name='a5', inventory=self.inventory, created_by=u)
|
||||
Host.objects.create(name='a6', inventory=self.inventory, created_by=u)
|
||||
Host.objects.create(name='a7', inventory=self.inventory, created_by=u)
|
||||
Host.objects.create(name='a8', inventory=self.inventory, created_by=u)
|
||||
Host.objects.create(name='a9', inventory=self.inventory, created_by=u)
|
||||
Host.objects.create(name='a10', inventory=self.inventory, created_by=u)
|
||||
Host.objects.create(name='a11', inventory=self.inventory, created_by=u)
|
||||
Host.objects.create(name='a12', inventory=self.inventory, created_by=u)
|
||||
self._temp_task_file = awx.main.task_engine.TEMPORARY_TASK_FILE
|
||||
self._temp_task_fetch_ami = awx.main.task_engine.TemporaryTaskEngine.fetch_ami
|
||||
self._temp_task_fetch_instance = awx.main.task_engine.TemporaryTaskEngine.fetch_instance
|
||||
|
||||
def tearDown(self):
|
||||
awx.main.task_engine.TEMPORARY_TASK_FILE = self._temp_task_file
|
||||
awx.main.task_engine.TemporaryTaskEngine.fetch_ami = self._temp_task_fetch_ami
|
||||
awx.main.task_engine.TemporaryTaskEngine.fetch_instance = self._temp_task_fetch_instance
|
||||
super(LicenseTests, self).tearDown()
|
||||
self.stop_redis()
|
||||
|
||||
def test_license_writer(self):
|
||||
|
||||
writer = TaskEngager(
|
||||
company_name='acmecorp',
|
||||
contact_name='Michael DeHaan',
|
||||
contact_email='michael@ansibleworks.com',
|
||||
license_date=25000, # seconds since epoch
|
||||
instance_count=500)
|
||||
|
||||
data = writer.get_data()
|
||||
|
||||
assert data['instance_count'] == 500
|
||||
assert data['contact_name'] == 'Michael DeHaan'
|
||||
assert data['contact_email'] == 'michael@ansibleworks.com'
|
||||
assert data['license_date'] == 25000
|
||||
assert data['license_key'] == "11bae31f31c6a6cdcb483a278cdbe98bd8ac5761acd7163a50090b0f098b3a13"
|
||||
|
||||
strdata = writer.get_string()
|
||||
strdata_loaded = json.loads(strdata)
|
||||
assert strdata_loaded == data
|
||||
|
||||
reader = TaskSerializer()
|
||||
|
||||
vdata = reader.from_string(strdata)
|
||||
|
||||
assert vdata['available_instances'] == 500
|
||||
assert vdata['current_instances'] == 12
|
||||
assert vdata['free_instances'] == 488
|
||||
assert vdata['date_warning'] is True
|
||||
assert vdata['date_expired'] is True
|
||||
assert vdata['license_date'] == 25000
|
||||
assert vdata['time_remaining'] < 0
|
||||
assert vdata['valid_key'] is True
|
||||
assert vdata['compliant'] is False
|
||||
assert vdata['subscription_name']
|
||||
|
||||
def test_expired_licenses(self):
|
||||
reader = TaskSerializer()
|
||||
writer = TaskEngager(
|
||||
company_name='Tower',
|
||||
contact_name='Tower Admin',
|
||||
contact_email='tower@ansible.com',
|
||||
license_date=int(time.time() - 3600),
|
||||
instance_count=100,
|
||||
trial=True)
|
||||
strdata = writer.get_string()
|
||||
vdata = reader.from_string(strdata)
|
||||
|
||||
assert vdata['compliant'] is False
|
||||
assert vdata['grace_period_remaining'] < 0
|
||||
|
||||
writer = TaskEngager(
|
||||
company_name='Tower',
|
||||
contact_name='Tower Admin',
|
||||
contact_email='tower@ansible.com',
|
||||
license_date=int(time.time() - 2592001),
|
||||
instance_count=100,
|
||||
trial=False)
|
||||
strdata = writer.get_string()
|
||||
vdata = reader.from_string(strdata)
|
||||
|
||||
assert vdata['compliant'] is False
|
||||
assert vdata['grace_period_remaining'] < 0
|
||||
|
||||
writer = TaskEngager(
|
||||
company_name='Tower',
|
||||
contact_name='Tower Admin',
|
||||
contact_email='tower@ansible.com',
|
||||
license_date=int(time.time() - 3600),
|
||||
instance_count=100,
|
||||
trial=False)
|
||||
strdata = writer.get_string()
|
||||
vdata = reader.from_string(strdata)
|
||||
|
||||
assert vdata['compliant'] is False
|
||||
assert vdata['grace_period_remaining'] > 0
|
||||
|
||||
def test_aws_license(self):
|
||||
os.environ['AWX_LICENSE_FILE'] = 'non-existent-license-file.json'
|
||||
h, path = tempfile.mkstemp()
|
||||
self._temp_paths.append(path)
|
||||
with os.fdopen(h, 'w') as f:
|
||||
json.dump({'instance_count': 100}, f)
|
||||
awx.main.task_engine.TEMPORARY_TASK_FILE = path
|
||||
|
||||
def fetch_ami(_self):
|
||||
_self.attributes['ami-id'] = 'ami-00000000'
|
||||
return True
|
||||
|
||||
def fetch_instance(_self):
|
||||
_self.attributes['instance-id'] = 'i-00000000'
|
||||
return True
|
||||
|
||||
awx.main.task_engine.TemporaryTaskEngine.fetch_ami = fetch_ami
|
||||
awx.main.task_engine.TemporaryTaskEngine.fetch_instance = fetch_instance
|
||||
reader = TaskSerializer()
|
||||
license = reader.from_file()
|
||||
self.assertTrue(license['is_aws'])
|
||||
self.assertTrue(license['time_remaining'])
|
||||
self.assertTrue(license['free_instances'] > 0)
|
||||
self.assertTrue(license['grace_period_remaining'] > 0)
|
||||
@ -1,413 +0,0 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
# Python
|
||||
from datetime import timedelta
|
||||
|
||||
# Django
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.test.utils import override_settings
|
||||
from django.contrib.auth.models import User
|
||||
from django.utils.timezone import now as tz_now
|
||||
|
||||
# AWX
|
||||
from awx.main.models import * # noqa
|
||||
from awx.main.tests.base import BaseTest
|
||||
|
||||
__all__ = ['AuthTokenLimitUnitTest', 'OrganizationsTest']
|
||||
|
||||
class AuthTokenLimitUnitTest(BaseTest):
|
||||
|
||||
def setUp(self):
|
||||
self.now = tz_now()
|
||||
# Times are relative to now
|
||||
# (key, created on in seconds , expiration in seconds)
|
||||
self.test_data = [
|
||||
# a is implicitly expired
|
||||
("a", -1000, -10),
|
||||
# b's are invalid due to session limit of 3
|
||||
("b", -100, 60),
|
||||
("bb", -100, 60),
|
||||
("c", -90, 70),
|
||||
("d", -80, 80),
|
||||
("e", -70, 90),
|
||||
]
|
||||
self.user = User.objects.create_superuser('admin', 'foo@bar.com', 'password')
|
||||
for key, t_create, t_expire in self.test_data:
|
||||
AuthToken.objects.create(
|
||||
user=self.user,
|
||||
key=key,
|
||||
request_hash='this_is_a_hash',
|
||||
created=self.now + timedelta(seconds=t_create),
|
||||
expires=self.now + timedelta(seconds=t_expire),
|
||||
)
|
||||
super(AuthTokenLimitUnitTest, self).setUp()
|
||||
|
||||
@override_settings(AUTH_TOKEN_PER_USER=3)
|
||||
def test_get_tokens_over_limit(self):
|
||||
invalid_tokens = AuthToken.get_tokens_over_limit(self.user, now=self.now)
|
||||
invalid_keys = [x.key for x in invalid_tokens]
|
||||
self.assertEqual(len(invalid_keys), 2)
|
||||
self.assertIn('b', invalid_keys)
|
||||
self.assertIn('bb', invalid_keys)
|
||||
|
||||
class OrganizationsTest(BaseTest):
|
||||
|
||||
def collection(self):
|
||||
return reverse('api:organization_list')
|
||||
|
||||
def setUp(self):
|
||||
super(OrganizationsTest, self).setUp()
|
||||
self.setup_instances()
|
||||
# TODO: Test non-enterprise license
|
||||
self.create_test_license_file()
|
||||
self.setup_users()
|
||||
|
||||
self.organizations = self.make_organizations(self.super_django_user, 10)
|
||||
self.projects = self.make_projects(self.normal_django_user, 10)
|
||||
|
||||
# add projects to organizations in a more or less arbitrary way
|
||||
for project in self.projects[0:2]:
|
||||
self.organizations[0].projects.add(project)
|
||||
for project in self.projects[3:8]:
|
||||
self.organizations[1].projects.add(project)
|
||||
for project in self.projects[9:10]:
|
||||
self.organizations[2].projects.add(project)
|
||||
self.organizations[0].projects.add(self.projects[-1])
|
||||
self.organizations[9].projects.add(self.projects[-2])
|
||||
|
||||
# get the URL for various organization records
|
||||
self.a_detail_url = "%s%s" % (self.collection(), self.organizations[0].pk)
|
||||
self.b_detail_url = "%s%s" % (self.collection(), self.organizations[1].pk)
|
||||
self.c_detail_url = "%s%s" % (self.collection(), self.organizations[2].pk)
|
||||
|
||||
# configuration:
|
||||
# admin_user is an admin and regular user in all organizations
|
||||
# other_user is all organizations
|
||||
# normal_user is a user in organization 0, and an admin of organization 1
|
||||
# nobody_user is a user not a member of any organizations
|
||||
|
||||
for x in self.organizations:
|
||||
x.admin_role.members.add(self.super_django_user)
|
||||
x.member_role.members.add(self.super_django_user)
|
||||
x.member_role.members.add(self.other_django_user)
|
||||
|
||||
self.organizations[0].member_role.members.add(self.normal_django_user)
|
||||
self.organizations[1].admin_role.members.add(self.normal_django_user)
|
||||
|
||||
def test_get_organization_list(self):
|
||||
url = reverse('api:organization_list')
|
||||
|
||||
# no credentials == 401
|
||||
self.options(url, expect=401)
|
||||
self.head(url, expect=401)
|
||||
self.get(url, expect=401)
|
||||
|
||||
# wrong credentials == 401
|
||||
with self.current_user(self.get_invalid_credentials()):
|
||||
self.options(url, expect=401)
|
||||
self.head(url, expect=401)
|
||||
self.get(url, expect=401)
|
||||
|
||||
# superuser credentials == 200, full list
|
||||
with self.current_user(self.super_django_user):
|
||||
self.options(url, expect=200)
|
||||
self.head(url, expect=200)
|
||||
response = self.get(url, expect=200)
|
||||
self.check_pagination_and_size(response, 10, previous=None, next=None)
|
||||
self.assertEqual(len(response['results']),
|
||||
Organization.objects.count())
|
||||
for field in ['id', 'url', 'name', 'description', 'created']:
|
||||
self.assertTrue(field in response['results'][0],
|
||||
'field %s not in result' % field)
|
||||
|
||||
# check that the related URL functionality works
|
||||
related = response['results'][0]['related']
|
||||
for x in ['projects', 'users', 'admins']:
|
||||
self.assertTrue(x in related and related[x].endswith("/%s/" % x), "looking for %s in related" % x)
|
||||
|
||||
# normal credentials == 200, get only organizations of which user is a member
|
||||
with self.current_user(self.normal_django_user):
|
||||
self.options(url, expect=200)
|
||||
self.head(url, expect=200)
|
||||
response = self.get(url, expect=200)
|
||||
self.check_pagination_and_size(response, 2, previous=None, next=None)
|
||||
|
||||
# no admin rights? get empty list
|
||||
with self.current_user(self.other_django_user):
|
||||
response = self.get(url, expect=200)
|
||||
self.check_pagination_and_size(response, len(self.organizations), previous=None, next=None)
|
||||
|
||||
# not a member of any orgs? get empty list
|
||||
with self.current_user(self.nobody_django_user):
|
||||
response = self.get(url, expect=200)
|
||||
self.check_pagination_and_size(response, 0, previous=None, next=None)
|
||||
|
||||
def test_get_item(self):
|
||||
|
||||
# first get all the URLs
|
||||
data = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
|
||||
urls = [item['url'] for item in data['results']]
|
||||
|
||||
# make sure super user can fetch records
|
||||
data = self.get(urls[0], expect=200, auth=self.get_super_credentials())
|
||||
[self.assertTrue(key in data) for key in ['name', 'description', 'url']]
|
||||
|
||||
# make sure invalid user cannot
|
||||
data = self.get(urls[0], expect=401, auth=self.get_invalid_credentials())
|
||||
|
||||
# normal user should be able to get org 0 and org 1 but not org 9 (as he's not a user or admin of it)
|
||||
data = self.get(urls[0], expect=200, auth=self.get_normal_credentials())
|
||||
data = self.get(urls[1], expect=200, auth=self.get_normal_credentials())
|
||||
data = self.get(urls[9], expect=403, auth=self.get_normal_credentials())
|
||||
|
||||
# other user is a member, but not admin, can access org
|
||||
data = self.get(urls[0], expect=200, auth=self.get_other_credentials())
|
||||
|
||||
# nobody user is not a member, cannot access org
|
||||
data = self.get(urls[0], expect=403, auth=self.get_nobody_credentials())
|
||||
|
||||
def test_get_item_subobjects_projects(self):
|
||||
|
||||
# first get all the orgs
|
||||
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
|
||||
|
||||
# find projects attached to the first org
|
||||
projects0_url = orgs['results'][0]['related']['projects']
|
||||
projects1_url = orgs['results'][1]['related']['projects']
|
||||
projects9_url = orgs['results'][9]['related']['projects']
|
||||
|
||||
self.get(projects0_url, expect=401, auth=None)
|
||||
self.get(projects0_url, expect=401, auth=self.get_invalid_credentials())
|
||||
|
||||
# normal user is just a member of the first org, so can see all projects under the org
|
||||
self.get(projects0_url, expect=200, auth=self.get_normal_credentials())
|
||||
|
||||
# however in the second org, he's an admin and should see all of them
|
||||
projects1a = self.get(projects1_url, expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEquals(projects1a['count'], 5)
|
||||
|
||||
# but the non-admin cannot access the list of projects in the org. He should use /projects/ instead!
|
||||
self.get(projects1_url, expect=200, auth=self.get_other_credentials())
|
||||
|
||||
# superuser should be able to read anything
|
||||
projects9a = self.get(projects9_url, expect=200, auth=self.get_super_credentials())
|
||||
self.assertEquals(projects9a['count'], 1)
|
||||
|
||||
# nobody user is not a member of any org, so can't see projects...
|
||||
self.get(projects0_url, expect=403, auth=self.get_nobody_credentials())
|
||||
projects1a = self.get(projects1_url, expect=403, auth=self.get_nobody_credentials())
|
||||
|
||||
def test_get_item_subobjects_users(self):
|
||||
|
||||
# see if we can list the users added to the organization
|
||||
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
|
||||
org1_users_url = orgs['results'][1]['related']['users']
|
||||
org1_users = self.get(org1_users_url, expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEquals(org1_users['count'], 2)
|
||||
org1_users = self.get(org1_users_url, expect=200, auth=self.get_super_credentials())
|
||||
self.assertEquals(org1_users['count'], 2)
|
||||
org1_users = self.get(org1_users_url, expect=200, auth=self.get_other_credentials())
|
||||
self.assertEquals(org1_users['count'], 2)
|
||||
|
||||
def test_get_item_subobjects_admins(self):
|
||||
|
||||
# see if we can list the users added to the organization
|
||||
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
|
||||
org1_users_url = orgs['results'][1]['related']['admins']
|
||||
org1_users = self.get(org1_users_url, expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEquals(org1_users['count'], 2)
|
||||
org1_users = self.get(org1_users_url, expect=200, auth=self.get_super_credentials())
|
||||
self.assertEquals(org1_users['count'], 2)
|
||||
|
||||
def test_get_organization_inventories_list(self):
|
||||
pass
|
||||
|
||||
def _test_get_item_subobjects_tags(self):
|
||||
# FIXME: Update to support taggit!
|
||||
|
||||
# put some tags on the org
|
||||
org1 = Organization.objects.get(pk=2)
|
||||
tag1 = Tag.objects.create(name='atag')
|
||||
tag2 = Tag.objects.create(name='btag')
|
||||
org1.tags.add(tag1)
|
||||
org1.tags.add(tag2)
|
||||
|
||||
# see if we can list the users added to the organization
|
||||
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
|
||||
org1_tags_url = orgs['results'][1]['related']['tags']
|
||||
org1_tags = self.get(org1_tags_url, expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEquals(org1_tags['count'], 2)
|
||||
org1_tags = self.get(org1_tags_url, expect=200, auth=self.get_super_credentials())
|
||||
self.assertEquals(org1_tags['count'], 2)
|
||||
org1_tags = self.get(org1_tags_url, expect=403, auth=self.get_other_credentials())
|
||||
|
||||
def _test_get_item_subobjects_audit_trail(self):
|
||||
# FIXME: Update to support whatever audit trail framework is used.
|
||||
url = '/api/v1/organizations/2/audit_trail/'
|
||||
self.get(url, expect=200, auth=self.get_normal_credentials())
|
||||
# FIXME: verify that some audit trail records are auto-created on save AND post
|
||||
|
||||
def test_post_item(self):
|
||||
|
||||
new_org = dict(name='magic test org', description='8675309')
|
||||
|
||||
# need to be a valid user
|
||||
self.post(self.collection(), new_org, expect=401, auth=None)
|
||||
self.post(self.collection(), new_org, expect=401, auth=self.get_invalid_credentials())
|
||||
|
||||
# only super users can create organizations
|
||||
self.post(self.collection(), new_org, expect=403, auth=self.get_normal_credentials())
|
||||
self.post(self.collection(), new_org, expect=403, auth=self.get_other_credentials())
|
||||
data1 = self.post(self.collection(), new_org, expect=201, auth=self.get_super_credentials())
|
||||
|
||||
# duplicate post results in 400
|
||||
response = self.post(self.collection(), new_org, expect=400, auth=self.get_super_credentials())
|
||||
self.assertTrue('name' in response, response)
|
||||
self.assertTrue('Name' in response['name'][0], response)
|
||||
|
||||
# look at what we got back from the post, make sure we added an org
|
||||
last_org = Organization.objects.order_by('-pk')[0]
|
||||
self.assertTrue(data1['url'].endswith("/%d/" % last_org.pk))
|
||||
|
||||
# Test that not even super users can create an organization with a basic license
|
||||
self.create_basic_license_file()
|
||||
cant_org = dict(name='silly user org', description='4815162342')
|
||||
self.post(self.collection(), cant_org, expect=402, auth=self.get_super_credentials())
|
||||
|
||||
def test_post_item_subobjects_users(self):
|
||||
|
||||
url = reverse('api:organization_users_list', args=(self.organizations[1].pk,))
|
||||
users = self.get(url, expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEqual(users['count'], 2)
|
||||
self.post(url, dict(id=self.normal_django_user.pk), expect=204, auth=self.get_normal_credentials())
|
||||
users = self.get(url, expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEqual(users['count'], 3)
|
||||
self.post(url, dict(id=self.normal_django_user.pk, disassociate=True), expect=204, auth=self.get_normal_credentials())
|
||||
users = self.get(url, expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEqual(users['count'], 2)
|
||||
|
||||
# post a completely new user to verify we can add users to the subcollection directly
|
||||
new_user = dict(username='NewUser9000', password='NewPassword9000')
|
||||
which_org = Organization.accessible_objects(self.normal_django_user, 'admin_role')[0]
|
||||
url = reverse('api:organization_users_list', args=(which_org.pk,))
|
||||
self.post(url, new_user, expect=201, auth=self.get_normal_credentials())
|
||||
|
||||
all_users = self.get(url, expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEqual(all_users['count'], 3)
|
||||
|
||||
def test_post_item_subobjects_admins(self):
|
||||
|
||||
url = reverse('api:organization_admins_list', args=(self.organizations[1].pk,))
|
||||
admins = self.get(url, expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEqual(admins['count'], 2)
|
||||
self.post(url, dict(id=self.other_django_user.pk), expect=204, auth=self.get_normal_credentials())
|
||||
admins = self.get(url, expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEqual(admins['count'], 3)
|
||||
self.post(url, dict(id=self.other_django_user.pk, disassociate=1), expect=204, auth=self.get_normal_credentials())
|
||||
admins = self.get(url, expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEqual(admins['count'], 2)
|
||||
|
||||
def _test_post_item_subobjects_tags(self):
|
||||
# FIXME: Update to support taggit!
|
||||
|
||||
tag = Tag.objects.create(name='blippy')
|
||||
url = '/api/v1/organizations/2/tags/'
|
||||
tags = self.get(url, expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEqual(tags['count'], 0)
|
||||
self.post(url, dict(id=tag.pk), expect=204, auth=self.get_normal_credentials())
|
||||
tags = self.get(url, expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEqual(tags['count'], 1)
|
||||
self.assertEqual(tags['results'][0]['id'], tag.pk)
|
||||
self.post(url, dict(id=tag.pk, disassociate=1), expect=204, auth=self.get_normal_credentials())
|
||||
tags = self.get(url, expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEqual(tags['count'], 0)
|
||||
|
||||
def _test_post_item_subobjects_audit_trail(self):
|
||||
# FIXME: Update to support whatever audit trail framework is used.
|
||||
# audit trails are system things, and no user can post to them.
|
||||
url = '/api/v1/organizations/2/audit_trail/'
|
||||
self.post(url, dict(id=1), expect=405, auth=self.get_super_credentials())
|
||||
|
||||
def test_put_item(self):
|
||||
|
||||
# first get some urls and data to put back to them
|
||||
urls = self.get_urls(self.collection(), auth=self.get_super_credentials())
|
||||
self.get(urls[0], expect=200, auth=self.get_super_credentials())
|
||||
data1 = self.get(urls[1], expect=200, auth=self.get_super_credentials())
|
||||
|
||||
# test that an unauthenticated user cannot do a put
|
||||
new_data1 = data1.copy()
|
||||
new_data1['description'] = 'updated description'
|
||||
self.put(urls[0], new_data1, expect=401, auth=None)
|
||||
self.put(urls[0], new_data1, expect=401, auth=self.get_invalid_credentials())
|
||||
|
||||
# user normal is an admin of org 0 and a member of org 1 so should be able to put only org 1
|
||||
self.put(urls[0], new_data1, expect=403, auth=self.get_normal_credentials())
|
||||
self.put(urls[1], new_data1, expect=200, auth=self.get_normal_credentials())
|
||||
|
||||
# get back org 1 and see if it changed
|
||||
get_result = self.get(urls[1], expect=200, auth=self.get_normal_credentials())
|
||||
self.assertEquals(get_result['description'], 'updated description')
|
||||
|
||||
# super user can also put even though they aren't added to the org users or admins list
|
||||
self.put(urls[1], new_data1, expect=200, auth=self.get_super_credentials())
|
||||
|
||||
# make sure posting to this URL is not supported
|
||||
self.post(urls[1], new_data1, expect=405, auth=self.get_super_credentials())
|
||||
|
||||
def test_put_item_subobjects_projects(self):
|
||||
|
||||
# any attempt to put a subobject should be a 405, edit the actual resource or POST with 'disassociate' to delete
|
||||
# this is against a collection URL anyway, so we really need not repeat this test for other object types
|
||||
# as a PUT against a collection doesn't make much sense.
|
||||
|
||||
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
|
||||
projects0_url = orgs['results'][0]['related']['projects']
|
||||
sub_projects = self.get(projects0_url, expect=200, auth=self.get_super_credentials())
|
||||
self.assertEquals(sub_projects['count'], 3)
|
||||
first_sub_project = sub_projects['results'][0]
|
||||
self.put(projects0_url, first_sub_project, expect=405, auth=self.get_super_credentials())
|
||||
|
||||
def test_delete_item(self):
|
||||
|
||||
# first get some urls
|
||||
urls = self.get_urls(self.collection(), auth=self.get_super_credentials())
|
||||
urldata1 = self.get(urls[1], auth=self.get_super_credentials())
|
||||
|
||||
# check authentication -- admins of the org and superusers can delete objects only
|
||||
self.delete(urls[0], expect=401, auth=None)
|
||||
self.delete(urls[0], expect=401, auth=self.get_invalid_credentials())
|
||||
self.delete(urls[8], expect=403, auth=self.get_normal_credentials())
|
||||
self.delete(urls[1], expect=204, auth=self.get_normal_credentials())
|
||||
self.delete(urls[0], expect=204, auth=self.get_super_credentials())
|
||||
|
||||
# check that when we have deleted an object it comes back 404 via GET
|
||||
self.get(urls[1], expect=404, auth=self.get_normal_credentials())
|
||||
assert Organization.objects.filter(pk=urldata1['id']).count() == 0
|
||||
|
||||
# also check that DELETE on the collection doesn't work
|
||||
self.delete(self.collection(), expect=405, auth=self.get_super_credentials())
|
||||
|
||||
# Test that not even super users can delete an organization with a basic license
|
||||
self.create_basic_license_file()
|
||||
self.delete(urls[2], expect=402, auth=self.get_super_credentials())
|
||||
|
||||
def test_invalid_post_data(self):
|
||||
url = reverse('api:organization_list')
|
||||
# API should gracefully handle data of an invalid type.
|
||||
self.post(url, expect=400, data=None, auth=self.get_super_credentials())
|
||||
self.post(url, expect=400, data=99, auth=self.get_super_credentials())
|
||||
self.post(url, expect=400, data='abcd', auth=self.get_super_credentials())
|
||||
self.post(url, expect=400, data=3.14, auth=self.get_super_credentials())
|
||||
self.post(url, expect=400, data=True, auth=self.get_super_credentials())
|
||||
self.post(url, expect=400, data=[1,2,3], auth=self.get_super_credentials())
|
||||
url = reverse('api:organization_users_list', args=(self.organizations[0].pk,))
|
||||
self.post(url, expect=400, data=None, auth=self.get_super_credentials())
|
||||
self.post(url, expect=400, data=99, auth=self.get_super_credentials())
|
||||
self.post(url, expect=400, data='abcd', auth=self.get_super_credentials())
|
||||
self.post(url, expect=400, data=3.14, auth=self.get_super_credentials())
|
||||
self.post(url, expect=400, data=True, auth=self.get_super_credentials())
|
||||
self.post(url, expect=400, data=[1,2,3], auth=self.get_super_credentials())
|
||||
|
||||
# TODO: tests for tag disassociation
|
||||
@ -1,55 +0,0 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved
|
||||
|
||||
# Python
|
||||
import mock
|
||||
from mock import Mock
|
||||
from StringIO import StringIO
|
||||
from django.utils.timezone import now
|
||||
|
||||
# Django
|
||||
from django.test import SimpleTestCase
|
||||
|
||||
# AWX
|
||||
from awx.main.models import * # noqa
|
||||
|
||||
__all__ = ['UnifiedJobsUnitTest',]
|
||||
|
||||
class UnifiedJobsUnitTest(SimpleTestCase):
|
||||
|
||||
# stdout file present
|
||||
@mock.patch('os.path.exists', return_value=True)
|
||||
@mock.patch('codecs.open', return_value='my_file_handler')
|
||||
def test_result_stdout_raw_handle_file__found(self, exists, open):
|
||||
unified_job = UnifiedJob()
|
||||
unified_job.result_stdout_file = 'dummy'
|
||||
|
||||
with mock.patch('os.stat', return_value=Mock(st_size=1)):
|
||||
result = unified_job.result_stdout_raw_handle()
|
||||
|
||||
self.assertEqual(result, 'my_file_handler')
|
||||
|
||||
# stdout file missing, job finished
|
||||
@mock.patch('os.path.exists', return_value=False)
|
||||
def test_result_stdout_raw_handle__missing(self, exists):
|
||||
unified_job = UnifiedJob()
|
||||
unified_job.result_stdout_file = 'dummy'
|
||||
unified_job.finished = now()
|
||||
|
||||
result = unified_job.result_stdout_raw_handle()
|
||||
|
||||
self.assertIsInstance(result, StringIO)
|
||||
self.assertEqual(result.read(), 'stdout capture is missing')
|
||||
|
||||
# stdout file missing, job not finished
|
||||
@mock.patch('os.path.exists', return_value=False)
|
||||
def test_result_stdout_raw_handle__pending(self, exists):
|
||||
unified_job = UnifiedJob()
|
||||
unified_job.result_stdout_file = 'dummy'
|
||||
unified_job.finished = None
|
||||
|
||||
result = unified_job.result_stdout_raw_handle()
|
||||
|
||||
self.assertIsInstance(result, StringIO)
|
||||
self.assertEqual(result.read(), 'Waiting for results...')
|
||||
|
||||
@ -1,113 +0,0 @@
|
||||
# Django
|
||||
from django.core.urlresolvers import reverse
|
||||
|
||||
# Reuse Test code
|
||||
from awx.main.tests.base import (
|
||||
BaseLiveServerTest,
|
||||
QueueStartStopTestMixin,
|
||||
URI,
|
||||
)
|
||||
from awx.main.models.projects import * # noqa
|
||||
|
||||
__all__ = ['UnifiedJobStdoutRedactedTests']
|
||||
|
||||
|
||||
TEST_STDOUTS = []
|
||||
uri = URI(scheme="https", username="Dhh3U47nmC26xk9PKscV", password="PXPfWW8YzYrgS@E5NbQ2H@", host="github.ginger.com/theirrepo.git/info/refs")
|
||||
TEST_STDOUTS.append({
|
||||
'description': 'uri in a plain text document',
|
||||
'uri' : uri,
|
||||
'text' : 'hello world %s goodbye world' % uri,
|
||||
'occurrences' : 1
|
||||
})
|
||||
|
||||
uri = URI(scheme="https", username="applepie@@@", password="thatyouknow@@@@", host="github.ginger.com/theirrepo.git/info/refs")
|
||||
TEST_STDOUTS.append({
|
||||
'description': 'uri appears twice in a multiline plain text document',
|
||||
'uri' : uri,
|
||||
'text' : 'hello world %s \n\nyoyo\n\nhello\n%s' % (uri, uri),
|
||||
'occurrences' : 2
|
||||
})
|
||||
|
||||
class UnifiedJobStdoutRedactedTests(BaseLiveServerTest, QueueStartStopTestMixin):
|
||||
|
||||
def setUp(self):
|
||||
super(UnifiedJobStdoutRedactedTests, self).setUp()
|
||||
self.setup_instances()
|
||||
self.setup_users()
|
||||
self.test_cases = []
|
||||
self.negative_test_cases = []
|
||||
|
||||
proj = self.make_project()
|
||||
|
||||
for e in TEST_STDOUTS:
|
||||
e['project'] = ProjectUpdate(project=proj)
|
||||
e['project'].result_stdout_text = e['text']
|
||||
e['project'].save()
|
||||
self.test_cases.append(e)
|
||||
for d in TEST_STDOUTS:
|
||||
d['job'] = self.make_job()
|
||||
d['job'].result_stdout_text = d['text']
|
||||
d['job'].save()
|
||||
self.negative_test_cases.append(d)
|
||||
|
||||
# This is more of a functional test than a unit test.
|
||||
# should filter out username and password
|
||||
def check_sensitive_redacted(self, test_data, response):
|
||||
uri = test_data['uri']
|
||||
self.assertIsNotNone(response['content'])
|
||||
self.check_not_found(response['content'], uri.username, test_data['description'])
|
||||
self.check_not_found(response['content'], uri.password, test_data['description'])
|
||||
# Ensure the host didn't get redacted
|
||||
self.check_found(response['content'], uri.host, test_data['occurrences'], test_data['description'])
|
||||
|
||||
def check_sensitive_not_redacted(self, test_data, response):
|
||||
uri = test_data['uri']
|
||||
self.assertIsNotNone(response['content'])
|
||||
self.check_found(response['content'], uri.username, description=test_data['description'])
|
||||
self.check_found(response['content'], uri.password, description=test_data['description'])
|
||||
|
||||
def _get_url_job_stdout(self, job, url_base, format='json'):
|
||||
formats = {
|
||||
'json': 'application/json',
|
||||
'ansi': 'text/plain',
|
||||
'txt': 'text/plain',
|
||||
'html': 'text/html',
|
||||
}
|
||||
content_type = formats[format]
|
||||
project_update_stdout_url = reverse(url_base, args=(job.pk,)) + "?format=" + format
|
||||
return self.get(project_update_stdout_url, expect=200, auth=self.get_super_credentials(), accept=content_type)
|
||||
|
||||
def _test_redaction_enabled(self, format):
|
||||
for test_data in self.test_cases:
|
||||
response = self._get_url_job_stdout(test_data['project'], "api:project_update_stdout", format=format)
|
||||
self.check_sensitive_redacted(test_data, response)
|
||||
|
||||
def _test_redaction_disabled(self, format):
|
||||
for test_data in self.negative_test_cases:
|
||||
response = self._get_url_job_stdout(test_data['job'], "api:job_stdout", format=format)
|
||||
self.check_sensitive_not_redacted(test_data, response)
|
||||
|
||||
def test_project_update_redaction_enabled_json(self):
|
||||
self._test_redaction_enabled('json')
|
||||
|
||||
def test_project_update_redaction_enabled_ansi(self):
|
||||
self._test_redaction_enabled('ansi')
|
||||
|
||||
def test_project_update_redaction_enabled_html(self):
|
||||
self._test_redaction_enabled('html')
|
||||
|
||||
def test_project_update_redaction_enabled_txt(self):
|
||||
self._test_redaction_enabled('txt')
|
||||
|
||||
def test_job_redaction_disabled_json(self):
|
||||
self._test_redaction_disabled('json')
|
||||
|
||||
def test_job_redaction_disabled_ansi(self):
|
||||
self._test_redaction_disabled('ansi')
|
||||
|
||||
def test_job_redaction_disabled_html(self):
|
||||
self._test_redaction_disabled('html')
|
||||
|
||||
def test_job_redaction_disabled_txt(self):
|
||||
self._test_redaction_disabled('txt')
|
||||
15
awx/main/tests/unit/test_ha.py
Normal file
15
awx/main/tests/unit/test_ha.py
Normal file
@ -0,0 +1,15 @@
|
||||
# Copyright (c) 2016 Ansible, Inc.
|
||||
|
||||
# Python
|
||||
import mock
|
||||
|
||||
# AWX
|
||||
from awx.main.ha import is_ha_environment
|
||||
|
||||
@mock.patch('awx.main.models.Instance.objects.count', lambda: 2)
|
||||
def test_multiple_instances():
|
||||
assert is_ha_environment()
|
||||
|
||||
@mock.patch('awx.main.models.Instance.objects.count', lambda: 1)
|
||||
def test_db_localhost():
|
||||
assert is_ha_environment() is False
|
||||
@ -1,11 +1,8 @@
|
||||
|
||||
import textwrap
|
||||
|
||||
# AWX
|
||||
from awx.main.redact import UriCleaner
|
||||
from awx.main.tests.base import BaseTest, URI
|
||||
|
||||
__all__ = ['UriCleanTests']
|
||||
from awx.main.tests.URI import URI
|
||||
|
||||
TEST_URIS = [
|
||||
URI('no host', scheme='https', username='myusername', password='mypass', host=None),
|
||||
@ -80,59 +77,58 @@ TEST_CLEARTEXT.append({
|
||||
'host_occurrences' : 4
|
||||
})
|
||||
|
||||
class UriCleanTests(BaseTest):
|
||||
|
||||
# should redact sensitive usernames and passwords
|
||||
def test_uri_scm_simple_redacted(self):
|
||||
for uri in TEST_URIS:
|
||||
redacted_str = UriCleaner.remove_sensitive(str(uri))
|
||||
if uri.username:
|
||||
self.check_not_found(redacted_str, uri.username, uri.description)
|
||||
if uri.password:
|
||||
self.check_not_found(redacted_str, uri.password, uri.description)
|
||||
|
||||
# should replace secret data with safe string, UriCleaner.REPLACE_STR
|
||||
def test_uri_scm_simple_replaced(self):
|
||||
for uri in TEST_URIS:
|
||||
redacted_str = UriCleaner.remove_sensitive(str(uri))
|
||||
self.check_found(redacted_str, UriCleaner.REPLACE_STR, uri.get_secret_count())
|
||||
|
||||
# should redact multiple uris in text
|
||||
def test_uri_scm_multiple(self):
|
||||
cleartext = ''
|
||||
for uri in TEST_URIS:
|
||||
cleartext += str(uri) + ' '
|
||||
for uri in TEST_URIS:
|
||||
cleartext += str(uri) + '\n'
|
||||
|
||||
# should redact sensitive usernames and passwords
|
||||
def test_uri_scm_simple_redacted():
|
||||
for uri in TEST_URIS:
|
||||
redacted_str = UriCleaner.remove_sensitive(str(uri))
|
||||
if uri.username:
|
||||
self.check_not_found(redacted_str, uri.username, uri.description)
|
||||
assert uri.username not in redacted_str
|
||||
if uri.password:
|
||||
self.check_not_found(redacted_str, uri.password, uri.description)
|
||||
assert uri.username not in redacted_str
|
||||
|
||||
# should replace multiple secret data with safe string
|
||||
def test_uri_scm_multiple_replaced(self):
|
||||
cleartext = ''
|
||||
find_count = 0
|
||||
for uri in TEST_URIS:
|
||||
cleartext += str(uri) + ' '
|
||||
find_count += uri.get_secret_count()
|
||||
# should replace secret data with safe string, UriCleaner.REPLACE_STR
|
||||
def test_uri_scm_simple_replaced():
|
||||
for uri in TEST_URIS:
|
||||
redacted_str = UriCleaner.remove_sensitive(str(uri))
|
||||
assert redacted_str.count(UriCleaner.REPLACE_STR) == uri.get_secret_count()
|
||||
|
||||
for uri in TEST_URIS:
|
||||
cleartext += str(uri) + '\n'
|
||||
find_count += uri.get_secret_count()
|
||||
# should redact multiple uris in text
|
||||
def test_uri_scm_multiple():
|
||||
cleartext = ''
|
||||
for uri in TEST_URIS:
|
||||
cleartext += str(uri) + ' '
|
||||
for uri in TEST_URIS:
|
||||
cleartext += str(uri) + '\n'
|
||||
|
||||
redacted_str = UriCleaner.remove_sensitive(cleartext)
|
||||
self.check_found(redacted_str, UriCleaner.REPLACE_STR, find_count)
|
||||
redacted_str = UriCleaner.remove_sensitive(str(uri))
|
||||
if uri.username:
|
||||
assert uri.username not in redacted_str
|
||||
if uri.password:
|
||||
assert uri.username not in redacted_str
|
||||
|
||||
# should redact and replace multiple secret data within a complex cleartext blob
|
||||
def test_uri_scm_cleartext_redact_and_replace(self):
|
||||
for test_data in TEST_CLEARTEXT:
|
||||
uri = test_data['uri']
|
||||
redacted_str = UriCleaner.remove_sensitive(test_data['text'])
|
||||
self.check_not_found(redacted_str, uri.username, uri.description)
|
||||
self.check_not_found(redacted_str, uri.password, uri.description)
|
||||
# Ensure the host didn't get redacted
|
||||
self.check_found(redacted_str, uri.host, test_data['host_occurrences'], uri.description)
|
||||
# should replace multiple secret data with safe string
|
||||
def test_uri_scm_multiple_replaced():
|
||||
cleartext = ''
|
||||
find_count = 0
|
||||
for uri in TEST_URIS:
|
||||
cleartext += str(uri) + ' '
|
||||
find_count += uri.get_secret_count()
|
||||
|
||||
for uri in TEST_URIS:
|
||||
cleartext += str(uri) + '\n'
|
||||
find_count += uri.get_secret_count()
|
||||
|
||||
redacted_str = UriCleaner.remove_sensitive(cleartext)
|
||||
assert redacted_str.count(UriCleaner.REPLACE_STR) == find_count
|
||||
|
||||
# should redact and replace multiple secret data within a complex cleartext blob
|
||||
def test_uri_scm_cleartext_redact_and_replace():
|
||||
for test_data in TEST_CLEARTEXT:
|
||||
uri = test_data['uri']
|
||||
redacted_str = UriCleaner.remove_sensitive(test_data['text'])
|
||||
assert uri.username not in redacted_str
|
||||
assert uri.password not in redacted_str
|
||||
# Ensure the host didn't get redacted
|
||||
assert redacted_str.count(uri.host) == test_data['host_occurrences']
|
||||
48
awx/main/tests/unit/test_unified_jobs.py
Normal file
48
awx/main/tests/unit/test_unified_jobs.py
Normal file
@ -0,0 +1,48 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved
|
||||
|
||||
# Python
|
||||
import mock
|
||||
from mock import Mock
|
||||
from StringIO import StringIO
|
||||
from django.utils.timezone import now
|
||||
|
||||
# AWX
|
||||
from awx.main.models import UnifiedJob
|
||||
|
||||
|
||||
# stdout file present
|
||||
@mock.patch('os.path.exists', return_value=True)
|
||||
@mock.patch('codecs.open', return_value='my_file_handler')
|
||||
def test_result_stdout_raw_handle_file__found(exists, open):
|
||||
unified_job = UnifiedJob()
|
||||
unified_job.result_stdout_file = 'dummy'
|
||||
|
||||
with mock.patch('os.stat', return_value=Mock(st_size=1)):
|
||||
result = unified_job.result_stdout_raw_handle()
|
||||
|
||||
assert result == 'my_file_handler'
|
||||
|
||||
# stdout file missing, job finished
|
||||
@mock.patch('os.path.exists', return_value=False)
|
||||
def test_result_stdout_raw_handle__missing(exists):
|
||||
unified_job = UnifiedJob()
|
||||
unified_job.result_stdout_file = 'dummy'
|
||||
unified_job.finished = now()
|
||||
|
||||
result = unified_job.result_stdout_raw_handle()
|
||||
|
||||
assert isinstance(result, StringIO)
|
||||
assert result.read() == 'stdout capture is missing'
|
||||
|
||||
# stdout file missing, job not finished
|
||||
@mock.patch('os.path.exists', return_value=False)
|
||||
def test_result_stdout_raw_handle__pending(exists):
|
||||
unified_job = UnifiedJob()
|
||||
unified_job.result_stdout_file = 'dummy'
|
||||
unified_job.finished = None
|
||||
|
||||
result = unified_job.result_stdout_raw_handle()
|
||||
|
||||
assert isinstance(result, StringIO)
|
||||
assert result.read() == 'Waiting for results...'
|
||||
Loading…
x
Reference in New Issue
Block a user