Delete cred and inv plugins

This commit is contained in:
Chris Meyers
2024-08-28 15:02:05 -04:00
committed by Chris Meyers
parent 376cc35a92
commit 16d1f34179
13 changed files with 0 additions and 2351 deletions

View File

@@ -1,126 +0,0 @@
from .plugin import CredentialPlugin, CertFiles, raise_for_status
from urllib.parse import quote, urlencode, urljoin
from .plugin import translate_function as _
import requests
aim_inputs = {
'fields': [
{
'id': 'url',
'label': _('CyberArk CCP URL'),
'type': 'string',
'format': 'url',
},
{
'id': 'webservice_id',
'label': _('Web Service ID'),
'type': 'string',
'help_text': _('The CCP Web Service ID. Leave blank to default to AIMWebService.'),
},
{
'id': 'app_id',
'label': _('Application ID'),
'type': 'string',
'secret': True,
},
{
'id': 'client_key',
'label': _('Client Key'),
'type': 'string',
'secret': True,
'multiline': True,
},
{
'id': 'client_cert',
'label': _('Client Certificate'),
'type': 'string',
'secret': True,
'multiline': True,
},
{
'id': 'verify',
'label': _('Verify SSL Certificates'),
'type': 'boolean',
'default': True,
},
],
'metadata': [
{
'id': 'object_query',
'label': _('Object Query'),
'type': 'string',
'help_text': _('Lookup query for the object. Ex: Safe=TestSafe;Object=testAccountName123'),
},
{'id': 'object_query_format', 'label': _('Object Query Format'), 'type': 'string', 'default': 'Exact', 'choices': ['Exact', 'Regexp']},
{
'id': 'object_property',
'label': _('Object Property'),
'type': 'string',
'help_text': _('The property of the object to return. Available properties: Username, Password and Address.'),
},
{
'id': 'reason',
'label': _('Reason'),
'type': 'string',
'help_text': _('Object request reason. This is only needed if it is required by the object\'s policy.'),
},
],
'required': ['url', 'app_id', 'object_query'],
}
def aim_backend(**kwargs):
url = kwargs['url']
client_cert = kwargs.get('client_cert', None)
client_key = kwargs.get('client_key', None)
verify = kwargs['verify']
webservice_id = kwargs.get('webservice_id', '')
app_id = kwargs['app_id']
object_query = kwargs['object_query']
object_query_format = kwargs['object_query_format']
object_property = kwargs.get('object_property', '')
reason = kwargs.get('reason', None)
if webservice_id == '':
webservice_id = 'AIMWebService'
query_params = {
'AppId': app_id,
'Query': object_query,
'QueryFormat': object_query_format,
}
if reason:
query_params['reason'] = reason
request_qs = '?' + urlencode(query_params, quote_via=quote)
request_url = urljoin(url, '/'.join([webservice_id, 'api', 'Accounts']))
with CertFiles(client_cert, client_key) as cert:
res = requests.get(
request_url + request_qs,
timeout=30,
cert=cert,
verify=verify,
allow_redirects=False,
)
raise_for_status(res)
# CCP returns the property name capitalized, username is camel case
# so we need to handle that case
if object_property == '':
object_property = 'Content'
elif object_property.lower() == 'username':
object_property = 'UserName'
elif object_property.lower() == 'password':
object_property = 'Content'
elif object_property.lower() == 'address':
object_property = 'Address'
elif object_property not in res:
raise KeyError('Property {} not found in object, available properties: Username, Password and Address'.format(object_property))
else:
object_property = object_property.capitalize()
return res.json()[object_property]
aim_plugin = CredentialPlugin('CyberArk Central Credential Provider Lookup', inputs=aim_inputs, backend=aim_backend)

View File

@@ -1,65 +0,0 @@
import boto3
from botocore.exceptions import ClientError
from .plugin import CredentialPlugin
from .plugin import translate_function as _
secrets_manager_inputs = {
'fields': [
{
'id': 'aws_access_key',
'label': _('AWS Access Key'),
'type': 'string',
},
{
'id': 'aws_secret_key',
'label': _('AWS Secret Key'),
'type': 'string',
'secret': True,
},
],
'metadata': [
{
'id': 'region_name',
'label': _('AWS Secrets Manager Region'),
'type': 'string',
'help_text': _('Region which the secrets manager is located'),
},
{
'id': 'secret_name',
'label': _('AWS Secret Name'),
'type': 'string',
},
],
'required': ['aws_access_key', 'aws_secret_key', 'region_name', 'secret_name'],
}
def aws_secretsmanager_backend(**kwargs):
secret_name = kwargs['secret_name']
region_name = kwargs['region_name']
aws_secret_access_key = kwargs['aws_secret_key']
aws_access_key_id = kwargs['aws_access_key']
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager', region_name=region_name, aws_secret_access_key=aws_secret_access_key, aws_access_key_id=aws_access_key_id
)
try:
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
except ClientError as e:
raise e
# Secrets Manager decrypts the secret value using the associated KMS CMK
# Depending on whether the secret was a string or binary, only one of these fields will be populated
if 'SecretString' in get_secret_value_response:
secret = get_secret_value_response['SecretString']
else:
secret = get_secret_value_response['SecretBinary']
return secret
aws_secretmanager_plugin = CredentialPlugin('AWS Secrets Manager lookup', inputs=secrets_manager_inputs, backend=aws_secretsmanager_backend)

View File

@@ -1,63 +0,0 @@
from azure.keyvault.secrets import SecretClient
from azure.identity import ClientSecretCredential
from msrestazure import azure_cloud
from .plugin import CredentialPlugin
from .plugin import translate_function as _
# https://github.com/Azure/msrestazure-for-python/blob/master/msrestazure/azure_cloud.py
clouds = [vars(azure_cloud)[n] for n in dir(azure_cloud) if n.startswith("AZURE_") and n.endswith("_CLOUD")]
default_cloud = vars(azure_cloud)["AZURE_PUBLIC_CLOUD"]
azure_keyvault_inputs = {
'fields': [
{
'id': 'url',
'label': _('Vault URL (DNS Name)'),
'type': 'string',
'format': 'url',
},
{'id': 'client', 'label': _('Client ID'), 'type': 'string'},
{
'id': 'secret',
'label': _('Client Secret'),
'type': 'string',
'secret': True,
},
{'id': 'tenant', 'label': _('Tenant ID'), 'type': 'string'},
{
'id': 'cloud_name',
'label': _('Cloud Environment'),
'help_text': _('Specify which azure cloud environment to use.'),
'choices': list(set([default_cloud.name] + [c.name for c in clouds])),
'default': default_cloud.name,
},
],
'metadata': [
{
'id': 'secret_field',
'label': _('Secret Name'),
'type': 'string',
'help_text': _('The name of the secret to look up.'),
},
{
'id': 'secret_version',
'label': _('Secret Version'),
'type': 'string',
'help_text': _('Used to specify a specific secret version (if left empty, the latest version will be used).'),
},
],
'required': ['url', 'client', 'secret', 'tenant', 'secret_field'],
}
def azure_keyvault_backend(**kwargs):
csc = ClientSecretCredential(tenant_id=kwargs['tenant'], client_id=kwargs['client'], client_secret=kwargs['secret'])
kv = SecretClient(credential=csc, vault_url=kwargs['url'])
return kv.get_secret(name=kwargs['secret_field'], version=kwargs.get('secret_version', '')).value
azure_keyvault_plugin = CredentialPlugin('Microsoft Azure Key Vault', inputs=azure_keyvault_inputs, backend=azure_keyvault_backend)

View File

