Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
virtualenvs-in-project: true

- name: Install dependencies
run: poetry install -v -E toml -E yaml -E azure -E aws -E gcp
run: poetry install -v -E toml -E yaml -E azure -E aws -E gcp -E vault

- name: Run pytest
run: poetry run pytest --cov=./ --cov-report=xml
Expand Down
2 changes: 1 addition & 1 deletion config/contrib/aws.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Configuration from AWS Secrets Manager."""
"""Configuration instances from AWS Secrets Manager."""

import json
import time
Expand Down
4 changes: 2 additions & 2 deletions config/contrib/azure.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Configuration from Azure KeyVaults."""
"""Configuration instances from Azure KeyVaults."""

import time
from typing import Any, Dict, ItemsView, KeysView, Optional, Union, ValuesView, cast
Expand Down Expand Up @@ -88,7 +88,7 @@ def __getitem__(self, item: str) -> Any: # noqa: D105
def __getattr__(self, item: str) -> Any: # noqa: D105
secret = self._get_secret(item)
if secret is None:
raise KeyError(item)
raise AttributeError(item)
else:
return secret

Expand Down
4 changes: 2 additions & 2 deletions config/contrib/gcp.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Configuration from GCP Secret Manager."""
"""Configuration instances from GCP Secret Manager."""

import time
from typing import Any, Dict, ItemsView, KeysView, Optional, Union, ValuesView, cast
Expand Down Expand Up @@ -89,7 +89,7 @@ def __getitem__(self, item: str) -> Any: # noqa: D105
def __getattr__(self, item: str) -> Any: # noqa: D105
secret = self._get_secret(item)
if secret is None:
raise KeyError(item)
raise AttributeError(item)
else:
return secret

Expand Down
157 changes: 157 additions & 0 deletions config/contrib/vault.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""Configuration instances from Hashicorp Vault."""

import time
from typing import (
Any,
Dict,
ItemsView,
KeysView,
Mapping,
Optional,
Union,
ValuesView,
cast,
)

import hvac
from hvac.exceptions import InvalidPath


from .. import Configuration, InterpolateType, config_from_dict


class Cache:
"""Cache class."""

def __init__(self, value: Dict[str, Any], ts: float): # noqa: D107
self.value = value
self.ts = ts


class HashicorpVaultConfiguration(Configuration):
"""
Hashicorp Vault Configuration class.

The Hashicorp Vault Configuration class takes Vault credentials and
behaves like a drop-in replacement for the regular Configuration class.

The following limitations apply to the Hashicorp Vault Configurations:
- only works with KV version 2
- only supports the latest secret version
- assumes that secrets are named as <engine name>/<path>/<field>
"""

def __init__(
self,
engine: str,
cache_expiration: int = 5 * 60,
interpolate: InterpolateType = False,
**kwargs: Mapping[str, Any],
) -> None:
"""
Constructor.

See https://developer.hashicorp.com/vault/docs/get-started/developer-qs.
""" # noqa: E501
self._client = hvac.Client(**kwargs)
self._cache_expiration = cache_expiration
self._cache: Dict[str, Cache] = {}
self._engine = engine
self._interpolate = {} if interpolate is True else interpolate
self._default_levels = None

def _get_secret(self, secret: str) -> Optional[Dict[str, Any]]:
now = time.time()
from_cache = self._cache.get(secret)
if from_cache and from_cache.ts + self._cache_expiration > now:
return from_cache.value
try:
data = cast(
Dict[str, Any],
self._client.kv.v2.read_secret(secret, mount_point=self._engine)[
"data"
]["data"],
)
self._cache[secret] = Cache(value=data, ts=now)
return data
except (InvalidPath, KeyError):
if secret in self._cache:
del self._cache[secret]
return None

def __getitem__(self, item: str) -> Any: # noqa: D105
path, *rest = item.split(".", 1)
secret = self._get_secret(path)
if secret is None:
raise KeyError(item)
else:
return (
Configuration(secret)[".".join(rest)] if rest else Configuration(secret)
)

def __getattr__(self, item: str) -> Any: # noqa: D105
secret = self._get_secret(item)
if secret is None:
raise AttributeError(item)
else:
return Configuration(secret)

def get(self, key: str, default: Any = None) -> Union[dict, Any]:
"""
Get the configuration values corresponding to :attr:`key`.

:param key: key to retrieve
:param default: default value in case the key is missing
:return: the value found or a default
"""
try:
return self[key]
except KeyError:
return default

def keys(
self, levels: Optional[int] = None
) -> Union["Configuration", Any, KeysView[str]]:
"""Return a set-like object providing a view on the configuration keys."""
assert not levels # Vault secrets don't support separators
return cast(
KeysView[str],
self._client.list(f"/{self._engine}/metadata")["data"]["keys"],
)

