support specifying multiple vault IDs for a playbook run

see: https://github.com/ansible/awx/issues/352
This commit is contained in:
Ryan Petrello
2017-11-30 12:49:54 -05:00
parent fde5a8850d
commit a1f8f65add
8 changed files with 190 additions and 24 deletions

View File

@@ -450,6 +450,97 @@ class TestJobCredentials(TestJobExecution):
] == 'vault-me'
assert '--ask-vault-pass' in ' '.join(args)
def test_vault_password_ask(self):
vault = CredentialType.defaults['vault']()
credential = Credential(
pk=1,
credential_type=vault,
inputs={'vault_password': 'ASK'}
)
credential.inputs['vault_password'] = encrypt_field(credential, 'vault_password')
self.instance.credentials.add(credential)
self.task.run(self.pk, vault_password='provided-at-launch')
assert self.run_pexpect.call_count == 1
call_args, call_kwargs = self.run_pexpect.call_args_list[0]
args, cwd, env, stdout = call_args
assert call_kwargs.get('expect_passwords')[
re.compile(r'Vault password:\s*?$', re.M)
] == 'provided-at-launch'
assert '--ask-vault-pass' in ' '.join(args)
def test_multi_vault_password(self):
vault = CredentialType.defaults['vault']()
for i, label in enumerate(['dev', 'prod']):
credential = Credential(
pk=i,
credential_type=vault,
inputs={'vault_password': 'pass@{}'.format(label), 'vault_id': label}
)
credential.inputs['vault_password'] = encrypt_field(credential, 'vault_password')
self.instance.credentials.add(credential)
self.task.run(self.pk)
assert self.run_pexpect.call_count == 1
call_args, call_kwargs = self.run_pexpect.call_args_list[0]
args, cwd, env, stdout = call_args
vault_passwords = dict(
(k.pattern, v) for k, v in call_kwargs['expect_passwords'].items()
if 'Vault' in k.pattern
)
assert vault_passwords['Vault password \(prod\):\\s*?$'] == 'pass@prod'
assert vault_passwords['Vault password \(dev\):\\s*?$'] == 'pass@dev'
assert vault_passwords['Vault password:\\s*?$'] == ''
assert '--ask-vault-pass' not in ' '.join(args)
assert '--vault-id dev@prompt' in ' '.join(args)
assert '--vault-id prod@prompt' in ' '.join(args)
def test_multi_vault_id_conflict(self):
vault = CredentialType.defaults['vault']()
for i in range(2):
credential = Credential(
pk=i,
credential_type=vault,
inputs={'vault_password': 'some-pass', 'vault_id': 'conflict'}
)
credential.inputs['vault_password'] = encrypt_field(credential, 'vault_password')
self.instance.credentials.add(credential)
with pytest.raises(Exception):
self.task.run(self.pk)
def test_multi_vault_password_ask(self):
vault = CredentialType.defaults['vault']()
for i, label in enumerate(['dev', 'prod']):
credential = Credential(
pk=i,
credential_type=vault,
inputs={'vault_password': 'ASK', 'vault_id': label}
)
credential.inputs['vault_password'] = encrypt_field(credential, 'vault_password')
self.instance.credentials.add(credential)
self.task.run(self.pk, **{
'vault_password.dev': 'provided-at-launch@dev',
'vault_password.prod': 'provided-at-launch@prod'
})
assert self.run_pexpect.call_count == 1
call_args, call_kwargs = self.run_pexpect.call_args_list[0]
args, cwd, env, stdout = call_args
vault_passwords = dict(
(k.pattern, v) for k, v in call_kwargs['expect_passwords'].items()
if 'Vault' in k.pattern
)
assert vault_passwords['Vault password \(prod\):\\s*?$'] == 'provided-at-launch@prod'
assert vault_passwords['Vault password \(dev\):\\s*?$'] == 'provided-at-launch@dev'
assert vault_passwords['Vault password:\\s*?$'] == ''
assert '--ask-vault-pass' not in ' '.join(args)
assert '--vault-id dev@prompt' in ' '.join(args)
assert '--vault-id prod@prompt' in ' '.join(args)
def test_ssh_key_with_agent(self):
ssh = CredentialType.defaults['ssh']()
credential = Credential(