refactor and fix unit tests

* fixup task TestGenericRun
* make runner callback functions accessable to testing
* reduce isintance() usage in run() by using build_ pattern
* move process_isolation param building to build_ function so it can be
tested
This commit is contained in:
chris meyers
2019-03-13 09:30:36 -04:00
parent 827ad0fa75
commit f7842cf283
2 changed files with 265 additions and 293 deletions

View File

@@ -43,11 +43,45 @@ from awx.main.utils import encrypt_field, encrypt_value, OutputEventFilter
from awx.main.utils.safe_yaml import SafeLoader
@contextmanager
def apply_patches(_patches):
[p.start() for p in _patches]
yield
[p.stop() for p in _patches]
class TestJobExecution(object):
pass
@pytest.fixture
def private_data_dir():
private_data = tempfile.mkdtemp(prefix='awx_')
yield private_data
shutil.rmtree(private_data, True)
@pytest.fixture
def patch_Job():
with mock.patch.object(Job, 'cloud_credentials') as mock_cred:
mock_cred.__get__ = lambda *args, **kwargs: []
with mock.patch.object(Job, 'network_credentials') as mock_net:
mock_net.__get__ = lambda *args, **kwargs: []
yield
@pytest.fixture
def job():
return Job(pk=1, id=1, project=Project(), inventory=Inventory())
@pytest.fixture
def update_model_wrapper(job):
def fn(pk, **kwargs):
for k, v in kwargs.items():
setattr(job, k, v)
return job
return fn
@pytest.fixture
def patch_CallbackQueueDispatcher():
with mock.patch('awx.main.tasks.CallbackQueueDispatcher') as m:
m.return_value = m
yield m
def test_send_notifications_not_list():
@@ -209,109 +243,6 @@ def parse_extra_vars(args):
return extra_vars
class TestJobExecution(object):
"""
For job runs, test that `ansible-playbook` is invoked with the proper
arguments, environment variables, and pexpect passwords for a variety of
credential types.
"""
TASK_CLS = tasks.RunJob
EXAMPLE_PRIVATE_KEY = '-----BEGIN PRIVATE KEY-----\nxyz==\n-----END PRIVATE KEY-----'
INVENTORY_DATA = {
"all": {"hosts": ["localhost"]},
"_meta": {"localhost": {"ansible_connection": "local"}}
}
def setup_method(self, method):
if not os.path.exists(settings.PROJECTS_ROOT):
os.mkdir(settings.PROJECTS_ROOT)
self.project_path = tempfile.mkdtemp(prefix='awx_project_')
with open(os.path.join(self.project_path, 'helloworld.yml'), 'w') as f:
f.write('---')
# The primary goal of these tests is to mock our `run_pexpect` call
# and make assertions about the arguments and environment passed to it.
self.run_pexpect = mock.Mock()
self.run_pexpect.return_value = ['successful', 0]
self.patches = [
mock.patch.object(CallbackQueueDispatcher, 'dispatch', lambda self, obj: None),
mock.patch.object(Project, 'get_project_path', lambda *a, **kw: self.project_path),
# don't emit websocket statuses; they use the DB and complicate testing
mock.patch.object(UnifiedJob, 'websocket_emit_status', mock.Mock()),
mock.patch('awx.main.expect.run.run_pexpect', self.run_pexpect),
]
for cls in (Job, AdHocCommand):
self.patches.append(
mock.patch.object(cls, 'inventory', mock.Mock(
pk=1,
get_script_data=lambda *args, **kw: self.INVENTORY_DATA,
spec_set=['pk', 'get_script_data']
))
)
for p in self.patches:
p.start()
self.instance = self.get_instance()
def status_side_effect(pk, **kwargs):
# If `Job.update_model` is called, we're not actually persisting
# to the database; just update the status, which is usually
# the update we care about for testing purposes
if 'status' in kwargs:
self.instance.status = kwargs['status']
if 'job_env' in kwargs:
self.instance.job_env = kwargs['job_env']
return self.instance
self.task = self.TASK_CLS()
self.task.update_model = mock.Mock(side_effect=status_side_effect)
# ignore pre-run and post-run hooks, they complicate testing in a variety of ways
self.task.pre_run_hook = self.task.post_run_hook = self.task.final_run_hook = mock.Mock()
def teardown_method(self, method):
for p in self.patches:
p.stop()
shutil.rmtree(self.project_path, True)
def get_instance(self):
job = Job(
pk=1,
created=datetime.utcnow(),
status='new',
job_type='run',
cancel_flag=False,
project=Project(),
playbook='helloworld.yml',
verbosity=3,
job_template=JobTemplate(extra_vars='')
)
# mock the job.credentials M2M relation so we can avoid DB access
job._credentials = []
patch = mock.patch.object(UnifiedJob, 'credentials', mock.Mock(**{
'all': lambda: job._credentials,
'add': job._credentials.append,
'filter.return_value': mock.Mock(
__iter__ = lambda *args: iter(job._credentials),
first = lambda: job._credentials[0]
),
'spec_set': ['all', 'add', 'filter']
}))
self.patches.append(patch)
patch.start()
job.project = Project(organization=Organization())
return job
@property
def pk(self):
return self.instance.pk
class TestExtraVarSanitation(TestJobExecution):
# By default, extra vars are marked as `!unsafe` in the generated yaml
# _unless_ they've been specified on the JobTemplate's extra_vars (which
@@ -439,34 +370,73 @@ class TestExtraVarSanitation(TestJobExecution):
class TestGenericRun(TestJobExecution):
def test_generic_failure(self):
self.task.build_private_data_files = mock.Mock(side_effect=OSError())
def test_generic_failure(self, patch_Job):
job = Job(status='running', inventory=Inventory())
job.websocket_emit_status = mock.Mock()
task = tasks.RunJob()
task.update_model = mock.Mock(return_value=job)
task.build_private_data_files = mock.Mock(side_effect=OSError())
with pytest.raises(Exception):
self.task.run(self.pk)
update_model_call = self.task.update_model.call_args[1]
task.run(1)
update_model_call = task.update_model.call_args[1]
assert 'OSError' in update_model_call['result_traceback']
assert update_model_call['status'] == 'error'
assert update_model_call['emitted_events'] == 0
def test_cancel_flag(self):
self.instance.cancel_flag = True
def test_cancel_flag(self, job, update_model_wrapper):
job.status = 'running'
job.cancel_flag = True
job.websocket_emit_status = mock.Mock()
task = tasks.RunJob()
task.update_model = mock.Mock(wraps=update_model_wrapper)
task.build_private_data_files = mock.Mock()
with pytest.raises(Exception):
self.task.run(self.pk)
task.run(1)
for c in [
mock.call(self.pk, status='running', start_args=''),
mock.call(self.pk, status='canceled')
mock.call(1, status='running', start_args=''),
mock.call(1, status='canceled')
]:
assert c in self.task.update_model.call_args_list
assert c in task.update_model.call_args_list
def test_event_count(self):
with mock.patch.object(self.task, 'get_stdout_handle') as mock_stdout:
handle = OutputEventFilter(lambda event_data: None)
handle._counter = 334
mock_stdout.return_value = handle
self.task.run(self.pk)
def test_event_count(self, patch_CallbackQueueDispatcher):
task = tasks.RunJob()
task.instance = Job()
task.event_ct = 0
event_data = {}
assert self.task.update_model.call_args[-1]['emitted_events'] == 334
[task.event_handler(event_data) for i in range(20)]
assert 20 == task.event_ct
def test_finished_callback_eof(self, patch_CallbackQueueDispatcher):
task = tasks.RunJob()
task.instance = Job(pk=1, id=1)
task.event_ct = 17
task.finished_callback(None)
patch_CallbackQueueDispatcher.dispatch.assert_called_with({'event': 'EOF', 'final_counter': 17, 'job_id': 1})
def test_save_job_metadata(self, job, update_model_wrapper):
class MockMe():
pass
task = tasks.RunJob()
task.instance = job
task.update_model = mock.Mock(wraps=update_model_wrapper)
runner_config = MockMe()
runner_config.command = {'foo': 'bar'}
runner_config.cwd = '/foobar'
runner_config.env = { 'switch': 'blade', 'foot': 'ball' }
task.status_handler({'status': 'starting'}, runner_config)
task.update_model.assert_called_with(1, job_args=json.dumps({'foo': 'bar'}),
job_cwd='/foobar', job_env={'switch': 'blade', 'foot': 'ball'})
'''
def test_artifact_cleanup(self):
path = tempfile.NamedTemporaryFile(delete=False).name
try:
@@ -477,119 +447,109 @@ class TestGenericRun(TestJobExecution):
finally:
if os.path.exists(path):
os.remove(path)
'''
def test_uses_bubblewrap(self):
self.task.run(self.pk)
def test_uses_process_isolation(self, settings):
job = Job(project=Project(), inventory=Inventory())
task = tasks.RunJob()
task.should_use_proot = lambda instance: True
assert self.run_pexpect.call_count == 1
call_args, _ = self.run_pexpect.call_args_list[0]
args, cwd, env, stdout = call_args
assert args[0] == 'bwrap'
private_data_dir = '/foo'
cwd = '/bar'
def test_bwrap_virtualenvs_are_readonly(self):
self.task.run(self.pk)
settings.AWX_PROOT_HIDE_PATHS = ['/AWX_PROOT_HIDE_PATHS1', '/AWX_PROOT_HIDE_PATHS2']
settings.ANSIBLE_VENV_PATH = '/ANSIBLE_VENV_PATH'
settings.AWX_VENV_PATH = '/AWX_VENV_PATH'
assert self.run_pexpect.call_count == 1
call_args, _ = self.run_pexpect.call_args_list[0]
args, cwd, env, stdout = call_args
assert '--ro-bind %s %s' % (settings.ANSIBLE_VENV_PATH, settings.ANSIBLE_VENV_PATH) in ' '.join(args) # noqa
assert '--ro-bind %s %s' % (settings.AWX_VENV_PATH, settings.AWX_VENV_PATH) in ' '.join(args) # noqa
process_isolation_params = task.build_params_process_isolation(job, private_data_dir, cwd)
assert True is process_isolation_params['process_isolation']
assert settings.AWX_PROOT_BASE_PATH == process_isolation_params['process_isolation_path'], \
"Directory where a temp directory will be created for the remapping to take place"
assert private_data_dir in process_isolation_params['process_isolation_show_paths'], \
"The per-job private data dir should be in the list of directories the user can see."
assert cwd in process_isolation_params['process_isolation_show_paths'], \
"The current working directory should be in the list of directories the user can see."
for p in [settings.AWX_PROOT_BASE_PATH,
'/etc/tower',
'/var/lib/awx',
'/var/log',
settings.PROJECTS_ROOT,
settings.JOBOUTPUT_ROOT,
'/AWX_PROOT_HIDE_PATHS1',
'/AWX_PROOT_HIDE_PATHS2']:
assert p in process_isolation_params['process_isolation_hide_paths']
assert 8 == len(process_isolation_params['process_isolation_hide_paths'])
assert '/ANSIBLE_VENV_PATH' in process_isolation_params['process_isolation_ro_paths']
assert '/AWX_VENV_PATH' in process_isolation_params['process_isolation_ro_paths']
assert 2 == len(process_isolation_params['process_isolation_ro_paths'])
def test_created_by_extra_vars(self):
self.instance.created_by = User(pk=123, username='angry-spud')
job = Job(created_by=User(pk=123, username='angry-spud'))
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
extra_vars = parse_extra_vars(args)
assert extra_vars['tower_user_id'] == 123
assert extra_vars['tower_user_name'] == "angry-spud"
assert extra_vars['awx_user_id'] == 123
assert extra_vars['awx_user_name'] == "angry-spud"
return ['successful', 0]
task = tasks.RunJob()
task._write_extra_vars_file = mock.Mock()
task.build_extra_vars_file(job, None, dict())
self.run_pexpect.side_effect = run_pexpect_side_effect
self.task.run(self.pk)
call_args, _ = task._write_extra_vars_file.call_args_list[0]
private_data_dir, extra_vars, safe_dict = call_args
assert extra_vars['tower_user_id'] == 123
assert extra_vars['tower_user_name'] == "angry-spud"
assert extra_vars['awx_user_id'] == 123
assert extra_vars['awx_user_name'] == "angry-spud"
def test_survey_extra_vars(self):
self.instance.extra_vars = json.dumps({
job = Job()
job.extra_vars = json.dumps({
'super_secret': encrypt_value('CLASSIFIED', pk=None)
})
self.instance.survey_passwords = {
job.survey_passwords = {
'super_secret': '$encrypted$'
}
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
extra_vars = parse_extra_vars(args)
assert extra_vars['super_secret'] == "CLASSIFIED"
return ['successful', 0]
task = tasks.RunJob()
task._write_extra_vars_file = mock.Mock()
task.build_extra_vars_file(job, None, dict())
self.run_pexpect.side_effect = run_pexpect_side_effect
self.task.run(self.pk)
call_args, _ = task._write_extra_vars_file.call_args_list[0]
private_data_dir, extra_vars, safe_dict = call_args
assert extra_vars['super_secret'] == "CLASSIFIED"
def test_awx_task_env(self, patch_Job, private_data_dir):
job = Job(project=Project(), inventory=Inventory())
task = tasks.RunJob()
task._write_extra_vars_file = mock.Mock()
def test_awx_task_env(self):
with mock.patch('awx.main.tasks.settings.AWX_TASK_ENV', {'FOO': 'BAR'}):
self.task.run(self.pk)
assert self.run_pexpect.call_count == 1
call_args, _ = self.run_pexpect.call_args_list[0]
args, cwd, env, stdout = call_args
env = task.build_env(job, private_data_dir)
assert env['FOO'] == 'BAR'
def test_valid_custom_virtualenv(self):
def test_valid_custom_virtualenv(self, patch_Job, private_data_dir):
job = Job(project=Project(), inventory=Inventory())
with TemporaryDirectory(dir=settings.BASE_VENV_PATH) as tempdir:
self.instance.project.custom_virtualenv = tempdir
job.project.custom_virtualenv = tempdir
os.makedirs(os.path.join(tempdir, 'lib'))
os.makedirs(os.path.join(tempdir, 'bin', 'activate'))
self.task.run(self.pk)
assert self.run_pexpect.call_count == 1
call_args, _ = self.run_pexpect.call_args_list[0]
args, cwd, env, stdout = call_args
task = tasks.RunJob()
env = task.build_env(job, private_data_dir)
assert env['PATH'].startswith(os.path.join(tempdir, 'bin'))
assert env['VIRTUAL_ENV'] == tempdir
for path in (settings.ANSIBLE_VENV_PATH, tempdir):
assert '--ro-bind {} {}'.format(path, path) in ' '.join(args)
def test_invalid_custom_virtualenv(self):
with pytest.raises(Exception):
self.instance.project.custom_virtualenv = '/venv/missing'
self.task.run(self.pk)
tb = self.task.update_model.call_args[-1]['result_traceback']
assert 'a valid Python virtualenv does not exist at /venv/missing' in tb
def test_invalid_custom_virtualenv(self, patch_Job, private_data_dir):
job = Job(project=Project(), inventory=Inventory())
job.project.custom_virtualenv = '/venv/missing'
task = tasks.RunJob()
def test_fact_cache_usage(self):
self.instance.use_fact_cache = True
with pytest.raises(RuntimeError) as e:
env = task.build_env(job, private_data_dir)
start_mock = mock.Mock()
patch = mock.patch.object(Job, 'start_job_fact_cache', start_mock)
self.patches.append(patch)
patch.start()
self.task.run(self.pk)
call_args, _ = self.run_pexpect.call_args_list[0]
args, cwd, env, stdout = call_args
start_mock.assert_called_once()
tmpdir, _ = start_mock.call_args[0]
assert env['ANSIBLE_CACHE_PLUGIN'] == 'jsonfile'
assert env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] == os.path.join(tmpdir, 'facts')
@pytest.mark.parametrize('task_env, ansible_library_env', [
[{}, '/awx_devel/awx/plugins/library'],
[{'ANSIBLE_LIBRARY': '/foo/bar'}, '/foo/bar:/awx_devel/awx/plugins/library'],
])
def test_fact_cache_usage_with_ansible_library(self, task_env, ansible_library_env):
self.instance.use_fact_cache = True
with mock.patch('awx.main.tasks.settings.AWX_TASK_ENV', task_env):
start_mock = mock.Mock()
with mock.patch.object(Job, 'start_job_fact_cache', start_mock):
self.task.run(self.pk)
call_args, _ = self.run_pexpect.call_args_list[0]
args, cwd, env, stdout = call_args
assert env['ANSIBLE_LIBRARY'] == ansible_library_env
assert 'a valid Python virtualenv does not exist at /venv/missing' == str(e.value)
class TestAdhocRun(TestJobExecution):