Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
- **[CHANGE]**: change(build): add doc string check to flake8 [#14](https://github.com/intergral/deep/pull/14) [@Umaaz](https://github.com/Umaaz)
- **[FEATURE]**: feat(logging): initial implementation of log points [#3](https://github.com/intergral/deep/pull/3) [@Umaaz](https://github.com/Umaaz)
- **[FEATURE]**: feat(plugins): change plugins to allow better customisation [#22](https://github.com/intergral/deep/pull/22) [@Umaaz](https://github.com/Umaaz)
- **[FEATURE]**: feat(metrics): initial implementation of metric points [#21](https://github.com/intergral/deep/pull/21) [@Umaaz](https://github.com/Umaaz)
- **[ENHANCEMENT]**: enhancement(trigger): change tracepoint handling to use triggers [#16](https://github.com/intergral/deep/pull/16) [@Umaaz](https://github.com/Umaaz)
- **[BUGFIX]**: feat(api): add api function to register tracepoint directly [#8](https://github.com/intergral/deep/pull/8) [@Umaaz](https://github.com/Umaaz)

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ it-test:

.PHONY: coverage
coverage:
pytest tests/unit_tests --cov=deep --cov-report term --cov-fail-under=82 --cov-report html --cov-branch
pytest tests/unit_tests --cov=deep --cov-report term --cov-fail-under=83 --cov-report html --cov-branch

.PHONY: lint
lint:
Expand Down
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ certifi>=2023.7.22 # not directly required, pinned by Snyk to avoid a vulnerabil
setuptools>=65.5.1 # not directly required, pinned by Snyk to avoid a vulnerability
opentelemetry-api
opentelemetry-sdk
prometheus-client
9 changes: 7 additions & 2 deletions examples/simple-app-metrics/src/simple-app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from prometheus_client import Summary, start_http_server

import deep
from deep.api.tracepoint.constants import FIRE_COUNT
from deep.api.tracepoint.tracepoint_config import MetricDefinition, LabelExpression
from simple_test import SimpleTest


Expand Down Expand Up @@ -80,12 +82,15 @@ def process_request(t):

if __name__ == '__main__':
start_http_server(8000)
d = deep.start({
_deep = deep.start({
'SERVICE_URL': 'localhost:43315',
'SERVICE_SECURE': 'False',
})

d.register_tracepoint("simple_test.py", 31)
_deep.register_tracepoint("simple_test.py", 43, {FIRE_COUNT: '-1'}, [], [
MetricDefinition(name="simple_test", metric_type="histogram", expression="len(info)",
labels=[LabelExpression("test_name", expression="self.test_name")])
])

print("app running")
main()
6 changes: 5 additions & 1 deletion src/deep/api/deep.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import deep.logging
from deep.api.plugin import load_plugins
from deep.api.resource import Resource
from deep.api.tracepoint.tracepoint_config import MetricDefinition
from deep.config import ConfigService
from deep.config.tracepoint_config import TracepointConfigService
from deep.grpc import GRPCService
Expand Down Expand Up @@ -78,11 +79,14 @@ def shutdown(self):
self.trigger_handler.shutdown()
self.task_handler.flush()
self.poll.shutdown()
for plugin in self.config.plugins:
plugin.shutdown()
deep.logging.info("Deep is shutdown.")
self.started = False

def register_tracepoint(self, path: str, line: int, args: Dict[str, str] = None,
watches: List[str] = None, metrics=None) -> 'TracepointRegistration':
watches: List[str] = None,
metrics: List[MetricDefinition] = None) -> 'TracepointRegistration':
"""
Register a new tracepoint.

Expand Down
5 changes: 5 additions & 0 deletions src/deep/api/plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
DEEP_PLUGINS = [
'deep.api.plugin.otel.OTelPlugin',
'deep.api.plugin.python.PythonPlugin',
'deep.api.plugin.metric.prometheus_metrics.PrometheusPlugin',
]
"""System provided default plugins."""

Expand Down Expand Up @@ -111,6 +112,10 @@ def is_active(self) -> bool:
return True
return str2bool(attr)

def shutdown(self):
"""Clean up and shutdown the plugin."""
pass

def order(self) -> int:
"""
Order of precedence when multiple versions of providers are available.
Expand Down
86 changes: 86 additions & 0 deletions src/deep/api/plugin/metric/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright (C) 2024 Intergral GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Definition of metric processor.

Metric processor gives the ability to attach dynamic metrics to metric providers.
"""

import abc
from typing import Dict


class MetricProcessor(abc.ABC):
"""Metric processor connects Deep to a metric provider."""

@abc.abstractmethod
def counter(self, name: str, labels: Dict[str, str], namespace: str, help_string: str, unit: str, value: float):
"""
Create a counter type value in the provider.

:param name: the metric name
:param labels: the metric labels
:param namespace: the metric namespace
:param help_string: the metric help string
:param unit: the metric unit
:param value: the metric value
"""
pass

@abc.abstractmethod
def gauge(self, name: str, labels: Dict[str, str], namespace: str, help_string: str, unit: str, value: float):
"""
Create a gauge type value in the provider.

:param name: the metric name
:param labels: the metric labels
:param namespace: the metric namespace
:param help_string: the metric help string
:param unit: the metric unit
:param value: the metric value
"""
pass

@abc.abstractmethod
def histogram(self, name: str, labels: Dict[str, str], namespace: str, help_string: str, unit: str, value: float):
"""
Create a histogram type value in the provider.

:param name: the metric name
:param labels: the metric labels
:param namespace: the metric namespace
:param help_string: the metric help string
:param unit: the metric unit
:param value: the metric value
"""
pass

@abc.abstractmethod
def summary(self, name: str, labels: Dict[str, str], namespace: str, help_string: str, unit: str, value: float):
"""
Create a summary type value in the provider.

:param name: the metric name
:param labels: the metric labels
:param namespace: the metric namespace
:param help_string: the metric help string
:param unit: the metric unit
:param value: the metric value
"""
pass

def clear(self):
"""Remove any registrations."""
pass
157 changes: 157 additions & 0 deletions src/deep/api/plugin/metric/prometheus_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Copyright (C) 2024 Intergral GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Add support for prometheus metrics."""

import threading
from typing import Dict

import deep.logging
from deep.api.plugin import DidNotEnable, Plugin
from deep.api.plugin.metric import MetricProcessor

try:
from prometheus_client import Summary, Counter, REGISTRY, Histogram, Gauge
except ImportError as e:
raise DidNotEnable("opentelemetry is not installed", e)


class PrometheusPlugin(Plugin, MetricProcessor):
"""Connect Deep to prometheus."""

def __init__(self, config):
"""Create new plugin."""
super().__init__("PrometheusPlugin", config)
self.__cache = {}
self.__lock = threading.Lock()

def __check_cache(self, name, type_name, from_default):
cache_key = f'{name}_{type_name}'
if cache_key in self.__cache:
return self.__cache[cache_key]
default = from_default()
self.__cache[cache_key] = default
return default

def counter(self, name: str, labels: Dict[str, str], namespace: str, help_string: str, unit: str, value: float):
"""
Create a counter type value in the provider.

:param name: the metric name
:param labels: the metric labels
:param namespace: the metric namespace
:param help_string: the metric help string
:param unit: the metric unit
:param value: the metric value
"""
try:
with self.__lock:
label_keys = list(labels.keys())
counter: Counter = self.__check_cache(name, "counter",
lambda: Counter(name=name, documentation=help_string or "",
labelnames=label_keys,
namespace=namespace, unit=unit))
if len(labels) > 0:
counter = counter.labels(**labels)
counter.inc(value)
except Exception:
deep.logging.exception(f"Error registering metric counter {namespace}_{name}")
pass

def gauge(self, name: str, labels: Dict[str, str], namespace: str, help_string: str, unit: str, value: float):
"""
Create a gauge type value in the provider.

:param name: the metric name
:param labels: the metric labels
:param namespace: the metric namespace
:param help_string: the metric help string
:param unit: the metric unit
:param value: the metric value
"""
try:
with self.__lock:
label_keys = list(labels.keys())
gauge: Gauge = self.__check_cache(name, "gauge",
lambda: Gauge(name=name, documentation=help_string or "",
labelnames=label_keys,
namespace=namespace, unit=unit))
if len(labels) > 0:
gauge = gauge.labels(**labels)
gauge.inc(value)
except Exception:
deep.logging.exception(f"Error registering metric gauge {namespace}_{name}")
pass

def histogram(self, name: str, labels: Dict[str, str], namespace: str, help_string: str, unit: str, value: float):
"""
Create a histogram type value in the provider.

:param name: the metric name
:param labels: the metric labels
:param namespace: the metric namespace
:param help_string: the metric help string
:param unit: the metric unit
:param value: the metric value
"""
try:
with self.__lock:
label_keys = list(labels.keys())
histogram: Histogram = self.__check_cache(name, "histogram",
lambda: Histogram(name=name, documentation=help_string or "",
labelnames=label_keys,
namespace=namespace, unit=unit))
if len(labels) > 0:
histogram = histogram.labels(**labels)
histogram.observe(value)
except Exception:
deep.logging.exception(f"Error registering metric histogram {namespace}_{name}")
pass

def summary(self, name: str, labels: Dict[str, str], namespace: str, help_string: str, unit: str, value: float):
"""
Create a summary type value in the provider.

:param name: the metric name
:param labels: the metric labels
:param namespace: the metric namespace
:param help_string: the metric help string
:param unit: the metric unit
:param value: the metric value
"""
try:
with self.__lock:
label_keys = list(labels.keys())
summary: Summary = self.__check_cache(name, "summary",
lambda: Summary(name=name, documentation=help_string or "",
labelnames=label_keys,
namespace=namespace, unit=unit))
if len(labels) > 0:
summary = summary.labels(**labels)
summary.observe(value)
except Exception:
deep.logging.exception(f"Error registering metric summary {namespace}_{name}")
pass

def shutdown(self):
"""Clean up and shutdown the plugin."""
self.clear()

def clear(self):
"""Remove any registrations."""
with self.__lock:
for metric in self.__cache.values():
REGISTRY.unregister(metric)
self.__cache = {}
Loading