@@ -1,115 +0,0 @@
from .plugin import CredentialPlugin, raise_for_status
from .plugin import translate_function as _
from urllib.parse import urljoin
import requests
pas_inputs = {
'fields': [
{
'id': 'url',
'label': _('Centrify Tenant URL'),
'type': 'string',
'help_text': _('Centrify Tenant URL'),
'format': 'url',
},
{
'id': 'client_id',
'label': _('Centrify API User'),
'type': 'string',
'help_text': _('Centrify API User, having necessary permissions as mentioned in support doc'),
},
{
'id': 'client_password',
'label': _('Centrify API Password'),
'type': 'string',
'help_text': _('Password of Centrify API User with necessary permissions'),
'secret': True,
},
{
'id': 'oauth_application_id',
'label': _('OAuth2 Application ID'),
'type': 'string',
'help_text': _('Application ID of the configured OAuth2 Client (defaults to \'awx\')'),
'default': 'awx',
},
{
'id': 'oauth_scope',
'label': _('OAuth2 Scope'),
'type': 'string',
'help_text': _('Scope of the configured OAuth2 Client (defaults to \'awx\')'),
'default': 'awx',
},
],
'metadata': [
{
'id': 'account-name',
'label': _('Account Name'),
'type': 'string',
'help_text': _('Local system account or Domain account name enrolled in Centrify Vault. eg. (root or DOMAIN/Administrator)'),
},
{
'id': 'system-name',
'label': _('System Name'),
'type': 'string',
'help_text': _('Machine Name enrolled with in Centrify Portal'),
},
],
'required': ['url', 'account-name', 'system-name', 'client_id', 'client_password'],
}
# generate bearer token to authenticate with PAS portal, Input : Client ID, Client Secret
def handle_auth(**kwargs):
post_data = {"grant_type": "client_credentials", "scope": kwargs['oauth_scope']}
response = requests.post(kwargs['endpoint'], data=post_data, auth=(kwargs['client_id'], kwargs['client_password']), verify=True, timeout=(5, 30))
raise_for_status(response)
try:
return response.json()['access_token']
except KeyError:
raise RuntimeError('OAuth request to tenant was unsuccessful')
# fetch the ID of system with RedRock query, Input : System Name, Account Name
def get_ID(**kwargs):
endpoint = urljoin(kwargs['url'], '/Redrock/query')
name = " Name='{0}' and User='{1}'".format(kwargs['system_name'], kwargs['acc_name'])
query = 'Select ID from VaultAccount where {0}'.format(name)
post_headers = {"Authorization": "Bearer " + kwargs['access_token'], "X-CENTRIFY-NATIVE-CLIENT": "true"}
response = requests.post(endpoint, json={'Script': query}, headers=post_headers, verify=True, timeout=(5, 30))
raise_for_status(response)
try:
result_str = response.json()["Result"]["Results"]
return result_str[0]["Row"]["ID"]
except (IndexError, KeyError):
raise RuntimeError("Error Detected!! Check the Inputs")
# CheckOut Password from Centrify Vault, Input : ID
def get_passwd(**kwargs):
endpoint = urljoin(kwargs['url'], '/ServerManage/CheckoutPassword')
post_headers = {"Authorization": "Bearer " + kwargs['access_token'], "X-CENTRIFY-NATIVE-CLIENT": "true"}
response = requests.post(endpoint, json={'ID': kwargs['acc_id']}, headers=post_headers, verify=True, timeout=(5, 30))
raise_for_status(response)
try:
return response.json()["Result"]["Password"]
except KeyError:
raise RuntimeError("Password Not Found")
def centrify_backend(**kwargs):
url = kwargs.get('url')
acc_name = kwargs.get('account-name')
system_name = kwargs.get('system-name')
client_id = kwargs.get('client_id')
client_password = kwargs.get('client_password')
app_id = kwargs.get('oauth_application_id', 'awx')
endpoint = urljoin(url, f'/oauth2/token/{app_id}')
endpoint = {'endpoint': endpoint, 'client_id': client_id, 'client_password': client_password, 'oauth_scope': kwargs.get('oauth_scope', 'awx')}
token = handle_auth(**endpoint)
get_id_args = {'system_name': system_name, 'acc_name': acc_name, 'url': url, 'access_token': token}
acc_id = get_ID(**get_id_args)
get_pwd_args = {'url': url, 'acc_id': acc_id, 'access_token': token}
return get_passwd(**get_pwd_args)
centrify_plugin = CredentialPlugin('Centrify Vault Credential Provider Lookup', inputs=pas_inputs, backend=centrify_backend)

View File

@@ -1,112 +0,0 @@
from .plugin import CredentialPlugin, CertFiles, raise_for_status
from urllib.parse import urljoin, quote
from .plugin import translate_function as _
import requests
import base64
import binascii
conjur_inputs = {
'fields': [
{
'id': 'url',
'label': _('Conjur URL'),
'type': 'string',
'format': 'url',
},
{
'id': 'api_key',
'label': _('API Key'),
'type': 'string',
'secret': True,
},
{
'id': 'account',
'label': _('Account'),
'type': 'string',
},
{
'id': 'username',
'label': _('Username'),
'type': 'string',
},
{'id': 'cacert', 'label': _('Public Key Certificate'), 'type': 'string', 'multiline': True},
],
'metadata': [
{
'id': 'secret_path',
'label': _('Secret Identifier'),
'type': 'string',
'help_text': _('The identifier for the secret e.g., /some/identifier'),
},
{
'id': 'secret_version',
'label': _('Secret Version'),
'type': 'string',
'help_text': _('Used to specify a specific secret version (if left empty, the latest version will be used).'),
},
],
'required': ['url', 'api_key', 'account', 'username'],
}
def _is_base64(s: str) -> bool:
try:
return base64.b64encode(base64.b64decode(s.encode("utf-8"))) == s.encode("utf-8")
except binascii.Error:
return False
def conjur_backend(**kwargs):
url = kwargs['url']
api_key = kwargs['api_key']
account = quote(kwargs['account'], safe='')
username = quote(kwargs['username'], safe='')
secret_path = quote(kwargs['secret_path'], safe='')
version = kwargs.get('secret_version')
cacert = kwargs.get('cacert', None)
auth_kwargs = {
'headers': {'Content-Type': 'text/plain', 'Accept-Encoding': 'base64'},
'data': api_key,
'allow_redirects': False,
}
with CertFiles(cacert) as cert:
# https://www.conjur.org/api.html#authentication-authenticate-post
auth_kwargs['verify'] = cert
try:
resp = requests.post(urljoin(url, '/'.join(['authn', account, username, 'authenticate'])), **auth_kwargs)
resp.raise_for_status()
except requests.exceptions.HTTPError:
resp = requests.post(urljoin(url, '/'.join(['api', 'authn', account, username, 'authenticate'])), **auth_kwargs)
raise_for_status(resp)
token = resp.content.decode('utf-8')
lookup_kwargs = {
'headers': {'Authorization': 'Token token="{}"'.format(token if _is_base64(token) else base64.b64encode(token.encode('utf-8')).decode('utf-8'))},
'allow_redirects': False,
}
# https://www.conjur.org/api.html#secrets-retrieve-a-secret-get
path = urljoin(url, '/'.join(['secrets', account, 'variable', secret_path]))
path_conjurcloud = urljoin(url, '/'.join(['api', 'secrets', account, 'variable', secret_path]))
if version:
ver = "version={}".format(version)
path = '?'.join([path, ver])
path_conjurcloud = '?'.join([path_conjurcloud, ver])
with CertFiles(cacert) as cert:
lookup_kwargs['verify'] = cert
try:
resp = requests.get(path, timeout=30, **lookup_kwargs)
resp.raise_for_status()
except requests.exceptions.HTTPError:
resp = requests.get(path_conjurcloud, timeout=30, **lookup_kwargs)
raise_for_status(resp)
return resp.text
conjur_plugin = CredentialPlugin('CyberArk Conjur Secrets Manager Lookup', inputs=conjur_inputs, backend=conjur_backend)

View File

@@ -1,94 +0,0 @@
from .plugin import CredentialPlugin
from .plugin import settings
from .plugin import translate_function as _
from delinea.secrets.vault import PasswordGrantAuthorizer, SecretsVault
from base64 import b64decode
dsv_inputs = {
'fields': [
{
'id': 'tenant',
'label': _('Tenant'),
'help_text': _('The tenant e.g. "ex" when the URL is https://ex.secretsvaultcloud.com'),
'type': 'string',
},
{
'id': 'tld',
'label': _('Top-level Domain (TLD)'),
'help_text': _('The TLD of the tenant e.g. "com" when the URL is https://ex.secretsvaultcloud.com'),
'choices': ['ca', 'com', 'com.au', 'eu'],
'default': 'com',
},
{
'id': 'client_id',
'label': _('Client ID'),
'type': 'string',
},
{
'id': 'client_secret',
'label': _('Client Secret'),
'type': 'string',
'secret': True,
},
],
'metadata': [
{
'id': 'path',
'label': _('Secret Path'),
'type': 'string',
'help_text': _('The secret path e.g. /test/secret1'),
},
{
'id': 'secret_field',
'label': _('Secret Field'),
'help_text': _('The field to extract from the secret'),
'type': 'string',
},
{
'id': 'secret_decoding',
'label': _('Should the secret be base64 decoded?'),
'help_text': _('Specify whether the secret should be base64 decoded, typically used for storing files, such as SSH keys'),
'choices': ['No Decoding', 'Decode Base64'],
'type': 'string',
'default': 'No Decoding',
},
],
'required': ['tenant', 'client_id', 'client_secret', 'path', 'secret_field', 'secret_decoding'],
}
if settings.DEBUG:
dsv_inputs['fields'].append(
{
'id': 'url_template',
'label': _('URL template'),
'type': 'string',
'default': 'https://{}.secretsvaultcloud.{}',
}
)
def dsv_backend(**kwargs):
tenant_name = kwargs['tenant']
tenant_tld = kwargs.get('tld', 'com')
tenant_url_template = kwargs.get('url_template', 'https://{}.secretsvaultcloud.{}')
client_id = kwargs['client_id']
client_secret = kwargs['client_secret']
secret_path = kwargs['path']
secret_field = kwargs['secret_field']
# providing a default value to remain backward compatible for secrets that have not specified this option
secret_decoding = kwargs.get('secret_decoding', 'No Decoding')
tenant_url = tenant_url_template.format(tenant_name, tenant_tld.strip("."))
authorizer = PasswordGrantAuthorizer(tenant_url, client_id, client_secret)
dsv_secret = SecretsVault(tenant_url, authorizer).get_secret(secret_path)
# files can be uploaded base64 decoded to DSV and thus decoding it only, when asked for
if secret_decoding == 'Decode Base64':
return b64decode(dsv_secret['data'][secret_field]).decode()
return dsv_secret['data'][secret_field]
dsv_plugin = CredentialPlugin(name='Thycotic DevOps Secrets Vault', inputs=dsv_inputs, backend=dsv_backend)

