diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 204ef80352..5b2fed17d8 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -1,17 +1,29 @@ from contextlib import contextmanager +from datetime import datetime +from functools import partial +import ConfigParser +import tempfile import pytest import yaml import mock from awx.main.models import ( - UnifiedJob, + Credential, + CredentialType, + Inventory, + InventorySource, + InventoryUpdate, + Job, Notification, - ProjectUpdate + Project, + ProjectUpdate, + UnifiedJob, ) from awx.main import tasks from awx.main.task_engine import TaskEnhancer +from awx.main.utils.common import encrypt_field @contextmanager @@ -152,3 +164,682 @@ def test_openstack_client_config_generation_with_private_source_vars(mocker, sou 'private': expected } } + + +def pytest_generate_tests(metafunc): + # pytest.mark.parametrize doesn't work on unittest.TestCase methods + # see: https://docs.pytest.org/en/latest/example/parametrize.html#parametrizing-test-methods-through-per-class-configuration + if metafunc.cls and hasattr(metafunc.cls, 'parametrize'): + funcarglist = metafunc.cls.parametrize.get(metafunc.function.__name__) + if funcarglist: + argnames = sorted(funcarglist[0]) + metafunc.parametrize( + argnames, + [[funcargs[name] for name in argnames] for funcargs in funcarglist] + ) + + +class TestJobExecution: + """ + For job runs, test that `ansible-playbook` is invoked with the proper + arguments, environment variables, and pexpect passwords for a variety of + credential types. + """ + + TASK_CLS = tasks.RunJob + EXAMPLE_PRIVATE_KEY = '-----BEGIN PRIVATE KEY-----\nxyz==\n-----END PRIVATE KEY-----' + + def setup_method(self, method): + self.patches = [ + mock.patch.object(Project, 'get_project_path', lambda *a, **kw: '/tmp/'), + # don't emit websocket statuses; they use the DB and complicate testing + mock.patch.object(UnifiedJob, 'websocket_emit_status', mock.Mock()), + mock.patch.object(Job, 'inventory', mock.Mock(pk=1, spec_set=['pk'])) + ] + for p in self.patches: + p.start() + + self.instance = self.get_instance() + + def status_side_effect(pk, **kwargs): + # If `Job.update_model` is called, we're not actually persisting + # to the database; just update the status, which is usually + # the update we care about for testing purposes + if 'status' in kwargs: + self.instance.status = kwargs['status'] + return self.instance + + self.task = self.TASK_CLS() + self.task.update_model = mock.Mock(side_effect=status_side_effect) + + # The primary goal of these tests is to mock our `run_pexpect` call + # and make assertions about the arguments and environment passed to it. + self.task.run_pexpect = mock.Mock(return_value=['successful', 0]) + + # ignore pre-run and post-run hooks, they complicate testing in a variety of ways + self.task.pre_run_hook = self.task.post_run_hook = mock.Mock() + + def teardown_method(self, method): + for p in self.patches: + p.stop() + + def get_instance(self): + return Job( + pk=1, + created=datetime.utcnow(), + status='new', + job_type='run', + cancel_flag=False, + credential=None, + cloud_credential=None, + network_credential=None, + project=Project() + ) + + @property + def pk(self): + return self.instance.pk + + +class TestGenericRun(TestJobExecution): + + def test_cancel_flag(self): + self.instance.cancel_flag = True + with pytest.raises(Exception): + self.task.run(self.pk) + for c in [ + mock.call(self.pk, celery_task_id='', status='running'), + mock.call(self.pk, output_replacements=[], result_traceback=mock.ANY, status='canceled') + ]: + assert c in self.task.update_model.call_args_list + + def test_uses_bubblewrap(self): + self.task.run(self.pk) + + assert self.task.run_pexpect.call_count == 1 + call_args, _ = self.task.run_pexpect.call_args_list[0] + job, args, cwd, env, passwords, stdout = call_args + assert args[0] == 'bwrap' + + +class TestJobCredentials(TestJobExecution): + + parametrize = { + 'test_ssh_passwords': [ + dict(field='password', password_name='ssh_password', expected_flag='--ask-pass'), + dict(field='ssh_key_unlock', password_name='ssh_key_unlock', expected_flag=None), + dict(field='become_password', password_name='become_password', expected_flag='--ask-become-pass'), + dict(field='vault_password', password_name='vault_password', expected_flag='--ask-vault-pass'), + ] + } + + def test_ssh_passwords(self, field, password_name, expected_flag): + ssh = CredentialType.defaults['ssh']() + self.instance.credential = Credential( + credential_type=ssh, + inputs = {'username': 'bob', field: 'secret'} + ) + self.instance.credential.inputs[field] = encrypt_field( + self.instance.credential, field + ) + self.task.run(self.pk) + + assert self.task.run_pexpect.call_count == 1 + call_args, _ = self.task.run_pexpect.call_args_list[0] + job, args, cwd, env, passwords, stdout = call_args + + assert passwords[password_name] == 'secret' + assert '-u bob' in ' '.join(args) + if expected_flag: + assert expected_flag in ' '.join(args) + + def test_ssh_key_with_agent(self): + ssh = CredentialType.defaults['ssh']() + self.instance.credential = Credential( + credential_type=ssh, + inputs = { + 'username': 'bob', + 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY + } + ) + self.instance.credential.inputs['ssh_key_data'] = encrypt_field( + self.instance.credential, 'ssh_key_data' + ) + + def run_pexpect_side_effect(private_data, *args, **kwargs): + job, args, cwd, env, passwords, stdout = args + ssh_key_data_fifo = '/'.join([private_data, 'credential']) + assert open(ssh_key_data_fifo, 'r').read() == self.EXAMPLE_PRIVATE_KEY + assert ' '.join(args).startswith( + 'ssh-agent -a %s sh -c ssh-add %s && rm -f %s' % ( + '/'.join([private_data, 'ssh_auth.sock']), + ssh_key_data_fifo, + ssh_key_data_fifo + ) + ) + return ['successful', 0] + + private_data = tempfile.mkdtemp(prefix='ansible_tower_') + self.task.build_private_data_dir = mock.Mock(return_value=private_data) + self.task.run_pexpect = mock.Mock( + side_effect=partial(run_pexpect_side_effect, private_data) + ) + self.task.run(self.pk, private_data_dir=private_data) + + def test_aws_cloud_credential(self): + aws = CredentialType.defaults['aws']() + self.instance.cloud_credential = Credential( + credential_type=aws, + inputs = {'username': 'bob', 'password': 'secret'} + ) + self.instance.cloud_credential.inputs['password'] = encrypt_field( + self.instance.cloud_credential, 'password' + ) + self.task.run(self.pk) + + assert self.task.run_pexpect.call_count == 1 + call_args, _ = self.task.run_pexpect.call_args_list[0] + job, args, cwd, env, passwords, stdout = call_args + + assert env['AWS_ACCESS_KEY'] == 'bob' + assert env['AWS_SECRET_KEY'] == 'secret' + assert 'AWS_SECURITY_TOKEN' not in env + + def test_aws_cloud_credential_with_sts_token(self): + aws = CredentialType.defaults['aws']() + self.instance.cloud_credential = Credential( + credential_type=aws, + inputs = {'username': 'bob', 'password': 'secret', 'security_token': 'token'} + ) + for key in ('password', 'security_token'): + self.instance.cloud_credential.inputs[key] = encrypt_field( + self.instance.cloud_credential, key + ) + self.task.run(self.pk) + + assert self.task.run_pexpect.call_count == 1 + call_args, _ = self.task.run_pexpect.call_args_list[0] + job, args, cwd, env, passwords, stdout = call_args + + assert env['AWS_ACCESS_KEY'] == 'bob' + assert env['AWS_SECRET_KEY'] == 'secret' + assert env['AWS_SECURITY_TOKEN'] == 'token' + + def test_rax_credential(self): + rax = CredentialType.defaults['rackspace']() + self.instance.cloud_credential = Credential( + credential_type=rax, + inputs = {'username': 'bob', 'password': 'secret'} + ) + self.instance.cloud_credential.inputs['password'] = encrypt_field( + self.instance.cloud_credential, 'password' + ) + self.task.run(self.pk) + + assert self.task.run_pexpect.call_count == 1 + call_args, _ = self.task.run_pexpect.call_args_list[0] + job, args, cwd, env, passwords, stdout = call_args + + assert env['RAX_USERNAME'] == 'bob' + assert env['RAX_API_KEY'] == 'secret' + assert env['CLOUD_VERIFY_SSL'] == 'False' + + def test_gce_credentials(self): + gce = CredentialType.defaults['gce']() + self.instance.cloud_credential = Credential( + credential_type=gce, + inputs = { + 'username': 'bob', + 'project': 'some-project', + 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY + } + ) + self.instance.cloud_credential.inputs['ssh_key_data'] = encrypt_field( + self.instance.cloud_credential, 'ssh_key_data' + ) + + def run_pexpect_side_effect(*args, **kwargs): + job, args, cwd, env, passwords, stdout = args + assert env['GCE_EMAIL'] == 'bob' + assert env['GCE_PROJECT'] == 'some-project' + ssh_key_data = env['GCE_PEM_FILE_PATH'] + assert open(ssh_key_data, 'rb').read() == self.EXAMPLE_PRIVATE_KEY + return ['successful', 0] + + self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.task.run(self.pk) + + def test_azure_credentials(self): + azure = CredentialType.defaults['azure']() + self.instance.cloud_credential = Credential( + credential_type=azure, + inputs = { + 'username': 'bob', + 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY + } + ) + self.instance.cloud_credential.inputs['ssh_key_data'] = encrypt_field( + self.instance.cloud_credential, 'ssh_key_data' + ) + + def run_pexpect_side_effect(*args, **kwargs): + job, args, cwd, env, passwords, stdout = args + assert env['AZURE_SUBSCRIPTION_ID'] == 'bob' + ssh_key_data = env['AZURE_CERT_PATH'] + assert open(ssh_key_data, 'rb').read() == self.EXAMPLE_PRIVATE_KEY + return ['successful', 0] + + self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.task.run(self.pk) + + def test_azure_rm_with_tenant(self): + azure = CredentialType.defaults['azure_rm']() + self.instance.cloud_credential = Credential( + credential_type=azure, + inputs = { + 'client': 'some-client', + 'secret': 'some-secret', + 'tenant': 'some-tenant', + 'subscription': 'some-subscription' + } + ) + self.instance.cloud_credential.inputs['secret'] = encrypt_field( + self.instance.cloud_credential, 'secret' + ) + + self.task.run(self.pk) + + assert self.task.run_pexpect.call_count == 1 + call_args, _ = self.task.run_pexpect.call_args_list[0] + job, args, cwd, env, passwords, stdout = call_args + + assert env['AZURE_CLIENT_ID'] == 'some-client' + assert env['AZURE_SECRET'] == 'some-secret' + assert env['AZURE_TENANT'] == 'some-tenant' + assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription' + + def test_azure_rm_with_password(self): + azure = CredentialType.defaults['azure_rm']() + self.instance.cloud_credential = Credential( + credential_type=azure, + inputs = { + 'subscription': 'some-subscription', + 'username': 'bob', + 'password': 'secret' + } + ) + self.instance.cloud_credential.inputs['password'] = encrypt_field( + self.instance.cloud_credential, 'password' + ) + + self.task.run(self.pk) + + assert self.task.run_pexpect.call_count == 1 + call_args, _ = self.task.run_pexpect.call_args_list[0] + job, args, cwd, env, passwords, stdout = call_args + + assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription' + assert env['AZURE_AD_USER'] == 'bob' + assert env['AZURE_PASSWORD'] == 'secret' + + def test_vmware_credentials(self): + vmware = CredentialType.defaults['vmware']() + self.instance.cloud_credential = Credential( + credential_type=vmware, + inputs = {'username': 'bob', 'password': 'secret', 'host': 'https://example.org'} + ) + self.instance.cloud_credential.inputs['password'] = encrypt_field( + self.instance.cloud_credential, 'password' + ) + self.task.run(self.pk) + + assert self.task.run_pexpect.call_count == 1 + call_args, _ = self.task.run_pexpect.call_args_list[0] + job, args, cwd, env, passwords, stdout = call_args + + assert env['VMWARE_USER'] == 'bob' + assert env['VMWARE_PASSWORD'] == 'secret' + assert env['VMWARE_HOST'] == 'https://example.org' + + def test_openstack_credentials(self): + openstack = CredentialType.defaults['openstack']() + self.instance.cloud_credential = Credential( + credential_type=openstack, + inputs = { + 'username': 'bob', + 'password': 'secret', + 'project': 'tenant-name', + 'host': 'https://keystone.example.org' + } + ) + self.instance.cloud_credential.inputs['password'] = encrypt_field( + self.instance.cloud_credential, 'password' + ) + + def run_pexpect_side_effect(*args, **kwargs): + job, args, cwd, env, passwords, stdout = args + shade_config = open(env['OS_CLIENT_CONFIG_FILE'], 'rb').read() + assert shade_config == '\n'.join([ + 'clouds:', + ' devstack:', + ' auth:', + ' auth_url: https://keystone.example.org', + ' password: secret', + ' project_name: tenant-name', + ' username: bob', + '' + ]) + return ['successful', 0] + + self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.task.run(self.pk) + + def test_net_credentials(self): + net = CredentialType.defaults['net']() + self.instance.network_credential = Credential( + credential_type=net, + inputs = { + 'username': 'bob', + 'password': 'secret', + 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY, + 'authorize': True, + 'authorize_password': 'authorizeme' + } + ) + for field in ('password', 'ssh_key_data', 'authorize_password'): + self.instance.network_credential.inputs[field] = encrypt_field( + self.instance.network_credential, field + ) + + def run_pexpect_side_effect(*args, **kwargs): + job, args, cwd, env, passwords, stdout = args + assert env['ANSIBLE_NET_USERNAME'] == 'bob' + assert env['ANSIBLE_NET_PASSWORD'] == 'secret' + assert env['ANSIBLE_NET_AUTHORIZE'] == '1' + assert env['ANSIBLE_NET_AUTH_PASS'] == 'authorizeme' + assert open(env['ANSIBLE_NET_SSH_KEYFILE'], 'rb').read() == self.EXAMPLE_PRIVATE_KEY + return ['successful', 0] + + self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.task.run(self.pk) + + +class TestProjectUpdateCredentials(TestJobExecution): + + TASK_CLS = tasks.RunProjectUpdate + + def get_instance(self): + return ProjectUpdate( + pk=1, + project=Project() + ) + + parametrize = { + 'test_username_and_password_auth': [ + dict(scm_type='git'), + dict(scm_type='hg'), + dict(scm_type='svn'), + ], + 'test_ssh_key_auth': [ + dict(scm_type='git'), + dict(scm_type='hg'), + dict(scm_type='svn'), + ] + } + + def test_username_and_password_auth(self, scm_type): + ssh = CredentialType.defaults['ssh']() + self.instance.scm_type = scm_type + self.instance.credential = Credential( + credential_type=ssh, + inputs = {'username': 'bob', 'password': 'secret'} + ) + self.instance.credential.inputs['password'] = encrypt_field( + self.instance.credential, 'password' + ) + self.task.run(self.pk) + + assert self.task.run_pexpect.call_count == 1 + call_args, _ = self.task.run_pexpect.call_args_list[0] + job, args, cwd, env, passwords, stdout = call_args + + assert passwords.get('scm_username') == 'bob' + assert passwords.get('scm_password') == 'secret' + + def test_ssh_key_auth(self, scm_type): + ssh = CredentialType.defaults['ssh']() + self.instance.scm_type = scm_type + self.instance.credential = Credential( + credential_type=ssh, + inputs = { + 'username': 'bob', + 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY + } + ) + self.instance.credential.inputs['ssh_key_data'] = encrypt_field( + self.instance.credential, 'ssh_key_data' + ) + + def run_pexpect_side_effect(private_data, *args, **kwargs): + job, args, cwd, env, passwords, stdout = args + ssh_key_data_fifo = '/'.join([private_data, 'scm_credential']) + assert open(ssh_key_data_fifo, 'r').read() == self.EXAMPLE_PRIVATE_KEY + assert ' '.join(args).startswith( + 'ssh-agent -a %s sh -c ssh-add %s && rm -f %s' % ( + '/'.join([private_data, 'ssh_auth.sock']), + ssh_key_data_fifo, + ssh_key_data_fifo + ) + ) + assert passwords.get('scm_username') == 'bob' + return ['successful', 0] + + private_data = tempfile.mkdtemp(prefix='ansible_tower_') + self.task.build_private_data_dir = mock.Mock(return_value=private_data) + self.task.run_pexpect = mock.Mock( + side_effect=partial(run_pexpect_side_effect, private_data) + ) + self.task.run(self.pk) + + +class TestInventoryUpdateCredentials(TestJobExecution): + + TASK_CLS = tasks.RunInventoryUpdate + + def get_instance(self): + return InventoryUpdate( + pk=1, + inventory_source=InventorySource( + pk=1, + inventory=Inventory(pk=1) + ) + ) + + def test_ec2_source(self): + aws = CredentialType.defaults['aws']() + self.instance.source = 'ec2' + self.instance.credential = Credential( + credential_type=aws, + inputs = {'username': 'bob', 'password': 'secret'} + ) + self.instance.credential.inputs['password'] = encrypt_field( + self.instance.credential, 'password' + ) + + def run_pexpect_side_effect(*args, **kwargs): + job, args, cwd, env, passwords, stdout = args + + assert env['AWS_ACCESS_KEY_ID'] == 'bob' + assert env['AWS_SECRET_ACCESS_KEY'] == 'secret' + assert 'EC2_INI_PATH' in env + + config = ConfigParser.ConfigParser() + config.read(env['EC2_INI_PATH']) + assert 'ec2' in config.sections() + return ['successful', 0] + + self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.task.run(self.pk) + + def test_vmware_source(self): + vmware = CredentialType.defaults['vmware']() + self.instance.source = 'vmware' + self.instance.credential = Credential( + credential_type=vmware, + inputs = {'username': 'bob', 'password': 'secret', 'host': 'https://example.org'} + ) + self.instance.credential.inputs['password'] = encrypt_field( + self.instance.credential, 'password' + ) + + def run_pexpect_side_effect(*args, **kwargs): + job, args, cwd, env, passwords, stdout = args + + config = ConfigParser.ConfigParser() + config.read(env['VMWARE_INI_PATH']) + assert config.get('vmware', 'username') == 'bob' + assert config.get('vmware', 'password') == 'secret' + assert config.get('vmware', 'server') == 'https://example.org' + return ['successful', 0] + + self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.task.run(self.pk) + + def test_azure_source(self): + azure = CredentialType.defaults['azure']() + self.instance.source = 'azure' + self.instance.credential = Credential( + credential_type=azure, + inputs = { + 'username': 'bob', + 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY + } + ) + self.instance.credential.inputs['ssh_key_data'] = encrypt_field( + self.instance.credential, 'ssh_key_data' + ) + + def run_pexpect_side_effect(*args, **kwargs): + job, args, cwd, env, passwords, stdout = args + assert env['AZURE_SUBSCRIPTION_ID'] == 'bob' + ssh_key_data = env['AZURE_CERT_PATH'] + assert open(ssh_key_data, 'rb').read() == self.EXAMPLE_PRIVATE_KEY + return ['successful', 0] + + self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.task.run(self.pk) + + def test_gce_source(self): + gce = CredentialType.defaults['gce']() + self.instance.source = 'gce' + self.instance.credential = Credential( + credential_type=gce, + inputs = { + 'username': 'bob', + 'project': 'some-project', + 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY + } + ) + self.instance.credential.inputs['ssh_key_data'] = encrypt_field( + self.instance.credential, 'ssh_key_data' + ) + + def run_pexpect_side_effect(*args, **kwargs): + job, args, cwd, env, passwords, stdout = args + assert env['GCE_EMAIL'] == 'bob' + assert env['GCE_PROJECT'] == 'some-project' + ssh_key_data = env['GCE_PEM_FILE_PATH'] + assert open(ssh_key_data, 'rb').read() == self.EXAMPLE_PRIVATE_KEY + return ['successful', 0] + + self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.task.run(self.pk) + + def test_openstack_source(self): + openstack = CredentialType.defaults['openstack']() + self.instance.source = 'openstack' + self.instance.credential = Credential( + credential_type=openstack, + inputs = { + 'username': 'bob', + 'password': 'secret', + 'project': 'tenant-name', + 'host': 'https://keystone.example.org' + } + ) + self.instance.credential.inputs['ssh_key_data'] = encrypt_field( + self.instance.credential, 'ssh_key_data' + ) + + def run_pexpect_side_effect(*args, **kwargs): + job, args, cwd, env, passwords, stdout = args + shade_config = open(env['OS_CLIENT_CONFIG_FILE'], 'rb').read() + assert '\n'.join([ + 'clouds:', + ' devstack:', + ' auth:', + ' auth_url: https://keystone.example.org', + ' password: secret', + ' project_name: tenant-name', + ' username: bob', + '' + ]) in shade_config + return ['successful', 0] + + self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.task.run(self.pk) + + def test_satellite6_source(self): + satellite6 = CredentialType.defaults['satellite6']() + self.instance.source = 'satellite6' + self.instance.credential = Credential( + credential_type=satellite6, + inputs = { + 'username': 'bob', + 'password': 'secret', + 'host': 'https://example.org' + } + ) + self.instance.credential.inputs['password'] = encrypt_field( + self.instance.credential, 'password' + ) + + def run_pexpect_side_effect(*args, **kwargs): + job, args, cwd, env, passwords, stdout = args + config = ConfigParser.ConfigParser() + config.read(env['FOREMAN_INI_PATH']) + assert config.get('foreman', 'url') == 'https://example.org' + assert config.get('foreman', 'user') == 'bob' + assert config.get('foreman', 'password') == 'secret' + return ['successful', 0] + + self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.task.run(self.pk) + + def test_cloudforms_source(self): + cloudforms = CredentialType.defaults['cloudforms']() + self.instance.source = 'cloudforms' + self.instance.credential = Credential( + credential_type=cloudforms, + inputs = { + 'username': 'bob', + 'password': 'secret', + 'host': 'https://example.org' + } + ) + self.instance.credential.inputs['password'] = encrypt_field( + self.instance.credential, 'password' + ) + + def run_pexpect_side_effect(*args, **kwargs): + job, args, cwd, env, passwords, stdout = args + config = ConfigParser.ConfigParser() + config.read(env['CLOUDFORMS_INI_PATH']) + assert config.get('cloudforms', 'url') == 'https://example.org' + assert config.get('cloudforms', 'username') == 'bob' + assert config.get('cloudforms', 'password') == 'secret' + assert config.get('cloudforms', 'ssl_verify') == 'false' + return ['successful', 0] + + self.task.run_pexpect = mock.Mock(side_effect=run_pexpect_side_effect) + self.task.run(self.pk)