From 16d1f34179596935d473975571749f3726c796ee Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Wed, 28 Aug 2024 15:02:05 -0400 Subject: [PATCH] Delete cred and inv plugins --- awx_plugins/credentials/aim.py | 126 ---- awx_plugins/credentials/aws_secretsmanager.py | 65 -- awx_plugins/credentials/azure_kv.py | 63 -- awx_plugins/credentials/centrify_vault.py | 115 --- awx_plugins/credentials/conjur.py | 112 --- awx_plugins/credentials/dsv.py | 94 --- awx_plugins/credentials/hashivault.py | 384 ---------- awx_plugins/credentials/injectors.py | 139 ---- awx_plugins/credentials/plugin.py | 68 -- awx_plugins/credentials/plugins.py | 665 ------------------ awx_plugins/credentials/tss.py | 76 -- awx_plugins/inventory/plugins.py | 302 -------- awx_plugins/tests/test_credential_plugins.py | 142 ---- 13 files changed, 2351 deletions(-) delete mode 100644 awx_plugins/credentials/aim.py delete mode 100644 awx_plugins/credentials/aws_secretsmanager.py delete mode 100644 awx_plugins/credentials/azure_kv.py delete mode 100644 awx_plugins/credentials/centrify_vault.py delete mode 100644 awx_plugins/credentials/conjur.py delete mode 100644 awx_plugins/credentials/dsv.py delete mode 100644 awx_plugins/credentials/hashivault.py delete mode 100644 awx_plugins/credentials/injectors.py delete mode 100644 awx_plugins/credentials/plugin.py delete mode 100644 awx_plugins/credentials/plugins.py delete mode 100644 awx_plugins/credentials/tss.py delete mode 100644 awx_plugins/inventory/plugins.py delete mode 100644 awx_plugins/tests/test_credential_plugins.py diff --git a/awx_plugins/credentials/aim.py b/awx_plugins/credentials/aim.py deleted file mode 100644 index dc06b0ea6f..0000000000 --- a/awx_plugins/credentials/aim.py +++ /dev/null @@ -1,126 +0,0 @@ -from .plugin import CredentialPlugin, CertFiles, raise_for_status - -from urllib.parse import quote, urlencode, urljoin - -from .plugin import translate_function as _ -import requests - -aim_inputs = { - 'fields': [ - { - 'id': 'url', - 'label': _('CyberArk CCP URL'), - 'type': 'string', - 'format': 'url', - }, - { - 'id': 'webservice_id', - 'label': _('Web Service ID'), - 'type': 'string', - 'help_text': _('The CCP Web Service ID. Leave blank to default to AIMWebService.'), - }, - { - 'id': 'app_id', - 'label': _('Application ID'), - 'type': 'string', - 'secret': True, - }, - { - 'id': 'client_key', - 'label': _('Client Key'), - 'type': 'string', - 'secret': True, - 'multiline': True, - }, - { - 'id': 'client_cert', - 'label': _('Client Certificate'), - 'type': 'string', - 'secret': True, - 'multiline': True, - }, - { - 'id': 'verify', - 'label': _('Verify SSL Certificates'), - 'type': 'boolean', - 'default': True, - }, - ], - 'metadata': [ - { - 'id': 'object_query', - 'label': _('Object Query'), - 'type': 'string', - 'help_text': _('Lookup query for the object. Ex: Safe=TestSafe;Object=testAccountName123'), - }, - {'id': 'object_query_format', 'label': _('Object Query Format'), 'type': 'string', 'default': 'Exact', 'choices': ['Exact', 'Regexp']}, - { - 'id': 'object_property', - 'label': _('Object Property'), - 'type': 'string', - 'help_text': _('The property of the object to return. Available properties: Username, Password and Address.'), - }, - { - 'id': 'reason', - 'label': _('Reason'), - 'type': 'string', - 'help_text': _('Object request reason. This is only needed if it is required by the object\'s policy.'), - }, - ], - 'required': ['url', 'app_id', 'object_query'], -} - - -def aim_backend(**kwargs): - url = kwargs['url'] - client_cert = kwargs.get('client_cert', None) - client_key = kwargs.get('client_key', None) - verify = kwargs['verify'] - webservice_id = kwargs.get('webservice_id', '') - app_id = kwargs['app_id'] - object_query = kwargs['object_query'] - object_query_format = kwargs['object_query_format'] - object_property = kwargs.get('object_property', '') - reason = kwargs.get('reason', None) - if webservice_id == '': - webservice_id = 'AIMWebService' - - query_params = { - 'AppId': app_id, - 'Query': object_query, - 'QueryFormat': object_query_format, - } - if reason: - query_params['reason'] = reason - - request_qs = '?' + urlencode(query_params, quote_via=quote) - request_url = urljoin(url, '/'.join([webservice_id, 'api', 'Accounts'])) - - with CertFiles(client_cert, client_key) as cert: - res = requests.get( - request_url + request_qs, - timeout=30, - cert=cert, - verify=verify, - allow_redirects=False, - ) - raise_for_status(res) - # CCP returns the property name capitalized, username is camel case - # so we need to handle that case - if object_property == '': - object_property = 'Content' - elif object_property.lower() == 'username': - object_property = 'UserName' - elif object_property.lower() == 'password': - object_property = 'Content' - elif object_property.lower() == 'address': - object_property = 'Address' - elif object_property not in res: - raise KeyError('Property {} not found in object, available properties: Username, Password and Address'.format(object_property)) - else: - object_property = object_property.capitalize() - - return res.json()[object_property] - - -aim_plugin = CredentialPlugin('CyberArk Central Credential Provider Lookup', inputs=aim_inputs, backend=aim_backend) diff --git a/awx_plugins/credentials/aws_secretsmanager.py b/awx_plugins/credentials/aws_secretsmanager.py deleted file mode 100644 index 335113b2a8..0000000000 --- a/awx_plugins/credentials/aws_secretsmanager.py +++ /dev/null @@ -1,65 +0,0 @@ -import boto3 -from botocore.exceptions import ClientError - -from .plugin import CredentialPlugin -from .plugin import translate_function as _ - - -secrets_manager_inputs = { - 'fields': [ - { - 'id': 'aws_access_key', - 'label': _('AWS Access Key'), - 'type': 'string', - }, - { - 'id': 'aws_secret_key', - 'label': _('AWS Secret Key'), - 'type': 'string', - 'secret': True, - }, - ], - 'metadata': [ - { - 'id': 'region_name', - 'label': _('AWS Secrets Manager Region'), - 'type': 'string', - 'help_text': _('Region which the secrets manager is located'), - }, - { - 'id': 'secret_name', - 'label': _('AWS Secret Name'), - 'type': 'string', - }, - ], - 'required': ['aws_access_key', 'aws_secret_key', 'region_name', 'secret_name'], -} - - -def aws_secretsmanager_backend(**kwargs): - secret_name = kwargs['secret_name'] - region_name = kwargs['region_name'] - aws_secret_access_key = kwargs['aws_secret_key'] - aws_access_key_id = kwargs['aws_access_key'] - - session = boto3.session.Session() - client = session.client( - service_name='secretsmanager', region_name=region_name, aws_secret_access_key=aws_secret_access_key, aws_access_key_id=aws_access_key_id - ) - - try: - get_secret_value_response = client.get_secret_value(SecretId=secret_name) - except ClientError as e: - raise e - # Secrets Manager decrypts the secret value using the associated KMS CMK - # Depending on whether the secret was a string or binary, only one of these fields will be populated - if 'SecretString' in get_secret_value_response: - secret = get_secret_value_response['SecretString'] - - else: - secret = get_secret_value_response['SecretBinary'] - - return secret - - -aws_secretmanager_plugin = CredentialPlugin('AWS Secrets Manager lookup', inputs=secrets_manager_inputs, backend=aws_secretsmanager_backend) diff --git a/awx_plugins/credentials/azure_kv.py b/awx_plugins/credentials/azure_kv.py deleted file mode 100644 index 7579dbee3d..0000000000 --- a/awx_plugins/credentials/azure_kv.py +++ /dev/null @@ -1,63 +0,0 @@ -from azure.keyvault.secrets import SecretClient -from azure.identity import ClientSecretCredential -from msrestazure import azure_cloud - -from .plugin import CredentialPlugin - -from .plugin import translate_function as _ - - -# https://github.com/Azure/msrestazure-for-python/blob/master/msrestazure/azure_cloud.py -clouds = [vars(azure_cloud)[n] for n in dir(azure_cloud) if n.startswith("AZURE_") and n.endswith("_CLOUD")] -default_cloud = vars(azure_cloud)["AZURE_PUBLIC_CLOUD"] - - -azure_keyvault_inputs = { - 'fields': [ - { - 'id': 'url', - 'label': _('Vault URL (DNS Name)'), - 'type': 'string', - 'format': 'url', - }, - {'id': 'client', 'label': _('Client ID'), 'type': 'string'}, - { - 'id': 'secret', - 'label': _('Client Secret'), - 'type': 'string', - 'secret': True, - }, - {'id': 'tenant', 'label': _('Tenant ID'), 'type': 'string'}, - { - 'id': 'cloud_name', - 'label': _('Cloud Environment'), - 'help_text': _('Specify which azure cloud environment to use.'), - 'choices': list(set([default_cloud.name] + [c.name for c in clouds])), - 'default': default_cloud.name, - }, - ], - 'metadata': [ - { - 'id': 'secret_field', - 'label': _('Secret Name'), - 'type': 'string', - 'help_text': _('The name of the secret to look up.'), - }, - { - 'id': 'secret_version', - 'label': _('Secret Version'), - 'type': 'string', - 'help_text': _('Used to specify a specific secret version (if left empty, the latest version will be used).'), - }, - ], - 'required': ['url', 'client', 'secret', 'tenant', 'secret_field'], -} - - -def azure_keyvault_backend(**kwargs): - csc = ClientSecretCredential(tenant_id=kwargs['tenant'], client_id=kwargs['client'], client_secret=kwargs['secret']) - kv = SecretClient(credential=csc, vault_url=kwargs['url']) - return kv.get_secret(name=kwargs['secret_field'], version=kwargs.get('secret_version', '')).value - - -azure_keyvault_plugin = CredentialPlugin('Microsoft Azure Key Vault', inputs=azure_keyvault_inputs, backend=azure_keyvault_backend) diff --git a/awx_plugins/credentials/centrify_vault.py b/awx_plugins/credentials/centrify_vault.py deleted file mode 100644 index b2d97a1db3..0000000000 --- a/awx_plugins/credentials/centrify_vault.py +++ /dev/null @@ -1,115 +0,0 @@ -from .plugin import CredentialPlugin, raise_for_status -from .plugin import translate_function as _ -from urllib.parse import urljoin -import requests - -pas_inputs = { - 'fields': [ - { - 'id': 'url', - 'label': _('Centrify Tenant URL'), - 'type': 'string', - 'help_text': _('Centrify Tenant URL'), - 'format': 'url', - }, - { - 'id': 'client_id', - 'label': _('Centrify API User'), - 'type': 'string', - 'help_text': _('Centrify API User, having necessary permissions as mentioned in support doc'), - }, - { - 'id': 'client_password', - 'label': _('Centrify API Password'), - 'type': 'string', - 'help_text': _('Password of Centrify API User with necessary permissions'), - 'secret': True, - }, - { - 'id': 'oauth_application_id', - 'label': _('OAuth2 Application ID'), - 'type': 'string', - 'help_text': _('Application ID of the configured OAuth2 Client (defaults to \'awx\')'), - 'default': 'awx', - }, - { - 'id': 'oauth_scope', - 'label': _('OAuth2 Scope'), - 'type': 'string', - 'help_text': _('Scope of the configured OAuth2 Client (defaults to \'awx\')'), - 'default': 'awx', - }, - ], - 'metadata': [ - { - 'id': 'account-name', - 'label': _('Account Name'), - 'type': 'string', - 'help_text': _('Local system account or Domain account name enrolled in Centrify Vault. eg. (root or DOMAIN/Administrator)'), - }, - { - 'id': 'system-name', - 'label': _('System Name'), - 'type': 'string', - 'help_text': _('Machine Name enrolled with in Centrify Portal'), - }, - ], - 'required': ['url', 'account-name', 'system-name', 'client_id', 'client_password'], -} - - -# generate bearer token to authenticate with PAS portal, Input : Client ID, Client Secret -def handle_auth(**kwargs): - post_data = {"grant_type": "client_credentials", "scope": kwargs['oauth_scope']} - response = requests.post(kwargs['endpoint'], data=post_data, auth=(kwargs['client_id'], kwargs['client_password']), verify=True, timeout=(5, 30)) - raise_for_status(response) - try: - return response.json()['access_token'] - except KeyError: - raise RuntimeError('OAuth request to tenant was unsuccessful') - - -# fetch the ID of system with RedRock query, Input : System Name, Account Name -def get_ID(**kwargs): - endpoint = urljoin(kwargs['url'], '/Redrock/query') - name = " Name='{0}' and User='{1}'".format(kwargs['system_name'], kwargs['acc_name']) - query = 'Select ID from VaultAccount where {0}'.format(name) - post_headers = {"Authorization": "Bearer " + kwargs['access_token'], "X-CENTRIFY-NATIVE-CLIENT": "true"} - response = requests.post(endpoint, json={'Script': query}, headers=post_headers, verify=True, timeout=(5, 30)) - raise_for_status(response) - try: - result_str = response.json()["Result"]["Results"] - return result_str[0]["Row"]["ID"] - except (IndexError, KeyError): - raise RuntimeError("Error Detected!! Check the Inputs") - - -# CheckOut Password from Centrify Vault, Input : ID -def get_passwd(**kwargs): - endpoint = urljoin(kwargs['url'], '/ServerManage/CheckoutPassword') - post_headers = {"Authorization": "Bearer " + kwargs['access_token'], "X-CENTRIFY-NATIVE-CLIENT": "true"} - response = requests.post(endpoint, json={'ID': kwargs['acc_id']}, headers=post_headers, verify=True, timeout=(5, 30)) - raise_for_status(response) - try: - return response.json()["Result"]["Password"] - except KeyError: - raise RuntimeError("Password Not Found") - - -def centrify_backend(**kwargs): - url = kwargs.get('url') - acc_name = kwargs.get('account-name') - system_name = kwargs.get('system-name') - client_id = kwargs.get('client_id') - client_password = kwargs.get('client_password') - app_id = kwargs.get('oauth_application_id', 'awx') - endpoint = urljoin(url, f'/oauth2/token/{app_id}') - endpoint = {'endpoint': endpoint, 'client_id': client_id, 'client_password': client_password, 'oauth_scope': kwargs.get('oauth_scope', 'awx')} - token = handle_auth(**endpoint) - get_id_args = {'system_name': system_name, 'acc_name': acc_name, 'url': url, 'access_token': token} - acc_id = get_ID(**get_id_args) - get_pwd_args = {'url': url, 'acc_id': acc_id, 'access_token': token} - return get_passwd(**get_pwd_args) - - -centrify_plugin = CredentialPlugin('Centrify Vault Credential Provider Lookup', inputs=pas_inputs, backend=centrify_backend) diff --git a/awx_plugins/credentials/conjur.py b/awx_plugins/credentials/conjur.py deleted file mode 100644 index a7fd3a3a65..0000000000 --- a/awx_plugins/credentials/conjur.py +++ /dev/null @@ -1,112 +0,0 @@ -from .plugin import CredentialPlugin, CertFiles, raise_for_status - -from urllib.parse import urljoin, quote - -from .plugin import translate_function as _ -import requests -import base64 -import binascii - - -conjur_inputs = { - 'fields': [ - { - 'id': 'url', - 'label': _('Conjur URL'), - 'type': 'string', - 'format': 'url', - }, - { - 'id': 'api_key', - 'label': _('API Key'), - 'type': 'string', - 'secret': True, - }, - { - 'id': 'account', - 'label': _('Account'), - 'type': 'string', - }, - { - 'id': 'username', - 'label': _('Username'), - 'type': 'string', - }, - {'id': 'cacert', 'label': _('Public Key Certificate'), 'type': 'string', 'multiline': True}, - ], - 'metadata': [ - { - 'id': 'secret_path', - 'label': _('Secret Identifier'), - 'type': 'string', - 'help_text': _('The identifier for the secret e.g., /some/identifier'), - }, - { - 'id': 'secret_version', - 'label': _('Secret Version'), - 'type': 'string', - 'help_text': _('Used to specify a specific secret version (if left empty, the latest version will be used).'), - }, - ], - 'required': ['url', 'api_key', 'account', 'username'], -} - - -def _is_base64(s: str) -> bool: - try: - return base64.b64encode(base64.b64decode(s.encode("utf-8"))) == s.encode("utf-8") - except binascii.Error: - return False - - -def conjur_backend(**kwargs): - url = kwargs['url'] - api_key = kwargs['api_key'] - account = quote(kwargs['account'], safe='') - username = quote(kwargs['username'], safe='') - secret_path = quote(kwargs['secret_path'], safe='') - version = kwargs.get('secret_version') - cacert = kwargs.get('cacert', None) - - auth_kwargs = { - 'headers': {'Content-Type': 'text/plain', 'Accept-Encoding': 'base64'}, - 'data': api_key, - 'allow_redirects': False, - } - - with CertFiles(cacert) as cert: - # https://www.conjur.org/api.html#authentication-authenticate-post - auth_kwargs['verify'] = cert - try: - resp = requests.post(urljoin(url, '/'.join(['authn', account, username, 'authenticate'])), **auth_kwargs) - resp.raise_for_status() - except requests.exceptions.HTTPError: - resp = requests.post(urljoin(url, '/'.join(['api', 'authn', account, username, 'authenticate'])), **auth_kwargs) - raise_for_status(resp) - token = resp.content.decode('utf-8') - - lookup_kwargs = { - 'headers': {'Authorization': 'Token token="{}"'.format(token if _is_base64(token) else base64.b64encode(token.encode('utf-8')).decode('utf-8'))}, - 'allow_redirects': False, - } - - # https://www.conjur.org/api.html#secrets-retrieve-a-secret-get - path = urljoin(url, '/'.join(['secrets', account, 'variable', secret_path])) - path_conjurcloud = urljoin(url, '/'.join(['api', 'secrets', account, 'variable', secret_path])) - if version: - ver = "version={}".format(version) - path = '?'.join([path, ver]) - path_conjurcloud = '?'.join([path_conjurcloud, ver]) - - with CertFiles(cacert) as cert: - lookup_kwargs['verify'] = cert - try: - resp = requests.get(path, timeout=30, **lookup_kwargs) - resp.raise_for_status() - except requests.exceptions.HTTPError: - resp = requests.get(path_conjurcloud, timeout=30, **lookup_kwargs) - raise_for_status(resp) - return resp.text - - -conjur_plugin = CredentialPlugin('CyberArk Conjur Secrets Manager Lookup', inputs=conjur_inputs, backend=conjur_backend) diff --git a/awx_plugins/credentials/dsv.py b/awx_plugins/credentials/dsv.py deleted file mode 100644 index 8296779bde..0000000000 --- a/awx_plugins/credentials/dsv.py +++ /dev/null @@ -1,94 +0,0 @@ -from .plugin import CredentialPlugin - -from .plugin import settings -from .plugin import translate_function as _ -from delinea.secrets.vault import PasswordGrantAuthorizer, SecretsVault -from base64 import b64decode - -dsv_inputs = { - 'fields': [ - { - 'id': 'tenant', - 'label': _('Tenant'), - 'help_text': _('The tenant e.g. "ex" when the URL is https://ex.secretsvaultcloud.com'), - 'type': 'string', - }, - { - 'id': 'tld', - 'label': _('Top-level Domain (TLD)'), - 'help_text': _('The TLD of the tenant e.g. "com" when the URL is https://ex.secretsvaultcloud.com'), - 'choices': ['ca', 'com', 'com.au', 'eu'], - 'default': 'com', - }, - { - 'id': 'client_id', - 'label': _('Client ID'), - 'type': 'string', - }, - { - 'id': 'client_secret', - 'label': _('Client Secret'), - 'type': 'string', - 'secret': True, - }, - ], - 'metadata': [ - { - 'id': 'path', - 'label': _('Secret Path'), - 'type': 'string', - 'help_text': _('The secret path e.g. /test/secret1'), - }, - { - 'id': 'secret_field', - 'label': _('Secret Field'), - 'help_text': _('The field to extract from the secret'), - 'type': 'string', - }, - { - 'id': 'secret_decoding', - 'label': _('Should the secret be base64 decoded?'), - 'help_text': _('Specify whether the secret should be base64 decoded, typically used for storing files, such as SSH keys'), - 'choices': ['No Decoding', 'Decode Base64'], - 'type': 'string', - 'default': 'No Decoding', - }, - ], - 'required': ['tenant', 'client_id', 'client_secret', 'path', 'secret_field', 'secret_decoding'], -} - -if settings.DEBUG: - dsv_inputs['fields'].append( - { - 'id': 'url_template', - 'label': _('URL template'), - 'type': 'string', - 'default': 'https://{}.secretsvaultcloud.{}', - } - ) - - -def dsv_backend(**kwargs): - tenant_name = kwargs['tenant'] - tenant_tld = kwargs.get('tld', 'com') - tenant_url_template = kwargs.get('url_template', 'https://{}.secretsvaultcloud.{}') - client_id = kwargs['client_id'] - client_secret = kwargs['client_secret'] - secret_path = kwargs['path'] - secret_field = kwargs['secret_field'] - # providing a default value to remain backward compatible for secrets that have not specified this option - secret_decoding = kwargs.get('secret_decoding', 'No Decoding') - - tenant_url = tenant_url_template.format(tenant_name, tenant_tld.strip(".")) - - authorizer = PasswordGrantAuthorizer(tenant_url, client_id, client_secret) - dsv_secret = SecretsVault(tenant_url, authorizer).get_secret(secret_path) - - # files can be uploaded base64 decoded to DSV and thus decoding it only, when asked for - if secret_decoding == 'Decode Base64': - return b64decode(dsv_secret['data'][secret_field]).decode() - - return dsv_secret['data'][secret_field] - - -dsv_plugin = CredentialPlugin(name='Thycotic DevOps Secrets Vault', inputs=dsv_inputs, backend=dsv_backend) diff --git a/awx_plugins/credentials/hashivault.py b/awx_plugins/credentials/hashivault.py deleted file mode 100644 index 81f7770f51..0000000000 --- a/awx_plugins/credentials/hashivault.py +++ /dev/null @@ -1,384 +0,0 @@ -import copy -import os -import pathlib -import time -from urllib.parse import urljoin - -from .plugin import CredentialPlugin, CertFiles, raise_for_status - -import requests -from .plugin import translate_function as _ - -base_inputs = { - 'fields': [ - { - 'id': 'url', - 'label': _('Server URL'), - 'type': 'string', - 'format': 'url', - 'help_text': _('The URL to the HashiCorp Vault'), - }, - { - 'id': 'token', - 'label': _('Token'), - 'type': 'string', - 'secret': True, - 'help_text': _('The access token used to authenticate to the Vault server'), - }, - { - 'id': 'cacert', - 'label': _('CA Certificate'), - 'type': 'string', - 'multiline': True, - 'help_text': _('The CA certificate used to verify the SSL certificate of the Vault server'), - }, - {'id': 'role_id', 'label': _('AppRole role_id'), 'type': 'string', 'multiline': False, 'help_text': _('The Role ID for AppRole Authentication')}, - { - 'id': 'secret_id', - 'label': _('AppRole secret_id'), - 'type': 'string', - 'multiline': False, - 'secret': True, - 'help_text': _('The Secret ID for AppRole Authentication'), - }, - { - 'id': 'client_cert_public', - 'label': _('Client Certificate'), - 'type': 'string', - 'multiline': True, - 'help_text': _( - 'The PEM-encoded client certificate used for TLS client authentication.' - ' This should include the certificate and any intermediate certififcates.' - ), - }, - { - 'id': 'client_cert_private', - 'label': _('Client Certificate Key'), - 'type': 'string', - 'multiline': True, - 'secret': True, - 'help_text': _('The certificate private key used for TLS client authentication.'), - }, - { - 'id': 'client_cert_role', - 'label': _('TLS Authentication Role'), - 'type': 'string', - 'multiline': False, - 'help_text': _( - 'The role configured in Hashicorp Vault for TLS client authentication.' - ' If not provided, Hashicorp Vault may assign roles based on the certificate used.' - ), - }, - { - 'id': 'namespace', - 'label': _('Namespace name (Vault Enterprise only)'), - 'type': 'string', - 'multiline': False, - 'help_text': _('Name of the namespace to use when authenticate and retrieve secrets'), - }, - { - 'id': 'kubernetes_role', - 'label': _('Kubernetes role'), - 'type': 'string', - 'multiline': False, - 'help_text': _( - 'The Role for Kubernetes Authentication.' - ' This is the named role, configured in Vault server, for AWX pod auth policies.' - ' see https://www.vaultproject.io/docs/auth/kubernetes#configuration' - ), - }, - { - 'id': 'username', - 'label': _('Username'), - 'type': 'string', - 'secret': False, - 'help_text': _('Username for user authentication.'), - }, - { - 'id': 'password', - 'label': _('Password'), - 'type': 'string', - 'secret': True, - 'help_text': _('Password for user authentication.'), - }, - { - 'id': 'default_auth_path', - 'label': _('Path to Auth'), - 'type': 'string', - 'multiline': False, - 'default': 'approle', - 'help_text': _('The Authentication path to use if one isn\'t provided in the metadata when linking to an input field. Defaults to \'approle\''), - }, - ], - 'metadata': [ - { - 'id': 'secret_path', - 'label': _('Path to Secret'), - 'type': 'string', - 'help_text': _( - ( - 'The path to the secret stored in the secret backend e.g, /some/secret/. It is recommended' - ' that you use the secret backend field to identify the storage backend and to use this field' - ' for locating a specific secret within that store. However, if you prefer to fully identify' - ' both the secret backend and one of its secrets using only this field, join their locations' - ' into a single path without any additional separators, e.g, /location/of/backend/some/secret.' - ) - ), - }, - { - 'id': 'auth_path', - 'label': _('Path to Auth'), - 'type': 'string', - 'multiline': False, - 'help_text': _('The path where the Authentication method is mounted e.g, approle'), - }, - ], - 'required': ['url', 'secret_path'], -} - -hashi_kv_inputs = copy.deepcopy(base_inputs) -hashi_kv_inputs['fields'].append( - { - 'id': 'api_version', - 'label': _('API Version'), - 'choices': ['v1', 'v2'], - 'help_text': _('API v1 is for static key/value lookups. API v2 is for versioned key/value lookups.'), - 'default': 'v1', - } -) -hashi_kv_inputs['metadata'] = ( - [ - { - 'id': 'secret_backend', - 'label': _('Name of Secret Backend'), - 'type': 'string', - 'help_text': _('The name of the kv secret backend (if left empty, the first segment of the secret path will be used).'), - } - ] - + hashi_kv_inputs['metadata'] - + [ - { - 'id': 'secret_key', - 'label': _('Key Name'), - 'type': 'string', - 'help_text': _('The name of the key to look up in the secret.'), - }, - { - 'id': 'secret_version', - 'label': _('Secret Version (v2 only)'), - 'type': 'string', - 'help_text': _('Used to specify a specific secret version (if left empty, the latest version will be used).'), - }, - ] -) -hashi_kv_inputs['required'].extend(['api_version', 'secret_key']) - -hashi_ssh_inputs = copy.deepcopy(base_inputs) -hashi_ssh_inputs['metadata'] = ( - [ - { - 'id': 'public_key', - 'label': _('Unsigned Public Key'), - 'type': 'string', - 'multiline': True, - } - ] - + hashi_ssh_inputs['metadata'] - + [ - {'id': 'role', 'label': _('Role Name'), 'type': 'string', 'help_text': _('The name of the role used to sign.')}, - { - 'id': 'valid_principals', - 'label': _('Valid Principals'), - 'type': 'string', - 'help_text': _('Valid principals (either usernames or hostnames) that the certificate should be signed for.'), - }, - ] -) -hashi_ssh_inputs['required'].extend(['public_key', 'role']) - - -def handle_auth(**kwargs): - token = None - if kwargs.get('token'): - token = kwargs['token'] - elif kwargs.get('username') and kwargs.get('password'): - token = method_auth(**kwargs, auth_param=userpass_auth(**kwargs)) - elif kwargs.get('role_id') and kwargs.get('secret_id'): - token = method_auth(**kwargs, auth_param=approle_auth(**kwargs)) - elif kwargs.get('kubernetes_role'): - token = method_auth(**kwargs, auth_param=kubernetes_auth(**kwargs)) - elif kwargs.get('client_cert_public') and kwargs.get('client_cert_private'): - token = method_auth(**kwargs, auth_param=client_cert_auth(**kwargs)) - else: - raise Exception('Token, Username/Password, AppRole, Kubernetes, or TLS authentication parameters must be set') - return token - - -def userpass_auth(**kwargs): - return {'username': kwargs['username'], 'password': kwargs['password']} - - -def approle_auth(**kwargs): - return {'role_id': kwargs['role_id'], 'secret_id': kwargs['secret_id']} - - -def kubernetes_auth(**kwargs): - jwt_file = pathlib.Path('/var/run/secrets/kubernetes.io/serviceaccount/token') - with jwt_file.open('r') as jwt_fo: - jwt = jwt_fo.read().rstrip() - return {'role': kwargs['kubernetes_role'], 'jwt': jwt} - - -def client_cert_auth(**kwargs): - return {'name': kwargs.get('client_cert_role')} - - -def method_auth(**kwargs): - # get auth method specific params - request_kwargs = {'json': kwargs['auth_param'], 'timeout': 30} - - # we first try to use the 'auth_path' from the metadata - # if not found we try to fetch the 'default_auth_path' from inputs - auth_path = kwargs.get('auth_path') or kwargs['default_auth_path'] - - url = urljoin(kwargs['url'], 'v1') - cacert = kwargs.get('cacert', None) - - sess = requests.Session() - sess.mount(url, requests.adapters.HTTPAdapter(max_retries=5)) - - # Namespace support - if kwargs.get('namespace'): - sess.headers['X-Vault-Namespace'] = kwargs['namespace'] - request_url = '/'.join([url, 'auth', auth_path, 'login']).rstrip('/') - if kwargs['auth_param'].get('username'): - request_url = request_url + '/' + (kwargs['username']) - with CertFiles(cacert) as cert: - request_kwargs['verify'] = cert - # TLS client certificate support - if kwargs.get('client_cert_public') and kwargs.get('client_cert_private'): - # Add client cert to requests Session before making call - with CertFiles(kwargs['client_cert_public'], key=kwargs['client_cert_private']) as client_cert: - sess.cert = client_cert - resp = sess.post(request_url, **request_kwargs) - else: - # Make call without client certificate - resp = sess.post(request_url, **request_kwargs) - resp.raise_for_status() - token = resp.json()['auth']['client_token'] - return token - - -def kv_backend(**kwargs): - token = handle_auth(**kwargs) - url = kwargs['url'] - secret_path = kwargs['secret_path'] - secret_backend = kwargs.get('secret_backend', None) - secret_key = kwargs.get('secret_key', None) - cacert = kwargs.get('cacert', None) - api_version = kwargs['api_version'] - - request_kwargs = { - 'timeout': 30, - 'allow_redirects': False, - } - - sess = requests.Session() - sess.mount(url, requests.adapters.HTTPAdapter(max_retries=5)) - sess.headers['Authorization'] = 'Bearer {}'.format(token) - # Compatibility header for older installs of Hashicorp Vault - sess.headers['X-Vault-Token'] = token - if kwargs.get('namespace'): - sess.headers['X-Vault-Namespace'] = kwargs['namespace'] - - if api_version == 'v2': - if kwargs.get('secret_version'): - request_kwargs['params'] = {'version': kwargs['secret_version']} - if secret_backend: - path_segments = [secret_backend, 'data', secret_path] - else: - try: - mount_point, *path = pathlib.Path(secret_path.lstrip(os.sep)).parts - '/'.join(path) - except Exception: - mount_point, path = secret_path, [] - # https://www.vaultproject.io/api/secret/kv/kv-v2.html#read-secret-version - path_segments = [mount_point, 'data'] + path - else: - if secret_backend: - path_segments = [secret_backend, secret_path] - else: - path_segments = [secret_path] - - request_url = urljoin(url, '/'.join(['v1'] + path_segments)).rstrip('/') - with CertFiles(cacert) as cert: - request_kwargs['verify'] = cert - request_retries = 0 - while request_retries < 5: - response = sess.get(request_url, **request_kwargs) - # https://developer.hashicorp.com/vault/docs/enterprise/consistency - if response.status_code == 412: - request_retries += 1 - time.sleep(1) - else: - break - raise_for_status(response) - - json = response.json() - if api_version == 'v2': - json = json['data'] - - if secret_key: - try: - if (secret_key != 'data') and (secret_key not in json['data']) and ('data' in json['data']): - return json['data']['data'][secret_key] - return json['data'][secret_key] - except KeyError: - raise RuntimeError('{} is not present at {}'.format(secret_key, secret_path)) - return json['data'] - - -def ssh_backend(**kwargs): - token = handle_auth(**kwargs) - url = urljoin(kwargs['url'], 'v1') - secret_path = kwargs['secret_path'] - role = kwargs['role'] - cacert = kwargs.get('cacert', None) - - request_kwargs = { - 'timeout': 30, - 'allow_redirects': False, - } - - request_kwargs['json'] = {'public_key': kwargs['public_key']} - if kwargs.get('valid_principals'): - request_kwargs['json']['valid_principals'] = kwargs['valid_principals'] - - sess = requests.Session() - sess.mount(url, requests.adapters.HTTPAdapter(max_retries=5)) - sess.headers['Authorization'] = 'Bearer {}'.format(token) - if kwargs.get('namespace'): - sess.headers['X-Vault-Namespace'] = kwargs['namespace'] - # Compatability header for older installs of Hashicorp Vault - sess.headers['X-Vault-Token'] = token - # https://www.vaultproject.io/api/secret/ssh/index.html#sign-ssh-key - request_url = '/'.join([url, secret_path, 'sign', role]).rstrip('/') - - with CertFiles(cacert) as cert: - request_kwargs['verify'] = cert - request_retries = 0 - while request_retries < 5: - resp = sess.post(request_url, **request_kwargs) - # https://developer.hashicorp.com/vault/docs/enterprise/consistency - if resp.status_code == 412: - request_retries += 1 - time.sleep(1) - else: - break - raise_for_status(resp) - return resp.json()['data']['signed_key'] - - -hashivault_kv_plugin = CredentialPlugin('HashiCorp Vault Secret Lookup', inputs=hashi_kv_inputs, backend=kv_backend) - -hashivault_ssh_plugin = CredentialPlugin('HashiCorp Vault Signed SSH', inputs=hashi_ssh_inputs, backend=ssh_backend) diff --git a/awx_plugins/credentials/injectors.py b/awx_plugins/credentials/injectors.py deleted file mode 100644 index 29a438f919..0000000000 --- a/awx_plugins/credentials/injectors.py +++ /dev/null @@ -1,139 +0,0 @@ -import json -import yaml -import os -import stat -import tempfile - -from django.conf import settings - -from awx.main.utils.execution_environments import to_container_path - - -def aws(cred, env, private_data_dir): - env['AWS_ACCESS_KEY_ID'] = cred.get_input('username', default='') - env['AWS_SECRET_ACCESS_KEY'] = cred.get_input('password', default='') - - if cred.has_input('security_token'): - env['AWS_SECURITY_TOKEN'] = cred.get_input('security_token', default='') - env['AWS_SESSION_TOKEN'] = env['AWS_SECURITY_TOKEN'] - - -def gce(cred, env, private_data_dir): - project = cred.get_input('project', default='') - username = cred.get_input('username', default='') - - json_cred = {'type': 'service_account', 'private_key': cred.get_input('ssh_key_data', default=''), 'client_email': username, 'project_id': project} - if 'INVENTORY_UPDATE_ID' not in env: - env['GCE_EMAIL'] = username - env['GCE_PROJECT'] = project - json_cred['token_uri'] = 'https://oauth2.googleapis.com/token' - - handle, path = tempfile.mkstemp(dir=os.path.join(private_data_dir, 'env')) - f = os.fdopen(handle, 'w') - json.dump(json_cred, f, indent=2) - f.close() - os.chmod(path, stat.S_IRUSR | stat.S_IWUSR) - container_path = to_container_path(path, private_data_dir) - env['GCE_CREDENTIALS_FILE_PATH'] = container_path - env['GCP_SERVICE_ACCOUNT_FILE'] = container_path - env['GOOGLE_APPLICATION_CREDENTIALS'] = container_path - - # Handle env variables for new module types. - # This includes gcp_compute inventory plugin and - # all new gcp_* modules. - env['GCP_AUTH_KIND'] = 'serviceaccount' - env['GCP_PROJECT'] = project - env['GCP_ENV_TYPE'] = 'tower' - return path - - -def azure_rm(cred, env, private_data_dir): - client = cred.get_input('client', default='') - tenant = cred.get_input('tenant', default='') - - env['AZURE_SUBSCRIPTION_ID'] = cred.get_input('subscription', default='') - - if len(client) and len(tenant): - env['AZURE_CLIENT_ID'] = client - env['AZURE_TENANT'] = tenant - env['AZURE_SECRET'] = cred.get_input('secret', default='') - else: - env['AZURE_AD_USER'] = cred.get_input('username', default='') - env['AZURE_PASSWORD'] = cred.get_input('password', default='') - - if cred.has_input('cloud_environment'): - env['AZURE_CLOUD_ENVIRONMENT'] = cred.get_input('cloud_environment') - - -def vmware(cred, env, private_data_dir): - env['VMWARE_USER'] = cred.get_input('username', default='') - env['VMWARE_PASSWORD'] = cred.get_input('password', default='') - env['VMWARE_HOST'] = cred.get_input('host', default='') - env['VMWARE_VALIDATE_CERTS'] = str(settings.VMWARE_VALIDATE_CERTS) - - -def _openstack_data(cred): - openstack_auth = dict( - auth_url=cred.get_input('host', default=''), - username=cred.get_input('username', default=''), - password=cred.get_input('password', default=''), - project_name=cred.get_input('project', default=''), - ) - if cred.has_input('project_domain_name'): - openstack_auth['project_domain_name'] = cred.get_input('project_domain_name', default='') - if cred.has_input('domain'): - openstack_auth['domain_name'] = cred.get_input('domain', default='') - verify_state = cred.get_input('verify_ssl', default=True) - - openstack_data = { - 'clouds': { - 'devstack': { - 'auth': openstack_auth, - 'verify': verify_state, - }, - }, - } - - if cred.has_input('region'): - openstack_data['clouds']['devstack']['region_name'] = cred.get_input('region', default='') - - return openstack_data - - -def openstack(cred, env, private_data_dir): - handle, path = tempfile.mkstemp(dir=os.path.join(private_data_dir, 'env')) - f = os.fdopen(handle, 'w') - openstack_data = _openstack_data(cred) - yaml.safe_dump(openstack_data, f, default_flow_style=False, allow_unicode=True) - f.close() - os.chmod(path, stat.S_IRUSR | stat.S_IWUSR) - env['OS_CLIENT_CONFIG_FILE'] = to_container_path(path, private_data_dir) - - -def kubernetes_bearer_token(cred, env, private_data_dir): - env['K8S_AUTH_HOST'] = cred.get_input('host', default='') - env['K8S_AUTH_API_KEY'] = cred.get_input('bearer_token', default='') - if cred.get_input('verify_ssl') and 'ssl_ca_cert' in cred.inputs: - env['K8S_AUTH_VERIFY_SSL'] = 'True' - handle, path = tempfile.mkstemp(dir=os.path.join(private_data_dir, 'env')) - with os.fdopen(handle, 'w') as f: - os.chmod(path, stat.S_IRUSR | stat.S_IWUSR) - f.write(cred.get_input('ssl_ca_cert')) - env['K8S_AUTH_SSL_CA_CERT'] = to_container_path(path, private_data_dir) - else: - env['K8S_AUTH_VERIFY_SSL'] = 'False' - - -def terraform(cred, env, private_data_dir): - handle, path = tempfile.mkstemp(dir=os.path.join(private_data_dir, 'env')) - with os.fdopen(handle, 'w') as f: - os.chmod(path, stat.S_IRUSR | stat.S_IWUSR) - f.write(cred.get_input('configuration')) - env['TF_BACKEND_CONFIG_FILE'] = to_container_path(path, private_data_dir) - # Handle env variables for GCP account credentials - if 'gce_credentials' in cred.inputs: - handle, path = tempfile.mkstemp(dir=os.path.join(private_data_dir, 'env')) - with os.fdopen(handle, 'w') as f: - os.chmod(path, stat.S_IRUSR | stat.S_IWUSR) - f.write(cred.get_input('gce_credentials')) - env['GOOGLE_BACKEND_CREDENTIALS'] = to_container_path(path, private_data_dir) diff --git a/awx_plugins/credentials/plugin.py b/awx_plugins/credentials/plugin.py deleted file mode 100644 index b8aa294544..0000000000 --- a/awx_plugins/credentials/plugin.py +++ /dev/null @@ -1,68 +0,0 @@ -import os -import tempfile - -from collections import namedtuple - -from requests.exceptions import HTTPError - -CredentialPlugin = namedtuple('CredentialPlugin', ['name', 'inputs', 'backend']) - - -try: - from django.utils.translation import gettext_lazy as translate_function -except ModuleNotFoundError: - translate_function = lambda *args, **kwargs: None - - -class Settings(): - DEBUG = False - - -settings = Settings() - - -def raise_for_status(resp): - resp.raise_for_status() - if resp.status_code >= 300: - exc = HTTPError() - setattr(exc, 'response', resp) - raise exc - - -class CertFiles: - """ - A context manager used for writing a certificate and (optional) key - to $TMPDIR, and cleaning up afterwards. - - This is particularly useful as a shared resource for credential plugins - that want to pull cert/key data out of the database and persist it - temporarily to the file system so that it can loaded into the openssl - certificate chain (generally, for HTTPS requests plugins make via the - Python requests library) - - with CertFiles(cert_data, key_data) as cert: - # cert is string representing a path to the cert or pemfile - # temporarily written to disk - requests.post(..., cert=cert) - """ - - certfile = None - - def __init__(self, cert, key=None): - self.cert = cert - self.key = key - - def __enter__(self): - if not self.cert: - return None - self.certfile = tempfile.NamedTemporaryFile('wb', delete=False) - self.certfile.write(self.cert.encode()) - if self.key: - self.certfile.write(b'\n') - self.certfile.write(self.key.encode()) - self.certfile.flush() - return str(self.certfile.name) - - def __exit__(self, *args): - if self.certfile and os.path.exists(self.certfile.name): - os.remove(self.certfile.name) diff --git a/awx_plugins/credentials/plugins.py b/awx_plugins/credentials/plugins.py deleted file mode 100644 index debc5c7032..0000000000 --- a/awx_plugins/credentials/plugins.py +++ /dev/null @@ -1,665 +0,0 @@ -# Django -from django.utils.translation import gettext_noop - -# AWX -from awx.main.models.credential import ManagedCredentialType - - -ManagedCredentialType( - namespace='ssh', - kind='ssh', - name=gettext_noop('Machine'), - inputs={ - 'fields': [ - {'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'}, - {'id': 'password', 'label': gettext_noop('Password'), 'type': 'string', 'secret': True, 'ask_at_runtime': True}, - {'id': 'ssh_key_data', 'label': gettext_noop('SSH Private Key'), 'type': 'string', 'format': 'ssh_private_key', 'secret': True, 'multiline': True}, - { - 'id': 'ssh_public_key_data', - 'label': gettext_noop('Signed SSH Certificate'), - 'type': 'string', - 'multiline': True, - 'secret': True, - }, - {'id': 'ssh_key_unlock', 'label': gettext_noop('Private Key Passphrase'), 'type': 'string', 'secret': True, 'ask_at_runtime': True}, - { - 'id': 'become_method', - 'label': gettext_noop('Privilege Escalation Method'), - 'type': 'string', - 'help_text': gettext_noop('Specify a method for "become" operations. This is equivalent to specifying the --become-method Ansible parameter.'), - }, - { - 'id': 'become_username', - 'label': gettext_noop('Privilege Escalation Username'), - 'type': 'string', - }, - {'id': 'become_password', 'label': gettext_noop('Privilege Escalation Password'), 'type': 'string', 'secret': True, 'ask_at_runtime': True}, - ], - }, -) - -ManagedCredentialType( - namespace='scm', - kind='scm', - name=gettext_noop('Source Control'), - managed=True, - inputs={ - 'fields': [ - {'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'}, - {'id': 'password', 'label': gettext_noop('Password'), 'type': 'string', 'secret': True}, - {'id': 'ssh_key_data', 'label': gettext_noop('SCM Private Key'), 'type': 'string', 'format': 'ssh_private_key', 'secret': True, 'multiline': True}, - {'id': 'ssh_key_unlock', 'label': gettext_noop('Private Key Passphrase'), 'type': 'string', 'secret': True}, - ], - }, -) - -ManagedCredentialType( - namespace='vault', - kind='vault', - name=gettext_noop('Vault'), - managed=True, - inputs={ - 'fields': [ - {'id': 'vault_password', 'label': gettext_noop('Vault Password'), 'type': 'string', 'secret': True, 'ask_at_runtime': True}, - { - 'id': 'vault_id', - 'label': gettext_noop('Vault Identifier'), - 'type': 'string', - 'format': 'vault_id', - 'help_text': gettext_noop( - 'Specify an (optional) Vault ID. This is ' - 'equivalent to specifying the --vault-id ' - 'Ansible parameter for providing multiple Vault ' - 'passwords. Note: this feature only works in ' - 'Ansible 2.4+.' - ), - }, - ], - 'required': ['vault_password'], - }, -) - -ManagedCredentialType( - namespace='net', - kind='net', - name=gettext_noop('Network'), - managed=True, - inputs={ - 'fields': [ - {'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'}, - { - 'id': 'password', - 'label': gettext_noop('Password'), - 'type': 'string', - 'secret': True, - }, - {'id': 'ssh_key_data', 'label': gettext_noop('SSH Private Key'), 'type': 'string', 'format': 'ssh_private_key', 'secret': True, 'multiline': True}, - { - 'id': 'ssh_key_unlock', - 'label': gettext_noop('Private Key Passphrase'), - 'type': 'string', - 'secret': True, - }, - { - 'id': 'authorize', - 'label': gettext_noop('Authorize'), - 'type': 'boolean', - }, - { - 'id': 'authorize_password', - 'label': gettext_noop('Authorize Password'), - 'type': 'string', - 'secret': True, - }, - ], - 'dependencies': { - 'authorize_password': ['authorize'], - }, - 'required': ['username'], - }, -) - -ManagedCredentialType( - namespace='aws', - kind='cloud', - name=gettext_noop('Amazon Web Services'), - managed=True, - inputs={ - 'fields': [ - {'id': 'username', 'label': gettext_noop('Access Key'), 'type': 'string'}, - { - 'id': 'password', - 'label': gettext_noop('Secret Key'), - 'type': 'string', - 'secret': True, - }, - { - 'id': 'security_token', - 'label': gettext_noop('STS Token'), - 'type': 'string', - 'secret': True, - 'help_text': gettext_noop( - 'Security Token Service (STS) is a web service ' - 'that enables you to request temporary, ' - 'limited-privilege credentials for AWS Identity ' - 'and Access Management (IAM) users.' - ), - }, - ], - 'required': ['username', 'password'], - }, -) - -ManagedCredentialType( - namespace='openstack', - kind='cloud', - name=gettext_noop('OpenStack'), - managed=True, - inputs={ - 'fields': [ - {'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'}, - { - 'id': 'password', - 'label': gettext_noop('Password (API Key)'), - 'type': 'string', - 'secret': True, - }, - { - 'id': 'host', - 'label': gettext_noop('Host (Authentication URL)'), - 'type': 'string', - 'help_text': gettext_noop('The host to authenticate with. For example, https://openstack.business.com/v2.0/'), - }, - { - 'id': 'project', - 'label': gettext_noop('Project (Tenant Name)'), - 'type': 'string', - }, - { - 'id': 'project_domain_name', - 'label': gettext_noop('Project (Domain Name)'), - 'type': 'string', - }, - { - 'id': 'domain', - 'label': gettext_noop('Domain Name'), - 'type': 'string', - 'help_text': gettext_noop( - 'OpenStack domains define administrative boundaries. ' - 'It is only needed for Keystone v3 authentication ' - 'URLs. Refer to the documentation for ' - 'common scenarios.' - ), - }, - { - 'id': 'region', - 'label': gettext_noop('Region Name'), - 'type': 'string', - 'help_text': gettext_noop('For some cloud providers, like OVH, region must be specified'), - }, - { - 'id': 'verify_ssl', - 'label': gettext_noop('Verify SSL'), - 'type': 'boolean', - 'default': True, - }, - ], - 'required': ['username', 'password', 'host', 'project'], - }, -) - -ManagedCredentialType( - namespace='vmware', - kind='cloud', - name=gettext_noop('VMware vCenter'), - managed=True, - inputs={ - 'fields': [ - { - 'id': 'host', - 'label': gettext_noop('VCenter Host'), - 'type': 'string', - 'help_text': gettext_noop('Enter the hostname or IP address that corresponds to your VMware vCenter.'), - }, - {'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'}, - { - 'id': 'password', - 'label': gettext_noop('Password'), - 'type': 'string', - 'secret': True, - }, - ], - 'required': ['host', 'username', 'password'], - }, -) - -ManagedCredentialType( - namespace='satellite6', - kind='cloud', - name=gettext_noop('Red Hat Satellite 6'), - managed=True, - inputs={ - 'fields': [ - { - 'id': 'host', - 'label': gettext_noop('Satellite 6 URL'), - 'type': 'string', - 'help_text': gettext_noop('Enter the URL that corresponds to your Red Hat Satellite 6 server. For example, https://satellite.example.org'), - }, - {'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'}, - { - 'id': 'password', - 'label': gettext_noop('Password'), - 'type': 'string', - 'secret': True, - }, - ], - 'required': ['host', 'username', 'password'], - }, -) - -ManagedCredentialType( - namespace='gce', - kind='cloud', - name=gettext_noop('Google Compute Engine'), - managed=True, - inputs={ - 'fields': [ - { - 'id': 'username', - 'label': gettext_noop('Service Account Email Address'), - 'type': 'string', - 'help_text': gettext_noop('The email address assigned to the Google Compute Engine service account.'), - }, - { - 'id': 'project', - 'label': 'Project', - 'type': 'string', - 'help_text': gettext_noop( - 'The Project ID is the GCE assigned identification. ' - 'It is often constructed as three words or two words ' - 'followed by a three-digit number. Examples: project-id-000 ' - 'and another-project-id' - ), - }, - { - 'id': 'ssh_key_data', - 'label': gettext_noop('RSA Private Key'), - 'type': 'string', - 'format': 'ssh_private_key', - 'secret': True, - 'multiline': True, - 'help_text': gettext_noop('Paste the contents of the PEM file associated with the service account email.'), - }, - ], - 'required': ['username', 'ssh_key_data'], - }, -) - -ManagedCredentialType( - namespace='azure_rm', - kind='cloud', - name=gettext_noop('Microsoft Azure Resource Manager'), - managed=True, - inputs={ - 'fields': [ - { - 'id': 'subscription', - 'label': gettext_noop('Subscription ID'), - 'type': 'string', - 'help_text': gettext_noop('Subscription ID is an Azure construct, which is mapped to a username.'), - }, - {'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'}, - { - 'id': 'password', - 'label': gettext_noop('Password'), - 'type': 'string', - 'secret': True, - }, - {'id': 'client', 'label': gettext_noop('Client ID'), 'type': 'string'}, - { - 'id': 'secret', - 'label': gettext_noop('Client Secret'), - 'type': 'string', - 'secret': True, - }, - {'id': 'tenant', 'label': gettext_noop('Tenant ID'), 'type': 'string'}, - { - 'id': 'cloud_environment', - 'label': gettext_noop('Azure Cloud Environment'), - 'type': 'string', - 'help_text': gettext_noop('Environment variable AZURE_CLOUD_ENVIRONMENT when using Azure GovCloud or Azure stack.'), - }, - ], - 'required': ['subscription'], - }, -) - -ManagedCredentialType( - namespace='github_token', - kind='token', - name=gettext_noop('GitHub Personal Access Token'), - managed=True, - inputs={ - 'fields': [ - { - 'id': 'token', - 'label': gettext_noop('Token'), - 'type': 'string', - 'secret': True, - 'help_text': gettext_noop('This token needs to come from your profile settings in GitHub'), - } - ], - 'required': ['token'], - }, -) - -ManagedCredentialType( - namespace='gitlab_token', - kind='token', - name=gettext_noop('GitLab Personal Access Token'), - managed=True, - inputs={ - 'fields': [ - { - 'id': 'token', - 'label': gettext_noop('Token'), - 'type': 'string', - 'secret': True, - 'help_text': gettext_noop('This token needs to come from your profile settings in GitLab'), - } - ], - 'required': ['token'], - }, -) - -ManagedCredentialType( - namespace='bitbucket_dc_token', - kind='token', - name=gettext_noop('Bitbucket Data Center HTTP Access Token'), - managed=True, - inputs={ - 'fields': [ - { - 'id': 'token', - 'label': gettext_noop('Token'), - 'type': 'string', - 'secret': True, - 'help_text': gettext_noop('This token needs to come from your user settings in Bitbucket'), - } - ], - 'required': ['token'], - }, -) - -ManagedCredentialType( - namespace='insights', - kind='insights', - name=gettext_noop('Insights'), - managed=True, - inputs={ - 'fields': [ - {'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'}, - {'id': 'password', 'label': gettext_noop('Password'), 'type': 'string', 'secret': True}, - ], - 'required': ['username', 'password'], - }, - injectors={ - 'extra_vars': { - "scm_username": "{{username}}", - "scm_password": "{{password}}", - }, - 'env': { - 'INSIGHTS_USER': '{{username}}', - 'INSIGHTS_PASSWORD': '{{password}}', - }, - }, -) - -ManagedCredentialType( - namespace='rhv', - kind='cloud', - name=gettext_noop('Red Hat Virtualization'), - managed=True, - inputs={ - 'fields': [ - {'id': 'host', 'label': gettext_noop('Host (Authentication URL)'), 'type': 'string', 'help_text': gettext_noop('The host to authenticate with.')}, - {'id': 'username', 'label': gettext_noop('Username'), 'type': 'string'}, - { - 'id': 'password', - 'label': gettext_noop('Password'), - 'type': 'string', - 'secret': True, - }, - { - 'id': 'ca_file', - 'label': gettext_noop('CA File'), - 'type': 'string', - 'help_text': gettext_noop('Absolute file path to the CA file to use (optional)'), - }, - ], - 'required': ['host', 'username', 'password'], - }, - injectors={ - # The duplication here is intentional; the ovirt4 inventory plugin - # writes a .ini file for authentication, while the ansible modules for - # ovirt4 use a separate authentication process that support - # environment variables; by injecting both, we support both - 'file': { - 'template': '\n'.join( - [ - '[ovirt]', - 'ovirt_url={{host}}', - 'ovirt_username={{username}}', - 'ovirt_password={{password}}', - '{% if ca_file %}ovirt_ca_file={{ca_file}}{% endif %}', - ] - ) - }, - 'env': {'OVIRT_INI_PATH': '{{tower.filename}}', 'OVIRT_URL': '{{host}}', 'OVIRT_USERNAME': '{{username}}', 'OVIRT_PASSWORD': '{{password}}'}, - }, -) - -ManagedCredentialType( - namespace='controller', - kind='cloud', - name=gettext_noop('Red Hat Ansible Automation Platform'), - managed=True, - inputs={ - 'fields': [ - { - 'id': 'host', - 'label': gettext_noop('Red Hat Ansible Automation Platform'), - 'type': 'string', - 'help_text': gettext_noop('Red Hat Ansible Automation Platform base URL to authenticate with.'), - }, - { - 'id': 'username', - 'label': gettext_noop('Username'), - 'type': 'string', - 'help_text': gettext_noop( - 'Red Hat Ansible Automation Platform username id to authenticate as.This should not be set if an OAuth token is being used.' - ), - }, - { - 'id': 'password', - 'label': gettext_noop('Password'), - 'type': 'string', - 'secret': True, - }, - { - 'id': 'oauth_token', - 'label': gettext_noop('OAuth Token'), - 'type': 'string', - 'secret': True, - 'help_text': gettext_noop('An OAuth token to use to authenticate with.This should not be set if username/password are being used.'), - }, - {'id': 'verify_ssl', 'label': gettext_noop('Verify SSL'), 'type': 'boolean', 'secret': False}, - ], - 'required': ['host'], - }, - injectors={ - 'env': { - 'TOWER_HOST': '{{host}}', - 'TOWER_USERNAME': '{{username}}', - 'TOWER_PASSWORD': '{{password}}', - 'TOWER_VERIFY_SSL': '{{verify_ssl}}', - 'TOWER_OAUTH_TOKEN': '{{oauth_token}}', - 'CONTROLLER_HOST': '{{host}}', - 'CONTROLLER_USERNAME': '{{username}}', - 'CONTROLLER_PASSWORD': '{{password}}', - 'CONTROLLER_VERIFY_SSL': '{{verify_ssl}}', - 'CONTROLLER_OAUTH_TOKEN': '{{oauth_token}}', - } - }, -) - -ManagedCredentialType( - namespace='kubernetes_bearer_token', - kind='kubernetes', - name=gettext_noop('OpenShift or Kubernetes API Bearer Token'), - inputs={ - 'fields': [ - { - 'id': 'host', - 'label': gettext_noop('OpenShift or Kubernetes API Endpoint'), - 'type': 'string', - 'help_text': gettext_noop('The OpenShift or Kubernetes API Endpoint to authenticate with.'), - }, - { - 'id': 'bearer_token', - 'label': gettext_noop('API authentication bearer token'), - 'type': 'string', - 'secret': True, - }, - { - 'id': 'verify_ssl', - 'label': gettext_noop('Verify SSL'), - 'type': 'boolean', - 'default': True, - }, - { - 'id': 'ssl_ca_cert', - 'label': gettext_noop('Certificate Authority data'), - 'type': 'string', - 'secret': True, - 'multiline': True, - }, - ], - 'required': ['host', 'bearer_token'], - }, -) - -ManagedCredentialType( - namespace='registry', - kind='registry', - name=gettext_noop('Container Registry'), - inputs={ - 'fields': [ - { - 'id': 'host', - 'label': gettext_noop('Authentication URL'), - 'type': 'string', - 'help_text': gettext_noop('Authentication endpoint for the container registry.'), - 'default': 'quay.io', - }, - { - 'id': 'username', - 'label': gettext_noop('Username'), - 'type': 'string', - }, - { - 'id': 'password', - 'label': gettext_noop('Password or Token'), - 'type': 'string', - 'secret': True, - 'help_text': gettext_noop('A password or token used to authenticate with'), - }, - { - 'id': 'verify_ssl', - 'label': gettext_noop('Verify SSL'), - 'type': 'boolean', - 'default': True, - }, - ], - 'required': ['host'], - }, -) - - -ManagedCredentialType( - namespace='galaxy_api_token', - kind='galaxy', - name=gettext_noop('Ansible Galaxy/Automation Hub API Token'), - inputs={ - 'fields': [ - { - 'id': 'url', - 'label': gettext_noop('Galaxy Server URL'), - 'type': 'string', - 'help_text': gettext_noop('The URL of the Galaxy instance to connect to.'), - }, - { - 'id': 'auth_url', - 'label': gettext_noop('Auth Server URL'), - 'type': 'string', - 'help_text': gettext_noop('The URL of a Keycloak server token_endpoint, if using SSO auth.'), - }, - { - 'id': 'token', - 'label': gettext_noop('API Token'), - 'type': 'string', - 'secret': True, - 'help_text': gettext_noop('A token to use for authentication against the Galaxy instance.'), - }, - ], - 'required': ['url'], - }, -) - -ManagedCredentialType( - namespace='gpg_public_key', - kind='cryptography', - name=gettext_noop('GPG Public Key'), - inputs={ - 'fields': [ - { - 'id': 'gpg_public_key', - 'label': gettext_noop('GPG Public Key'), - 'type': 'string', - 'secret': True, - 'multiline': True, - 'help_text': gettext_noop('GPG Public Key used to validate content signatures.'), - }, - ], - 'required': ['gpg_public_key'], - }, -) - -ManagedCredentialType( - namespace='terraform', - kind='cloud', - name=gettext_noop('Terraform backend configuration'), - managed=True, - inputs={ - 'fields': [ - { - 'id': 'configuration', - 'label': gettext_noop('Backend configuration'), - 'type': 'string', - 'secret': True, - 'multiline': True, - 'help_text': gettext_noop('Terraform backend config as Hashicorp configuration language.'), - }, - { - 'id': 'gce_credentials', - 'label': gettext_noop('Google Cloud Platform account credentials'), - 'type': 'string', - 'secret': True, - 'multiline': True, - 'help_text': gettext_noop('Google Cloud Platform account credentials in JSON format.'), - }, - ], - 'required': ['configuration'], - }, -) diff --git a/awx_plugins/credentials/tss.py b/awx_plugins/credentials/tss.py deleted file mode 100644 index e295072233..0000000000 --- a/awx_plugins/credentials/tss.py +++ /dev/null @@ -1,76 +0,0 @@ -from .plugin import CredentialPlugin -from .plugin import translate_function as _ - -try: - from delinea.secrets.server import DomainPasswordGrantAuthorizer, PasswordGrantAuthorizer, SecretServer, ServerSecret -except ImportError: - from thycotic.secrets.server import DomainPasswordGrantAuthorizer, PasswordGrantAuthorizer, SecretServer, ServerSecret - -tss_inputs = { - 'fields': [ - { - 'id': 'server_url', - 'label': _('Secret Server URL'), - 'help_text': _('The Base URL of Secret Server e.g. https://myserver/SecretServer or https://mytenant.secretservercloud.com'), - 'type': 'string', - }, - { - 'id': 'username', - 'label': _('Username'), - 'help_text': _('The (Application) user username'), - 'type': 'string', - }, - { - 'id': 'domain', - 'label': _('Domain'), - 'help_text': _('The (Application) user domain'), - 'type': 'string', - }, - { - 'id': 'password', - 'label': _('Password'), - 'help_text': _('The corresponding password'), - 'type': 'string', - 'secret': True, - }, - ], - 'metadata': [ - { - 'id': 'secret_id', - 'label': _('Secret ID'), - 'help_text': _('The integer ID of the secret'), - 'type': 'string', - }, - { - 'id': 'secret_field', - 'label': _('Secret Field'), - 'help_text': _('The field to extract from the secret'), - 'type': 'string', - }, - ], - 'required': ['server_url', 'username', 'password', 'secret_id', 'secret_field'], -} - - -def tss_backend(**kwargs): - if kwargs.get("domain"): - authorizer = DomainPasswordGrantAuthorizer( - base_url=kwargs['server_url'], username=kwargs['username'], domain=kwargs['domain'], password=kwargs['password'] - ) - else: - authorizer = PasswordGrantAuthorizer(kwargs['server_url'], kwargs['username'], kwargs['password']) - secret_server = SecretServer(kwargs['server_url'], authorizer) - secret_dict = secret_server.get_secret(kwargs['secret_id']) - secret = ServerSecret(**secret_dict) - - if isinstance(secret.fields[kwargs['secret_field']].value, str) == False: - return secret.fields[kwargs['secret_field']].value.text - else: - return secret.fields[kwargs['secret_field']].value - - -tss_plugin = CredentialPlugin( - 'Thycotic Secret Server', - tss_inputs, - tss_backend, -) diff --git a/awx_plugins/inventory/plugins.py b/awx_plugins/inventory/plugins.py deleted file mode 100644 index 9a42173482..0000000000 --- a/awx_plugins/inventory/plugins.py +++ /dev/null @@ -1,302 +0,0 @@ -import yaml -import stat -import tempfile - -import os.path - -from awx_plugins.credentials.injectors import _openstack_data -from awx.main.utils.execution_environments import to_container_path - -from awx.main.utils.licensing import server_product_name - - -class PluginFileInjector(object): - plugin_name = None # Ansible core name used to reference plugin - # base injector should be one of None, "managed", or "template" - # this dictates which logic to borrow from playbook injectors - base_injector = None - # every source should have collection, these are for the collection name - namespace = None - collection = None - collection_migration = '2.9' # Starting with this version, we use collections - use_fqcn = False # plugin: name versus plugin: namespace.collection.name - - # TODO: delete this method and update unit tests - @classmethod - def get_proper_name(cls): - if cls.plugin_name is None: - return None - return f'{cls.namespace}.{cls.collection}.{cls.plugin_name}' - - @property - def filename(self): - """Inventory filename for using the inventory plugin - This is created dynamically, but the auto plugin requires this exact naming - """ - return '{0}.yml'.format(self.plugin_name) - - def inventory_contents(self, inventory_update, private_data_dir): - """Returns a string that is the content for the inventory file for the inventory plugin""" - return yaml.safe_dump(self.inventory_as_dict(inventory_update, private_data_dir), default_flow_style=False, width=1000) - - def inventory_as_dict(self, inventory_update, private_data_dir): - source_vars = dict(inventory_update.source_vars_dict) # make a copy - ''' - None conveys that we should use the user-provided plugin. - Note that a plugin value of '' should still be overridden. - ''' - if self.plugin_name is not None: - if hasattr(self, 'downstream_namespace') and server_product_name() != 'AWX': - source_vars['plugin'] = f'{self.downstream_namespace}.{self.downstream_collection}.{self.plugin_name}' - elif self.use_fqcn: - source_vars['plugin'] = f'{self.namespace}.{self.collection}.{self.plugin_name}' - else: - source_vars['plugin'] = self.plugin_name - return source_vars - - def build_env(self, inventory_update, env, private_data_dir, private_data_files): - injector_env = self.get_plugin_env(inventory_update, private_data_dir, private_data_files) - env.update(injector_env) - # All CLOUD_PROVIDERS sources implement as inventory plugin from collection - env['ANSIBLE_INVENTORY_ENABLED'] = 'auto' - return env - - def _get_shared_env(self, inventory_update, private_data_dir, private_data_files): - """By default, we will apply the standard managed injectors""" - injected_env = {} - credential = inventory_update.get_cloud_credential() - # some sources may have no credential, specifically ec2 - if credential is None: - return injected_env - if self.base_injector in ('managed', 'template'): - injected_env['INVENTORY_UPDATE_ID'] = str(inventory_update.pk) # so injector knows this is inventory - if self.base_injector == 'managed': - from awx_plugins.credentials import injectors as builtin_injectors - - cred_kind = inventory_update.source.replace('ec2', 'aws') - if cred_kind in dir(builtin_injectors): - getattr(builtin_injectors, cred_kind)(credential, injected_env, private_data_dir) - elif self.base_injector == 'template': - safe_env = injected_env.copy() - args = [] - credential.credential_type.inject_credential(credential, injected_env, safe_env, args, private_data_dir) - # NOTE: safe_env is handled externally to injector class by build_safe_env static method - # that means that managed injectors must only inject detectable env keys - # enforcement of this is accomplished by tests - return injected_env - - def get_plugin_env(self, inventory_update, private_data_dir, private_data_files): - env = self._get_shared_env(inventory_update, private_data_dir, private_data_files) - return env - - def build_private_data(self, inventory_update, private_data_dir): - return self.build_plugin_private_data(inventory_update, private_data_dir) - - def build_plugin_private_data(self, inventory_update, private_data_dir): - return None - - -class azure_rm(PluginFileInjector): - plugin_name = 'azure_rm' - base_injector = 'managed' - namespace = 'azure' - collection = 'azcollection' - - def get_plugin_env(self, *args, **kwargs): - ret = super(azure_rm, self).get_plugin_env(*args, **kwargs) - # We need native jinja2 types so that tags can give JSON null value - ret['ANSIBLE_JINJA2_NATIVE'] = str(True) - return ret - - -class ec2(PluginFileInjector): - plugin_name = 'aws_ec2' - base_injector = 'managed' - namespace = 'amazon' - collection = 'aws' - - def get_plugin_env(self, *args, **kwargs): - ret = super(ec2, self).get_plugin_env(*args, **kwargs) - # We need native jinja2 types so that ec2_state_code will give integer - ret['ANSIBLE_JINJA2_NATIVE'] = str(True) - return ret - - -class gce(PluginFileInjector): - plugin_name = 'gcp_compute' - base_injector = 'managed' - namespace = 'google' - collection = 'cloud' - - def get_plugin_env(self, *args, **kwargs): - ret = super(gce, self).get_plugin_env(*args, **kwargs) - # We need native jinja2 types so that ip addresses can give JSON null value - ret['ANSIBLE_JINJA2_NATIVE'] = str(True) - return ret - - def inventory_as_dict(self, inventory_update, private_data_dir): - ret = super().inventory_as_dict(inventory_update, private_data_dir) - credential = inventory_update.get_cloud_credential() - # InventorySource.source_vars take precedence over ENV vars - if 'projects' not in ret: - ret['projects'] = [credential.get_input('project', default='')] - return ret - - -class vmware(PluginFileInjector): - plugin_name = 'vmware_vm_inventory' - base_injector = 'managed' - namespace = 'community' - collection = 'vmware' - - -class openstack(PluginFileInjector): - plugin_name = 'openstack' - namespace = 'openstack' - collection = 'cloud' - - def _get_clouds_dict(self, inventory_update, cred, private_data_dir): - openstack_data = _openstack_data(cred) - - openstack_data['clouds']['devstack']['private'] = inventory_update.source_vars_dict.get('private', True) - ansible_variables = { - 'use_hostnames': True, - 'expand_hostvars': False, - 'fail_on_errors': True, - } - provided_count = 0 - for var_name in ansible_variables: - if var_name in inventory_update.source_vars_dict: - ansible_variables[var_name] = inventory_update.source_vars_dict[var_name] - provided_count += 1 - if provided_count: - # Must we provide all 3 because the user provides any 1 of these?? - # this probably results in some incorrect mangling of the defaults - openstack_data['ansible'] = ansible_variables - return openstack_data - - def build_plugin_private_data(self, inventory_update, private_data_dir): - credential = inventory_update.get_cloud_credential() - private_data = {'credentials': {}} - - openstack_data = self._get_clouds_dict(inventory_update, credential, private_data_dir) - private_data['credentials'][credential] = yaml.safe_dump(openstack_data, default_flow_style=False, allow_unicode=True) - return private_data - - def get_plugin_env(self, inventory_update, private_data_dir, private_data_files): - env = super(openstack, self).get_plugin_env(inventory_update, private_data_dir, private_data_files) - credential = inventory_update.get_cloud_credential() - cred_data = private_data_files['credentials'] - env['OS_CLIENT_CONFIG_FILE'] = to_container_path(cred_data[credential], private_data_dir) - return env - - -class rhv(PluginFileInjector): - """ovirt uses the custom credential templating, and that is all""" - - plugin_name = 'ovirt' - base_injector = 'template' - initial_version = '2.9' - namespace = 'ovirt' - collection = 'ovirt' - downstream_namespace = 'redhat' - downstream_collection = 'rhv' - use_fqcn = True - - -class satellite6(PluginFileInjector): - plugin_name = 'foreman' - namespace = 'theforeman' - collection = 'foreman' - downstream_namespace = 'redhat' - downstream_collection = 'satellite' - use_fqcn = True - - def get_plugin_env(self, inventory_update, private_data_dir, private_data_files): - # this assumes that this is merged - # https://github.com/ansible/ansible/pull/52693 - credential = inventory_update.get_cloud_credential() - ret = super(satellite6, self).get_plugin_env(inventory_update, private_data_dir, private_data_files) - if credential: - ret['FOREMAN_SERVER'] = credential.get_input('host', default='') - ret['FOREMAN_USER'] = credential.get_input('username', default='') - ret['FOREMAN_PASSWORD'] = credential.get_input('password', default='') - return ret - - -class terraform(PluginFileInjector): - plugin_name = 'terraform_state' - namespace = 'cloud' - collection = 'terraform' - use_fqcn = True - - def inventory_as_dict(self, inventory_update, private_data_dir): - ret = super().inventory_as_dict(inventory_update, private_data_dir) - credential = inventory_update.get_cloud_credential() - config_cred = credential.get_input('configuration') - if config_cred: - handle, path = tempfile.mkstemp(dir=os.path.join(private_data_dir, 'env')) - with os.fdopen(handle, 'w') as f: - os.chmod(path, stat.S_IRUSR | stat.S_IWUSR) - f.write(config_cred) - ret['backend_config_files'] = to_container_path(path, private_data_dir) - return ret - - def build_plugin_private_data(self, inventory_update, private_data_dir): - credential = inventory_update.get_cloud_credential() - - private_data = {'credentials': {}} - gce_cred = credential.get_input('gce_credentials', default=None) - if gce_cred: - private_data['credentials'][credential] = gce_cred - return private_data - - def get_plugin_env(self, inventory_update, private_data_dir, private_data_files): - env = super(terraform, self).get_plugin_env(inventory_update, private_data_dir, private_data_files) - credential = inventory_update.get_cloud_credential() - cred_data = private_data_files['credentials'] - if credential in cred_data: - env['GOOGLE_BACKEND_CREDENTIALS'] = to_container_path(cred_data[credential], private_data_dir) - return env - - -class controller(PluginFileInjector): - plugin_name = 'tower' # TODO: relying on routing for now, update after EEs pick up revised collection - base_injector = 'template' - namespace = 'awx' - collection = 'awx' - downstream_namespace = 'ansible' - downstream_collection = 'controller' - - -class insights(PluginFileInjector): - plugin_name = 'insights' - base_injector = 'template' - namespace = 'redhatinsights' - collection = 'insights' - downstream_namespace = 'redhat' - downstream_collection = 'insights' - use_fqcn = True - - -class openshift_virtualization(PluginFileInjector): - plugin_name = 'kubevirt' - base_injector = 'template' - namespace = 'kubevirt' - collection = 'core' - downstream_namespace = 'redhat' - downstream_collection = 'openshift_virtualization' - use_fqcn = True - - -class constructed(PluginFileInjector): - plugin_name = 'constructed' - namespace = 'ansible' - collection = 'builtin' - - def build_env(self, *args, **kwargs): - env = super().build_env(*args, **kwargs) - # Enable script inventory plugin so we pick up the script files from source inventories - env['ANSIBLE_INVENTORY_ENABLED'] += ',script' - env['ANSIBLE_INVENTORY_ANY_UNPARSED_IS_FAILED'] = 'True' - return env diff --git a/awx_plugins/tests/test_credential_plugins.py b/awx_plugins/tests/test_credential_plugins.py deleted file mode 100644 index 660fdf756b..0000000000 --- a/awx_plugins/tests/test_credential_plugins.py +++ /dev/null @@ -1,142 +0,0 @@ -import pytest -from unittest import mock -from awx_plugins.credentials import hashivault - - -def test_imported_azure_cloud_sdk_vars(): - from awx_plugins.credentials import azure_kv - - assert len(azure_kv.clouds) > 0 - assert all([hasattr(c, 'name') for c in azure_kv.clouds]) - assert all([hasattr(c, 'suffixes') for c in azure_kv.clouds]) - assert all([hasattr(c.suffixes, 'keyvault_dns') for c in azure_kv.clouds]) - - -def test_hashivault_approle_auth(): - kwargs = { - 'role_id': 'the_role_id', - 'secret_id': 'the_secret_id', - } - expected_res = { - 'role_id': 'the_role_id', - 'secret_id': 'the_secret_id', - } - res = hashivault.approle_auth(**kwargs) - assert res == expected_res - - -def test_hashivault_kubernetes_auth(): - kwargs = { - 'kubernetes_role': 'the_kubernetes_role', - } - expected_res = { - 'role': 'the_kubernetes_role', - 'jwt': 'the_jwt', - } - with mock.patch('pathlib.Path') as path_mock: - mock.mock_open(path_mock.return_value.open, read_data='the_jwt') - res = hashivault.kubernetes_auth(**kwargs) - path_mock.assert_called_with('/var/run/secrets/kubernetes.io/serviceaccount/token') - assert res == expected_res - - -def test_hashivault_client_cert_auth_explicit_role(): - kwargs = { - 'client_cert_role': 'test-cert-1', - } - expected_res = { - 'name': 'test-cert-1', - } - res = hashivault.client_cert_auth(**kwargs) - assert res == expected_res - - -def test_hashivault_client_cert_auth_no_role(): - kwargs = {} - expected_res = { - 'name': None, - } - res = hashivault.client_cert_auth(**kwargs) - assert res == expected_res - - -def test_hashivault_userpass_auth(): - kwargs = {'username': 'the_username', 'password': 'the_password'} - expected_res = {'username': 'the_username', 'password': 'the_password'} - res = hashivault.userpass_auth(**kwargs) - assert res == expected_res - - -def test_hashivault_handle_auth_token(): - kwargs = { - 'token': 'the_token', - } - token = hashivault.handle_auth(**kwargs) - assert token == kwargs['token'] - - -def test_hashivault_handle_auth_approle(): - kwargs = { - 'role_id': 'the_role_id', - 'secret_id': 'the_secret_id', - } - with mock.patch.object(hashivault, 'method_auth') as method_mock: - method_mock.return_value = 'the_token' - token = hashivault.handle_auth(**kwargs) - method_mock.assert_called_with(**kwargs, auth_param=kwargs) - assert token == 'the_token' - - -def test_hashivault_handle_auth_kubernetes(): - kwargs = { - 'kubernetes_role': 'the_kubernetes_role', - } - with mock.patch.object(hashivault, 'method_auth') as method_mock: - with mock.patch('pathlib.Path') as path_mock: - mock.mock_open(path_mock.return_value.open, read_data='the_jwt') - method_mock.return_value = 'the_token' - token = hashivault.handle_auth(**kwargs) - method_mock.assert_called_with(**kwargs, auth_param={'role': 'the_kubernetes_role', 'jwt': 'the_jwt'}) - assert token == 'the_token' - - -def test_hashivault_handle_auth_client_cert(): - kwargs = { - 'client_cert_public': "foo", - 'client_cert_private': "bar", - 'client_cert_role': 'test-cert-1', - } - auth_params = { - 'name': 'test-cert-1', - } - with mock.patch.object(hashivault, 'method_auth') as method_mock: - method_mock.return_value = 'the_token' - token = hashivault.handle_auth(**kwargs) - method_mock.assert_called_with(**kwargs, auth_param=auth_params) - assert token == 'the_token' - - -def test_hashivault_handle_auth_not_enough_args(): - with pytest.raises(Exception): - hashivault.handle_auth() - - -class TestDelineaImports: - """ - These module have a try-except for ImportError which will allow using the older library - but we do not want the awx_devel image to have the older library, - so these tests are designed to fail if these wind up using the fallback import - """ - - def test_dsv_import(self): - from awx_plugins.credentials.dsv import SecretsVault # noqa - - # assert this module as opposed to older thycotic.secrets.vault - assert SecretsVault.__module__ == 'delinea.secrets.vault' - - def test_tss_import(self): - from awx_plugins.credentials.tss import DomainPasswordGrantAuthorizer, PasswordGrantAuthorizer, SecretServer, ServerSecret # noqa - - for cls in (DomainPasswordGrantAuthorizer, PasswordGrantAuthorizer, SecretServer, ServerSecret): - # assert this module as opposed to older thycotic.secrets.server - assert cls.__module__ == 'delinea.secrets.server'