Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ exclude =
scripts,
max-line-length = 120
per-file-ignores =
# ignore unused imports in __init__ files
*/__init__.py: F401
# supress some docstring requirements in tests
tests/unit_tests/*.py: D
tests/unit_tests/**/*.py: D
Expand All @@ -22,6 +20,8 @@ per-file-ignores =
*/deep/api/attributes/__init__.py: NCF102
tests/unit_tests/api/attributes/*.py: NCF102,D
tests/unit_tests/api/resource/*.py: NCF102,D
examples/**/*: DB100
dev/**/*: DB100

detailed-output = True
copyright-regex =
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

- **[CHANGE]**: change(build): add doc string check to flake8 [#14](https://github.com/intergral/deep/pull/14) [@Umaaz](https://github.com/Umaaz)
- **[FEATURE]**: feat(logging): initial implementation of log points [#3](https://github.com/intergral/deep/pull/3) [@Umaaz](https://github.com/Umaaz)
- **[FEATURE]**: feat(plugins): change plugins to allow better customisation [#22](https://github.com/intergral/deep/pull/22) [@Umaaz](https://github.com/Umaaz)
- **[ENHANCEMENT]**: enhancement(trigger): change tracepoint handling to use triggers [#16](https://github.com/intergral/deep/pull/16) [@Umaaz](https://github.com/Umaaz)
- **[BUGFIX]**: feat(api): add api function to register tracepoint directly [#8](https://github.com/intergral/deep/pull/8) [@Umaaz](https://github.com/Umaaz)

Expand Down
20 changes: 14 additions & 6 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
# linting deps
flake8
flake8-docstrings
flake8-debug
flake8-header-validator>=0.0.3

# test deps
parameterized
pytest
pytest-cov
mockito

# doc deps
mkdocs-material
pdoc3
mkdocstrings-python
pdoc3

# build deps
build
twine
flake8-docstrings
certifi>=2023.7.22 # not directly required, pinned by Snyk to avoid a vulnerability
flake8-header-validator>=0.0.3
setuptools>=65.5.1 # not directly required, pinned by Snyk to avoid a vulnerability
pytest-cov
mockito
opentelemetry-api
opentelemetry-sdk
opentelemetry-sdk
2 changes: 1 addition & 1 deletion src/deep/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@

"""Main api for Deep."""

from deep.api.deep import Deep
from deep.api.deep import Deep # noqa: F401
16 changes: 12 additions & 4 deletions src/deep/api/deep.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,24 @@ def __init__(self, config: 'ConfigService'):
self.task_handler = TaskHandler()
self.config.set_task_handler(self.task_handler)
self.poll = LongPoll(self.config, self.grpc)
self.push = PushService(self.config, self.grpc, self.task_handler)
self.push = PushService(self.grpc, self.task_handler)
self.trigger_handler = TriggerHandler(config, self.push)

def start(self):
"""Start Deep."""
if self.started:
return
plugins, attributes = load_plugins()
self.config.plugins = plugins
self.config.resource = Resource.create(attributes.copy())
self.config.plugins = load_plugins(self.config, self.config.PLUGINS)
default_resource = Resource.create()
for provider in self.config.resource_providers:
try:
plugin_resource = provider.resource()
if plugin_resource:
default_resource = default_resource.merge(plugin_resource)
except Exception:
deep.logging.exception("Failed to process plugin resource {}", provider.name)

self.config.resource = default_resource
self.trigger_handler.start()
self.grpc.start()
self.poll.start()
Expand Down
100 changes: 76 additions & 24 deletions src/deep/api/plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
"""Load and handle plugins."""

import abc
import os
from importlib import import_module
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional, TypeVar, List

from deep.api.resource import Resource
from deep.processor.context.action_context import ActionContext

if TYPE_CHECKING:
from typing import Tuple
from deep.config import ConfigService

from deep import logging
from deep.api.attributes import BoundedAttributes
Expand All @@ -31,6 +33,7 @@
'deep.api.plugin.otel.OTelPlugin',
'deep.api.plugin.python.PythonPlugin',
]
"""System provided default plugins."""


def __plugin_generator(configured):
Expand All @@ -45,7 +48,7 @@ def __plugin_generator(configured):
)


def load_plugins(custom=None) -> 'Tuple[list[Plugin], BoundedAttributes]':
def load_plugins(config: 'ConfigService', custom=None) -> List['Plugin']:
"""
Load all the deep plugins.

Expand All @@ -55,21 +58,19 @@ def load_plugins(custom=None) -> 'Tuple[list[Plugin], BoundedAttributes]':
"""
if custom is None:
custom = []
bounded_attributes = BoundedAttributes(immutable=False)
loaded = []
for plugin in __plugin_generator(DEEP_PLUGINS + custom):
try:
plugin_instance = plugin()
plugin_instance = plugin(config=config)
if not plugin_instance.is_active():
logging.debug("Plugin %s is not active.", plugin_instance.name)
continue
attributes = plugin_instance.load_plugin()
if attributes is not None:
bounded_attributes.merge_in(attributes)
loaded.append(plugin_instance)
except Exception as e:
logging.debug("Could not load plugin %s: %s", plugin, e)
return loaded, bounded_attributes

loaded.sort(key=lambda pl: pl.order() or 0)
return loaded


class Plugin(abc.ABC):
Expand All @@ -79,9 +80,15 @@ class Plugin(abc.ABC):
This type defines a plugin for deep, these plugins allow for extensions to how deep decorates data.
"""

def __init__(self, name=None):
"""Create a new."""
def __init__(self, name: str = None, config: 'ConfigService' = None):
"""
Create a new plugin.

:param name: the name of the plugin (default to class name)
:param config: the deep config service
"""
super(Plugin, self).__init__()
self.config = config
if name is None:
self._name = self.__class__.__name__
else:
Expand All @@ -96,29 +103,74 @@ def is_active(self) -> bool:
"""
Is the plugin active.

Check the value of the environment value of the module name + class name. It set to
'false' this plugin is not active.
Check the value of the config element plugin_{name}. If it is set to
'False' this plugin is not active.
"""
attr = getattr(self.config, f'plugin_{self.name}'.upper(), 'True')
if attr is None:
return True
return str2bool(attr)

def order(self) -> int:
"""
getenv = os.getenv("{0}.{1}".format(self.__class__.__module__, self.__class__.__name__), 'True')
return str2bool(getenv)
Order of precedence when multiple versions of providers are available.

Order=1 will run after a provider with order=0.

:return: the provider order
"""
return 0


PLUGIN_TYPE = TypeVar('PLUGIN_TYPE', bound=Plugin)


class ResourceProvider(Plugin, abc.ABC):
"""Implement this to have the plugin provide resource attributes to Deep."""

@abc.abstractmethod
def load_plugin(self) -> BoundedAttributes:
def resource(self) -> Optional[Resource]:
"""
Load the plugin.
Provide resource.

:return: any values to attach to the client resource.
:return: the provided resource
"""
raise NotImplementedError()
pass


class SnapshotDecorator(Plugin, abc.ABC):
"""Implement this to decorate collected snapshots with attributes."""

@abc.abstractmethod
def decorate(self, context: ActionContext) -> Optional[BoundedAttributes]:
"""
Decorate a snapshot with additional data.

:param context: the action context for this action

:return: the additional attributes to attach
"""
pass


class TracepointLogger(Plugin, abc.ABC):
"""
This defines how a tracepoint logger should interact with Deep.

This can be registered with Deep to provide customization to the way Deep will log dynamic log
messages injected via tracepoints.
"""

@abc.abstractmethod
def collect_attributes(self) -> BoundedAttributes:
def log_tracepoint(self, log_msg: str, tp_id: str, ctx_id: str):
"""
Collect attributes to attach to snapshot.
Log the dynamic log message.

:return: the attributes to attach.
:param (str) log_msg: the log message to log
:param (str) tp_id: the id of the tracepoint that generated this log
:param (str) ctx_id: the id of the context that was created by this tracepoint
"""
raise NotImplementedError()
pass


class DidNotEnable(Exception):
Expand Down
22 changes: 13 additions & 9 deletions src/deep/api/plugin/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
from typing import Optional

from deep.api.attributes import BoundedAttributes
from deep.api.plugin import Plugin, DidNotEnable
from deep.api.plugin import DidNotEnable, SnapshotDecorator, ResourceProvider
from deep.api.resource import Resource
from deep.processor.context.action_context import ActionContext

try:
from opentelemetry import trace
Expand All @@ -28,32 +30,34 @@
raise DidNotEnable("opentelemetry is not installed", e)


class OTelPlugin(Plugin):
class OTelPlugin(ResourceProvider, SnapshotDecorator):
"""
Deep Otel plugin.

Provide span and trace information to the snapshot.
"""

def load_plugin(self) -> Optional[BoundedAttributes]:
def resource(self) -> Optional[Resource]:
"""
Load the plugin.
Provide resource.

:return: any values to attach to the client resource.
:return: the provided resource
"""
provider = trace.get_tracer_provider()
if isinstance(provider, TracerProvider):
# noinspection PyUnresolvedReferences
resource = provider.resource
attributes = dict(resource.attributes)
return BoundedAttributes(attributes=attributes)
return Resource.create(attributes=attributes)
return None

def collect_attributes(self) -> Optional[BoundedAttributes]:
def decorate(self, context: ActionContext) -> Optional[BoundedAttributes]:
"""
Collect attributes to attach to snapshot.
Decorate a snapshot with additional data.

:return: the attributes to attach.
:param context: the action context for this action

:return: the additional attributes to attach
"""
span = OTelPlugin.__get_span()
if span is not None:
Expand Down
42 changes: 29 additions & 13 deletions src/deep/api/plugin/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,52 @@

import platform
import threading
from typing import Optional

from deep import logging
from deep.api.attributes import BoundedAttributes
from deep.api.plugin import Plugin
from deep.api.plugin import ResourceProvider, TracepointLogger, SnapshotDecorator
from deep.api.resource import Resource
from deep.processor.context.action_context import ActionContext


class PythonPlugin(Plugin):
class PythonPlugin(ResourceProvider, SnapshotDecorator, TracepointLogger):
"""
Deep python plugin.

This plugin provides the python version to the resource, and the thread name to the attributes.
"""

def load_plugin(self):
def decorate(self, context: ActionContext) -> Optional[BoundedAttributes]:
"""
Load the plugin.
Decorate a snapshot with additional data.

:return: any values to attach to the client resource.
:param context: the action context for this action

:return: the additional attributes to attach
"""
thread = threading.current_thread()

return BoundedAttributes(attributes={
"python_version": platform.python_version(),
'thread_name': thread.name
})

def collect_attributes(self):
def resource(self) -> Optional[Resource]:
"""
Collect attributes to attach to snapshot.
Provide resource.

:return: the attributes to attach.
:return: the provided resource
"""
thread = threading.current_thread()

return BoundedAttributes(attributes={
'thread_name': thread.name
return Resource.create({
"python_version": platform.python_version(),
})

def log_tracepoint(self, log_msg: str, tp_id: str, ctx_id: str):
"""
Log the dynamic log message.

:param (str) log_msg: the log message to log
:param (str) tp_id: the id of the tracepoint that generated this log
:param (str) ctx_id: the id of the context that was created by this tracepoint
"""
logging.info(log_msg + " ctx=%s tracepoint=%s" % (ctx_id, tp_id))
Loading