Merge pull request #1234 from ryanpetrello/hey-hey-hey-goodbye

remove old crusty test fixtures
This commit is contained in:
Ryan Petrello 2018-04-04 09:48:09 -04:00 committed by GitHub
commit 808267d3fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 1 additions and 1229 deletions

View File

@ -1,687 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import base64
import contextlib
import json
import os
import random
import shutil
import sys
import tempfile
import time
import urllib
import re
import mock
# PyYAML
import yaml
# Django
import django.test
from django.conf import settings, UserSettingsHolder
from django.contrib.auth.models import User
from django.core.cache import cache
from django.test.client import Client
from django.test.utils import override_settings
from django.utils.encoding import force_text
# AWX
from awx.main.models import * # noqa
from awx.main.utils import get_ansible_version
from awx.sso.backends import LDAPSettings
from awx.main.tests.URI import URI # noqa
TEST_PLAYBOOK = '''- hosts: mygroup
gather_facts: false
tasks:
- name: woohoo
command: test 1 = 1
'''
class QueueTestMixin(object):
def start_queue(self):
self.start_rabbit()
receiver = CallbackReceiver()
self.queue_process = Process(target=receiver.run_subscriber,
args=(False,))
self.queue_process.start()
def terminate_queue(self):
if hasattr(self, 'queue_process'):
self.queue_process.terminate()
self.stop_rabbit()
def start_rabbit(self):
if not getattr(self, 'redis_process', None):
# Centos 6.5 redis is runnable by non-root user but is not in a normal users path by default
env = dict(os.environ)
env['PATH'] = '%s:/usr/sbin/' % env['PATH']
env['RABBITMQ_NODENAME'] = 'towerunittest'
env['RABBITMQ_NODE_PORT'] = '55672'
self.redis_process = Popen('rabbitmq-server > /dev/null',
shell=True, executable='/bin/bash',
env=env)
def stop_rabbit(self):
if getattr(self, 'redis_process', None):
self.redis_process.kill()
self.redis_process = None
# The observed effect of not calling terminate_queue() if you call start_queue() are
# an hang on test cleanup database delete. Thus, to ensure terminate_queue() is called
# whenever start_queue() is called just inherit from this class when you want to use the queue.
class QueueStartStopTestMixin(QueueTestMixin):
def setUp(self):
super(QueueStartStopTestMixin, self).setUp()
self.start_queue()
def tearDown(self):
super(QueueStartStopTestMixin, self).tearDown()
self.terminate_queue()
class MockCommonlySlowTestMixin(object):
def __init__(self, *args, **kwargs):
from awx.api import generics
mock.patch.object(generics, 'get_view_description', return_value=None).start()
super(MockCommonlySlowTestMixin, self).__init__(*args, **kwargs)
ansible_version = get_ansible_version()
class BaseTestMixin(MockCommonlySlowTestMixin):
'''
Mixin with shared code for use by all test cases.
'''
def setUp(self):
super(BaseTestMixin, self).setUp()
global ansible_version
self.object_ctr = 0
# Save sys.path before tests.
self._sys_path = [x for x in sys.path]
# Save os.environ before tests.
self._environ = dict(os.environ.items())
# Capture current directory to change back after each test.
self._cwd = os.getcwd()
# Capture list of temp files/directories created during tests.
self._temp_paths = []
self._current_auth = None
self._user_passwords = {}
self.ansible_version = ansible_version
self.assertNotEqual(self.ansible_version, 'unknown')
# Wrap settings so we can redefine them within each test.
self._wrapped = settings._wrapped
settings._wrapped = UserSettingsHolder(settings._wrapped)
# Set all AUTH_LDAP_* settings to defaults to avoid using LDAP for
# tests unless expicitly configured.
for name, value in LDAPSettings.defaults.items():
if name == 'SERVER_URI':
value = ''
setattr(settings, 'AUTH_LDAP_%s' % name, value)
# Pass test database settings in environment for use by any management
# commands that run from tests.
for opt in ('ENGINE', 'NAME', 'USER', 'PASSWORD', 'HOST', 'PORT'):
os.environ['AWX_TEST_DATABASE_%s' % opt] = settings.DATABASES['default'][opt]
# Set flag so that task chain works with unit tests.
settings.CELERY_UNIT_TEST = True
settings.SYSTEM_UUID='00000000-0000-0000-0000-000000000000'
settings.CELERY_BROKER_URL='redis://localhost:55672/'
settings.CALLBACK_QUEUE = 'callback_tasks_unit'
# Disable socket notifications for unit tests.
settings.SOCKETIO_NOTIFICATION_PORT = None
# Make temp job status directory for unit tests.
job_status_dir = tempfile.mkdtemp()
self._temp_paths.append(job_status_dir)
settings.JOBOUTPUT_ROOT = os.path.abspath(job_status_dir)
settings.CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': 'unittests'
}
}
cache.clear()
self._start_time = time.time()
def tearDown(self):
super(BaseTestMixin, self).tearDown()
# Restore sys.path after tests.
sys.path = self._sys_path
# Restore os.environ after tests.
for k,v in self._environ.items():
if os.environ.get(k, None) != v:
os.environ[k] = v
for k,v in os.environ.items():
if k not in self._environ.keys():
del os.environ[k]
# Restore current directory after each test.
os.chdir(self._cwd)
# Cleanup temp files/directories created during tests.
for project_dir in self._temp_paths:
if os.path.exists(project_dir):
if os.path.isdir(project_dir):
shutil.rmtree(project_dir, True)
else:
os.remove(project_dir)
# Restore previous settings after each test.
settings._wrapped = self._wrapped
def unique_name(self, string):
rnd_str = '____' + str(random.randint(1, 9999999))
return __name__ + '-generated-' + string + rnd_str
def assertElapsedLessThan(self, seconds):
elapsed = time.time() - self._start_time
self.assertTrue(elapsed < seconds, 'elapsed time of %0.3fs is greater than %0.3fs' % (elapsed, seconds))
@contextlib.contextmanager
def current_user(self, user_or_username, password=None):
try:
if isinstance(user_or_username, User):
username = user_or_username.username
else:
username = user_or_username
password = password or self._user_passwords.get(username)
previous_auth = self._current_auth
if username is None:
self._current_auth = None
else:
self._current_auth = (username, password)
yield
finally:
self._current_auth = previous_auth
def make_user(self, username, password=None, super_user=False):
user = None
password = password or username
if super_user:
user = User.objects.create_superuser(username, "%s@example.com", password)
else:
user = User.objects.create_user(username, "%s@example.com", password)
self._user_passwords[user.username] = password
return user
def make_organizations(self, created_by, count=1):
results = []
for x in range(0, count):
results.append(self.make_organization(created_by=created_by, count=x))
return results
def make_organization(self, created_by, count=1):
self.object_ctr = self.object_ctr + 1
return Organization.objects.create(
name="org%s-%s" % (count, self.object_ctr), description="org%s" % count, created_by=created_by
)
def make_project(self, name=None, description='', created_by=None,
playbook_content='', role_playbooks=None, unicode_prefix=True):
if not name:
name = self.unique_name('Project')
if not os.path.exists(settings.PROJECTS_ROOT):
os.makedirs(settings.PROJECTS_ROOT)
# Create temp project directory.
if unicode_prefix:
tmp_prefix = u'\u2620tmp'
else:
tmp_prefix = 'tmp'
project_dir = tempfile.mkdtemp(prefix=tmp_prefix, dir=settings.PROJECTS_ROOT)
self._temp_paths.append(project_dir)
# Create temp playbook in project (if playbook content is given).
if playbook_content:
handle, playbook_path = tempfile.mkstemp(suffix=u'\u2620.yml',
dir=project_dir)
test_playbook_file = os.fdopen(handle, 'w')
test_playbook_file.write(playbook_content.encode('utf-8'))
test_playbook_file.close()
# Role playbooks are specified as a dict of role name and the
# content of tasks/main.yml playbook.
role_playbooks = role_playbooks or {}
for role_name, role_playbook_content in role_playbooks.items():
role_tasks_dir = os.path.join(project_dir, 'roles', role_name, 'tasks')
if not os.path.exists(role_tasks_dir):
os.makedirs(role_tasks_dir)
role_tasks_playbook_path = os.path.join(role_tasks_dir, 'main.yml')
with open(role_tasks_playbook_path, 'w') as f:
f.write(role_playbook_content)
return Project.objects.create(
name=name, description=description,
local_path=os.path.basename(project_dir), created_by=created_by,
#scm_type='git', default_playbook='foo.yml',
)
def make_projects(self, created_by, count=1, playbook_content='',
role_playbooks=None, unicode_prefix=False):
results = []
for x in range(0, count):
self.object_ctr = self.object_ctr + 1
results.append(self.make_project(
name="proj%s-%s" % (x, self.object_ctr),
description=u"proj%s" % x,
created_by=created_by,
playbook_content=playbook_content,
role_playbooks=role_playbooks,
unicode_prefix=unicode_prefix
))
return results
def decide_created_by(self, created_by=None):
if created_by:
return created_by
if self.super_django_user:
return self.super_django_user
raise RuntimeError('please call setup_users() or specify a user')
def make_inventory(self, organization=None, name=None, created_by=None):
created_by = self.decide_created_by(created_by)
if not organization:
organization = self.make_organization(created_by=created_by)
return Inventory.objects.create(name=name or self.unique_name('Inventory'), organization=organization, created_by=created_by)
def make_job_template(self, name=None, created_by=None, organization=None, inventory=None, project=None, playbook=None, **kwargs):
created_by = self.decide_created_by(created_by)
if not inventory:
inventory = self.make_inventory(organization=organization, created_by=created_by)
if not organization:
organization = inventory.organization
if not project:
project = self.make_project(self.unique_name('Project'), created_by=created_by, playbook_content=playbook if playbook else TEST_PLAYBOOK)
if project and project.playbooks and len(project.playbooks) > 0:
playbook = project.playbooks[0]
else:
raise RuntimeError('Expected project to have at least one playbook')
if project not in organization.projects.all():
organization.projects.add(project)
opts = {
'name' : name or self.unique_name('JobTemplate'),
'job_type': 'check',
'inventory': inventory,
'project': project,
'host_config_key': settings.SYSTEM_UUID,
'created_by': created_by,
'playbook': playbook,
'ask_credential_on_launch': True,
}
opts.update(kwargs)
return JobTemplate.objects.create(**opts)
def make_job(self, job_template=None, created_by=None, inital_state='new', **kwargs):
created_by = self.decide_created_by(created_by)
if not job_template:
job_template = self.make_job_template(created_by=created_by)
opts = {
'created_by': created_by,
'status': inital_state,
}
opts.update(kwargs)
return job_template.create_job(**opts)
def make_credential(self, **kwargs):
opts = {
'name': self.unique_name('Credential'),
'kind': 'ssh',
'user': self.super_django_user,
'username': '',
'ssh_key_data': '',
'ssh_key_unlock': '',
'password': '',
'become_method': '',
'become_username': '',
'become_password': '',
'vault_password': '',
}
opts.update(kwargs)
user = opts['user']
del opts['user']
cred = Credential.objects.create(**opts)
cred.admin_role.members.add(user)
return cred
def setup_instances(self):
instance = Instance(uuid=settings.SYSTEM_UUID, hostname='127.0.0.1')
instance.save()
def setup_users(self, just_super_user=False):
# Create a user.
self.super_username = 'admin'
self.super_password = 'admin'
self.normal_username = 'normal'
self.normal_password = 'normal'
self.other_username = 'other'
self.other_password = 'other'
self.nobody_username = 'nobody'
self.nobody_password = 'nobody'
self.super_django_user = self.make_user(self.super_username, self.super_password, super_user=True)
if not just_super_user:
self.normal_django_user = self.make_user(self.normal_username, self.normal_password, super_user=False)
self.other_django_user = self.make_user(self.other_username, self.other_password, super_user=False)
self.nobody_django_user = self.make_user(self.nobody_username, self.nobody_password, super_user=False)
def get_super_credentials(self):
return (self.super_username, self.super_password)
def get_normal_credentials(self):
return (self.normal_username, self.normal_password)
def get_other_credentials(self):
return (self.other_username, self.other_password)
def get_nobody_credentials(self):
# here is a user without any permissions...
return (self.nobody_username, self.nobody_password)
def get_invalid_credentials(self):
return ('random', 'combination')
def _generic_rest(self, url, data=None, expect=204, auth=None, method=None,
data_type=None, accept=None, remote_addr=None,
return_response_object=False, client_kwargs=None):
assert method is not None
method_name = method.lower()
client_kwargs = client_kwargs or {}
if accept:
client_kwargs['HTTP_ACCEPT'] = accept
if remote_addr is not None:
client_kwargs['REMOTE_ADDR'] = remote_addr
auth = auth or self._current_auth
if auth:
# Dict is only used to test case when both Authorization and
# X-Auth-Token headers are passed.
if isinstance(auth, dict):
basic = auth.get('basic', ())
if basic:
basic_auth = base64.b64encode('%s:%s' % (basic[0], basic[1]))
basic_auth = basic_auth.decode('ascii')
client_kwargs['HTTP_AUTHORIZATION'] = 'Basic %s' % basic_auth
token = auth.get('token', '')
if token and not basic:
client_kwargs['HTTP_AUTHORIZATION'] = 'Token %s' % token
elif token:
client_kwargs['HTTP_X_AUTH_TOKEN'] = 'Token %s' % token
elif isinstance(auth, (list, tuple)):
#client.login(username=auth[0], password=auth[1])
basic_auth = base64.b64encode('%s:%s' % (auth[0], auth[1]))
basic_auth = basic_auth.decode('ascii')
client_kwargs['HTTP_AUTHORIZATION'] = 'Basic %s' % basic_auth
elif isinstance(auth, basestring):
client_kwargs['HTTP_AUTHORIZATION'] = 'Token %s' % auth
client = Client(**client_kwargs)
method = getattr(client, method_name)
response = None
if method_name not in ('options', 'head', 'get', 'delete'):
data_type = data_type or 'json'
if data_type == 'json':
response = method(url, json.dumps(data), 'application/json')
elif data_type == 'yaml':
response = method(url, yaml.safe_dump(data), 'application/yaml')
elif data_type == 'form':
response = method(url, urllib.urlencode(data), 'application/x-www-form-urlencoded')
else:
self.fail('Unsupported data_type %s' % data_type)
else:
response = method(url)
self.assertFalse(response.status_code == 500 and expect != 500,
'Failed (500): %s' % force_text(response.content))
if expect is not None:
assert response.status_code == expect, u"expected status %s, got %s for url=%s as auth=%s: %s" % (
expect, response.status_code, url, auth, force_text(response.content)
)
if method_name == 'head':
self.assertFalse(response.content)
if return_response_object:
return response
if response.status_code not in [204, 405] and method_name != 'head' and response.content:
# no JSON responses in these at least for now, 409 should probably return some (FIXME)
if response['Content-Type'].startswith('application/json'):
obj = json.loads(force_text(response.content))
elif response['Content-Type'].startswith('application/yaml'):
obj = yaml.safe_load(force_text(response.content))
elif response['Content-Type'].startswith('text/plain'):
obj = {
'content': force_text(response.content)
}
elif response['Content-Type'].startswith('text/html'):
obj = {
'content': force_text(response.content)
}
else:
self.fail('Unsupport response content type %s' % response['Content-Type'])
else:
obj = {}
# Create a new subclass of object type and attach the response instance
# to it (to allow for checking response headers).
if isinstance(obj, dict):
return type('DICT', (dict,), {'response': response})(obj.items())
elif isinstance(obj, (tuple, list)):
return type('LIST', (list,), {'response': response})(iter(obj))
else:
return obj
def options(self, url, expect=200, auth=None, accept=None,
remote_addr=None):
return self._generic_rest(url, data=None, expect=expect, auth=auth,
method='options', accept=accept,
remote_addr=remote_addr)
def head(self, url, expect=200, auth=None, accept=None, remote_addr=None):
return self._generic_rest(url, data=None, expect=expect, auth=auth,
method='head', accept=accept,
remote_addr=remote_addr)
def get(self, url, expect=200, auth=None, accept=None, remote_addr=None, client_kwargs={}):
return self._generic_rest(url, data=None, expect=expect, auth=auth,
method='get', accept=accept,
remote_addr=remote_addr,
client_kwargs=client_kwargs)
def post(self, url, data, expect=204, auth=None, data_type=None,
accept=None, remote_addr=None, client_kwargs={}):
return self._generic_rest(url, data=data, expect=expect, auth=auth,
method='post', data_type=data_type,
accept=accept,
remote_addr=remote_addr,
client_kwargs=client_kwargs)
def put(self, url, data, expect=200, auth=None, data_type=None,
accept=None, remote_addr=None):
return self._generic_rest(url, data=data, expect=expect, auth=auth,
method='put', data_type=data_type,
accept=accept, remote_addr=remote_addr)
def patch(self, url, data, expect=200, auth=None, data_type=None,
accept=None, remote_addr=None):
return self._generic_rest(url, data=data, expect=expect, auth=auth,
method='patch', data_type=data_type,
accept=accept, remote_addr=remote_addr)
def delete(self, url, expect=201, auth=None, data_type=None, accept=None,
remote_addr=None):
return self._generic_rest(url, data=None, expect=expect, auth=auth,
method='delete', accept=accept,
remote_addr=remote_addr)
def get_urls(self, collection_url, auth=None):
# TODO: this test helper function doesn't support pagination
data = self.get(collection_url, expect=200, auth=auth)
return [item['url'] for item in data['results']]
def check_invalid_auth(self, url, data=None, methods=None):
'''
Check various methods of accessing the given URL with invalid
authentication credentials.
'''
data = data or {}
methods = methods or ('options', 'head', 'get')
for auth in [(None,), ('invalid', 'password')]:
with self.current_user(*auth):
for method in methods:
f = getattr(self, method)
if method in ('post', 'put', 'patch'):
f(url, data, expect=401)
else:
f(url, expect=401)
def check_pagination_and_size(self, data, desired_count, previous=False,
next=False):
self.assertTrue('results' in data)
self.assertEqual(data['count'], desired_count)
if previous:
self.assertTrue(data['previous'])
else:
self.assertFalse(data['previous'])
if next:
self.assertTrue(data['next'])
else:
self.assertFalse(data['next'])
def check_list_ids(self, data, queryset, check_order=False):
data_ids = [x['id'] for x in data['results']]
qs_ids = queryset.values_list('pk', flat=True)
if check_order:
self.assertEqual(tuple(data_ids), tuple(qs_ids))
else:
self.assertEqual(set(data_ids), set(qs_ids))
def check_get_list(self, url, user, qs, fields=None, expect=200,
check_order=False, offset=None, limit=None):
'''
Check that the given list view URL returns results for the given user
that match the given queryset.
'''
offset = offset or 0
with self.current_user(user):
if expect == 400:
self.options(url, expect=200)
else:
self.options(url, expect=expect)
self.head(url, expect=expect)
response = self.get(url, expect=expect)
if expect != 200:
return
total = qs.count()
if limit is not None:
if limit > 0:
qs = qs[offset:offset + limit]
else:
qs = qs.none()
self.check_pagination_and_size(response, total, offset > 0,
limit and ((offset + limit) < total))
self.check_list_ids(response, qs, check_order)
if fields:
for obj in response['results']:
returned_fields = set(obj.keys())
expected_fields = set(fields)
msg = ''
not_expected = returned_fields - expected_fields
if not_expected:
msg += 'fields %s not expected ' % ', '.join(not_expected)
not_returned = expected_fields - returned_fields
if not_returned:
msg += 'fields %s not returned ' % ', '.join(not_returned)
self.assertTrue(set(obj.keys()) <= set(fields), msg)
def check_not_found(self, string, substr, description=None, word_boundary=False):
if word_boundary:
count = len(re.findall(r'\b%s\b' % re.escape(substr), string))
else:
count = string.find(substr)
if count == -1:
count = 0
msg = ''
if description:
msg = 'Test "%s".\n' % description
msg += '"%s" found in: "%s"' % (substr, string)
self.assertEqual(count, 0, msg)
def check_found(self, string, substr, count=-1, description=None, word_boundary=False):
if word_boundary:
count_actual = len(re.findall(r'\b%s\b' % re.escape(substr), string))
else:
count_actual = string.count(substr)
msg = ''
if description:
msg = 'Test "%s".\n' % description
if count == -1:
self.assertTrue(count_actual > 0)
else:
msg += 'Found %d occurances of "%s" instead of %d in: "%s"' % (count_actual, substr, count, string)
self.assertEqual(count_actual, count, msg)
def check_job_result(self, job, expected='successful', expect_stdout=True,
expect_traceback=False):
msg = u'job status is %s, expected %s' % (job.status, expected)
msg = u'%s\nargs:\n%s' % (msg, job.job_args)
msg = u'%s\nenv:\n%s' % (msg, job.job_env)
if job.result_traceback:
msg = u'%s\ngot traceback:\n%s' % (msg, job.result_traceback)
if job.result_stdout:
msg = u'%s\ngot stdout:\n%s' % (msg, job.result_stdout)
if isinstance(expected, (list, tuple)):
self.assertTrue(job.status in expected)
else:
self.assertEqual(job.status, expected, msg)
if expect_stdout:
self.assertTrue(job.result_stdout)
else:
self.assertTrue(job.result_stdout in ('', 'stdout capture is missing'),
u'expected no stdout, got:\n%s' %
job.result_stdout)
if expect_traceback:
self.assertTrue(job.result_traceback)
else:
self.assertFalse(job.result_traceback,
u'expected no traceback, got:\n%s' %
job.result_traceback)
class BaseTest(BaseTestMixin, django.test.TestCase):
'''
Base class for unit tests.
'''
class BaseTransactionTest(BaseTestMixin, django.test.TransactionTestCase):
'''
Base class for tests requiring transactions (or where the test database
needs to be accessed by subprocesses).
'''
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
ANSIBLE_TRANSPORT='local')
class BaseLiveServerTest(BaseTestMixin, django.test.LiveServerTestCase):
'''
Base class for tests requiring a live test server.
'''
def setUp(self):
super(BaseLiveServerTest, self).setUp()
settings.INTERNAL_API_URL = self.live_server_url
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
ANSIBLE_TRANSPORT='local',
DEBUG=True)
class BaseJobExecutionTest(BaseLiveServerTest):
'''
Base class for celery task tests.
'''

