|
| 1 | +"""Configuration instances from Hashicorp Vault.""" |
| 2 | + |
| 3 | +import time |
| 4 | +from typing import ( |
| 5 | + Any, |
| 6 | + Dict, |
| 7 | + ItemsView, |
| 8 | + KeysView, |
| 9 | + Mapping, |
| 10 | + Optional, |
| 11 | + Union, |
| 12 | + ValuesView, |
| 13 | + cast, |
| 14 | +) |
| 15 | + |
| 16 | +import hvac |
| 17 | +from hvac.exceptions import InvalidPath |
| 18 | + |
| 19 | + |
| 20 | +from .. import Configuration, InterpolateType, config_from_dict |
| 21 | + |
| 22 | + |
| 23 | +class Cache: |
| 24 | + """Cache class.""" |
| 25 | + |
| 26 | + def __init__(self, value: Dict[str, Any], ts: float): # noqa: D107 |
| 27 | + self.value = value |
| 28 | + self.ts = ts |
| 29 | + |
| 30 | + |
| 31 | +class HashicorpVaultConfiguration(Configuration): |
| 32 | + """ |
| 33 | + Hashicorp Vault Configuration class. |
| 34 | +
|
| 35 | + The Hashicorp Vault Configuration class takes Vault credentials and |
| 36 | + behaves like a drop-in replacement for the regular Configuration class. |
| 37 | +
|
| 38 | + The following limitations apply to the Hashicorp Vault Configurations: |
| 39 | + - only works with KV version 2 |
| 40 | + - only supports the latest secret version |
| 41 | + - assumes that secrets are named as <engine name>/<path>/<field> |
| 42 | + """ |
| 43 | + |
| 44 | + def __init__( |
| 45 | + self, |
| 46 | + engine: str, |
| 47 | + cache_expiration: int = 5 * 60, |
| 48 | + interpolate: InterpolateType = False, |
| 49 | + **kwargs: Mapping[str, Any], |
| 50 | + ) -> None: |
| 51 | + """ |
| 52 | + Constructor. |
| 53 | +
|
| 54 | + See https://developer.hashicorp.com/vault/docs/get-started/developer-qs. |
| 55 | + """ # noqa: E501 |
| 56 | + self._client = hvac.Client(**kwargs) |
| 57 | + self._cache_expiration = cache_expiration |
| 58 | + self._cache: Dict[str, Cache] = {} |
| 59 | + self._engine = engine |
| 60 | + self._interpolate = {} if interpolate is True else interpolate |
| 61 | + self._default_levels = None |
| 62 | + |
| 63 | + def _get_secret(self, secret: str) -> Optional[Dict[str, Any]]: |
| 64 | + now = time.time() |
| 65 | + from_cache = self._cache.get(secret) |
| 66 | + if from_cache and from_cache.ts + self._cache_expiration > now: |
| 67 | + return from_cache.value |
| 68 | + try: |
| 69 | + data = cast( |
| 70 | + Dict[str, Any], |
| 71 | + self._client.kv.v2.read_secret(secret, mount_point=self._engine)[ |
| 72 | + "data" |
| 73 | + ]["data"], |
| 74 | + ) |
| 75 | + self._cache[secret] = Cache(value=data, ts=now) |
| 76 | + return data |
| 77 | + except (InvalidPath, KeyError): |
| 78 | + if secret in self._cache: |
| 79 | + del self._cache[secret] |
| 80 | + return None |
| 81 | + |
| 82 | + def __getitem__(self, item: str) -> Any: # noqa: D105 |
| 83 | + path, *rest = item.split(".", 1) |
| 84 | + secret = self._get_secret(path) |
| 85 | + if secret is None: |
| 86 | + raise KeyError(item) |
| 87 | + else: |
| 88 | + return ( |
| 89 | + Configuration(secret)[".".join(rest)] if rest else Configuration(secret) |
| 90 | + ) |
| 91 | + |
| 92 | + def __getattr__(self, item: str) -> Any: # noqa: D105 |
| 93 | + secret = self._get_secret(item) |
| 94 | + if secret is None: |
| 95 | + raise AttributeError(item) |
| 96 | + else: |
| 97 | + return Configuration(secret) |
| 98 | + |
| 99 | + def get(self, key: str, default: Any = None) -> Union[dict, Any]: |
| 100 | + """ |
| 101 | + Get the configuration values corresponding to :attr:`key`. |
| 102 | +
|
| 103 | + :param key: key to retrieve |
| 104 | + :param default: default value in case the key is missing |
| 105 | + :return: the value found or a default |
| 106 | + """ |
| 107 | + try: |
| 108 | + return self[key] |
| 109 | + except KeyError: |
| 110 | + return default |
| 111 | + |
| 112 | + def keys( |
| 113 | + self, levels: Optional[int] = None |
| 114 | + ) -> Union["Configuration", Any, KeysView[str]]: |
| 115 | + """Return a set-like object providing a view on the configuration keys.""" |
| 116 | + assert not levels # Vault secrets don't support separators |
| 117 | + return cast( |
| 118 | + KeysView[str], |
| 119 | + self._client.list(f"/{self._engine}/metadata")["data"]["keys"], |
| 120 | + ) |
| 121 | + |
| 122 | + def values( |
| 123 | + self, levels: Optional[int] = None |
| 124 | + ) -> Union["Configuration", Any, ValuesView[Any]]: |
| 125 | + """Return a set-like object providing a view on the configuration values.""" |
| 126 | + assert not levels # GCP Secret Manager secrets don't support separators |
| 127 | + return cast( |
| 128 | + ValuesView[str], |
| 129 | + ( |
| 130 | + self._get_secret(k) |
| 131 | + for k in self._client.list(f"/{self._engine}/metadata")["data"]["keys"] |
| 132 | + ), |
| 133 | + ) |
| 134 | + |
| 135 | + def items( |
| 136 | + self, levels: Optional[int] = None |
| 137 | + ) -> Union["Configuration", Any, ItemsView[str, Any]]: |
| 138 | + """Return a set-like object providing a view on the configuration items.""" |
| 139 | + assert not levels # GCP Secret Manager secrets don't support separators |
| 140 | + return cast( |
| 141 | + ItemsView[str, Any], |
| 142 | + ( |
| 143 | + (k, self._get_secret(k)) |
| 144 | + for k in self._client.list(f"/{self._engine}/metadata")["data"]["keys"] |
| 145 | + ), |
| 146 | + ) |
| 147 | + |
| 148 | + def reload(self) -> None: |
| 149 | + """Reload the configuration.""" |
| 150 | + self._cache.clear() |
| 151 | + |
| 152 | + def __repr__(self) -> str: # noqa: D105 |
| 153 | + return "<HashicorpVaultConfiguration: %r>" % self._engine |
| 154 | + |
| 155 | + @property |
| 156 | + def _config(self) -> Dict[str, Any]: # type: ignore |
| 157 | + return config_from_dict(dict(self.items()))._config |
0 commit comments