diff --git a/.gitignore b/.gitignore index 0b53c76..66cbc50 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ __pycache__/ *.egg-info/ *cache* *.pyc +venv/ # Ommitted files and directories management/ diff --git a/pyproject.toml b/pyproject.toml index 35744b2..1952436 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,10 @@ emails = "^0.6" unidecode = "^1.1.2" rapidfuzz = "^0.14.2" poetry = "^1.1.5" +prometheus-client = "^0.9.0" +opentelemetry-exporter-jaeger = "^1.0.0" +opentelemetry-sdk = "^1.0.0" +opentelemetry-api = "^1.0.0" [tool.poetry.dev-dependencies] black = "^20.8b1" diff --git a/requirements.txt b/requirements.txt index 3bbcfa6..9a02505 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,11 @@ -alembic==1.5.7; (python_version >= "2.7" and python_full_version < "3.0.0") or (python_full_version >= "3.6.0") +alembic==1.5.8; (python_version >= "2.7" and python_full_version < "3.0.0") or (python_full_version >= "3.6.0") appdirs==1.4.4; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" ariadne==0.12.0 atomicwrites==1.4.0; python_version >= "3.6" and python_full_version < "3.0.0" and sys_platform == "win32" or sys_platform == "win32" and python_version >= "3.6" and python_full_version >= "3.4.0" attrs==20.3.0; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" and python_version >= "3.6" bcrypt==3.2.0; python_version >= "3.6" -boto3==1.17.29; (python_version >= "2.7" and python_full_version < "3.0.0") or (python_full_version >= "3.6.0") -botocore==1.20.29; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.6.0" +boto3==1.17.39; (python_version >= "2.7" and python_full_version < "3.0.0") or (python_full_version >= "3.6.0") +botocore==1.20.39; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.6.0" cachecontrol==0.12.6; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" cachetools==4.2.1; python_version >= "3.5" and python_version < "4.0" cachy==0.3.0; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" @@ -17,7 +17,7 @@ click==7.1.2; python_version >= "2.7" and python_full_version < "3.0.0" or pytho clikit==0.6.2; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" colorama==0.4.4; python_version >= "3.6" and python_full_version < "3.0.0" and sys_platform == "win32" or sys_platform == "win32" and python_full_version >= "3.5.0" and python_version >= "3.6" crashtest==0.3.1; python_version >= "3.6" and python_version < "4.0" and (python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0") -cryptography==3.4.6; python_version >= "3.6" and python_version < "4.0" and (python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0") and sys_platform == "linux" +cryptography==3.4.7; python_version >= "3.6" and python_version < "4.0" and (python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0") and sys_platform == "linux" cssselect==1.1.0; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" cssutils==2.2.0; python_version >= "3.6" distlib==0.3.1; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" @@ -25,8 +25,10 @@ emails==0.6 faker==5.8.0; python_version >= "3.6" filelock==3.0.12; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" flake8==3.9.0; (python_version >= "2.7" and python_full_version < "3.0.0") or (python_full_version >= "3.5.0") +googleapis-common-protos==1.52.0; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" and python_version >= "3.6" graphql-core==3.0.5; python_version >= "3.6" and python_version < "4" greenlet==1.0.0; python_version >= "3" and python_full_version < "3.0.0" or python_full_version >= "3.6.0" and python_version >= "3" +grpcio==1.36.1; python_version >= "3.6" h11==0.12.0; python_version >= "3.6" html5lib==1.1; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" httptools==0.1.1; sys_platform != "win32" and sys_platform != "cygwin" and platform_python_implementation != "PyPy" @@ -37,13 +39,18 @@ jeepney==0.6.0; python_version >= "3.6" and python_version < "4.0" and (python_v jmespath==0.10.0; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.6.0" keyring==21.8.0; python_version >= "3.6" and python_version < "4.0" and (python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0") lockfile==0.12.2; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" -lxml==4.6.2; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" +lxml==4.6.3; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" mako==1.1.4; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.6.0" markupsafe==1.1.1; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.6.0" mccabe==0.6.1; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" msgpack==1.0.2; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" mypy-extensions==0.4.3; python_version >= "3.5" mypy==0.790; python_version >= "3.5" +opentelemetry-api==1.0.0; python_version >= "3.6" +opentelemetry-exporter-jaeger-proto-grpc==1.0.0; python_version >= "3.6" +opentelemetry-exporter-jaeger-thrift==1.0.0; python_version >= "3.6" +opentelemetry-exporter-jaeger==1.0.0; python_version >= "3.6" +opentelemetry-sdk==1.0.0; python_version >= "3.6" packaging==20.9; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" and python_version >= "3.6" passlib==1.7.4 pastel==0.2.1; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" @@ -53,19 +60,21 @@ pluggy==0.13.1; python_version >= "3.6" and python_full_version < "3.0.0" or pyt poetry-core==1.0.2; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" poetry==1.1.5; (python_version >= "2.7" and python_full_version < "3.0.0") or (python_full_version >= "3.5.0") premailer==3.7.0 +prometheus-client==0.9.0 +protobuf==3.15.6; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" and python_version >= "3.6" psycopg2-binary==2.8.6; (python_version >= "2.7" and python_full_version < "3.0.0") or (python_full_version >= "3.4.0") ptyprocess==0.7.0; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" py==1.10.0; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" and python_version >= "3.6" pycodestyle==2.7.0; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" pycparser==2.20; python_version >= "3.6" and python_full_version < "3.0.0" or python_version >= "3.6" and python_full_version >= "3.4.0" -pyflakes==2.3.0; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" +pyflakes==2.3.1; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" pyjwt==1.7.1 pylev==1.3.0; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" pyparsing==2.4.7; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.4.0" and python_version >= "3.6" pytest-asyncio==0.14.0; python_version >= "3.5" pytest==6.2.2; python_version >= "3.6" python-dateutil==2.8.1; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.6.0" and python_version >= "3.6" -python-dotenv==0.15.0 +python-dotenv==0.16.0 python-editor==1.0.4; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.6.0" python-multipart==0.0.5 pywin32-ctypes==0.2.0; python_version >= "3.6" and python_version < "4.0" and (python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0") and sys_platform == "win32" @@ -73,14 +82,15 @@ pyyaml==5.4.1; python_version >= "2.7" and python_full_version < "3.0.0" or pyth rapidfuzz==0.14.2; python_version >= "2.7" requests-toolbelt==0.9.1; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" requests==2.25.1; (python_version >= "2.7" and python_full_version < "3.0.0") or (python_full_version >= "3.5.0") -s3transfer==0.3.4; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.6.0" +s3transfer==0.3.6; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.6.0" secretstorage==3.3.1; python_version >= "3.6" and python_version < "4.0" and (python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0") and sys_platform == "linux" shellingham==1.4.0; python_version >= "2.7" and python_version < "3.0" and python_full_version < "3.0.0" or python_version >= "2.6" and python_version < "3.0" and python_full_version >= "3.5.0" or python_version > "3.0" and python_version < "3.1" and python_full_version < "3.0.0" or python_version > "3.0" and python_version < "3.1" and python_full_version >= "3.5.0" or python_version > "3.1" and python_version < "3.2" and python_full_version < "3.0.0" or python_version > "3.1" and python_version < "3.2" and python_full_version >= "3.5.0" or python_version > "3.2" and python_version < "3.3" and python_full_version < "3.0.0" or python_version > "3.2" and python_version < "3.3" and python_full_version >= "3.5.0" or python_version > "3.3" and python_full_version < "3.0.0" or python_version > "3.3" and python_full_version >= "3.5.0" -six==1.15.0; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" -sqlalchemy==1.4.0; (python_version >= "2.7" and python_full_version < "3.0.0") or (python_full_version >= "3.6.0") +six==1.15.0; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" and python_version >= "3.6" +sqlalchemy==1.4.3; (python_version >= "2.7" and python_full_version < "3.0.0") or (python_full_version >= "3.6.0") starlette==0.13.8; python_version >= "3.6" tenacity==6.3.1 text-unidecode==1.3; python_version >= "3.6" +thrift==0.13.0; python_version >= "3.6" toml==0.10.2; python_version >= "3.6" and python_full_version < "3.0.0" or python_full_version >= "3.3.0" and python_version >= "3.6" tomlkit==0.7.0; python_version >= "2.7" and python_full_version < "3.0.0" or python_full_version >= "3.5.0" typed-ast==1.4.2; python_version >= "3.5" diff --git a/src/observability/__init__.py b/src/observability/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/observability/decorator.py b/src/observability/decorator.py new file mode 100644 index 0000000..d137f20 --- /dev/null +++ b/src/observability/decorator.py @@ -0,0 +1,69 @@ +from contextvars import ContextVar +from functools import wraps +from threading import get_ident +from typing import Callable + +from src.observability.measure.prometheus import PrometheusMeasurer +from src.observability.trace.jaeger import JaegerTracer + +default_tracer = JaegerTracer() +default_measurer = PrometheusMeasurer() +ctx = ContextVar('parent-trace', default=str(get_ident())) + + +def trace(name: str = ""): + def block(fn: Callable): + @wraps(fn) + def wrapper(*args, **kwargs): + ctx.set(str(get_ident())) + operation = name if name.strip() else fn.__name__ + with default_tracer.for_context(ctx) as t: + with t.operation_name(operation) as tracer: + return tracer.trace(operation, fn, *args, **kwargs) + + return wrapper + return block + + +def _measuring(name: str, description: str, measure: Callable): + def block(fn: Callable): + @wraps(fn) + def wrapper(*args, **kwargs): + operation = name if name.strip() else fn.__name__ + with default_measurer.track_call(operation, description) as delegate: + measure(delegate) + return fn(*args, **kwargs) + return wrapper + return block + + +def _collecting(name: str, description: str, collect: Callable): + def block(fn: Callable): + @wraps(fn) + def wrapper(*args, **kwargs): + operation = name if name.strip() else fn.__name__ + with default_measurer.track_call(operation, description) as delegate: + with collect(delegate): + return fn(*args, **kwargs) + return wrapper + return block + + +def count(name: str = "", description: str = ""): + return _measuring(name, description, lambda d: d.count()) + + +def inc(name: str = "", description: str = ""): + return _measuring(name, description, lambda d: d.inc()) + + +def dec(name: str = "", description: str = ""): + return _measuring(name, description, lambda d: d.dec()) + + +def observe(name: str = "", description: str = ""): + return _collecting(name, description, lambda d: d.observe()) + + +def observe_bucket(name: str = "", description: str = ""): + return _collecting(name, description, lambda d: d.observe_bucket()) diff --git a/src/observability/measure/__init__.py b/src/observability/measure/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/observability/measure/prometheus.py b/src/observability/measure/prometheus.py new file mode 100644 index 0000000..9637062 --- /dev/null +++ b/src/observability/measure/prometheus.py @@ -0,0 +1,84 @@ +from typing import Optional + +from prometheus_client import Counter, Gauge, Summary, Histogram, CollectorRegistry +from prometheus_client.exposition import choose_encoder + +from src.observability.measurer import AbstractMeasurer, AbstractMeasurerDelegate + + +class PrometheusTracerDelegate(AbstractMeasurerDelegate): + name: str + description: str + counter: Optional[Counter] + gauge: Optional[Gauge] + summary: Optional[Summary] + histogram: Optional[Histogram] + registry: CollectorRegistry + + def __init__(self, name: str, description: str, registry: CollectorRegistry): + self.name = name + self.description = description + self.registry = registry + self.counter = None + self.gauge = None + self.summary = None + self.histogram = None + + def count(self): + if self.counter is None: + self.counter = Counter(self.name, self.description) + self.registry.register(self.counter) + self.counter.inc() + + def inc(self): + if self.gauge is None: + self.gauge = Gauge(self.name, self.description) + self.registry.register(self.gauge) + self.gauge.inc() + + def dec(self): + if self.gauge is None: + self.gauge = Gauge(self.name, self.description) + self.registry.register(self.gauge) + self.gauge.dec() + + def observe(self): + if self.summary is None: + self.summary = Summary(self.name, self.description) + self.registry.register(self.summary) + return self.summary.time() + + def observe_bucket(self): + if self.histogram is None: + self.histogram = Histogram(self.name, self.description) + self.registry.register(self.histogram) + return self.histogram.time() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + +class PrometheusMeasurer(AbstractMeasurer): + context: dict + registry: CollectorRegistry + + def __init__(self): + self.context = {} + self.registry = CollectorRegistry() + + def track_call(self, name: str, description: str) -> AbstractMeasurerDelegate: + self.context[name] = self.context.get(name, PrometheusTracerDelegate(name, description, self.registry)) + return self.context[name] + + def export(self) -> bytes: + generate, _ = choose_encoder("") + return generate(self.registry) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass diff --git a/src/observability/measurer.py b/src/observability/measurer.py new file mode 100644 index 0000000..a72aa80 --- /dev/null +++ b/src/observability/measurer.py @@ -0,0 +1,59 @@ +import abc + + +class AbstractMeasurerDelegate(abc.ABC): + """ + Abstract class implemented by the actual measurer. + """ + + @abc.abstractmethod + def count(self): + raise NotImplementedError + + @abc.abstractmethod + def inc(self): + raise NotImplementedError + + @abc.abstractmethod + def dec(self): + raise NotImplementedError + + @abc.abstractmethod + def observe(self): + raise NotImplementedError + + @abc.abstractmethod + def observe_bucket(self): + raise NotImplementedError + + @abc.abstractmethod + def __enter__(self): + raise NotImplementedError + + @abc.abstractmethod + def __exit__(self, exc_type, exc_val, exc_tb): + raise NotImplementedError + + +class AbstractMeasurer(abc.ABC): + """ + Abstract class that must be implemented by tools used for measurement. + This is used to keep tracking of how many times a function is used and + to export this information. + """ + + @abc.abstractmethod + def track_call(self, name: str, description: str) -> AbstractMeasurerDelegate: + raise NotImplementedError + + @abc.abstractmethod + def export(self) -> bytes: + raise NotImplementedError + + @abc.abstractmethod + def __enter__(self): + raise NotImplementedError + + @abc.abstractmethod + def __exit__(self, exc_type, exc_val, exc_tb): + raise NotImplementedError diff --git a/src/observability/trace/__init__.py b/src/observability/trace/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/observability/trace/jaeger.py b/src/observability/trace/jaeger.py new file mode 100644 index 0000000..5f80679 --- /dev/null +++ b/src/observability/trace/jaeger.py @@ -0,0 +1,134 @@ +from contextvars import ContextVar +from queue import LifoQueue +from threading import get_ident +from typing import Callable, Optional + +from opentelemetry import trace +from opentelemetry.exporter.jaeger.proto.grpc import JaegerExporter +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import TracerProvider, Tracer, Span +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace.propagation import SPAN_KEY + +from src.observability.tracer import AbstractTracer, AbstractTracerDelegate + +jaeger_collector_endpoint = "localhost:14250" +application_name = "default-name" + + +class JaegerTracerDelegate(AbstractTracerDelegate): + tracer: Tracer + span: Optional[Span] + name: str + + def __init__(self, name: str, tracer: Tracer, parent: Optional['JaegerTracerDelegate']): + self.name = name + self.tracer = tracer + self.span = None if parent is None else parent.span + + def add_property(self, key: str, value: str): + if self.span is not None: + self.span.set_attribute(key, value) + + def trace(self, name: str, block: Callable, *args, **kwargs): + def execute(instance): + try: + return block(*args, **kwargs) + except Exception as ex: + instance.span_failure(ex, False) + + if self.span is None: + with self.tracer.start_as_current_span(name) as span: + self.span = span + return execute(self) + + ctx = ContextVar(SPAN_KEY) + ctx.set(self.span) + with self.tracer.start_span(name, context=ctx) as span: + self.span = span + return execute(self) + + def span_failure(self, ex: Exception, expected: bool): + if self.span is not None: + self.span.record_exception(ex) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.span.is_recording(): + self.span.end() + + +class JaegerTracer(AbstractTracer): + """ + The trace itself is tree, and each node is a single span inside the tree. Each delegate can access + its own span, while the JaegarTracer manage the whole span. + + For concurrent accesses, each jaeger instance will hold its own context. This context will keep + each span sub-tree isolation, where a sub-tree does not interfere with other sub-trees. If sub-trees + are spawned concurrently, they will have a common span as parent as will the context. + + If a thread pool is used, this context approach could lead to stranger behaviours? + """ + tracer: Tracer + context: dict[str, LifoQueue[AbstractTracerDelegate]] + root: ContextVar + + def __init__(self): + trace.set_tracer_provider( + TracerProvider( + resource=Resource.create({SERVICE_NAME: application_name}) + ) + ) + self.tracer = trace.get_tracer(application_name) + self.context = {} + self.root = ContextVar('parent-root', default=None) + jaeger_exporter = JaegerExporter(collector_endpoint=jaeger_collector_endpoint, insecure=True) + trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(jaeger_exporter)) + + def _current_name(self) -> str: + value = self.root.get() + if value is None: + return str(get_ident()) + return value + + def for_context(self, context: ContextVar): + """ + Used for context propagation, this must be handle careful when spawning + multiple threads and for coroutines, otherwise the spans for a trace could + be wrong. + """ + self.root = context + return self + + def _current_delegates(self) -> LifoQueue[AbstractTracerDelegate]: + key = self._current_name() + if key in self.context: + return self.context[key] + return LifoQueue() + + def operation_name(self, name: str) -> AbstractTracerDelegate: + delegates = self._current_delegates() + last = None if delegates.empty() else delegates.queue[0] + t = JaegerTracerDelegate(name, self.tracer, last) + delegates.put_nowait(t) + self.context[self._current_name()] = delegates + return t + + def add_property(self, key: str, value: str): + delegates = self._current_delegates() + if not delegates.empty(): + last = delegates.queue[delegates.qsize()-1] + last.add_property(key, value) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + delegates = self._current_delegates() + if not delegates.empty(): + delegates.get_nowait() + self.context[self._current_name()] = delegates + else: + del self.context[self._current_name()] diff --git a/src/observability/tracer.py b/src/observability/tracer.py new file mode 100644 index 0000000..991e59a --- /dev/null +++ b/src/observability/tracer.py @@ -0,0 +1,62 @@ +import abc +from contextvars import Context +from typing import Callable + + +class AbstractTracerDelegate(abc.ABC): + @abc.abstractmethod + def trace(self, name: str, block: Callable, *args, **kwargs): + """ + Calculate and records the execution for the given block. + """ + raise NotImplementedError + + @abc.abstractmethod + def add_property(self, key: str, value: str): + """ + Add a key/value pair to the current span. + """ + raise NotImplementedError + + @abc.abstractmethod + def span_failure(self, ex: Exception, expected: bool): + """ + Notify an error for the current span. + """ + raise NotImplementedError + + @abc.abstractmethod + def __enter__(self): + raise NotImplementedError + + @abc.abstractmethod + def __exit__(self, exc_type, exc_val, exc_tb): + raise NotImplementedError + + +class AbstractTracer(abc.ABC): + @abc.abstractmethod + def for_context(self, context: Context): + raise NotImplementedError + + @abc.abstractmethod + def operation_name(self, name: str) -> AbstractTracerDelegate: + """ + Defines the operation name for the current span. + """ + raise NotImplementedError + + @abc.abstractmethod + def add_property(self, key: str, value: str): + """ + Add a key/value pair to the current span. + """ + raise NotImplementedError + + @abc.abstractmethod + def __enter__(self): + raise NotImplementedError + + @abc.abstractmethod + def __exit__(self, exc_type, exc_val, exc_tb): + raise NotImplementedError