View File

@@ -1,384 +0,0 @@
import copy
import os
import pathlib
import time
from urllib.parse import urljoin
from .plugin import CredentialPlugin, CertFiles, raise_for_status
import requests
from .plugin import translate_function as _
base_inputs = {
'fields': [
{
'id': 'url',
'label': _('Server URL'),
'type': 'string',
'format': 'url',
'help_text': _('The URL to the HashiCorp Vault'),
},
{
'id': 'token',
'label': _('Token'),
'type': 'string',
'secret': True,
'help_text': _('The access token used to authenticate to the Vault server'),
},
{
'id': 'cacert',
'label': _('CA Certificate'),
'type': 'string',
'multiline': True,
'help_text': _('The CA certificate used to verify the SSL certificate of the Vault server'),
},
{'id': 'role_id', 'label': _('AppRole role_id'), 'type': 'string', 'multiline': False, 'help_text': _('The Role ID for AppRole Authentication')},
{
'id': 'secret_id',
'label': _('AppRole secret_id'),
'type': 'string',
'multiline': False,
'secret': True,
'help_text': _('The Secret ID for AppRole Authentication'),
},
{
'id': 'client_cert_public',
'label': _('Client Certificate'),
'type': 'string',
'multiline': True,
'help_text': _(
'The PEM-encoded client certificate used for TLS client authentication.'
' This should include the certificate and any intermediate certififcates.'
),
},
{
'id': 'client_cert_private',
'label': _('Client Certificate Key'),
'type': 'string',
'multiline': True,
'secret': True,
'help_text': _('The certificate private key used for TLS client authentication.'),
},
{
'id': 'client_cert_role',
'label': _('TLS Authentication Role'),
'type': 'string',
'multiline': False,
'help_text': _(
'The role configured in Hashicorp Vault for TLS client authentication.'
' If not provided, Hashicorp Vault may assign roles based on the certificate used.'
),
},
{
'id': 'namespace',
'label': _('Namespace name (Vault Enterprise only)'),
'type': 'string',
'multiline': False,
'help_text': _('Name of the namespace to use when authenticate and retrieve secrets'),
},
{
'id': 'kubernetes_role',
'label': _('Kubernetes role'),
'type': 'string',
'multiline': False,
'help_text': _(
'The Role for Kubernetes Authentication.'
' This is the named role, configured in Vault server, for AWX pod auth policies.'
' see https://www.vaultproject.io/docs/auth/kubernetes#configuration'
),
},
{
'id': 'username',
'label': _('Username'),
'type': 'string',
'secret': False,
'help_text': _('Username for user authentication.'),
},
{
'id': 'password',
'label': _('Password'),
'type': 'string',
'secret': True,
'help_text': _('Password for user authentication.'),
},
{
'id': 'default_auth_path',
'label': _('Path to Auth'),
'type': 'string',
'multiline': False,
'default': 'approle',
'help_text': _('The Authentication path to use if one isn\'t provided in the metadata when linking to an input field. Defaults to \'approle\''),
},
],
'metadata': [
{
'id': 'secret_path',
'label': _('Path to Secret'),
'type': 'string',
'help_text': _(
(
'The path to the secret stored in the secret backend e.g, /some/secret/. It is recommended'
' that you use the secret backend field to identify the storage backend and to use this field'
' for locating a specific secret within that store. However, if you prefer to fully identify'
' both the secret backend and one of its secrets using only this field, join their locations'
' into a single path without any additional separators, e.g, /location/of/backend/some/secret.'
)
),
},
{
'id': 'auth_path',
'label': _('Path to Auth'),
'type': 'string',
'multiline': False,
'help_text': _('The path where the Authentication method is mounted e.g, approle'),
},
],
'required': ['url', 'secret_path'],
}
hashi_kv_inputs = copy.deepcopy(base_inputs)
hashi_kv_inputs['fields'].append(
{
'id': 'api_version',
'label': _('API Version'),
'choices': ['v1', 'v2'],
'help_text': _('API v1 is for static key/value lookups. API v2 is for versioned key/value lookups.'),
'default': 'v1',
}
)
hashi_kv_inputs['metadata'] = (
[
{
'id': 'secret_backend',
'label': _('Name of Secret Backend'),
'type': 'string',
'help_text': _('The name of the kv secret backend (if left empty, the first segment of the secret path will be used).'),
}
]
+ hashi_kv_inputs['metadata']
+ [
{
'id': 'secret_key',
'label': _('Key Name'),
'type': 'string',
'help_text': _('The name of the key to look up in the secret.'),
},
{
'id': 'secret_version',
'label': _('Secret Version (v2 only)'),
'type': 'string',
'help_text': _('Used to specify a specific secret version (if left empty, the latest version will be used).'),
},
]
)
hashi_kv_inputs['required'].extend(['api_version', 'secret_key'])
hashi_ssh_inputs = copy.deepcopy(base_inputs)
hashi_ssh_inputs['metadata'] = (
[
{
'id': 'public_key',
'label': _('Unsigned Public Key'),
'type': 'string',
'multiline': True,
}
]
+ hashi_ssh_inputs['metadata']
+ [
{'id': 'role', 'label': _('Role Name'), 'type': 'string', 'help_text': _('The name of the role used to sign.')},
{
'id': 'valid_principals',
'label': _('Valid Principals'),
'type': 'string',
'help_text': _('Valid principals (either usernames or hostnames) that the certificate should be signed for.'),
},
]
)
hashi_ssh_inputs['required'].extend(['public_key', 'role'])
def handle_auth(**kwargs):
token = None
if kwargs.get('token'):
token = kwargs['token']
elif kwargs.get('username') and kwargs.get('password'):
token = method_auth(**kwargs, auth_param=userpass_auth(**kwargs))
elif kwargs.get('role_id') and kwargs.get('secret_id'):
token = method_auth(**kwargs, auth_param=approle_auth(**kwargs))
elif kwargs.get('kubernetes_role'):
token = method_auth(**kwargs, auth_param=kubernetes_auth(**kwargs))
elif kwargs.get('client_cert_public') and kwargs.get('client_cert_private'):
token = method_auth(**kwargs, auth_param=client_cert_auth(**kwargs))
else:
raise Exception('Token, Username/Password, AppRole, Kubernetes, or TLS authentication parameters must be set')
return token
def userpass_auth(**kwargs):
return {'username': kwargs['username'], 'password': kwargs['password']}
def approle_auth(**kwargs):
return {'role_id': kwargs['role_id'], 'secret_id': kwargs['secret_id']}
def kubernetes_auth(**kwargs):
jwt_file = pathlib.Path('/var/run/secrets/kubernetes.io/serviceaccount/token')
with jwt_file.open('r') as jwt_fo:
jwt = jwt_fo.read().rstrip()
return {'role': kwargs['kubernetes_role'], 'jwt': jwt}
def client_cert_auth(**kwargs):
return {'name': kwargs.get('client_cert_role')}
def method_auth(**kwargs):
# get auth method specific params
request_kwargs = {'json': kwargs['auth_param'], 'timeout': 30}
# we first try to use the 'auth_path' from the metadata
# if not found we try to fetch the 'default_auth_path' from inputs
auth_path = kwargs.get('auth_path') or kwargs['default_auth_path']
url = urljoin(kwargs['url'], 'v1')
cacert = kwargs.get('cacert', None)
sess = requests.Session()
sess.mount(url, requests.adapters.HTTPAdapter(max_retries=5))
# Namespace support
if kwargs.get('namespace'):
sess.headers['X-Vault-Namespace'] = kwargs['namespace']
request_url = '/'.join([url, 'auth', auth_path, 'login']).rstrip('/')
if kwargs['auth_param'].get('username'):
request_url = request_url + '/' + (kwargs['username'])
with CertFiles(cacert) as cert:
request_kwargs['verify'] = cert
# TLS client certificate support
if kwargs.get('client_cert_public') and kwargs.get('client_cert_private'):
# Add client cert to requests Session before making call
with CertFiles(kwargs['client_cert_public'], key=kwargs['client_cert_private']) as client_cert:
sess.cert = client_cert
resp = sess.post(request_url, **request_kwargs)
else:
# Make call without client certificate
resp = sess.post(request_url, **request_kwargs)
resp.raise_for_status()
token = resp.json()['auth']['client_token']
return token
def kv_backend(**kwargs):
token = handle_auth(**kwargs)
url = kwargs['url']
secret_path = kwargs['secret_path']
secret_backend = kwargs.get('secret_backend', None)
secret_key = kwargs.get('secret_key', None)
cacert = kwargs.get('cacert', None)
api_version = kwargs['api_version']
request_kwargs = {
'timeout': 30,
'allow_redirects': False,
}
sess = requests.Session()
sess.mount(url, requests.adapters.HTTPAdapter(max_retries=5))
sess.headers['Authorization'] = 'Bearer {}'.format(token)
# Compatibility header for older installs of Hashicorp Vault
sess.headers['X-Vault-Token'] = token
if kwargs.get('namespace'):
sess.headers['X-Vault-Namespace'] = kwargs['namespace']
if api_version == 'v2':
if kwargs.get('secret_version'):
request_kwargs['params'] = {'version': kwargs['secret_version']}
if secret_backend:
path_segments = [secret_backend, 'data', secret_path]
else:
try:
mount_point, *path = pathlib.Path(secret_path.lstrip(os.sep)).parts
'/'.join(path)
except Exception:
mount_point, path = secret_path, []
# https://www.vaultproject.io/api/secret/kv/kv-v2.html#read-secret-version
path_segments = [mount_point, 'data'] + path
else:
if secret_backend:
path_segments = [secret_backend, secret_path]
else:
path_segments = [secret_path]
request_url = urljoin(url, '/'.join(['v1'] + path_segments)).rstrip('/')
with CertFiles(cacert) as cert:
request_kwargs['verify'] = cert
request_retries = 0
while request_retries < 5:
response = sess.get(request_url, **request_kwargs)
# https://developer.hashicorp.com/vault/docs/enterprise/consistency
if response.status_code == 412:
request_retries += 1
time.sleep(1)
else:
break
raise_for_status(response)
json = response.json()
if api_version == 'v2':
json = json['data']
if secret_key:
try:
if (secret_key != 'data') and (secret_key not in json['data']) and ('data' in json['data']):
return json['data']['data'][secret_key]
return json['data'][secret_key]
except KeyError:
raise RuntimeError('{} is not present at {}'.format(secret_key, secret_path))
return json['data']
def ssh_backend(**kwargs):
token = handle_auth(**kwargs)
url = urljoin(kwargs['url'], 'v1')
secret_path = kwargs['secret_path']
role = kwargs['role']
cacert = kwargs.get('cacert', None)
request_kwargs = {
'timeout': 30,
'allow_redirects': False,
}
request_kwargs['json'] = {'public_key': kwargs['public_key']}
if kwargs.get('valid_principals'):
request_kwargs['json']['valid_principals'] = kwargs['valid_principals']
sess = requests.Session()
sess.mount(url, requests.adapters.HTTPAdapter(max_retries=5))
sess.headers['Authorization'] = 'Bearer {}'.format(token)
if kwargs.get('namespace'):
sess.headers['X-Vault-Namespace'] = kwargs['namespace']
# Compatability header for older installs of Hashicorp Vault
sess.headers['X-Vault-Token'] = token
# https://www.vaultproject.io/api/secret/ssh/index.html#sign-ssh-key
request_url = '/'.join([url, secret_path, 'sign', role]).rstrip('/')
with CertFiles(cacert) as cert:
request_kwargs['verify'] = cert
request_retries = 0
while request_retries < 5:
resp = sess.post(request_url, **request_kwargs)
# https://developer.hashicorp.com/vault/docs/enterprise/consistency
if resp.status_code == 412:
request_retries += 1
time.sleep(1)
else:
break
raise_for_status(resp)
return resp.json()['data']['signed_key']
hashivault_kv_plugin = CredentialPlugin('HashiCorp Vault Secret Lookup', inputs=hashi_kv_inputs, backend=kv_backend)
hashivault_ssh_plugin = CredentialPlugin('HashiCorp Vault Signed SSH', inputs=hashi_ssh_inputs, backend=ssh_backend)

