[backport][4.6] Update Azure Key Vault plugin to use Managed Identity (#6939)

* Bug on file name. Commiting to remove it.

* Update azure_kv plugin to use ManagedIdentity. Add testing.
This commit is contained in:
jessicamack
2025-05-13 10:05:24 -04:00
committed by GitHub
parent 825a48bb32
commit ba7ee23298
3 changed files with 139 additions and 181 deletions

View File

@@ -1,5 +1,10 @@
from azure.keyvault.secrets import SecretClient
from azure.identity import ClientSecretCredential
from azure.identity import (
ClientSecretCredential,
CredentialUnavailableError,
ManagedIdentityCredential,
)
from azure.core.credentials import TokenCredential
from msrestazure import azure_cloud
from .plugin import CredentialPlugin
@@ -50,14 +55,66 @@ azure_keyvault_inputs = {
'help_text': _('Used to specify a specific secret version (if left empty, the latest version will be used).'),
},
],
'required': ['url', 'client', 'secret', 'tenant', 'secret_field'],
'required': ['url', 'secret_field'],
}
def azure_keyvault_backend(**kwargs):
csc = ClientSecretCredential(tenant_id=kwargs['tenant'], client_id=kwargs['client'], client_secret=kwargs['secret'])
kv = SecretClient(credential=csc, vault_url=kwargs['url'])
return kv.get_secret(name=kwargs['secret_field'], version=kwargs.get('secret_version', '')).value
def _initialize_credential(
tenant: str = '',
client: str = '',
secret: str = '',
) -> TokenCredential:
explicit_credentials_provided = all((tenant, client, secret))
if explicit_credentials_provided:
return ClientSecretCredential(
tenant_id=tenant,
client_id=client,
client_secret=secret,
)
return ManagedIdentityCredential()
def azure_keyvault_backend(
*,
url: str,
client: str = '',
secret: str = '',
tenant: str = '',
secret_field: str,
secret_version: str = '',
) -> str | None:
"""Get a credential and retrieve a secret from an Azure Key Vault.
An empty string for an optional parameter counts as not provided.
:param url: An Azure Key Vault URI.
:param client: The Client ID (optional).
:param secret: The Client Secret (optional).
:param tenant: The Tenant ID (optional).
:param secret_field: The name of the secret to retrieve from the
vault.
:param secret_version: The version of the secret to retrieve
(optional).
:returns: The secret from the Key Vault.
:raises RuntimeError: If the software is not being run on an Azure
VM.
"""
chosen_credential = _initialize_credential(tenant, client, secret)
keyvault = SecretClient(credential=chosen_credential, vault_url=url)
try:
keyvault_secret = keyvault.get_secret(
name=secret_field,
version=secret_version,
)
except CredentialUnavailableError as secret_lookup_err:
raise RuntimeError(
'You are not operating on an Azure VM, so the Managed Identity '
'feature is unavailable. Please provide the full Client ID, '
'Client Secret, and Tenant ID or run the software on an Azure VM.',
) from secret_lookup_err
return keyvault_secret.value
azure_keyvault_plugin = CredentialPlugin('Microsoft Azure Key Vault', inputs=azure_keyvault_inputs, backend=azure_keyvault_backend)

View File

@@ -1,6 +1,12 @@
import pytest
from unittest import mock
from awx.main.credential_plugins import hashivault
from awx.main.credential_plugins import hashivault, azure_kv
from azure.keyvault.secrets import (
KeyVaultSecret,
SecretClient,
SecretProperties,
)
def test_imported_azure_cloud_sdk_vars():
@@ -140,3 +146,72 @@ class TestDelineaImports:
for cls in (DomainPasswordGrantAuthorizer, PasswordGrantAuthorizer, SecretServer, ServerSecret):
# assert this module as opposed to older thycotic.secrets.server
assert cls.__module__ == 'delinea.secrets.server'
class _FakeSecretClient(SecretClient):
def get_secret(
self: '_FakeSecretClient',
name: str,
version: str | None = None,
**kwargs: str,
) -> KeyVaultSecret:
props = SecretProperties(None, None)
return KeyVaultSecret(properties=props, value='test-secret')
def test_azure_kv_invalid_env() -> None:
"""Test running outside of Azure raises error."""
error_msg = (
'You are not operating on an Azure VM, so the Managed Identity '
'feature is unavailable. Please provide the full Client ID, '
'Client Secret, and Tenant ID or run the software on an Azure VM.'
)
with pytest.raises(
RuntimeError,
match=error_msg,
):
azure_kv.azure_keyvault_backend(
url='https://test.vault.azure.net',
client='',
secret='client-secret',
tenant='tenant-id',
secret_field='secret',
secret_version='',
)
@pytest.mark.parametrize(
('client', 'secret', 'tenant'),
(
pytest.param('', '', '', id='managed-identity'),
pytest.param(
'client-id',
'client-secret',
'tenant-id',
id='client-secret-credential',
),
),
)
def test_azure_kv_valid_auth(
monkeypatch: pytest.MonkeyPatch,
client: str,
secret: str,
tenant: str,
) -> None:
"""Test successful Azure authentication via Managed Identity and credentials."""
monkeypatch.setattr(
azure_kv,
'SecretClient',
_FakeSecretClient,
)
keyvault_secret = azure_kv.azure_keyvault_backend(
url='https://test.vault.azure.net',
client=client,
secret=secret,
tenant=tenant,
secret_field='secret',
secret_version='',
)
assert keyvault_secret == 'test-secret'