def values(
self, levels: Optional[int] = None
) -> Union["Configuration", Any, ValuesView[Any]]:
"""Return a set-like object providing a view on the configuration values."""
assert not levels # GCP Secret Manager secrets don't support separators
return cast(
ValuesView[str],
(
self._get_secret(k)
for k in self._client.list(f"/{self._engine}/metadata")["data"]["keys"]
),
)

def items(
self, levels: Optional[int] = None
) -> Union["Configuration", Any, ItemsView[str, Any]]:
"""Return a set-like object providing a view on the configuration items."""
assert not levels # GCP Secret Manager secrets don't support separators
return cast(
ItemsView[str, Any],
(
(k, self._get_secret(k))
for k in self._client.list(f"/{self._engine}/metadata")["data"]["keys"]
),
)

def reload(self) -> None:
"""Reload the configuration."""
self._cache.clear()

def __repr__(self) -> str: # noqa: D105
return "<HashicorpVaultConfiguration: %r>" % self._engine

@property
def _config(self) -> Dict[str, Any]: # type: ignore
return config_from_dict(dict(self.items()))._config
66 changes: 65 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ azure-identity = { version = "^1.13.0", optional = true }
azure-keyvault = { version = "^4.2.0", optional = true }
boto3 = { version = "^1.28.20", optional = true }
google-cloud-secret-manager = { version = "^2.16.3", optional = true }
hvac = { version ="^1.1.1", optional = true }
pyyaml = { version = "^6.0", optional = true }
toml = { version = "^0.10.0", optional = true }

Expand Down Expand Up @@ -44,6 +45,7 @@ aws = ["boto3"]
azure = ["azure-keyvault", "azure-identity"]
gcp = ["google-cloud-secret-manager"]
toml = ["toml"]
vault = ["hvac"]
yaml = ["pyyaml"]

[tool.black]
Expand All @@ -58,10 +60,72 @@ envlist = py38, py39, py310, py311
[testenv]
allowlist_externals = poetry
commands =
poetry install -v -E toml -E yaml -E azure -E aws -E gcp
poetry install -v -E toml -E yaml -E azure -E aws -E gcp -E vault
poetry run pytest
"""

[tool.mypy]
warn_return_any = true
warn_unused_configs = true
disallow_untyped_calls = true
disallow_untyped_defs = true
disallow_subclassing_any = true
disallow_any_decorated = true
disallow_incomplete_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
warn_unused_ignores = true
warn_redundant_casts = true
exclude = [
'tests'
]

[[tool.mypy.overrides]]
module= [
'google.auth.credentials',
'yaml',
'toml',
'boto3',
'botocore.exceptions',
'hvac',
'hvac.exceptions',
]
ignore_missing_imports = true

[tool.coverage.run]
branch = true
include = [
'config/*'
]

[tool.coverage.html]
directory = 'cover'

[tool.pytest.ini_options]
minversion = "6.0"
addopts = '--cov --cov-report=html --cov-report term-missing --flake8 --mypy --black'
flake8-max-line-length = 88
flake8-extensions =[
'flake8-docstrings',
'flake8-comprehensions',
'flake8-import-order',
'flake8-bugbear',
'flake8-blind-except',
'flake8-builtins',
'flake8-logging-format',
'flake8-black'
]
flake8-ignore = [
'* E203',
'tests/* ALL',
'docs/* ALL'
]
filterwarnings =[
'ignore::pytest.PytestDeprecationWarning',
'ignore::DeprecationWarning',
'ignore::pytest.PytestWarning'
]

[build-system]
build-backend = "poetry.masonry.api"
requires = ["poetry>=1.5.0"]
70 changes: 0 additions & 70 deletions setup.cfg

This file was deleted.

4 changes: 2 additions & 2 deletions tests/contrib/test_azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ def test_get_attr(): # type: ignore

assert cfg.foo == "foo_val"

with raises(KeyError):
assert cfg.foo_missing is KeyError
with raises(AttributeError):
assert cfg.foo_missing is AttributeError


@pytest.mark.skipif("azure is None")
Expand Down
7 changes: 2 additions & 5 deletions tests/contrib/test_gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ def call(*args: list, **kwargs: dict) -> FakeSecretClient:
def test_load_dict(): # type: ignore
secretmanager_v1.SecretManagerServiceClient = fake_client(DICT)
cfg = GCPSecretManagerConfiguration("fake_id")

print(cfg)

assert cfg["foo"] == "foo_val"
assert cfg["with_underscore"] == "works"
assert cfg.get("foo", "default") == "foo_val"
Expand Down Expand Up @@ -120,8 +117,8 @@ def test_get_attr(): # type: ignore

assert cfg.foo == "foo_val"

with raises(KeyError):
assert cfg.foo_missing is KeyError
with raises(AttributeError):
assert cfg.foo_missing is AttributeError


@pytest.mark.skipif("secretmanager_v1 is None")
Expand Down
Loading