View File

@@ -1,139 +0,0 @@
import json
import yaml
import os
import stat
import tempfile
from django.conf import settings
from awx.main.utils.execution_environments import to_container_path
def aws(cred, env, private_data_dir):
env['AWS_ACCESS_KEY_ID'] = cred.get_input('username', default='')
env['AWS_SECRET_ACCESS_KEY'] = cred.get_input('password', default='')
if cred.has_input('security_token'):
env['AWS_SECURITY_TOKEN'] = cred.get_input('security_token', default='')
env['AWS_SESSION_TOKEN'] = env['AWS_SECURITY_TOKEN']
def gce(cred, env, private_data_dir):
project = cred.get_input('project', default='')
username = cred.get_input('username', default='')
json_cred = {'type': 'service_account', 'private_key': cred.get_input('ssh_key_data', default=''), 'client_email': username, 'project_id': project}
if 'INVENTORY_UPDATE_ID' not in env:
env['GCE_EMAIL'] = username
env['GCE_PROJECT'] = project
json_cred['token_uri'] = 'https://oauth2.googleapis.com/token'
handle, path = tempfile.mkstemp(dir=os.path.join(private_data_dir, 'env'))
f = os.fdopen(handle, 'w')
json.dump(json_cred, f, indent=2)
f.close()
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
container_path = to_container_path(path, private_data_dir)
env['GCE_CREDENTIALS_FILE_PATH'] = container_path
env['GCP_SERVICE_ACCOUNT_FILE'] = container_path
env['GOOGLE_APPLICATION_CREDENTIALS'] = container_path
# Handle env variables for new module types.
# This includes gcp_compute inventory plugin and
# all new gcp_* modules.
env['GCP_AUTH_KIND'] = 'serviceaccount'
env['GCP_PROJECT'] = project
env['GCP_ENV_TYPE'] = 'tower'
return path
def azure_rm(cred, env, private_data_dir):
client = cred.get_input('client', default='')
tenant = cred.get_input('tenant', default='')
env['AZURE_SUBSCRIPTION_ID'] = cred.get_input('subscription', default='')
if len(client) and len(tenant):
env['AZURE_CLIENT_ID'] = client
env['AZURE_TENANT'] = tenant
env['AZURE_SECRET'] = cred.get_input('secret', default='')
else:
env['AZURE_AD_USER'] = cred.get_input('username', default='')
env['AZURE_PASSWORD'] = cred.get_input('password', default='')
if cred.has_input('cloud_environment'):
env['AZURE_CLOUD_ENVIRONMENT'] = cred.get_input('cloud_environment')
def vmware(cred, env, private_data_dir):
env['VMWARE_USER'] = cred.get_input('username', default='')
env['VMWARE_PASSWORD'] = cred.get_input('password', default='')
env['VMWARE_HOST'] = cred.get_input('host', default='')
env['VMWARE_VALIDATE_CERTS'] = str(settings.VMWARE_VALIDATE_CERTS)
def _openstack_data(cred):
openstack_auth = dict(
auth_url=cred.get_input('host', default=''),
username=cred.get_input('username', default=''),
password=cred.get_input('password', default=''),
project_name=cred.get_input('project', default=''),
)
if cred.has_input('project_domain_name'):
openstack_auth['project_domain_name'] = cred.get_input('project_domain_name', default='')
if cred.has_input('domain'):
openstack_auth['domain_name'] = cred.get_input('domain', default='')
verify_state = cred.get_input('verify_ssl', default=True)
openstack_data = {
'clouds': {
'devstack': {
'auth': openstack_auth,
'verify': verify_state,
},
},
}
if cred.has_input('region'):
openstack_data['clouds']['devstack']['region_name'] = cred.get_input('region', default='')
return openstack_data
def openstack(cred, env, private_data_dir):
handle, path = tempfile.mkstemp(dir=os.path.join(private_data_dir, 'env'))
f = os.fdopen(handle, 'w')
openstack_data = _openstack_data(cred)
yaml.safe_dump(openstack_data, f, default_flow_style=False, allow_unicode=True)
f.close()
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
env['OS_CLIENT_CONFIG_FILE'] = to_container_path(path, private_data_dir)
def kubernetes_bearer_token(cred, env, private_data_dir):
env['K8S_AUTH_HOST'] = cred.get_input('host', default='')
env['K8S_AUTH_API_KEY'] = cred.get_input('bearer_token', default='')
if cred.get_input('verify_ssl') and 'ssl_ca_cert' in cred.inputs:
env['K8S_AUTH_VERIFY_SSL'] = 'True'
handle, path = tempfile.mkstemp(dir=os.path.join(private_data_dir, 'env'))
with os.fdopen(handle, 'w') as f:
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
f.write(cred.get_input('ssl_ca_cert'))
env['K8S_AUTH_SSL_CA_CERT'] = to_container_path(path, private_data_dir)
else:
env['K8S_AUTH_VERIFY_SSL'] = 'False'
def terraform(cred, env, private_data_dir):
handle, path = tempfile.mkstemp(dir=os.path.join(private_data_dir, 'env'))
with os.fdopen(handle, 'w') as f:
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
f.write(cred.get_input('configuration'))
env['TF_BACKEND_CONFIG_FILE'] = to_container_path(path, private_data_dir)
# Handle env variables for GCP account credentials
if 'gce_credentials' in cred.inputs:
handle, path = tempfile.mkstemp(dir=os.path.join(private_data_dir, 'env'))
with os.fdopen(handle, 'w') as f:
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
f.write(cred.get_input('gce_credentials'))
env['GOOGLE_BACKEND_CREDENTIALS'] = to_container_path(path, private_data_dir)

View File

