mirror of
https://github.com/ansible/awx.git
synced 2026-01-09 23:12:08 -03:30
Move cred type unite tests to awx-plugins
This commit is contained in:
parent
bd96000494
commit
cf9e6796ea
@ -1,5 +1,4 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import configparser
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
@ -856,205 +855,6 @@ class TestJobCredentials(TestJobExecution):
|
||||
assert '--vault-id dev@prompt' in ' '.join(args)
|
||||
assert '--vault-id prod@prompt' in ' '.join(args)
|
||||
|
||||
@pytest.mark.parametrize("verify", (True, False))
|
||||
def test_k8s_credential(self, job, private_data_dir, verify, mock_me):
|
||||
k8s = CredentialType.defaults['kubernetes_bearer_token']()
|
||||
inputs = {
|
||||
'host': 'https://example.org/',
|
||||
'bearer_token': 'token123',
|
||||
}
|
||||
if verify:
|
||||
inputs['verify_ssl'] = True
|
||||
inputs['ssl_ca_cert'] = 'CERTDATA'
|
||||
credential = Credential(
|
||||
pk=1,
|
||||
credential_type=k8s,
|
||||
inputs=inputs,
|
||||
)
|
||||
credential.inputs['bearer_token'] = encrypt_field(credential, 'bearer_token')
|
||||
job.credentials.add(credential)
|
||||
|
||||
env = {}
|
||||
safe_env = {}
|
||||
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
||||
|
||||
assert env['K8S_AUTH_HOST'] == 'https://example.org/'
|
||||
assert env['K8S_AUTH_API_KEY'] == 'token123'
|
||||
|
||||
if verify:
|
||||
assert env['K8S_AUTH_VERIFY_SSL'] == 'True'
|
||||
local_path = to_host_path(env['K8S_AUTH_SSL_CA_CERT'], private_data_dir)
|
||||
with open(local_path, 'r') as f:
|
||||
cert = f.read()
|
||||
assert cert == 'CERTDATA'
|
||||
else:
|
||||
assert env['K8S_AUTH_VERIFY_SSL'] == 'False'
|
||||
assert 'K8S_AUTH_SSL_CA_CERT' not in env
|
||||
|
||||
assert safe_env['K8S_AUTH_API_KEY'] == HIDDEN_PASSWORD
|
||||
|
||||
def test_aws_cloud_credential(self, job, private_data_dir, mock_me):
|
||||
aws = CredentialType.defaults['aws']()
|
||||
credential = Credential(pk=1, credential_type=aws, inputs={'username': 'bob', 'password': 'secret'})
|
||||
credential.inputs['password'] = encrypt_field(credential, 'password')
|
||||
job.credentials.add(credential)
|
||||
|
||||
env = {}
|
||||
safe_env = {}
|
||||
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
||||
|
||||
assert env['AWS_ACCESS_KEY_ID'] == 'bob'
|
||||
assert env['AWS_SECRET_ACCESS_KEY'] == 'secret'
|
||||
assert 'AWS_SECURITY_TOKEN' not in env
|
||||
assert safe_env['AWS_SECRET_ACCESS_KEY'] == HIDDEN_PASSWORD
|
||||
|
||||
def test_aws_cloud_credential_with_sts_token(self, private_data_dir, job, mock_me):
|
||||
aws = CredentialType.defaults['aws']()
|
||||
credential = Credential(pk=1, credential_type=aws, inputs={'username': 'bob', 'password': 'secret', 'security_token': 'token'})
|
||||
for key in ('password', 'security_token'):
|
||||
credential.inputs[key] = encrypt_field(credential, key)
|
||||
job.credentials.add(credential)
|
||||
|
||||
env = {}
|
||||
safe_env = {}
|
||||
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
||||
|
||||
assert env['AWS_ACCESS_KEY_ID'] == 'bob'
|
||||
assert env['AWS_SECRET_ACCESS_KEY'] == 'secret'
|
||||
assert env['AWS_SECURITY_TOKEN'] == 'token'
|
||||
assert safe_env['AWS_SECRET_ACCESS_KEY'] == HIDDEN_PASSWORD
|
||||
|
||||
@pytest.mark.parametrize("cred_env_var", ['GCE_CREDENTIALS_FILE_PATH', 'GOOGLE_APPLICATION_CREDENTIALS'])
|
||||
def test_gce_credentials(self, cred_env_var, private_data_dir, job, mock_me):
|
||||
gce = CredentialType.defaults['gce']()
|
||||
credential = Credential(pk=1, credential_type=gce, inputs={'username': 'bob', 'project': 'some-project', 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY})
|
||||
credential.inputs['ssh_key_data'] = encrypt_field(credential, 'ssh_key_data')
|
||||
job.credentials.add(credential)
|
||||
|
||||
env = {}
|
||||
safe_env = {}
|
||||
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
||||
runner_path = env[cred_env_var]
|
||||
local_path = to_host_path(runner_path, private_data_dir)
|
||||
with open(local_path, 'rb') as f_host:
|
||||
json_data = json.load(f_host)
|
||||
assert json_data['type'] == 'service_account'
|
||||
assert json_data['private_key'] == self.EXAMPLE_PRIVATE_KEY
|
||||
assert json_data['client_email'] == 'bob'
|
||||
assert json_data['project_id'] == 'some-project'
|
||||
|
||||
def test_azure_rm_with_tenant(self, private_data_dir, job, mock_me):
|
||||
azure = CredentialType.defaults['azure_rm']()
|
||||
credential = Credential(
|
||||
pk=1, credential_type=azure, inputs={'client': 'some-client', 'secret': 'some-secret', 'tenant': 'some-tenant', 'subscription': 'some-subscription'}
|
||||
)
|
||||
credential.inputs['secret'] = encrypt_field(credential, 'secret')
|
||||
job.credentials.add(credential)
|
||||
|
||||
env = {}
|
||||
safe_env = {}
|
||||
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
||||
|
||||
assert env['AZURE_CLIENT_ID'] == 'some-client'
|
||||
assert env['AZURE_SECRET'] == 'some-secret'
|
||||
assert env['AZURE_TENANT'] == 'some-tenant'
|
||||
assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
|
||||
assert safe_env['AZURE_SECRET'] == HIDDEN_PASSWORD
|
||||
|
||||
def test_azure_rm_with_password(self, private_data_dir, job, mock_me):
|
||||
azure = CredentialType.defaults['azure_rm']()
|
||||
credential = Credential(
|
||||
pk=1, credential_type=azure, inputs={'subscription': 'some-subscription', 'username': 'bob', 'password': 'secret', 'cloud_environment': 'foobar'}
|
||||
)
|
||||
credential.inputs['password'] = encrypt_field(credential, 'password')
|
||||
job.credentials.add(credential)
|
||||
|
||||
env = {}
|
||||
safe_env = {}
|
||||
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
||||
|
||||
assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
|
||||
assert env['AZURE_AD_USER'] == 'bob'
|
||||
assert env['AZURE_PASSWORD'] == 'secret'
|
||||
assert env['AZURE_CLOUD_ENVIRONMENT'] == 'foobar'
|
||||
assert safe_env['AZURE_PASSWORD'] == HIDDEN_PASSWORD
|
||||
|
||||
def test_vmware_credentials(self, private_data_dir, job, mock_me):
|
||||
vmware = CredentialType.defaults['vmware']()
|
||||
credential = Credential(pk=1, credential_type=vmware, inputs={'username': 'bob', 'password': 'secret', 'host': 'https://example.org'})
|
||||
credential.inputs['password'] = encrypt_field(credential, 'password')
|
||||
job.credentials.add(credential)
|
||||
|
||||
env = {}
|
||||
safe_env = {}
|
||||
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
||||
|
||||
assert env['VMWARE_USER'] == 'bob'
|
||||
assert env['VMWARE_PASSWORD'] == 'secret'
|
||||
assert env['VMWARE_HOST'] == 'https://example.org'
|
||||
assert safe_env['VMWARE_PASSWORD'] == HIDDEN_PASSWORD
|
||||
|
||||
def test_openstack_credentials(self, private_data_dir, job, mock_me):
|
||||
task = jobs.RunJob()
|
||||
task.instance = job
|
||||
openstack = CredentialType.defaults['openstack']()
|
||||
credential = Credential(
|
||||
pk=1, credential_type=openstack, inputs={'username': 'bob', 'password': 'secret', 'project': 'tenant-name', 'host': 'https://keystone.example.org'}
|
||||
)
|
||||
credential.inputs['password'] = encrypt_field(credential, 'password')
|
||||
job.credentials.add(credential)
|
||||
|
||||
private_data_files, ssh_key_data = task.build_private_data_files(job, private_data_dir)
|
||||
env = task.build_env(job, private_data_dir, private_data_files=private_data_files)
|
||||
credential.credential_type.inject_credential(credential, env, {}, [], private_data_dir)
|
||||
|
||||
config_loc = to_host_path(env['OS_CLIENT_CONFIG_FILE'], private_data_dir)
|
||||
with open(config_loc, 'r') as f:
|
||||
shade_config = f.read()
|
||||
assert shade_config == '\n'.join(
|
||||
[
|
||||
'clouds:',
|
||||
' devstack:',
|
||||
' auth:',
|
||||
' auth_url: https://keystone.example.org',
|
||||
' password: secret',
|
||||
' project_name: tenant-name',
|
||||
' username: bob',
|
||||
' verify: true',
|
||||
'',
|
||||
]
|
||||
)
|
||||
|
||||
@pytest.mark.parametrize("ca_file", [None, '/path/to/some/file'])
|
||||
def test_rhv_credentials(self, private_data_dir, job, ca_file, mock_me):
|
||||
rhv = CredentialType.defaults['rhv']()
|
||||
inputs = {
|
||||
'host': 'some-ovirt-host.example.org',
|
||||
'username': 'bob',
|
||||
'password': 'some-pass',
|
||||
}
|
||||
if ca_file:
|
||||
inputs['ca_file'] = ca_file
|
||||
credential = Credential(pk=1, credential_type=rhv, inputs=inputs)
|
||||
credential.inputs['password'] = encrypt_field(credential, 'password')
|
||||
job.credentials.add(credential)
|
||||
|
||||
env = {}
|
||||
safe_env = {}
|
||||
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
host_path = to_host_path(env['OVIRT_INI_PATH'], private_data_dir)
|
||||
config.read(host_path)
|
||||
assert config.get('ovirt', 'ovirt_url') == 'some-ovirt-host.example.org'
|
||||
assert config.get('ovirt', 'ovirt_username') == 'bob'
|
||||
assert config.get('ovirt', 'ovirt_password') == 'some-pass'
|
||||
if ca_file:
|
||||
assert config.get('ovirt', 'ovirt_ca_file') == ca_file
|
||||
else:
|
||||
with pytest.raises(configparser.NoOptionError):
|
||||
config.get('ovirt', 'ovirt_ca_file')
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'authorize, expected_authorize',
|
||||
[
|
||||
@ -1089,68 +889,6 @@ class TestJobCredentials(TestJobExecution):
|
||||
assert f.read() == self.EXAMPLE_PRIVATE_KEY
|
||||
assert safe_env['ANSIBLE_NET_PASSWORD'] == HIDDEN_PASSWORD
|
||||
|
||||
def test_terraform_cloud_credentials(self, job, private_data_dir, mock_me):
|
||||
terraform = CredentialType.defaults['terraform']()
|
||||
hcl_config = '''
|
||||
backend "s3" {
|
||||
bucket = "s3_sample_bucket"
|
||||
key = "/tf_state/"
|
||||
region = "us-east-1"
|
||||
}
|
||||
'''
|
||||
credential = Credential(pk=1, credential_type=terraform, inputs={'configuration': hcl_config})
|
||||
credential.inputs['configuration'] = encrypt_field(credential, 'configuration')
|
||||
job.credentials.add(credential)
|
||||
|
||||
env = {}
|
||||
safe_env = {}
|
||||
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
||||
|
||||
local_path = to_host_path(env['TF_BACKEND_CONFIG_FILE'], private_data_dir)
|
||||
with open(local_path, 'r') as f:
|
||||
config = f.read()
|
||||
assert config == hcl_config
|
||||
|
||||
def test_terraform_gcs_backend_credentials(self, job, private_data_dir, mock_me):
|
||||
terraform = CredentialType.defaults['terraform']()
|
||||
hcl_config = '''
|
||||
backend "gcs" {
|
||||
bucket = "gce_storage"
|
||||
}
|
||||
'''
|
||||
gce_backend_credentials = '''
|
||||
{
|
||||
"type": "service_account",
|
||||
"project_id": "sample",
|
||||
"private_key_id": "eeeeeeeeeeeeeeeeeeeeeeeeeee",
|
||||
"private_key": "-----BEGIN PRIVATE KEY-----\naaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\n-----END PRIVATE KEY-----\n",
|
||||
"client_email": "sample@sample.iam.gserviceaccount.com",
|
||||
"client_id": "0123456789",
|
||||
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
|
||||
"token_uri": "https://oauth2.googleapis.com/token",
|
||||
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
|
||||
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/cloud-content-robot%40sample.iam.gserviceaccount.com",
|
||||
}
|
||||
'''
|
||||
credential = Credential(pk=1, credential_type=terraform, inputs={'configuration': hcl_config, 'gce_credentials': gce_backend_credentials})
|
||||
credential.inputs['configuration'] = encrypt_field(credential, 'configuration')
|
||||
credential.inputs['gce_credentials'] = encrypt_field(credential, 'gce_credentials')
|
||||
job.credentials.add(credential)
|
||||
|
||||
env = {}
|
||||
safe_env = {}
|
||||
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
|
||||
|
||||
local_path = to_host_path(env['TF_BACKEND_CONFIG_FILE'], private_data_dir)
|
||||
with open(local_path, 'r') as f:
|
||||
config = f.read()
|
||||
assert config == hcl_config
|
||||
|
||||
credentials_path = to_host_path(env['GOOGLE_BACKEND_CREDENTIALS'], private_data_dir)
|
||||
with open(credentials_path, 'r') as f:
|
||||
credentials = f.read()
|
||||
assert credentials == gce_backend_credentials
|
||||
|
||||
def test_multi_cloud(self, private_data_dir, mock_me):
|
||||
gce = CredentialType.defaults['gce']()
|
||||
gce_credential = Credential(pk=1, credential_type=gce, inputs={'username': 'bob', 'project': 'some-project', 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY})
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user