Add support for Kubernetes authentication method in Hashicorp Vault secret lookup

Signed-off-by: liortamary <lior.tamary@houzz.com>
This commit is contained in:
liortamary 2022-02-21 17:41:14 +02:00
parent b7d0ec53e8
commit e0ce4c49f3

View File

@ -47,14 +47,21 @@ base_inputs = {
'multiline': False,
'help_text': _('Name of the namespace to use when authenticate and retrieve secrets'),
},
{
'id': 'kubernetes_role',
'label': _('Kubernetes role'),
'type': 'string',
'multiline': False,
'help_text': _('The Role for Kubernetes Authentication'),
},
{
'id': 'default_auth_path',
'label': _('Path to Approle Auth'),
'label': _('Path to Auth'),
'type': 'string',
'multiline': False,
'default': 'approle',
'help_text': _(
'The AppRole Authentication path to use if one isn\'t provided in the metadata when linking to an input field. Defaults to \'approle\''
'The Authentication path to use if one isn\'t provided in the metadata when linking to an input field. Defaults to \'approle\''
),
},
],
@ -151,9 +158,11 @@ def handle_auth(**kwargs):
if kwargs.get('token'):
token = kwargs['token']
elif kwargs.get('role_id') and kwargs.get('secret_id'):
token = approle_auth(**kwargs)
token = method_auth(**kwargs, auth_param=approle_auth(**kwargs))
elif kwargs.get('kubernetes_role'):
token = method_auth(**kwargs, auth_param=kubernetes_auth(**kwargs))
else:
raise Exception('Either token or AppRole parameters must be set')
raise Exception('Either token or AppRole/Kubernetes authentication parameters must be set')
return token
@ -161,6 +170,23 @@ def handle_auth(**kwargs):
def approle_auth(**kwargs):
role_id = kwargs['role_id']
secret_id = kwargs['secret_id']
# AppRole Login
return {'role_id': role_id, 'secret_id': secret_id}
def kubernetes_auth(**kwargs):
role = kwargs['kubernetes_role']
jwt_file = pathlib.Path('/var/run/secrets/kubernetes.io/serviceaccount/token')
with jwt_file.open('r') as jwt_fo:
jwt = jwt_fo.read().rstrip()
# Kubernetes Login
return {'role': role, 'jwt': jwt}
def method_auth(**kwargs):
# get auth method specific params
request_kwargs = {'json': kwargs['auth_param'], 'timeout': 30}
# we first try to use the 'auth_path' from the metadata
# if not found we try to fetch the 'default_auth_path' from inputs
auth_path = kwargs.get('auth_path') or kwargs['default_auth_path']
@ -168,9 +194,6 @@ def approle_auth(**kwargs):
url = urljoin(kwargs['url'], 'v1')
cacert = kwargs.get('cacert', None)
request_kwargs = {'timeout': 30}
# AppRole Login
request_kwargs['json'] = {'role_id': role_id, 'secret_id': secret_id}
sess = requests.Session()
# Namespace support
if kwargs.get('namespace'):