@@ -1,68 +0,0 @@
import os
import tempfile
from collections import namedtuple
from requests.exceptions import HTTPError
CredentialPlugin = namedtuple('CredentialPlugin', ['name', 'inputs', 'backend'])
try:
from django.utils.translation import gettext_lazy as translate_function
except ModuleNotFoundError:
translate_function = lambda *args, **kwargs: None
class Settings():
DEBUG = False
settings = Settings()
def raise_for_status(resp):
resp.raise_for_status()
if resp.status_code >= 300:
exc = HTTPError()
setattr(exc, 'response', resp)
raise exc
class CertFiles:
"""
A context manager used for writing a certificate and (optional) key
to $TMPDIR, and cleaning up afterwards.
This is particularly useful as a shared resource for credential plugins
that want to pull cert/key data out of the database and persist it
temporarily to the file system so that it can loaded into the openssl
certificate chain (generally, for HTTPS requests plugins make via the
Python requests library)
with CertFiles(cert_data, key_data) as cert:
# cert is string representing a path to the cert or pemfile
# temporarily written to disk
requests.post(..., cert=cert)
"""
certfile = None
def __init__(self, cert, key=None):
self.cert = cert
self.key = key
def __enter__(self):
if not self.cert:
return None
self.certfile = tempfile.NamedTemporaryFile('wb', delete=False)
self.certfile.write(self.cert.encode())
if self.key:
self.certfile.write(b'\n')
self.certfile.write(self.key.encode())
self.certfile.flush()
return str(self.certfile.name)
def __exit__(self, *args):
if self.certfile and os.path.exists(self.certfile.name):
os.remove(self.certfile.name)

View File