View File

@ -2,7 +2,7 @@ import pytest
from awx.api.versioning import reverse
from awx.main.models import UnifiedJob, ProjectUpdate, InventoryUpdate
from awx.main.tests.base import URI
from awx.main.tests.URI import URI
from awx.main.constants import ACTIVE_STATES

View File

@ -1,541 +0,0 @@
# Python
import uuid
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTestMixin
TEST_PLAYBOOK = '''- hosts: all
gather_facts: false
tasks:
- name: woohoo
command: test 1 = 1
'''
class BaseJobTestMixin(BaseTestMixin):
def _create_inventory(self, name, organization, created_by,
groups_hosts_dict):
'''Helper method for creating inventory with groups and hosts.'''
inventory = organization.inventories.create(
name=name,
created_by=created_by,
)
for group_name, host_names in groups_hosts_dict.items():
group = inventory.groups.create(
name=group_name,
created_by=created_by,
)
for host_name in host_names:
host = inventory.hosts.create(
name=host_name,
created_by=created_by,
)
group.hosts.add(host)
return inventory
def populate(self):
# Here's a little story about the AWX Bread Company, or ABC. They
# make machines that make bread - bakers, slicers, and packagers - and
# these machines are each controlled by a Linux boxes, which is in turn
# managed by AWX.
# Sue is the super user. You don't mess with Sue or you're toast. Ha.
self.user_sue = self.make_user('sue', super_user=True)
# There are three organizations in ABC using Ansible, since it's the
# best thing for dev ops automation since, well, sliced bread.
# Engineering - They design and build the machines.
self.org_eng = Organization.objects.create(
name='engineering',
created_by=self.user_sue,
)
# Support - They fix it when it's not working.
self.org_sup = Organization.objects.create(
name='support',
created_by=self.user_sue,
)
# Operations - They implement the production lines using the machines.
self.org_ops = Organization.objects.create(
name='operations',
created_by=self.user_sue,
)
# Alex is Sue's IT assistant who can also administer all of the
# organizations.
self.user_alex = self.make_user('alex')
self.org_eng.admin_role.members.add(self.user_alex)
self.org_sup.admin_role.members.add(self.user_alex)
self.org_ops.admin_role.members.add(self.user_alex)
# Bob is the head of engineering. He's an admin for engineering, but
# also a user within the operations organization (so he can see the
# results if things go wrong in production).
self.user_bob = self.make_user('bob')
self.org_eng.admin_role.members.add(self.user_bob)
self.org_ops.member_role.members.add(self.user_bob)
# Chuck is the lead engineer. He has full reign over engineering, but
# no other organizations.
self.user_chuck = self.make_user('chuck')
self.org_eng.admin_role.members.add(self.user_chuck)
# Doug is the other engineer working under Chuck. He can write
# playbooks and check them, but Chuck doesn't quite think he's ready to
# run them yet. Poor Doug.
self.user_doug = self.make_user('doug')
self.org_eng.member_role.members.add(self.user_doug)
# Juan is another engineer working under Chuck. He has a little more freedom
# to run playbooks but can't create job templates
self.user_juan = self.make_user('juan')
self.org_eng.member_role.members.add(self.user_juan)
# Hannibal is Chuck's right-hand man. Chuck usually has him create the job
# templates that the rest of the team will use
self.user_hannibal = self.make_user('hannibal')
self.org_eng.member_role.members.add(self.user_hannibal)
# Eve is the head of support. She can also see what goes on in
# operations to help them troubleshoot problems.
self.user_eve = self.make_user('eve')
self.org_sup.admin_role.members.add(self.user_eve)
self.org_ops.member_role.members.add(self.user_eve)
# Frank is the other support guy.
self.user_frank = self.make_user('frank')
self.org_sup.member_role.members.add(self.user_frank)
# Greg is the head of operations.
self.user_greg = self.make_user('greg')
self.org_ops.admin_role.members.add(self.user_greg)
# Holly is an operations engineer.
self.user_holly = self.make_user('holly')
self.org_ops.member_role.members.add(self.user_holly)
# Iris is another operations engineer.
self.user_iris = self.make_user('iris')
self.org_ops.member_role.members.add(self.user_iris)
# Randall and Billybob are new ops interns that ops uses to test
# their playbooks and inventory
self.user_randall = self.make_user('randall')
self.org_ops.member_role.members.add(self.user_randall)
# He works with Randall
self.user_billybob = self.make_user('billybob')
self.org_ops.member_role.members.add(self.user_billybob)
# Jim is the newest intern. He can login, but can't do anything quite yet
# except make everyone else fresh coffee.
self.user_jim = self.make_user('jim')
# There are three main projects, one each for the development, test and
# production branches of the playbook repository. All three orgs can
# use the production branch, support can use the production and testing
# branches, and operations can only use the production branch.
self.proj_dev = self.make_project('dev', 'development branch',
self.user_sue, TEST_PLAYBOOK)
self.org_eng.projects.add(self.proj_dev)
self.proj_test = self.make_project('test', 'testing branch',
self.user_sue, TEST_PLAYBOOK)
#self.org_eng.projects.add(self.proj_test) # No more multi org projects
self.org_sup.projects.add(self.proj_test)
self.proj_prod = self.make_project('prod', 'production branch',
self.user_sue, TEST_PLAYBOOK)
#self.org_eng.projects.add(self.proj_prod) # No more multi org projects
#self.org_sup.projects.add(self.proj_prod) # No more multi org projects
self.org_ops.projects.add(self.proj_prod)
# Operations also has 2 additional projects specific to the east/west
# production environments.
self.proj_prod_east = self.make_project('prod-east',
'east production branch',
self.user_sue, TEST_PLAYBOOK)
self.org_ops.projects.add(self.proj_prod_east)
self.proj_prod_west = self.make_project('prod-west',
'west production branch',
self.user_sue, TEST_PLAYBOOK)
self.org_ops.projects.add(self.proj_prod_west)
# The engineering organization has a set of servers to use for
# development and testing (2 bakers, 1 slicer, 1 packager).
self.inv_eng = self._create_inventory(
name='engineering environment',
organization=self.org_eng,
created_by=self.user_sue,
groups_hosts_dict={
'bakers': ['eng-baker1', 'eng-baker2'],
'slicers': ['eng-slicer1'],
'packagers': ['eng-packager1'],
},
)
# The support organization has a set of servers to use for
# testing and reproducing problems from operations (1 baker, 1 slicer,
# 1 packager).
self.inv_sup = self._create_inventory(
name='support environment',
organization=self.org_sup,
created_by=self.user_sue,
groups_hosts_dict={
'bakers': ['sup-baker1'],
'slicers': ['sup-slicer1'],
'packagers': ['sup-packager1'],
},
)
# The operations organization manages multiple sets of servers for the
# east and west production facilities.
self.inv_ops_east = self._create_inventory(
name='east production environment',
organization=self.org_ops,
created_by=self.user_sue,
groups_hosts_dict={
'bakers': ['east-baker%d' % n for n in range(1, 4)],
'slicers': ['east-slicer%d' % n for n in range(1, 3)],
'packagers': ['east-packager%d' % n for n in range(1, 3)],
},
)
self.inv_ops_west = self._create_inventory(
name='west production environment',
organization=self.org_ops,
created_by=self.user_sue,
groups_hosts_dict={
'bakers': ['west-baker%d' % n for n in range(1, 6)],
'slicers': ['west-slicer%d' % n for n in range(1, 4)],
'packagers': ['west-packager%d' % n for n in range(1, 3)],
},
)
# Operations is divided into teams to work on the east/west servers.
# Greg and Holly work on east, Greg and iris work on west.
self.team_ops_east = self.org_ops.teams.create(
name='easterners',
created_by=self.user_sue)
self.team_ops_east.member_role.children.add(self.proj_prod.admin_role)
self.team_ops_east.member_role.children.add(self.proj_prod_east.admin_role)
self.team_ops_east.member_role.members.add(self.user_greg)
self.team_ops_east.member_role.members.add(self.user_holly)
self.team_ops_west = self.org_ops.teams.create(
name='westerners',
created_by=self.user_sue)
self.team_ops_west.member_role.children.add(self.proj_prod.admin_role)
self.team_ops_west.member_role.children.add(self.proj_prod_west.admin_role)
self.team_ops_west.member_role.members.add(self.user_greg)
self.team_ops_west.member_role.members.add(self.user_iris)
# The south team is no longer active having been folded into the east team
# FIXME: This code can be removed (probably)
# - this case has been removed as we've gotten rid of the active flag, keeping
# code around in case this has ramifications on some test failures.. if
# you find this message and all tests are passing, then feel free to remove this
# - anoek 2016-03-10
#self.team_ops_south = self.org_ops.teams.create(
# name='southerners',
# created_by=self.user_sue,
# active=False,
#)
#self.team_ops_south.member_role.children.add(self.proj_prod.admin_role)
#self.team_ops_south.member_role.members.add(self.user_greg)
# The north team is going to be deleted
self.team_ops_north = self.org_ops.teams.create(
name='northerners',
created_by=self.user_sue,
)
self.team_ops_north.member_role.children.add(self.proj_prod.admin_role)
self.team_ops_north.member_role.members.add(self.user_greg)
# The testers team are interns that can only check playbooks but can't
# run them
self.team_ops_testers = self.org_ops.teams.create(
name='testers',
created_by=self.user_sue,
)
self.team_ops_testers.member_role.children.add(self.proj_prod.admin_role)
self.team_ops_testers.member_role.members.add(self.user_randall)
self.team_ops_testers.member_role.members.add(self.user_billybob)
# Each user has his/her own set of credentials.
from awx.main.tests.data.ssh import (TEST_SSH_KEY_DATA,
TEST_SSH_KEY_DATA_LOCKED,
TEST_SSH_KEY_DATA_UNLOCK)
self.cred_sue = Credential.objects.create(
username='sue',
password=TEST_SSH_KEY_DATA,
created_by=self.user_sue,
)
self.cred_sue.admin_role.members.add(self.user_sue)
self.cred_sue_ask = Credential.objects.create(
username='sue',
password='ASK',
created_by=self.user_sue,
)
self.cred_sue_ask.admin_role.members.add(self.user_sue)
self.cred_sue_ask_many = Credential.objects.create(
username='sue',
password='ASK',
become_method='sudo',
become_username='root',
become_password='ASK',
ssh_key_data=TEST_SSH_KEY_DATA_LOCKED,
ssh_key_unlock='ASK',
created_by=self.user_sue,
)
self.cred_sue_ask_many.admin_role.members.add(self.user_sue)
self.cred_bob = Credential.objects.create(
username='bob',
password='ASK',
created_by=self.user_sue,
)
self.cred_bob.use_role.members.add(self.user_bob)
self.cred_chuck = Credential.objects.create(
username='chuck',
ssh_key_data=TEST_SSH_KEY_DATA,
created_by=self.user_sue,
)
self.cred_chuck.use_role.members.add(self.user_chuck)
self.cred_doug = Credential.objects.create(
username='doug',
password='doug doesn\'t mind his password being saved. this '
'is why we dont\'t let doug actually run jobs.',
created_by=self.user_sue,
)
self.cred_doug.use_role.members.add(self.user_doug)
self.cred_eve = Credential.objects.create(
username='eve',
password='ASK',
become_method='sudo',
become_username='root',
become_password='ASK',
created_by=self.user_sue,
)
self.cred_eve.use_role.members.add(self.user_eve)
self.cred_frank = Credential.objects.create(
username='frank',
password='fr@nk the t@nk',
created_by=self.user_sue,
)
self.cred_frank.use_role.members.add(self.user_frank)
self.cred_greg = Credential.objects.create(
username='greg',
ssh_key_data=TEST_SSH_KEY_DATA_LOCKED,
ssh_key_unlock='ASK',
created_by=self.user_sue,
)
self.cred_greg.use_role.members.add(self.user_greg)
self.cred_holly = Credential.objects.create(
username='holly',
password='holly rocks',
created_by=self.user_sue,
)
self.cred_holly.use_role.members.add(self.user_holly)
self.cred_iris = Credential.objects.create(
username='iris',
password='ASK',
created_by=self.user_sue,
)
self.cred_iris.use_role.members.add(self.user_iris)
# Each operations team also has shared credentials they can use.
self.cred_ops_east = Credential.objects.create(
username='east',
ssh_key_data=TEST_SSH_KEY_DATA_LOCKED,
ssh_key_unlock=TEST_SSH_KEY_DATA_UNLOCK,
created_by = self.user_sue,
)
self.team_ops_east.member_role.children.add(self.cred_ops_east.use_role)
self.cred_ops_west = Credential.objects.create(
username='west',
password='Heading270',
created_by = self.user_sue,
)
self.team_ops_west.member_role.children.add(self.cred_ops_west.use_role)
# FIXME: This code can be removed (probably)
# - this case has been removed as we've gotten rid of the active flag, keeping
# code around in case this has ramifications on some test failures.. if
# you find this message and all tests are passing, then feel free to remove this
# - anoek 2016-03-10
#self.cred_ops_south = self.team_ops_south.credentials.create(
# username='south',
# password='Heading180',
# created_by = self.user_sue,
#)
self.cred_ops_north = Credential.objects.create(
username='north',
password='Heading0',
created_by = self.user_sue,
)
self.team_ops_north.member_role.children.add(self.cred_ops_north.admin_role)
self.cred_ops_test = Credential.objects.create(
username='testers',
password='HeadingNone',
created_by = self.user_sue,
)
self.team_ops_testers.member_role.children.add(self.cred_ops_test.use_role)
# Engineering has job templates to check/run the dev project onto
# their own inventory.
self.jt_eng_check = JobTemplate.objects.create(
name='eng-dev-check',
job_type='check',
inventory= self.inv_eng,
project=self.proj_dev,
playbook=self.proj_dev.playbooks[0],
host_config_key=uuid.uuid4().hex,
created_by=self.user_sue,
)
# self.job_eng_check = self.jt_eng_check.create_job(
# created_by=self.user_sue,
# credential=self.cred_doug,
# )
self.jt_eng_run = JobTemplate.objects.create(
name='eng-dev-run',
job_type='run',
inventory= self.inv_eng,
project=self.proj_dev,
playbook=self.proj_dev.playbooks[0],
host_config_key=uuid.uuid4().hex,
created_by=self.user_sue,
ask_credential_on_launch=True,
)
# self.job_eng_run = self.jt_eng_run.create_job(
# created_by=self.user_sue,
# credential=self.cred_chuck,
# )
# Support has job templates to check/run the test project onto
# their own inventory.
self.jt_sup_check = JobTemplate.objects.create(
name='sup-test-check',
job_type='check',
inventory= self.inv_sup,
project=self.proj_test,
playbook=self.proj_test.playbooks[0],
host_config_key=uuid.uuid4().hex,
created_by=self.user_sue,
)
# self.job_sup_check = self.jt_sup_check.create_job(
# created_by=self.user_sue,
# credential=self.cred_frank,
# )
self.jt_sup_run = JobTemplate.objects.create(
name='sup-test-run',
job_type='run',
inventory= self.inv_sup,
project=self.proj_test,
playbook=self.proj_test.playbooks[0],
host_config_key=uuid.uuid4().hex,
credential=self.cred_eve,
created_by=self.user_sue,
)
# self.job_sup_run = self.jt_sup_run.create_job(
# created_by=self.user_sue,
# )
# Operations has job templates to check/run the prod project onto
# both east and west inventories, by default using the team credential.
self.jt_ops_east_check = JobTemplate.objects.create(
name='ops-east-prod-check',
job_type='check',
inventory= self.inv_ops_east,
project=self.proj_prod,
playbook=self.proj_prod.playbooks[0],
credential=self.cred_ops_east,
host_config_key=uuid.uuid4().hex,
created_by=self.user_sue,
)
# self.job_ops_east_check = self.jt_ops_east_check.create_job(
# created_by=self.user_sue,
# )
self.jt_ops_east_run = JobTemplate.objects.create(
name='ops-east-prod-run',
job_type='run',
inventory= self.inv_ops_east,
project=self.proj_prod,
playbook=self.proj_prod.playbooks[0],
credential=self.cred_ops_east,
host_config_key=uuid.uuid4().hex,
created_by=self.user_sue,
)
self.jt_ops_east_run_prod_east = JobTemplate.objects.create(
name='ops-east-prod-run-on-prod-east',
job_type='run',
inventory= self.inv_ops_east,
project=self.proj_prod_east,
playbook=self.proj_prod_east.playbooks[0],
credential=self.cred_ops_east,
host_config_key=uuid.uuid4().hex,
created_by=self.user_sue,
)
# self.job_ops_east_run = self.jt_ops_east_run.create_job(
# created_by=self.user_sue,
# )
self.jt_ops_west_check = JobTemplate.objects.create(
name='ops-west-prod-check',
job_type='check',
inventory= self.inv_ops_west,
project=self.proj_prod,
playbook=self.proj_prod.playbooks[0],
credential=self.cred_ops_west,
host_config_key=uuid.uuid4().hex,
created_by=self.user_sue,
)
self.jt_ops_west_check_test_team = JobTemplate.objects.create(
name='ops-west-prod-check-testers',
job_type='check',
inventory= self.inv_ops_west,
project=self.proj_prod,
playbook=self.proj_prod.playbooks[0],
credential=self.cred_ops_test,
host_config_key=uuid.uuid4().hex,
created_by=self.user_sue,
)
# self.job_ops_west_check = self.jt_ops_west_check.create_job(
# created_by=self.user_sue,
# )
self.jt_ops_west_run = JobTemplate.objects.create(
name='ops-west-prod-run',
job_type='run',
inventory= self.inv_ops_west,
project=self.proj_prod,
playbook=self.proj_prod.playbooks[0],
credential=self.cred_ops_west,
host_config_key=uuid.uuid4().hex,
created_by=self.user_sue,
)
# self.job_ops_west_run = self.jt_ops_west_run.create_job(
# created_by=self.user_sue,
# )
def setUp(self):
super(BaseJobTestMixin, self).setUp()
self.start_rabbit()
self.setup_instances()
self.populate()
self.start_queue()
def tearDown(self):
super(BaseJobTestMixin, self).tearDown()
self.stop_rabbit()
self.terminate_queue()