use the m2m field for inventory source creds

This commit is contained in:
AlanCoding
2018-02-14 12:23:53 -05:00
parent 8505783350
commit 9c4d89f512
11 changed files with 409 additions and 169 deletions

View File

@@ -103,18 +103,22 @@ def test_safe_env_returns_new_copy():
def test_openstack_client_config_generation(mocker):
update = tasks.RunInventoryUpdate()
credential = mocker.Mock(**{
'host': 'https://keystone.openstack.example.org',
'username': 'demo',
'password': 'secrete',
'project': 'demo-project',
'domain': 'my-demo-domain',
})
cred_method = mocker.Mock(return_value=credential)
inventory_update = mocker.Mock(**{
'source': 'openstack',
'credential.host': 'https://keystone.openstack.example.org',
'credential.username': 'demo',
'credential.password': 'secrete',
'credential.project': 'demo-project',
'credential.domain': 'my-demo-domain',
'source_vars_dict': {}
'source_vars_dict': {},
'get_cloud_credential': cred_method
})
cloud_config = update.build_private_data(inventory_update)
cloud_credential = yaml.load(
cloud_config.get('credentials')[inventory_update.credential]
cloud_config.get('credentials')[credential]
)
assert cloud_credential['clouds'] == {
'devstack': {
@@ -135,18 +139,22 @@ def test_openstack_client_config_generation(mocker):
])
def test_openstack_client_config_generation_with_private_source_vars(mocker, source, expected):
update = tasks.RunInventoryUpdate()
credential = mocker.Mock(**{
'host': 'https://keystone.openstack.example.org',
'username': 'demo',
'password': 'secrete',
'project': 'demo-project',
'domain': None,
})
cred_method = mocker.Mock(return_value=credential)
inventory_update = mocker.Mock(**{
'source': 'openstack',
'credential.host': 'https://keystone.openstack.example.org',
'credential.username': 'demo',
'credential.password': 'secrete',
'credential.project': 'demo-project',
'credential.domain': None,
'source_vars_dict': {'private': source}
'source_vars_dict': {'private': source},
'get_cloud_credential': cred_method
})
cloud_config = update.build_private_data(inventory_update)
cloud_credential = yaml.load(
cloud_config.get('credentials')[inventory_update.credential]
cloud_config.get('credentials')[credential]
)
assert cloud_credential['clouds'] == {
'devstack': {
@@ -1519,8 +1527,9 @@ class TestInventoryUpdateCredentials(TestJobExecution):
)
)
def test_source_without_credential(self):
def test_source_without_credential(self, mocker):
self.instance.source = 'ec2'
self.instance.get_cloud_credential = mocker.Mock(return_value=None)
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
@@ -1538,7 +1547,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
self.task.run(self.pk)
@pytest.mark.parametrize('with_credential', [True, False])
def test_custom_source(self, with_credential):
def test_custom_source(self, with_credential, mocker):
self.instance.source = 'custom'
self.instance.source_vars = '{"FOO": "BAR"}'
patch = mock.patch.object(InventoryUpdate, 'source_script', mock.Mock(
@@ -1549,16 +1558,22 @@ class TestInventoryUpdateCredentials(TestJobExecution):
if with_credential:
azure_rm = CredentialType.defaults['azure_rm']()
self.instance.credential = Credential(
pk=1,
credential_type=azure_rm,
inputs = {
'client': 'some-client',
'secret': 'some-secret',
'tenant': 'some-tenant',
'subscription': 'some-subscription',
}
)
def get_cred():
cred = Credential(
pk=1,
credential_type=azure_rm,
inputs = {
'client': 'some-client',
'secret': 'some-secret',
'tenant': 'some-tenant',
'subscription': 'some-subscription',
}
)
return cred
self.instance.get_cloud_credential = get_cred
else:
self.instance.get_cloud_credential = mocker.Mock(return_value=None)
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
@@ -1580,14 +1595,16 @@ class TestInventoryUpdateCredentials(TestJobExecution):
def test_ec2_source(self):
aws = CredentialType.defaults['aws']()
self.instance.source = 'ec2'
self.instance.credential = Credential(
pk=1,
credential_type=aws,
inputs = {'username': 'bob', 'password': 'secret'}
)
self.instance.credential.inputs['password'] = encrypt_field(
self.instance.credential, 'password'
)
def get_cred():
cred = Credential(
pk=1,
credential_type=aws,
inputs = {'username': 'bob', 'password': 'secret'}
)
cred.inputs['password'] = encrypt_field(cred, 'password')
return cred
self.instance.get_cloud_credential = get_cred
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
@@ -1608,14 +1625,16 @@ class TestInventoryUpdateCredentials(TestJobExecution):
def test_vmware_source(self):
vmware = CredentialType.defaults['vmware']()
self.instance.source = 'vmware'
self.instance.credential = Credential(
pk=1,
credential_type=vmware,
inputs = {'username': 'bob', 'password': 'secret', 'host': 'https://example.org'}
)
self.instance.credential.inputs['password'] = encrypt_field(
self.instance.credential, 'password'
)
def get_cred():
cred = Credential(
pk=1,
credential_type=vmware,
inputs = {'username': 'bob', 'password': 'secret', 'host': 'https://example.org'}
)
cred.inputs['password'] = encrypt_field(cred, 'password')
return cred
self.instance.get_cloud_credential = get_cred
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
@@ -1634,17 +1653,21 @@ class TestInventoryUpdateCredentials(TestJobExecution):
azure_rm = CredentialType.defaults['azure_rm']()
self.instance.source = 'azure_rm'
self.instance.source_regions = 'north, south, east, west'
self.instance.credential = Credential(
pk=1,
credential_type=azure_rm,
inputs = {
'client': 'some-client',
'secret': 'some-secret',
'tenant': 'some-tenant',
'subscription': 'some-subscription',
'cloud_environment': 'foobar'
}
)
def get_cred():
cred = Credential(
pk=1,
credential_type=azure_rm,
inputs = {
'client': 'some-client',
'secret': 'some-secret',
'tenant': 'some-tenant',
'subscription': 'some-subscription',
'cloud_environment': 'foobar'
}
)
return cred
self.instance.get_cloud_credential = get_cred
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
@@ -1671,16 +1694,20 @@ class TestInventoryUpdateCredentials(TestJobExecution):
azure_rm = CredentialType.defaults['azure_rm']()
self.instance.source = 'azure_rm'
self.instance.source_regions = 'all'
self.instance.credential = Credential(
pk=1,
credential_type=azure_rm,
inputs = {
'subscription': 'some-subscription',
'username': 'bob',
'password': 'secret',
'cloud_environment': 'foobar'
}
)
def get_cred():
cred = Credential(
pk=1,
credential_type=azure_rm,
inputs = {
'subscription': 'some-subscription',
'username': 'bob',
'password': 'secret',
'cloud_environment': 'foobar'
}
)
return cred
self.instance.get_cloud_credential = get_cred
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
@@ -1706,18 +1733,23 @@ class TestInventoryUpdateCredentials(TestJobExecution):
gce = CredentialType.defaults['gce']()
self.instance.source = 'gce'
self.instance.source_regions = 'all'
self.instance.credential = Credential(
pk=1,
credential_type=gce,
inputs = {
'username': 'bob',
'project': 'some-project',
'ssh_key_data': self.EXAMPLE_PRIVATE_KEY
}
)
self.instance.credential.inputs['ssh_key_data'] = encrypt_field(
self.instance.credential, 'ssh_key_data'
)
def get_cred():
cred = Credential(
pk=1,
credential_type=gce,
inputs = {
'username': 'bob',
'project': 'some-project',
'ssh_key_data': self.EXAMPLE_PRIVATE_KEY
}
)
cred.inputs['ssh_key_data'] = encrypt_field(
cred, 'ssh_key_data'
)
return cred
self.instance.get_cloud_credential = get_cred
expected_gce_zone = ''
def run_pexpect_side_effect(*args, **kwargs):
@@ -1745,19 +1777,23 @@ class TestInventoryUpdateCredentials(TestJobExecution):
def test_openstack_source(self):
openstack = CredentialType.defaults['openstack']()
self.instance.source = 'openstack'
self.instance.credential = Credential(
pk=1,
credential_type=openstack,
inputs = {
'username': 'bob',
'password': 'secret',
'project': 'tenant-name',
'host': 'https://keystone.example.org'
}
)
self.instance.credential.inputs['ssh_key_data'] = encrypt_field(
self.instance.credential, 'ssh_key_data'
)
def get_cred():
cred = Credential(
pk=1,
credential_type=openstack,
inputs = {
'username': 'bob',
'password': 'secret',
'project': 'tenant-name',
'host': 'https://keystone.example.org'
}
)
cred.inputs['ssh_key_data'] = encrypt_field(
cred, 'ssh_key_data'
)
return cred
self.instance.get_cloud_credential = get_cred
def run_pexpect_side_effect(*args, **kwargs):
args, cwd, env, stdout = args
@@ -1780,18 +1816,23 @@ class TestInventoryUpdateCredentials(TestJobExecution):
def test_satellite6_source(self):
satellite6 = CredentialType.defaults['satellite6']()
self.instance.source = 'satellite6'
self.instance.credential = Credential(
pk=1,
credential_type=satellite6,
inputs = {
'username': 'bob',
'password': 'secret',
'host': 'https://example.org'
}
)
self.instance.credential.inputs['password'] = encrypt_field(
self.instance.credential, 'password'
)
def get_cred():
cred = Credential(
pk=1,
credential_type=satellite6,
inputs = {
'username': 'bob',
'password': 'secret',
'host': 'https://example.org'
}
)
cred.inputs['password'] = encrypt_field(
cred, 'password'
)
return cred
self.instance.get_cloud_credential = get_cred
self.instance.source_vars = '{"satellite6_group_patterns": "[a,b,c]", "satellite6_group_prefix": "hey_"}'
def run_pexpect_side_effect(*args, **kwargs):
@@ -1811,18 +1852,22 @@ class TestInventoryUpdateCredentials(TestJobExecution):
def test_cloudforms_source(self):
cloudforms = CredentialType.defaults['cloudforms']()
self.instance.source = 'cloudforms'
self.instance.credential = Credential(
pk=1,
credential_type=cloudforms,
inputs = {
'username': 'bob',
'password': 'secret',
'host': 'https://example.org'
}
)
self.instance.credential.inputs['password'] = encrypt_field(
self.instance.credential, 'password'
)
def get_cred():
cred = Credential(
pk=1,
credential_type=cloudforms,
inputs = {
'username': 'bob',
'password': 'secret',
'host': 'https://example.org'
}
)
cred.inputs['password'] = encrypt_field(
cred, 'password'
)
return cred
self.instance.get_cloud_credential = get_cred
self.instance.source_vars = '{"prefer_ipv4": True}'
@@ -1847,14 +1892,19 @@ class TestInventoryUpdateCredentials(TestJobExecution):
def test_awx_task_env(self):
gce = CredentialType.defaults['gce']()
self.instance.source = 'gce'
self.instance.credential = Credential(
pk=1,
credential_type=gce,
inputs = {
'username': 'bob',
'project': 'some-project',
}
)
def get_cred():
cred = Credential(
pk=1,
credential_type=gce,
inputs = {
'username': 'bob',
'project': 'some-project',
}
)
return cred
self.instance.get_cloud_credential = get_cred
patch = mock.patch('awx.main.tasks.settings.AWX_TASK_ENV', {'FOO': 'BAR'})
patch.start()