@@ -1,665 +0,0 @@
# Django
from django.utils.translation import gettext_noop
# AWX
from awx.main.models.credential import ManagedCredentialType
ManagedCredentialType(
namespace='ssh',
kind='ssh',
name=gettext_noop('Machine'),
inputs={
'fields': [
{'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'},
{'id': 'password', 'label': gettext_noop('Password'), 'type': 'string', 'secret': True, 'ask_at_runtime': True},
{'id': 'ssh_key_data', 'label': gettext_noop('SSH Private Key'), 'type': 'string', 'format': 'ssh_private_key', 'secret': True, 'multiline': True},
{
'id': 'ssh_public_key_data',
'label': gettext_noop('Signed SSH Certificate'),
'type': 'string',
'multiline': True,
'secret': True,
},
{'id': 'ssh_key_unlock', 'label': gettext_noop('Private Key Passphrase'), 'type': 'string', 'secret': True, 'ask_at_runtime': True},
{
'id': 'become_method',
'label': gettext_noop('Privilege Escalation Method'),
'type': 'string',
'help_text': gettext_noop('Specify a method for "become" operations. This is equivalent to specifying the --become-method Ansible parameter.'),
},
{
'id': 'become_username',
'label': gettext_noop('Privilege Escalation Username'),
'type': 'string',
},
{'id': 'become_password', 'label': gettext_noop('Privilege Escalation Password'), 'type': 'string', 'secret': True, 'ask_at_runtime': True},
],
},
)
ManagedCredentialType(
namespace='scm',
kind='scm',
name=gettext_noop('Source Control'),
managed=True,
inputs={
'fields': [
{'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'},
{'id': 'password', 'label': gettext_noop('Password'), 'type': 'string', 'secret': True},
{'id': 'ssh_key_data', 'label': gettext_noop('SCM Private Key'), 'type': 'string', 'format': 'ssh_private_key', 'secret': True, 'multiline': True},
{'id': 'ssh_key_unlock', 'label': gettext_noop('Private Key Passphrase'), 'type': 'string', 'secret': True},
],
},
)
ManagedCredentialType(
namespace='vault',
kind='vault',
name=gettext_noop('Vault'),
managed=True,
inputs={
'fields': [
{'id': 'vault_password', 'label': gettext_noop('Vault Password'), 'type': 'string', 'secret': True, 'ask_at_runtime': True},
{
'id': 'vault_id',
'label': gettext_noop('Vault Identifier'),
'type': 'string',
'format': 'vault_id',
'help_text': gettext_noop(
'Specify an (optional) Vault ID. This is '
'equivalent to specifying the --vault-id '
'Ansible parameter for providing multiple Vault '
'passwords. Note: this feature only works in '
'Ansible 2.4+.'
),
},
],
'required': ['vault_password'],
},
)
ManagedCredentialType(
namespace='net',
kind='net',
name=gettext_noop('Network'),
managed=True,
inputs={
'fields': [
{'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'},
{
'id': 'password',
'label': gettext_noop('Password'),
'type': 'string',
'secret': True,
},
{'id': 'ssh_key_data', 'label': gettext_noop('SSH Private Key'), 'type': 'string', 'format': 'ssh_private_key', 'secret': True, 'multiline': True},
{
'id': 'ssh_key_unlock',
'label': gettext_noop('Private Key Passphrase'),
'type': 'string',
'secret': True,
},
{
'id': 'authorize',
'label': gettext_noop('Authorize'),
'type': 'boolean',
},
{
'id': 'authorize_password',
'label': gettext_noop('Authorize Password'),
'type': 'string',
'secret': True,
},
],
'dependencies': {
'authorize_password': ['authorize'],
},
'required': ['username'],
},
)
ManagedCredentialType(
namespace='aws',
kind='cloud',
name=gettext_noop('Amazon Web Services'),
managed=True,
inputs={
'fields': [
{'id': 'username', 'label': gettext_noop('Access Key'), 'type': 'string'},
{
'id': 'password',
'label': gettext_noop('Secret Key'),
'type': 'string',
'secret': True,
},
{
'id': 'security_token',
'label': gettext_noop('STS Token'),
'type': 'string',
'secret': True,
'help_text': gettext_noop(
'Security Token Service (STS) is a web service '
'that enables you to request temporary, '
'limited-privilege credentials for AWS Identity '
'and Access Management (IAM) users.'
),
},
],
'required': ['username', 'password'],
},
)
ManagedCredentialType(
namespace='openstack',
kind='cloud',
name=gettext_noop('OpenStack'),
managed=True,
inputs={
'fields': [
{'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'},
{
'id': 'password',
'label': gettext_noop('Password (API Key)'),
'type': 'string',
'secret': True,
},
{
'id': 'host',
'label': gettext_noop('Host (Authentication URL)'),
'type': 'string',
'help_text': gettext_noop('The host to authenticate with. For example, https://openstack.business.com/v2.0/'),
},
{
'id': 'project',
'label': gettext_noop('Project (Tenant Name)'),
'type': 'string',
},
{
'id': 'project_domain_name',
'label': gettext_noop('Project (Domain Name)'),
'type': 'string',
},
{
'id': 'domain',
'label': gettext_noop('Domain Name'),
'type': 'string',
'help_text': gettext_noop(
'OpenStack domains define administrative boundaries. '
'It is only needed for Keystone v3 authentication '
'URLs. Refer to the documentation for '
'common scenarios.'
),
},
{
'id': 'region',
'label': gettext_noop('Region Name'),
'type': 'string',
'help_text': gettext_noop('For some cloud providers, like OVH, region must be specified'),
},
{
'id': 'verify_ssl',
'label': gettext_noop('Verify SSL'),
'type': 'boolean',
'default': True,
},
],
'required': ['username', 'password', 'host', 'project'],
},
)
ManagedCredentialType(
namespace='vmware',
kind='cloud',
name=gettext_noop('VMware vCenter'),
managed=True,
inputs={
'fields': [
{
'id': 'host',
'label': gettext_noop('VCenter Host'),
'type': 'string',
'help_text': gettext_noop('Enter the hostname or IP address that corresponds to your VMware vCenter.'),
},
{'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'},
{
'id': 'password',
'label': gettext_noop('Password'),
'type': 'string',
'secret': True,
},
],
'required': ['host', 'username', 'password'],
},
)
ManagedCredentialType(
namespace='satellite6',
kind='cloud',
name=gettext_noop('Red Hat Satellite 6'),
managed=True,
inputs={
'fields': [
{
'id': 'host',
'label': gettext_noop('Satellite 6 URL'),
'type': 'string',
'help_text': gettext_noop('Enter the URL that corresponds to your Red Hat Satellite 6 server. For example, https://satellite.example.org'),
},
{'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'},
{
'id': 'password',
'label': gettext_noop('Password'),
'type': 'string',
'secret': True,
},
],
'required': ['host', 'username', 'password'],
},
)
ManagedCredentialType(
namespace='gce',
kind='cloud',
name=gettext_noop('Google Compute Engine'),
managed=True,
inputs={
'fields': [
{
'id': 'username',
'label': gettext_noop('Service Account Email Address'),
'type': 'string',
'help_text': gettext_noop('The email address assigned to the Google Compute Engine service account.'),
},
{
'id': 'project',
'label': 'Project',
'type': 'string',
'help_text': gettext_noop(
'The Project ID is the GCE assigned identification. '
'It is often constructed as three words or two words '
'followed by a three-digit number. Examples: project-id-000 '
'and another-project-id'
),
},
{
'id': 'ssh_key_data',
'label': gettext_noop('RSA Private Key'),
'type': 'string',
'format': 'ssh_private_key',
'secret': True,
'multiline': True,
'help_text': gettext_noop('Paste the contents of the PEM file associated with the service account email.'),
},
],
'required': ['username', 'ssh_key_data'],
},
)
ManagedCredentialType(
namespace='azure_rm',
kind='cloud',
name=gettext_noop('Microsoft Azure Resource Manager'),
managed=True,
inputs={
'fields': [
{
'id': 'subscription',
'label': gettext_noop('Subscription ID'),
'type': 'string',
'help_text': gettext_noop('Subscription ID is an Azure construct, which is mapped to a username.'),
},
{'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'},
{
'id': 'password',
'label': gettext_noop('Password'),
'type': 'string',
'secret': True,
},
{'id': 'client', 'label': gettext_noop('Client ID'), 'type': 'string'},
{
'id': 'secret',
'label': gettext_noop('Client Secret'),
'type': 'string',
'secret': True,
},
{'id': 'tenant', 'label': gettext_noop('Tenant ID'), 'type': 'string'},
{
'id': 'cloud_environment',
'label': gettext_noop('Azure Cloud Environment'),
'type': 'string',
'help_text': gettext_noop('Environment variable AZURE_CLOUD_ENVIRONMENT when using Azure GovCloud or Azure stack.'),
},
],
'required': ['subscription'],
},
)
ManagedCredentialType(
namespace='github_token',
kind='token',
name=gettext_noop('GitHub Personal Access Token'),
managed=True,
inputs={
'fields': [
{
'id': 'token',
'label': gettext_noop('Token'),
'type': 'string',
'secret': True,
'help_text': gettext_noop('This token needs to come from your profile settings in GitHub'),
}
],
'required': ['token'],
},
)
ManagedCredentialType(
namespace='gitlab_token',
kind='token',
name=gettext_noop('GitLab Personal Access Token'),
managed=True,
inputs={
'fields': [
{
'id': 'token',
'label': gettext_noop('Token'),
'type': 'string',
'secret': True,
'help_text': gettext_noop('This token needs to come from your profile settings in GitLab'),
}
],
'required': ['token'],
},
)
ManagedCredentialType(
namespace='bitbucket_dc_token',
kind='token',
name=gettext_noop('Bitbucket Data Center HTTP Access Token'),
managed=True,
inputs={
'fields': [
{
'id': 'token',
'label': gettext_noop('Token'),
'type': 'string',
'secret': True,
'help_text': gettext_noop('This token needs to come from your user settings in Bitbucket'),
}
],
'required': ['token'],
},
)
ManagedCredentialType(
namespace='insights',
kind='insights',
name=gettext_noop('Insights'),
managed=True,
inputs={
'fields': [
{'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'},
{'id': 'password', 'label': gettext_noop('Password'), 'type': 'string', 'secret': True},
],
'required': ['username', 'password'],
},
injectors={
'extra_vars': {
"scm_username": "{{username}}",
"scm_password": "{{password}}",
},
'env': {
'INSIGHTS_USER': '{{username}}',
'INSIGHTS_PASSWORD': '{{password}}',
},
},
)
ManagedCredentialType(
namespace='rhv',
kind='cloud',
name=gettext_noop('Red Hat Virtualization'),
managed=True,
inputs={
'fields': [
{'id': 'host', 'label': gettext_noop('Host (Authentication URL)'), 'type': 'string', 'help_text': gettext_noop('The host to authenticate with.')},
{'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'},
{
'id': 'password',
'label': gettext_noop('Password'),
'type': 'string',
'secret': True,
},
{
'id': 'ca_file',
'label': gettext_noop('CA File'),
'type': 'string',
'help_text': gettext_noop('Absolute file path to the CA file to use (optional)'),
},
],
'required': ['host', 'username', 'password'],
},
injectors={
# The duplication here is intentional; the ovirt4 inventory plugin
# writes a .ini file for authentication, while the ansible modules for
# ovirt4 use a separate authentication process that support
# environment variables; by injecting both, we support both
'file': {
'template': '\n'.join(
[
'[ovirt]',
'ovirt_url={{host}}',
'ovirt_username={{username}}',
'ovirt_password={{password}}',
'{% if ca_file %}ovirt_ca_file={{ca_file}}{% endif %}',
]
)
},
'env': {'OVIRT_INI_PATH': '{{tower.filename}}', 'OVIRT_URL': '{{host}}', 'OVIRT_USERNAME': '{{username}}', 'OVIRT_PASSWORD': '{{password}}'},
},
)
ManagedCredentialType(
namespace='controller',
kind='cloud',
name=gettext_noop('Red Hat Ansible Automation Platform'),
managed=True,
inputs={
'fields': [
{
'id': 'host',
'label': gettext_noop('Red Hat Ansible Automation Platform'),
'type': 'string',
'help_text': gettext_noop('Red Hat Ansible Automation Platform base URL to authenticate with.'),
},
{
'id': 'username',
'label': gettext_noop('Username'),
'type': 'string',
'help_text': gettext_noop(
'Red Hat Ansible Automation Platform username id to authenticate as.This should not be set if an OAuth token is being used.'
),
},
{
'id': 'password',
'label': gettext_noop('Password'),
'type': 'string',
'secret': True,
},
{
'id': 'oauth_token',
'label': gettext_noop('OAuth Token'),
'type': 'string',
'secret': True,
'help_text': gettext_noop('An OAuth token to use to authenticate with.This should not be set if username/password are being used.'),
},
{'id': 'verify_ssl', 'label': gettext_noop('Verify SSL'), 'type': 'boolean', 'secret': False},
],
'required': ['host'],
},
injectors={
'env': {
'TOWER_HOST': '{{host}}',
'TOWER_USERNAME': '{{username}}',
'TOWER_PASSWORD': '{{password}}',
'TOWER_VERIFY_SSL': '{{verify_ssl}}',
'TOWER_OAUTH_TOKEN': '{{oauth_token}}',
'CONTROLLER_HOST': '{{host}}',
'CONTROLLER_USERNAME': '{{username}}',
'CONTROLLER_PASSWORD': '{{password}}',
'CONTROLLER_VERIFY_SSL': '{{verify_ssl}}',
'CONTROLLER_OAUTH_TOKEN': '{{oauth_token}}',
}
},
)
ManagedCredentialType(
namespace='kubernetes_bearer_token',
kind='kubernetes',
name=gettext_noop('OpenShift or Kubernetes API Bearer Token'),
inputs={
'fields': [
{
'id': 'host',
'label': gettext_noop('OpenShift or Kubernetes API Endpoint'),
'type': 'string',
'help_text': gettext_noop('The OpenShift or Kubernetes API Endpoint to authenticate with.'),
},
{
'id': 'bearer_token',
'label': gettext_noop('API authentication bearer token'),
'type': 'string',
'secret': True,
},
{
'id': 'verify_ssl',
'label': gettext_noop('Verify SSL'),
'type': 'boolean',
'default': True,
},
{
'id': 'ssl_ca_cert',
'label': gettext_noop('Certificate Authority data'),
'type': 'string',
'secret': True,
'multiline': True,
},
],
'required': ['host', 'bearer_token'],
},
)
ManagedCredentialType(
namespace='registry',
kind='registry',
name=gettext_noop('Container Registry'),
inputs={
'fields': [
{
'id': 'host',
'label': gettext_noop('Authentication URL'),
'type': 'string',
'help_text': gettext_noop('Authentication endpoint for the container registry.'),
'default': 'quay.io',
},
{
'id': 'username',
'label': gettext_noop('Username'),
'type': 'string',
},
{
'id': 'password',
'label': gettext_noop('Password or Token'),
'type': 'string',
'secret': True,
'help_text': gettext_noop('A password or token used to authenticate with'),
},
{
'id': 'verify_ssl',
'label': gettext_noop('Verify SSL'),
'type': 'boolean',
'default': True,
},
],
'required': ['host'],
},
)
ManagedCredentialType(
namespace='galaxy_api_token',
kind='galaxy',
name=gettext_noop('Ansible Galaxy/Automation Hub API Token'),
inputs={
'fields': [
{
'id': 'url',
'label': gettext_noop('Galaxy Server URL'),
'type': 'string',
'help_text': gettext_noop('The URL of the Galaxy instance to connect to.'),
},
{
'id': 'auth_url',
'label': gettext_noop('Auth Server URL'),
'type': 'string',
'help_text': gettext_noop('The URL of a Keycloak server token_endpoint, if using SSO auth.'),
},
{
'id': 'token',
'label': gettext_noop('API Token'),
'type': 'string',
'secret': True,
'help_text': gettext_noop('A token to use for authentication against the Galaxy instance.'),
},
],
'required': ['url'],
},
)
ManagedCredentialType(
namespace='gpg_public_key',
kind='cryptography',
name=gettext_noop('GPG Public Key'),
inputs={
'fields': [
{
'id': 'gpg_public_key',
'label': gettext_noop('GPG Public Key'),
'type': 'string',
'secret': True,
'multiline': True,
'help_text': gettext_noop('GPG Public Key used to validate content signatures.'),
},
],
'required': ['gpg_public_key'],
},
)
ManagedCredentialType(
namespace='terraform',
kind='cloud',
name=gettext_noop('Terraform backend configuration'),
managed=True,
inputs={
'fields': [
{
'id': 'configuration',
'label': gettext_noop('Backend configuration'),
'type': 'string',
'secret': True,
'multiline': True,
'help_text': gettext_noop('Terraform backend config as Hashicorp configuration language.'),
},
{
'id': 'gce_credentials',
'label': gettext_noop('Google Cloud Platform account credentials'),
'type': 'string',
'secret': True,
'multiline': True,
'help_text': gettext_noop('Google Cloud Platform account credentials in JSON format.'),
},
],
'required': ['configuration'],
},
)

View File

@@ -1,76 +0,0 @@
from .plugin import CredentialPlugin
from .plugin import translate_function as _
try:
from delinea.secrets.server import DomainPasswordGrantAuthorizer, PasswordGrantAuthorizer, SecretServer, ServerSecret
except ImportError:
from thycotic.secrets.server import DomainPasswordGrantAuthorizer, PasswordGrantAuthorizer, SecretServer, ServerSecret
tss_inputs = {
'fields': [
{
'id': 'server_url',
'label': _('Secret Server URL'),
'help_text': _('The Base URL of Secret Server e.g. https://myserver/SecretServer or https://mytenant.secretservercloud.com'),
'type': 'string',
},
{
'id': 'username',
'label': _('Username'),
'help_text': _('The (Application) user username'),
'type': 'string',
},
{
'id': 'domain',
'label': _('Domain'),
'help_text': _('The (Application) user domain'),
'type': 'string',
},
{
'id': 'password',
'label': _('Password'),
'help_text': _('The corresponding password'),
'type': 'string',
'secret': True,
},
],
'metadata': [
{
'id': 'secret_id',
'label': _('Secret ID'),
'help_text': _('The integer ID of the secret'),
'type': 'string',
},
{
'id': 'secret_field',
'label': _('Secret Field'),
'help_text': _('The field to extract from the secret'),
'type': 'string',
},
],
'required': ['server_url', 'username', 'password', 'secret_id', 'secret_field'],
}
def tss_backend(**kwargs):
if kwargs.get("domain"):
authorizer = DomainPasswordGrantAuthorizer(
base_url=kwargs['server_url'], username=kwargs['username'], domain=kwargs['domain'], password=kwargs['password']
)
else:
authorizer = PasswordGrantAuthorizer(kwargs['server_url'], kwargs['username'], kwargs['password'])
secret_server = SecretServer(kwargs['server_url'], authorizer)
secret_dict = secret_server.get_secret(kwargs['secret_id'])
secret = ServerSecret(**secret_dict)
if isinstance(secret.fields[kwargs['secret_field']].value, str) == False:
return secret.fields[kwargs['secret_field']].value.text
else:
return secret.fields[kwargs['secret_field']].value
tss_plugin = CredentialPlugin(
'Thycotic Secret Server',
tss_inputs,
tss_backend,
)

View File

@@ -1,302 +0,0 @@
import yaml
import stat
import tempfile
import os.path
from awx_plugins.credentials.injectors import _openstack_data
from awx.main.utils.execution_environments import to_container_path
from awx.main.utils.licensing import server_product_name
class PluginFileInjector(object):
plugin_name = None # Ansible core name used to reference plugin
# base injector should be one of None, "managed", or "template"
# this dictates which logic to borrow from playbook injectors
base_injector = None
# every source should have collection, these are for the collection name
namespace = None
collection = None
collection_migration = '2.9' # Starting with this version, we use collections
use_fqcn = False # plugin: name versus plugin: namespace.collection.name
# TODO: delete this method and update unit tests
@classmethod
def get_proper_name(cls):
if cls.plugin_name is None:
return None
return f'{cls.namespace}.{cls.collection}.{cls.plugin_name}'
@property
def filename(self):
"""Inventory filename for using the inventory plugin
This is created dynamically, but the auto plugin requires this exact naming
"""
return '{0}.yml'.format(self.plugin_name)
def inventory_contents(self, inventory_update, private_data_dir):
"""Returns a string that is the content for the inventory file for the inventory plugin"""
return yaml.safe_dump(self.inventory_as_dict(inventory_update, private_data_dir), default_flow_style=False, width=1000)
def inventory_as_dict(self, inventory_update, private_data_dir):
source_vars = dict(inventory_update.source_vars_dict) # make a copy
'''
None conveys that we should use the user-provided plugin.
Note that a plugin value of '' should still be overridden.
'''
if self.plugin_name is not None:
if hasattr(self, 'downstream_namespace') and server_product_name() != 'AWX':
source_vars['plugin'] = f'{self.downstream_namespace}.{self.downstream_collection}.{self.plugin_name}'
elif self.use_fqcn:
source_vars['plugin'] = f'{self.namespace}.{self.collection}.{self.plugin_name}'
else:
source_vars['plugin'] = self.plugin_name
return source_vars
def build_env(self, inventory_update, env, private_data_dir, private_data_files):
injector_env = self.get_plugin_env(inventory_update, private_data_dir, private_data_files)
env.update(injector_env)
# All CLOUD_PROVIDERS sources implement as inventory plugin from collection
env['ANSIBLE_INVENTORY_ENABLED'] = 'auto'
return env
def _get_shared_env(self, inventory_update, private_data_dir, private_data_files):
"""By default, we will apply the standard managed injectors"""
injected_env = {}
credential = inventory_update.get_cloud_credential()
# some sources may have no credential, specifically ec2
if credential is None:
return injected_env
if self.base_injector in ('managed', 'template'):
injected_env['INVENTORY_UPDATE_ID'] = str(inventory_update.pk) # so injector knows this is inventory
if self.base_injector == 'managed':
from awx_plugins.credentials import injectors as builtin_injectors
cred_kind = inventory_update.source.replace('ec2', 'aws')
if cred_kind in dir(builtin_injectors):
getattr(builtin_injectors, cred_kind)(credential, injected_env, private_data_dir)
elif self.base_injector == 'template':
safe_env = injected_env.copy()
args = []
credential.credential_type.inject_credential(credential, injected_env, safe_env, args, private_data_dir)
# NOTE: safe_env is handled externally to injector class by build_safe_env static method
# that means that managed injectors must only inject detectable env keys
# enforcement of this is accomplished by tests
return injected_env
def get_plugin_env(self, inventory_update, private_data_dir, private_data_files):
env = self._get_shared_env(inventory_update, private_data_dir, private_data_files)
return env
def build_private_data(self, inventory_update, private_data_dir):
return self.build_plugin_private_data(inventory_update, private_data_dir)
def build_plugin_private_data(self, inventory_update, private_data_dir):
return None
class azure_rm(PluginFileInjector):
plugin_name = 'azure_rm'
base_injector = 'managed'
namespace = 'azure'
collection = 'azcollection'
def get_plugin_env(self, *args, **kwargs):
ret = super(azure_rm, self).get_plugin_env(*args, **kwargs)
# We need native jinja2 types so that tags can give JSON null value
ret['ANSIBLE_JINJA2_NATIVE'] = str(True)
return ret
class ec2(PluginFileInjector):
plugin_name = 'aws_ec2'
base_injector = 'managed'
namespace = 'amazon'
collection = 'aws'
def get_plugin_env(self, *args, **kwargs):
ret = super(ec2, self).get_plugin_env(*args, **kwargs)
# We need native jinja2 types so that ec2_state_code will give integer
ret['ANSIBLE_JINJA2_NATIVE'] = str(True)
return ret
class gce(PluginFileInjector):
plugin_name = 'gcp_compute'
base_injector = 'managed'
namespace = 'google'
collection = 'cloud'
def get_plugin_env(self, *args, **kwargs):
ret = super(gce, self).get_plugin_env(*args, **kwargs)
# We need native jinja2 types so that ip addresses can give JSON null value
ret['ANSIBLE_JINJA2_NATIVE'] = str(True)
return ret
def inventory_as_dict(self, inventory_update, private_data_dir):
ret = super().inventory_as_dict(inventory_update, private_data_dir)
credential = inventory_update.get_cloud_credential()
# InventorySource.source_vars take precedence over ENV vars
if 'projects' not in ret:
ret['projects'] = [credential.get_input('project', default='')]
return ret
class vmware(PluginFileInjector):
plugin_name = 'vmware_vm_inventory'
base_injector = 'managed'
namespace = 'community'
collection = 'vmware'
class openstack(PluginFileInjector):
plugin_name = 'openstack'
namespace = 'openstack'
collection = 'cloud'
def _get_clouds_dict(self, inventory_update, cred, private_data_dir):
openstack_data = _openstack_data(cred)
openstack_data['clouds']['devstack']['private'] = inventory_update.source_vars_dict.get('private', True)
ansible_variables = {
'use_hostnames': True,
'expand_hostvars': False,
'fail_on_errors': True,
}
provided_count = 0
for var_name in ansible_variables:
if var_name in inventory_update.source_vars_dict:
ansible_variables[var_name] = inventory_update.source_vars_dict[var_name]
provided_count += 1
if provided_count:
# Must we provide all 3 because the user provides any 1 of these??
# this probably results in some incorrect mangling of the defaults
openstack_data['ansible'] = ansible_variables
return openstack_data
def build_plugin_private_data(self, inventory_update, private_data_dir):
credential = inventory_update.get_cloud_credential()
private_data = {'credentials': {}}
openstack_data = self._get_clouds_dict(inventory_update, credential, private_data_dir)
private_data['credentials'][credential] = yaml.safe_dump(openstack_data, default_flow_style=False, allow_unicode=True)
return private_data
def get_plugin_env(self, inventory_update, private_data_dir, private_data_files):
env = super(openstack, self).get_plugin_env(inventory_update, private_data_dir, private_data_files)
credential = inventory_update.get_cloud_credential()
cred_data = private_data_files['credentials']
env['OS_CLIENT_CONFIG_FILE'] = to_container_path(cred_data[credential], private_data_dir)
return env
class rhv(PluginFileInjector):
"""ovirt uses the custom credential templating, and that is all"""
plugin_name = 'ovirt'
base_injector = 'template'
initial_version = '2.9'
namespace = 'ovirt'
collection = 'ovirt'
downstream_namespace = 'redhat'
downstream_collection = 'rhv'
use_fqcn = True
class satellite6(PluginFileInjector):
plugin_name = 'foreman'
namespace = 'theforeman'
collection = 'foreman'
downstream_namespace = 'redhat'
downstream_collection = 'satellite'
use_fqcn = True
def get_plugin_env(self, inventory_update, private_data_dir, private_data_files):
# this assumes that this is merged
# https://github.com/ansible/ansible/pull/52693
credential = inventory_update.get_cloud_credential()
ret = super(satellite6, self).get_plugin_env(inventory_update, private_data_dir, private_data_files)
if credential:
ret['FOREMAN_SERVER'] = credential.get_input('host', default='')
ret['FOREMAN_USER'] = credential.get_input('username', default='')
ret['FOREMAN_PASSWORD'] = credential.get_input('password', default='')
return ret
class terraform(PluginFileInjector):
plugin_name = 'terraform_state'
namespace = 'cloud'
collection = 'terraform'
use_fqcn = True
def inventory_as_dict(self, inventory_update, private_data_dir):
ret = super().inventory_as_dict(inventory_update, private_data_dir)
credential = inventory_update.get_cloud_credential()
config_cred = credential.get_input('configuration')
if config_cred:
handle, path = tempfile.mkstemp(dir=os.path.join(private_data_dir, 'env'))
with os.fdopen(handle, 'w') as f:
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
f.write(config_cred)
ret['backend_config_files'] = to_container_path(path, private_data_dir)
return ret
def build_plugin_private_data(self, inventory_update, private_data_dir):
credential = inventory_update.get_cloud_credential()
private_data = {'credentials': {}}
gce_cred = credential.get_input('gce_credentials', default=None)
if gce_cred:
private_data['credentials'][credential] = gce_cred
return private_data
def get_plugin_env(self, inventory_update, private_data_dir, private_data_files):
env = super(terraform, self).get_plugin_env(inventory_update, private_data_dir, private_data_files)
credential = inventory_update.get_cloud_credential()
cred_data = private_data_files['credentials']
if credential in cred_data:
env['GOOGLE_BACKEND_CREDENTIALS'] = to_container_path(cred_data[credential], private_data_dir)
return env
class controller(PluginFileInjector):
plugin_name = 'tower' # TODO: relying on routing for now, update after EEs pick up revised collection
base_injector = 'template'
namespace = 'awx'
collection = 'awx'
downstream_namespace = 'ansible'
downstream_collection = 'controller'
class insights(PluginFileInjector):
plugin_name = 'insights'
base_injector = 'template'
namespace = 'redhatinsights'
collection = 'insights'
downstream_namespace = 'redhat'
downstream_collection = 'insights'
use_fqcn = True
class openshift_virtualization(PluginFileInjector):
plugin_name = 'kubevirt'
base_injector = 'template'
namespace = 'kubevirt'
collection = 'core'
downstream_namespace = 'redhat'
downstream_collection = 'openshift_virtualization'
use_fqcn = True
class constructed(PluginFileInjector):
plugin_name = 'constructed'
namespace = 'ansible'
collection = 'builtin'
def build_env(self, *args, **kwargs):
env = super().build_env(*args, **kwargs)
# Enable script inventory plugin so we pick up the script files from source inventories
env['ANSIBLE_INVENTORY_ENABLED'] += ',script'
env['ANSIBLE_INVENTORY_ANY_UNPARSED_IS_FAILED'] = 'True'
return env

View File

@@ -1,142 +0,0 @@
import pytest
from unittest import mock
from awx_plugins.credentials import hashivault
def test_imported_azure_cloud_sdk_vars():
from awx_plugins.credentials import azure_kv
assert len(azure_kv.clouds) > 0
assert all([hasattr(c, 'name') for c in azure_kv.clouds])
assert all([hasattr(c, 'suffixes') for c in azure_kv.clouds])
assert all([hasattr(c.suffixes, 'keyvault_dns') for c in azure_kv.clouds])
def test_hashivault_approle_auth():
kwargs = {
'role_id': 'the_role_id',
'secret_id': 'the_secret_id',
}
expected_res = {
'role_id': 'the_role_id',
'secret_id': 'the_secret_id',
}
res = hashivault.approle_auth(**kwargs)
assert res == expected_res
def test_hashivault_kubernetes_auth():
kwargs = {
'kubernetes_role': 'the_kubernetes_role',
}
expected_res = {
'role': 'the_kubernetes_role',
'jwt': 'the_jwt',
}
with mock.patch('pathlib.Path') as path_mock:
mock.mock_open(path_mock.return_value.open, read_data='the_jwt')
res = hashivault.kubernetes_auth(**kwargs)
path_mock.assert_called_with('/var/run/secrets/kubernetes.io/serviceaccount/token')
assert res == expected_res
def test_hashivault_client_cert_auth_explicit_role():
kwargs = {
'client_cert_role': 'test-cert-1',
}
expected_res = {
'name': 'test-cert-1',
}
res = hashivault.client_cert_auth(**kwargs)
assert res == expected_res
def test_hashivault_client_cert_auth_no_role():
kwargs = {}
expected_res = {
'name': None,
}
res = hashivault.client_cert_auth(**kwargs)
assert res == expected_res
def test_hashivault_userpass_auth():
kwargs = {'username': 'the_username', 'password': 'the_password'}
expected_res = {'username': 'the_username', 'password': 'the_password'}
res = hashivault.userpass_auth(**kwargs)
assert res == expected_res
def test_hashivault_handle_auth_token():
kwargs = {
'token': 'the_token',
}
token = hashivault.handle_auth(**kwargs)
assert token == kwargs['token']
def test_hashivault_handle_auth_approle():
kwargs = {
'role_id': 'the_role_id',
'secret_id': 'the_secret_id',
}
with mock.patch.object(hashivault, 'method_auth') as method_mock:
method_mock.return_value = 'the_token'
token = hashivault.handle_auth(**kwargs)
method_mock.assert_called_with(**kwargs, auth_param=kwargs)
assert token == 'the_token'
def test_hashivault_handle_auth_kubernetes():
kwargs = {
'kubernetes_role': 'the_kubernetes_role',
}
with mock.patch.object(hashivault, 'method_auth') as method_mock:
with mock.patch('pathlib.Path') as path_mock:
mock.mock_open(path_mock.return_value.open, read_data='the_jwt')
method_mock.return_value = 'the_token'
token = hashivault.handle_auth(**kwargs)
method_mock.assert_called_with(**kwargs, auth_param={'role': 'the_kubernetes_role', 'jwt': 'the_jwt'})
assert token == 'the_token'
def test_hashivault_handle_auth_client_cert():
kwargs = {
'client_cert_public': "foo",
'client_cert_private': "bar",
'client_cert_role': 'test-cert-1',
}
auth_params = {
'name': 'test-cert-1',
}
with mock.patch.object(hashivault, 'method_auth') as method_mock:
method_mock.return_value = 'the_token'
token = hashivault.handle_auth(**kwargs)
method_mock.assert_called_with(**kwargs, auth_param=auth_params)
assert token == 'the_token'
def test_hashivault_handle_auth_not_enough_args():
with pytest.raises(Exception):
hashivault.handle_auth()
class TestDelineaImports:
"""
These module have a try-except for ImportError which will allow using the older library
but we do not want the awx_devel image to have the older library,
so these tests are designed to fail if these wind up using the fallback import
"""
def test_dsv_import(self):
from awx_plugins.credentials.dsv import SecretsVault # noqa
# assert this module as opposed to older thycotic.secrets.vault
assert SecretsVault.__module__ == 'delinea.secrets.vault'
def test_tss_import(self):
from awx_plugins.credentials.tss import DomainPasswordGrantAuthorizer, PasswordGrantAuthorizer, SecretServer, ServerSecret # noqa
for cls in (DomainPasswordGrantAuthorizer, PasswordGrantAuthorizer, SecretServer, ServerSecret):
# assert this module as opposed to older thycotic.secrets.server
assert cls.__module__ == 'delinea.secrets.server'