From df19be9fac13bebfffcdb4fd61932a7e7f99f9f3 Mon Sep 17 00:00:00 2001 From: Ben Donnelly Date: Thu, 18 Jan 2024 22:26:28 +0000 Subject: [PATCH 1/3] feat(plugins): change plugins to allow better customisable extensions. --- .flake8 | 10 +- dev-requirements.txt | 3 +- src/deep/api/deep.py | 16 ++- src/deep/api/plugin/__init__.py | 99 ++++++++++++++----- src/deep/api/plugin/otel.py | 22 +++-- src/deep/api/plugin/python.py | 42 +++++--- src/deep/config/__init__.py | 7 +- src/deep/config/config_service.py | 47 +++++---- src/deep/logging/tracepoint_logger.py | 54 ---------- src/deep/processor/context/action_context.py | 21 ++-- src/deep/processor/context/action_results.py | 13 +-- src/deep/processor/context/log_action.py | 20 ++-- src/deep/processor/context/snapshot_action.py | 69 +++++++------ src/deep/processor/context/trigger_context.py | 17 +++- src/deep/push/push_service.py | 16 +-- tests/unit_tests/api/plugin/test_otel.py | 6 +- tests/unit_tests/api/plugin/test_plugin.py | 14 ++- tests/unit_tests/api/plugin/test_python.py | 6 +- .../processor/test_trigger_handler.py | 14 +-- tests/unit_tests/push/test_push_service.py | 62 +----------- 20 files changed, 267 insertions(+), 291 deletions(-) delete mode 100644 src/deep/logging/tracepoint_logger.py diff --git a/.flake8 b/.flake8 index 74043ff..e2d7a76 100644 --- a/.flake8 +++ b/.flake8 @@ -12,16 +12,18 @@ per-file-ignores = # ignore unused imports in __init__ files */__init__.py: F401 # supress some docstring requirements in tests - tests/unit_tests/*.py: D - tests/unit_tests/**/*.py: D - tests/it_tests/*.py: D - tests/it_tests/**/*.py: D + tests/unit_tests/*.py: D,NP100 + tests/unit_tests/**/*.py: D,NP100 + tests/it_tests/*.py: D,NP100 + tests/it_tests/**/*.py: D,NP100 # these files are from OTEL so should use OTEL license. */deep/api/types.py: NCF102 */deep/api/resource/__init__.py: NCF102 */deep/api/attributes/__init__.py: NCF102 tests/unit_tests/api/attributes/*.py: NCF102,D tests/unit_tests/api/resource/*.py: NCF102,D + examples/**/*: NP100 + dev/**/*: NP100 detailed-output = True copyright-regex = diff --git a/dev-requirements.txt b/dev-requirements.txt index e93352a..0bb281b 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -13,4 +13,5 @@ setuptools>=65.5.1 # not directly required, pinned by Snyk to avoid a vulnerabil pytest-cov mockito opentelemetry-api -opentelemetry-sdk \ No newline at end of file +opentelemetry-sdk +flake8-no-print diff --git a/src/deep/api/deep.py b/src/deep/api/deep.py index 20f0994..35ce3dc 100644 --- a/src/deep/api/deep.py +++ b/src/deep/api/deep.py @@ -48,16 +48,24 @@ def __init__(self, config: 'ConfigService'): self.task_handler = TaskHandler() self.config.set_task_handler(self.task_handler) self.poll = LongPoll(self.config, self.grpc) - self.push = PushService(self.config, self.grpc, self.task_handler) + self.push = PushService(self.grpc, self.task_handler) self.trigger_handler = TriggerHandler(config, self.push) def start(self): """Start Deep.""" if self.started: return - plugins, attributes = load_plugins() - self.config.plugins = plugins - self.config.resource = Resource.create(attributes.copy()) + self.config.plugins = load_plugins(self.config, self.config.PLUGINS) + default_resource = Resource.create() + for provider in self.config.resource_providers: + try: + plugin_resource = provider.resource() + if plugin_resource: + default_resource = default_resource.merge(plugin_resource) + except Exception: + deep.logging.exception("Failed to process plugin resource {}", provider.name) + + self.config.resource = default_resource self.trigger_handler.start() self.grpc.start() self.poll.start() diff --git a/src/deep/api/plugin/__init__.py b/src/deep/api/plugin/__init__.py index 666f0e7..7a5b40e 100644 --- a/src/deep/api/plugin/__init__.py +++ b/src/deep/api/plugin/__init__.py @@ -18,10 +18,13 @@ import abc import os from importlib import import_module -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional, TypeVar, List + +from deep.api.resource import Resource +from deep.processor.context.action_context import ActionContext if TYPE_CHECKING: - from typing import Tuple + from deep.config import ConfigService from deep import logging from deep.api.attributes import BoundedAttributes @@ -31,6 +34,7 @@ 'deep.api.plugin.otel.OTelPlugin', 'deep.api.plugin.python.PythonPlugin', ] +"""System provided default plugins.""" def __plugin_generator(configured): @@ -45,7 +49,7 @@ def __plugin_generator(configured): ) -def load_plugins(custom=None) -> 'Tuple[list[Plugin], BoundedAttributes]': +def load_plugins(config: 'ConfigService', custom=None) -> List['Plugin']: """ Load all the deep plugins. @@ -55,21 +59,19 @@ def load_plugins(custom=None) -> 'Tuple[list[Plugin], BoundedAttributes]': """ if custom is None: custom = [] - bounded_attributes = BoundedAttributes(immutable=False) loaded = [] for plugin in __plugin_generator(DEEP_PLUGINS + custom): try: - plugin_instance = plugin() + plugin_instance = plugin(config=config) if not plugin_instance.is_active(): logging.debug("Plugin %s is not active.", plugin_instance.name) continue - attributes = plugin_instance.load_plugin() - if attributes is not None: - bounded_attributes.merge_in(attributes) loaded.append(plugin_instance) except Exception as e: logging.debug("Could not load plugin %s: %s", plugin, e) - return loaded, bounded_attributes + + loaded.sort(key=lambda pl: pl.order() or 0) + return loaded class Plugin(abc.ABC): @@ -79,9 +81,15 @@ class Plugin(abc.ABC): This type defines a plugin for deep, these plugins allow for extensions to how deep decorates data. """ - def __init__(self, name=None): - """Create a new.""" + def __init__(self, name: str = None, config: 'ConfigService' = None): + """ + Create a new plugin. + + :param name: the name of the plugin (default to class name) + :param config: the deep config service + """ super(Plugin, self).__init__() + self.config = config if name is None: self._name = self.__class__.__name__ else: @@ -96,29 +104,74 @@ def is_active(self) -> bool: """ Is the plugin active. - Check the value of the environment value of the module name + class name. It set to - 'false' this plugin is not active. + Check the value of the config element plugin_{name}. If it is set to + 'False' this plugin is not active. + """ + attr = getattr(self.config, f'plugin_{self.name}'.upper(), 'True') + if attr is None: + return True + return str2bool(attr) + + def order(self) -> int: """ - getenv = os.getenv("{0}.{1}".format(self.__class__.__module__, self.__class__.__name__), 'True') - return str2bool(getenv) + Order of precedence when multiple versions of providers are available. + + Order=1 will run after a provider with order=0. + + :return: the provider order + """ + return 0 + + +PLUGIN_TYPE = TypeVar('PLUGIN_TYPE', bound=Plugin) + + +class ResourceProvider(Plugin, abc.ABC): + """Implement this to have the plugin provide resource attributes to Deep.""" @abc.abstractmethod - def load_plugin(self) -> BoundedAttributes: + def resource(self) -> Optional[Resource]: """ - Load the plugin. + Provide resource. - :return: any values to attach to the client resource. + :return: the provided resource """ - raise NotImplementedError() + pass + + +class SnapshotDecorator(Plugin, abc.ABC): + """Implement this to decorate collected snapshots with attributes.""" + + @abc.abstractmethod + def decorate(self, context: ActionContext) -> Optional[BoundedAttributes]: + """ + Decorate a snapshot with additional data. + + :param context: the action context for this action + + :return: the additional attributes to attach + """ + pass + + +class TracepointLogger(Plugin, abc.ABC): + """ + This defines how a tracepoint logger should interact with Deep. + + This can be registered with Deep to provide customization to the way Deep will log dynamic log + messages injected via tracepoints. + """ @abc.abstractmethod - def collect_attributes(self) -> BoundedAttributes: + def log_tracepoint(self, log_msg: str, tp_id: str, ctx_id: str): """ - Collect attributes to attach to snapshot. + Log the dynamic log message. - :return: the attributes to attach. + :param (str) log_msg: the log message to log + :param (str) tp_id: the id of the tracepoint that generated this log + :param (str) ctx_id: the id of the context that was created by this tracepoint """ - raise NotImplementedError() + pass class DidNotEnable(Exception): diff --git a/src/deep/api/plugin/otel.py b/src/deep/api/plugin/otel.py index 43264af..b0ba8a0 100644 --- a/src/deep/api/plugin/otel.py +++ b/src/deep/api/plugin/otel.py @@ -18,7 +18,9 @@ from typing import Optional from deep.api.attributes import BoundedAttributes -from deep.api.plugin import Plugin, DidNotEnable +from deep.api.plugin import DidNotEnable, SnapshotDecorator, ResourceProvider +from deep.api.resource import Resource +from deep.processor.context.action_context import ActionContext try: from opentelemetry import trace @@ -28,32 +30,34 @@ raise DidNotEnable("opentelemetry is not installed", e) -class OTelPlugin(Plugin): +class OTelPlugin(ResourceProvider, SnapshotDecorator): """ Deep Otel plugin. Provide span and trace information to the snapshot. """ - def load_plugin(self) -> Optional[BoundedAttributes]: + def resource(self) -> Optional[Resource]: """ - Load the plugin. + Provide resource. - :return: any values to attach to the client resource. + :return: the provided resource """ provider = trace.get_tracer_provider() if isinstance(provider, TracerProvider): # noinspection PyUnresolvedReferences resource = provider.resource attributes = dict(resource.attributes) - return BoundedAttributes(attributes=attributes) + return Resource.create(attributes=attributes) return None - def collect_attributes(self) -> Optional[BoundedAttributes]: + def decorate(self, context: ActionContext) -> Optional[BoundedAttributes]: """ - Collect attributes to attach to snapshot. + Decorate a snapshot with additional data. - :return: the attributes to attach. + :param context: the action context for this action + + :return: the additional attributes to attach """ span = OTelPlugin.__get_span() if span is not None: diff --git a/src/deep/api/plugin/python.py b/src/deep/api/plugin/python.py index 2a08f31..22d7e38 100644 --- a/src/deep/api/plugin/python.py +++ b/src/deep/api/plugin/python.py @@ -19,36 +19,52 @@ import platform import threading +from typing import Optional +from deep import logging from deep.api.attributes import BoundedAttributes -from deep.api.plugin import Plugin +from deep.api.plugin import ResourceProvider, TracepointLogger, SnapshotDecorator +from deep.api.resource import Resource +from deep.processor.context.action_context import ActionContext -class PythonPlugin(Plugin): +class PythonPlugin(ResourceProvider, SnapshotDecorator, TracepointLogger): """ Deep python plugin. This plugin provides the python version to the resource, and the thread name to the attributes. """ - def load_plugin(self): + def decorate(self, context: ActionContext) -> Optional[BoundedAttributes]: """ - Load the plugin. + Decorate a snapshot with additional data. - :return: any values to attach to the client resource. + :param context: the action context for this action + + :return: the additional attributes to attach """ + thread = threading.current_thread() + return BoundedAttributes(attributes={ - "python_version": platform.python_version(), + 'thread_name': thread.name }) - def collect_attributes(self): + def resource(self) -> Optional[Resource]: """ - Collect attributes to attach to snapshot. + Provide resource. - :return: the attributes to attach. + :return: the provided resource """ - thread = threading.current_thread() - - return BoundedAttributes(attributes={ - 'thread_name': thread.name + return Resource.create({ + "python_version": platform.python_version(), }) + + def log_tracepoint(self, log_msg: str, tp_id: str, ctx_id: str): + """ + Log the dynamic log message. + + :param (str) log_msg: the log message to log + :param (str) tp_id: the id of the tracepoint that generated this log + :param (str) ctx_id: the id of the context that was created by this tracepoint + """ + logging.info(log_msg + " ctx=%s tracepoint=%s" % (ctx_id, tp_id)) diff --git a/src/deep/config/__init__.py b/src/deep/config/__init__.py index d9e0701..4db09f2 100644 --- a/src/deep/config/__init__.py +++ b/src/deep/config/__init__.py @@ -43,6 +43,9 @@ APP_ROOT = "" """App root sets the prefix that can be removed to generate shorter file names. This value is calculated.""" +PLUGINS = [] +"""User definable plugins.""" + # noinspection PyPep8Naming def IN_APP_INCLUDE(): @@ -78,7 +81,3 @@ def IN_APP_EXCLUDE(): user_defined.append(prefix) return user_defined - -# Config items can be functions -# def SERVICE_URL(): -# return os.getenv('SERVICE_URL', 'localhost:50051') diff --git a/src/deep/config/config_service.py b/src/deep/config/config_service.py index 21f3332..6e6e0fc 100644 --- a/src/deep/config/config_service.py +++ b/src/deep/config/config_service.py @@ -16,13 +16,12 @@ """Service for handling deep config.""" import os -from typing import Any, List, Dict, Tuple, Optional +from typing import Any, List, Dict, Tuple, Optional, Generator from deep import logging -from deep.api.plugin import Plugin +from deep.api.plugin import Plugin, ResourceProvider, PLUGIN_TYPE, SnapshotDecorator, TracepointLogger from deep.api.resource import Resource from deep.config.tracepoint_config import TracepointConfigService, ConfigUpdateListener -from deep.logging.tracepoint_logger import DefaultLogger, TracepointLogger class ConfigService: @@ -40,7 +39,6 @@ def __init__(self, custom: Dict[str, any] = None, tracepoints=TracepointConfigSe self.__custom = custom self._resource = None self._tracepoint_config = tracepoints - self._tracepoint_logger: 'TracepointLogger' = DefaultLogger() def __getattribute__(self, name: str) -> Any: """ @@ -129,26 +127,19 @@ def add_listener(self, listener: 'ConfigUpdateListener'): self._tracepoint_config.add_listener(listener) @property - def tracepoint_logger(self) -> 'TracepointLogger': + def tracepoint_logger(self) -> TracepointLogger: """Get the tracepoint logger.""" - return self._tracepoint_logger + return self._find_plugin(TracepointLogger) - @tracepoint_logger.setter - def tracepoint_logger(self, logger: 'TracepointLogger'): - """Set the tracepoint logger.""" - self._tracepoint_logger = logger - - def log_tracepoint(self, log_msg: str, tp_id: str, snap_id: str): - """ - Log the dynamic log message. - - Pass the processed log to the tracepoint logger. + @property + def resource_providers(self) -> Generator[ResourceProvider, None, None]: + """Generator for available resource providers.""" + return self.__plugin_generator(ResourceProvider) - :param (str) log_msg: the log message to log - :param (str) tp_id: the id of the tracepoint that generated this log - :param (str) snap_id: the is of the snapshot that was created by this tracepoint - """ - self._tracepoint_logger.log_tracepoint(log_msg, tp_id, snap_id) + @property + def snapshot_decorators(self) -> Generator[SnapshotDecorator, None, None]: + """Generator for snapshot decorators.""" + return self.__plugin_generator(SnapshotDecorator) def is_app_frame(self, filename: str) -> Tuple[bool, Optional[str]]: """ @@ -172,3 +163,17 @@ def is_app_frame(self, filename: str) -> Tuple[bool, Optional[str]]: return True, self.APP_ROOT return False, None + + def _find_plugin(self, plugin_type) -> PLUGIN_TYPE: + return next(self.__plugin_generator(plugin_type)) + + def _find_plugins(self, plugin_type) -> List[PLUGIN_TYPE]: + plugins = [] + for plugin in self.__plugin_generator(plugin_type): + plugins.append(plugin) + return plugins + + def __plugin_generator(self, plugin_type) -> Generator[PLUGIN_TYPE, None, None]: + for plugin in self._plugins: + if isinstance(plugin, plugin_type): + yield plugin diff --git a/src/deep/logging/tracepoint_logger.py b/src/deep/logging/tracepoint_logger.py deleted file mode 100644 index ac45a13..0000000 --- a/src/deep/logging/tracepoint_logger.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright (C) 2023 Intergral GmbH -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -"""Service for customizing the tracepoint logging.""" - -import abc - -from deep import logging - - -class TracepointLogger(abc.ABC): - """ - This defines how a tracepoint logger should interact with Deep. - - This can be registered with Deep to provide customization to the way Deep will log dynamic log - messages injected via tracepoints. - """ - - @abc.abstractmethod - def log_tracepoint(self, log_msg: str, tp_id: str, ctx_id: str): - """ - Log the dynamic log message. - - :param (str) log_msg: the log message to log - :param (str) tp_id: the id of the tracepoint that generated this log - :param (str) ctx_id: the id of the context that was created by this tracepoint - """ - pass - - -class DefaultLogger(TracepointLogger): - """The default tracepoint logger used by Deep.""" - - def log_tracepoint(self, log_msg: str, tp_id: str, ctx_id: str): - """ - Log the dynamic log message. - - :param (str) log_msg: the log message to log - :param (str) tp_id: the id of the tracepoint that generated this log - :param (str) ctx_id: the id of the context that was created by this tracepoint - """ - logging.info(log_msg + " ctx=%s tracepoint=%s" % (ctx_id, tp_id)) diff --git a/src/deep/processor/context/action_context.py b/src/deep/processor/context/action_context.py index d928318..33b4f03 100644 --- a/src/deep/processor/context/action_context.py +++ b/src/deep/processor/context/action_context.py @@ -31,6 +31,7 @@ import abc from typing import Tuple, TYPE_CHECKING, Dict +import deep.logging from deep.logging import logging from deep.api.tracepoint import WatchResult, Variable from deep.processor.variable_set_processor import VariableSetProcessor @@ -51,8 +52,8 @@ def __init__(self, parent: 'TriggerContext', action: 'LocationAction'): :param parent: the parent trigger :param action: the action config """ - self._parent: 'TriggerContext' = parent - self._action: 'LocationAction' = action + self.tigger_context: 'TriggerContext' = parent + self.location_action: 'LocationAction' = action self._triggered = False def __enter__(self): @@ -62,7 +63,7 @@ def __enter__(self): def __exit__(self, exception_type, exception_value, exception_traceback): """Exit and close the context.""" if self.has_triggered(): - self._action.record_triggered(self._parent.ts) + self.location_action.record_triggered(self.tigger_context.ts) def eval_watch(self, watch: str) -> Tuple[WatchResult, Dict[str, Variable], str]: """ @@ -71,10 +72,10 @@ def eval_watch(self, watch: str) -> Tuple[WatchResult, Dict[str, Variable], str] :param watch: The watch expression to evaluate. :return: Tuple with WatchResult, collected variables, and the log string for the expression """ - var_processor = VariableSetProcessor({}, self._parent.var_cache) + var_processor = VariableSetProcessor({}, self.tigger_context.var_cache) try: - result = self._parent.evaluate_expression(watch) + result = self.tigger_context.evaluate_expression(watch) variable_id, log_str = var_processor.process_variable(watch, result) return WatchResult(watch, variable_id), var_processor.var_lookup, log_str @@ -108,11 +109,11 @@ def can_trigger(self) -> bool: Combine checks for rate limits, windows and condition. :return: True, if the trigger can be triggered. """ - if not self._action.can_trigger(self._parent.ts): + if not self.location_action.can_trigger(self.tigger_context.ts): return False - if self._action.condition is None: + if self.location_action.condition is None: return True - result = self._parent.evaluate_expression(self._action.condition) + result = self.tigger_context.evaluate_expression(self.location_action.condition) return str2bool(str(result)) @@ -120,7 +121,6 @@ class MetricActionContext(ActionContext): """Action for metrics.""" def _process_action(self): - print("metric action") pass @@ -128,7 +128,6 @@ class SpanActionContext(ActionContext): """Action for spans.""" def _process_action(self): - print("span action") pass @@ -136,4 +135,4 @@ class NoActionContext(ActionContext): """Default context if no action can be determined.""" def _process_action(self): - print("Unsupported action type: %s" % self._action) + deep.logging.error("Unsupported action type: %s", self.location_action) diff --git a/src/deep/processor/context/action_results.py b/src/deep/processor/context/action_results.py index 1adb963..5a0b6a0 100644 --- a/src/deep/processor/context/action_results.py +++ b/src/deep/processor/context/action_results.py @@ -42,10 +42,10 @@ """Handler results of actions.""" import abc -from typing import Optional +from typing import Optional, TYPE_CHECKING -from deep.logging.tracepoint_logger import TracepointLogger -from deep.push import PushService +if TYPE_CHECKING: + from deep.processor.context.trigger_context import TriggerContext class ActionCallback: @@ -70,15 +70,12 @@ class ActionResult(abc.ABC): """ @abc.abstractmethod - def process(self, ctx_id: str, logger: TracepointLogger, service: PushService) -> Optional[ActionCallback]: + def process(self, ctx: 'TriggerContext') -> Optional[ActionCallback]: """ Process this result. - Either log or ship the collected data to an endpoint. + :param ctx: the triggering context - :param ctx_id: the triggering context id - :param logger: the log service - :param service:the push service :return: an action callback if we need to do something at the 'end', or None """ pass diff --git a/src/deep/processor/context/log_action.py b/src/deep/processor/context/log_action.py index ea9e35e..847b1de 100644 --- a/src/deep/processor/context/log_action.py +++ b/src/deep/processor/context/log_action.py @@ -34,22 +34,21 @@ from .action_results import ActionResult, ActionCallback from ...api.tracepoint.constants import LOG_MSG from ...api.tracepoint.trigger import LocationAction -from ...logging.tracepoint_logger import TracepointLogger -from ...push import PushService from typing import Tuple if TYPE_CHECKING: from ...api.tracepoint import WatchResult, Variable + from .trigger_context import TriggerContext class LogActionContext(ActionContext): """The context for processing a log action.""" def _process_action(self): - log_msg = self._action.config.get(LOG_MSG) + log_msg = self.location_action.config.get(LOG_MSG) log, watches, vars_ = self.process_log(log_msg) - self._parent.attach_result(LogActionResult(self._action, log)) + self.tigger_context.attach_result(LogActionResult(self.location_action, log)) def process_log(self, log_msg) -> Tuple[str, List['WatchResult'], Dict[str, 'Variable']]: """ @@ -91,7 +90,7 @@ def get_field(self, field_name, args, kwargs): return log_str, field_name - log_msg = "[deep] %s" % FormatExtractor().vformat(log_msg, (), FormatDict(self._parent.locals)) + log_msg = "[deep] %s" % FormatExtractor().vformat(log_msg, (), FormatDict(self.tigger_context.locals)) return log_msg, watch_results, _var_lookup @@ -108,16 +107,15 @@ def __init__(self, action: 'LocationAction', log: str): self.action = action self.log = log - def process(self, ctx_id: str, logger: TracepointLogger, service: PushService) -> Optional[ActionCallback]: + def process(self, ctx: 'TriggerContext') -> Optional[ActionCallback]: """ Process this result. - Either log or ship the collected data to an endpoint. + :param ctx: the triggering context - :param ctx_id: the triggering context id - :param logger: the log service - :param service:the push service :return: an action callback if we need to do something at the 'end', or None """ - logger.log_tracepoint(self.log, ctx_id, self.action.id) + tracepoint_logger = ctx.config.tracepoint_logger + if tracepoint_logger: + tracepoint_logger.log_tracepoint(self.log, ctx.id, self.action.id) return None diff --git a/src/deep/processor/context/snapshot_action.py b/src/deep/processor/context/snapshot_action.py index 55bb122..faf9b1f 100644 --- a/src/deep/processor/context/snapshot_action.py +++ b/src/deep/processor/context/snapshot_action.py @@ -28,19 +28,21 @@ """Handling for snapshot actions.""" -from typing import Tuple, Optional +from typing import Tuple, Optional, TYPE_CHECKING +import deep.logging from deep.api.attributes import BoundedAttributes from deep.api.tracepoint import EventSnapshot from deep.api.tracepoint.constants import FRAME_TYPE, SINGLE_FRAME_TYPE, NO_FRAME_TYPE, ALL_FRAME_TYPE from deep.api.tracepoint.trigger import LocationAction -from deep.logging.tracepoint_logger import TracepointLogger from deep.processor.context.action_context import ActionContext from deep.processor.context.action_results import ActionResult, ActionCallback from deep.processor.context.log_action import LOG_MSG, LogActionContext, LogActionResult from deep.processor.frame_collector import FrameCollectorContext, FrameCollector from deep.processor.variable_set_processor import VariableProcessorConfig -from deep.push import PushService + +if TYPE_CHECKING: + from deep.processor.context.trigger_context import TriggerContext class SnapshotActionContext(FrameCollectorContext, ActionContext): @@ -49,22 +51,24 @@ class SnapshotActionContext(FrameCollectorContext, ActionContext): @property def max_tp_process_time(self) -> int: """The max time to spend processing a tracepoint.""" - return self._action.config.get('MAX_TP_PROCESS_TIME', 100) + return self.location_action.config.get('MAX_TP_PROCESS_TIME', 100) @property def collection_config(self) -> VariableProcessorConfig: """The variable processing config.""" config = VariableProcessorConfig() - config.max_string_length = self._action.config.get('MAX_STRING_LENGTH', config.DEFAULT_MAX_STRING_LENGTH) - config.max_collection_size = self._action.config.get('MAX_COLLECTION_SIZE', config.DEFAULT_MAX_COLLECTION_SIZE) - config.max_variables = self._action.config.get('MAX_VARIABLES', config.DEFAULT_MAX_VARIABLES) - config.max_var_depth = self._action.config.get('MAX_VAR_DEPTH', config.DEFAULT_MAX_VAR_DEPTH) + config.max_string_length = self.location_action.config.get('MAX_STRING_LENGTH', + config.DEFAULT_MAX_STRING_LENGTH) + config.max_collection_size = self.location_action.config.get('MAX_COLLECTION_SIZE', + config.DEFAULT_MAX_COLLECTION_SIZE) + config.max_variables = self.location_action.config.get('MAX_VARIABLES', config.DEFAULT_MAX_VARIABLES) + config.max_var_depth = self.location_action.config.get('MAX_VAR_DEPTH', config.DEFAULT_MAX_VAR_DEPTH) return config @property def ts(self) -> int: """The timestamp in nanoseconds for this trigger.""" - return self._parent.ts + return self.tigger_context.ts def should_collect_vars(self, current_frame_index: int) -> bool: """ @@ -75,7 +79,7 @@ def should_collect_vars(self, current_frame_index: int) -> bool: :param (int) current_frame_index: the current frame index. :return (bool): if we should collect the frame vars. """ - config_type = self._action.config.get(FRAME_TYPE, SINGLE_FRAME_TYPE) + config_type = self.location_action.config.get(FRAME_TYPE, SINGLE_FRAME_TYPE) if config_type == NO_FRAME_TYPE: return False if config_type == ALL_FRAME_TYPE: @@ -89,24 +93,25 @@ def is_app_frame(self, filename: str) -> Tuple[bool, str]: :param filename: the frame file name :return: True if add frame, else False """ - return self._parent.config.is_app_frame(filename) + return self.tigger_context.config.is_app_frame(filename) @property def watches(self): """The configured watches.""" - return self._action.config.get("watches", []) + return self.location_action.config.get("watches", []) @property def log_msg(self): """The configured log message on the tracepoint.""" - return self._action.config.get(LOG_MSG, None) + return self.location_action.config.get(LOG_MSG, None) def _process_action(self): - collector = FrameCollector(self, self._parent.frame) + collector = FrameCollector(self, self.tigger_context.frame) - frames, variables = collector.collect(self._parent.vars, self._parent.var_cache) + frames, variables = collector.collect(self.tigger_context.vars, self.tigger_context.var_cache) - snapshot = EventSnapshot(self._action.tracepoint, self._parent.ts, self._parent.resource, frames, variables) + snapshot = EventSnapshot(self.location_action.tracepoint, self.tigger_context.ts, self.tigger_context.resource, + frames, variables) # process the snapshot watches for watch in self.watches: @@ -117,7 +122,7 @@ def _process_action(self): log_msg = self.log_msg if log_msg is not None: # create and process the log message - context = LogActionContext(self._parent, LocationAction(self._action.id, None, { + context = LogActionContext(self.tigger_context, LocationAction(self.location_action.id, None, { LOG_MSG: log_msg, }, LocationAction.ActionType.Log)) log, watches, log_vars = context.process_log(log_msg) @@ -125,35 +130,41 @@ def _process_action(self): for watch in watches: snapshot.add_watch_result(watch) snapshot.merge_var_lookup(log_vars) - self._parent.attach_result(LogActionResult(context._action, log)) + self.tigger_context.attach_result(LogActionResult(context.location_action, log)) - self._parent.attach_result(SendSnapshotActionResult(self._action, snapshot)) + self.tigger_context.attach_result(SendSnapshotActionResult(self, snapshot)) class SendSnapshotActionResult(ActionResult): """The result of a successful snapshot action.""" - def __init__(self, action: LocationAction, snapshot: EventSnapshot): + def __init__(self, action_context: ActionContext, snapshot: EventSnapshot): """ Create a new snapshot action result. - :param action: the action that created this result + :param action_context: the action context that created this result :param snapshot: the snapshot result """ - self.action = action + self.action_context = action_context self.snapshot = snapshot - def process(self, ctx_id: str, logger: TracepointLogger, service: PushService) -> Optional[ActionCallback]: + def process(self, ctx: 'TriggerContext') -> Optional[ActionCallback]: """ Process this result. - Either log or ship the collected data to an endpoint. + :param ctx: the triggering context - :param ctx_id: the triggering context id - :param logger: the log service - :param service:the push service :return: an action callback if we need to do something at the 'end', or None """ - self.snapshot.attributes.merge_in(BoundedAttributes(attributes={'ctx_id': ctx_id})) - service.push_snapshot(self.snapshot) + attributes = BoundedAttributes(attributes={'ctx_id': ctx.id}) + for decorator in ctx.config.snapshot_decorators: + try: + decorate = decorator.decorate(self.action_context) + if decorate is not None: + attributes = attributes.merge_in(decorate) + except Exception: + deep.logging.exception("Failed to decorate snapshot: %s", decorator) + + self.snapshot.attributes.merge_in(attributes) + ctx.push_service.push_snapshot(self.snapshot) return None diff --git a/src/deep/processor/context/trigger_context.py b/src/deep/processor/context/trigger_context.py index aa0d076..d882228 100644 --- a/src/deep/processor/context/trigger_context.py +++ b/src/deep/processor/context/trigger_context.py @@ -19,10 +19,11 @@ from types import FrameType from typing import Dict, Optional, List +import deep.logging +from deep.api.plugin import TracepointLogger from deep.api.tracepoint import Variable from deep.api.tracepoint.trigger import LocationAction from deep.config import ConfigService -from deep.logging.tracepoint_logger import TracepointLogger from deep.processor.context.action_context import MetricActionContext, SpanActionContext, NoActionContext, ActionContext from deep.processor.context.action_results import ActionResult, ActionCallback from deep.processor.context.log_action import LogActionContext @@ -69,9 +70,17 @@ def __enter__(self): def __exit__(self, exception_type, exception_value, exception_traceback): """Complete the 'with' statement, and close this context.""" for result in self.__results: - new_callback = result.process(self.__id, self.tracepoint_logger, self.push_service) - if new_callback is not None: - self.callbacks.append(new_callback) + try: + new_callback = result.process(self) + if new_callback is not None: + self.callbacks.append(new_callback) + except Exception: + deep.logging.exception("failed to process result {}", result) + + @property + def id(self): + """The trigger context id.""" + return self.__id @property def file_name(self): diff --git a/src/deep/push/push_service.py b/src/deep/push/push_service.py index 214503b..42b24c9 100644 --- a/src/deep/push/push_service.py +++ b/src/deep/push/push_service.py @@ -25,21 +25,18 @@ class PushService: """This service deals with pushing the snapshots to the service endpoints.""" - def __init__(self, config, grpc, task_handler): + def __init__(self, grpc, task_handler): """ Create a service to handle push events. - :param config: the current deep config :param grpc: the grpc service to use to send events :param task_handler: the task handler to offload tasks to """ - self.config = config self.grpc = grpc self.task_handler = task_handler def push_snapshot(self, snapshot: EventSnapshot): """Push a snapshot to the deep services.""" - self.__decorate(snapshot) task = self.task_handler.submit_task(self._push_task, snapshot) task.add_done_callback( lambda _: logging.debug("Completed uploading snapshot %s", snapshot_id_as_hex_str(snapshot.id))) @@ -55,14 +52,3 @@ def _push_task(self, snapshot): stub = SnapshotServiceStub(self.grpc.channel) stub.send(converted, metadata=self.grpc.metadata()) - - def __decorate(self, snapshot): - plugins = self.config.plugins - for plugin in plugins: - try: - attributes = plugin.collect_attributes() - if attributes is not None: - snapshot.attributes.merge_in(attributes) - except Exception: - logging.exception("Error processing plugin %s", plugin.name) - snapshot.complete() diff --git a/tests/unit_tests/api/plugin/test_otel.py b/tests/unit_tests/api/plugin/test_otel.py index ef2e14a..ef2e6f5 100644 --- a/tests/unit_tests/api/plugin/test_otel.py +++ b/tests/unit_tests/api/plugin/test_otel.py @@ -32,14 +32,14 @@ def setUp(self): def test_load_plugin(self): plugin = OTelPlugin() - load_plugin = plugin.load_plugin() + load_plugin = plugin.resource() self.assertIsNotNone(load_plugin) - self.assertEqual("your-service-name", load_plugin.get(SERVICE_NAME)) + self.assertEqual("your-service-name", load_plugin.attributes.get(SERVICE_NAME)) def test_collect_attributes(self): with trace.get_tracer_provider().get_tracer("test").start_as_current_span("test-span"): plugin = OTelPlugin() - attributes = plugin.collect_attributes() + attributes = plugin.decorate(None) self.assertIsNotNone(attributes) self.assertEqual("test-span", attributes.get("span_name")) self.assertIsNotNone(attributes.get("span_id")) diff --git a/tests/unit_tests/api/plugin/test_plugin.py b/tests/unit_tests/api/plugin/test_plugin.py index 85ab50c..f5f118b 100644 --- a/tests/unit_tests/api/plugin/test_plugin.py +++ b/tests/unit_tests/api/plugin/test_plugin.py @@ -15,17 +15,15 @@ import unittest import deep -from deep.api.attributes import BoundedAttributes from deep.api.plugin import load_plugins, Plugin from deep.config import ConfigService class BadPlugin(Plugin): - def load_plugin(self) -> BoundedAttributes: - raise Exception('test: failed load') + def __init__(self): + super().__init__() - def collect_attributes(self) -> BoundedAttributes: - raise Exception('test: failed collection') + raise Exception('test: failed load') class TestPluginLoader(unittest.TestCase): @@ -34,15 +32,15 @@ def setUp(self): deep.logging.init(ConfigService()) def test_load_plugins(self): - plugins = load_plugins() + plugins = load_plugins(None) self.assertIsNotNone(plugins) self.assertEqual(2, len(plugins)) def test_handle_bad_plugin(self): - plugins = load_plugins([BadPlugin.__qualname__]) + plugins = load_plugins(None, [BadPlugin.__qualname__]) self.assertEqual(2, len(plugins)) - plugins = load_plugins([BadPlugin.__module__ + '.' + BadPlugin.__name__]) + plugins = load_plugins(None, [BadPlugin.__module__ + '.' + BadPlugin.__name__]) self.assertEqual(2, len(plugins)) diff --git a/tests/unit_tests/api/plugin/test_python.py b/tests/unit_tests/api/plugin/test_python.py index 9c82c0e..6864539 100644 --- a/tests/unit_tests/api/plugin/test_python.py +++ b/tests/unit_tests/api/plugin/test_python.py @@ -21,12 +21,12 @@ class TestPython(unittest.TestCase): def test_load_plugin(self): plugin = PythonPlugin() - load_plugin = plugin.load_plugin() + load_plugin = plugin.resource() self.assertIsNotNone(load_plugin) - self.assertIsNotNone(load_plugin.get('python_version')) + self.assertIsNotNone(load_plugin.attributes.get('python_version')) def test_collect_attributes(self): plugin = PythonPlugin() - attributes = plugin.collect_attributes() + attributes = plugin.decorate(None) self.assertIsNotNone(attributes) self.assertEqual("MainThread", attributes.get("thread_name")) diff --git a/tests/unit_tests/processor/test_trigger_handler.py b/tests/unit_tests/processor/test_trigger_handler.py index bf48dd2..991c16e 100644 --- a/tests/unit_tests/processor/test_trigger_handler.py +++ b/tests/unit_tests/processor/test_trigger_handler.py @@ -32,21 +32,21 @@ from typing import List from deep import logging +from deep.api.plugin import TracepointLogger from deep.api.resource import Resource from deep.api.tracepoint.constants import LOG_MSG, WATCHES from deep.api.tracepoint.eventsnapshot import EventSnapshot from deep.api.tracepoint.trigger import Location, LocationAction, LineLocation, Trigger from deep.config import ConfigService -from deep.logging.tracepoint_logger import TracepointLogger from deep.processor.trigger_handler import TriggerHandler from deep.push.push_service import PushService from unit_tests.test_target import some_test_function class MockPushService(PushService): - def __init__(self, config, grpc, task_handler): - super().__init__(config, grpc, task_handler) + def __init__(self, grpc, task_handler): + super().__init__(grpc, task_handler) self.pushed: List[EventSnapshot] = [] def push_snapshot(self, snapshot: EventSnapshot): @@ -121,7 +121,7 @@ def call_and_capture(self, location, func, args, capture): def test_log_action(self): capture = TraceCallCapture() config = MockConfigService({}) - push = MockPushService(None, None, None) + push = MockPushService(None, None) handler = TriggerHandler(config, push) location = LineLocation('test_target.py', 27, Location.Position.START) @@ -139,7 +139,7 @@ def test_log_action(self): def test_log_action_with_watch(self): capture = TraceCallCapture() config = MockConfigService({}) - push = MockPushService(None, None, None) + push = MockPushService(None, None) handler = TriggerHandler(config, push) location = LineLocation('test_target.py', 27, Location.Position.START) @@ -157,7 +157,7 @@ def test_log_action_with_watch(self): def test_snapshot_action(self): capture = TraceCallCapture() config = MockConfigService({}) - push = MockPushService(None, None, None) + push = MockPushService(None, None) handler = TriggerHandler(config, push) location = LineLocation('test_target.py', 27, Location.Position.START) @@ -183,7 +183,7 @@ def test_snapshot_action(self): def test_snapshot_action_with_condition(self): capture = TraceCallCapture() config = MockConfigService({}) - push = MockPushService(None, None, None) + push = MockPushService(None, None) handler = TriggerHandler(config, push) location = LineLocation('test_target.py', 27, Location.Position.START) diff --git a/tests/unit_tests/push/test_push_service.py b/tests/unit_tests/push/test_push_service.py index 1c7cfa3..7cbd5a4 100644 --- a/tests/unit_tests/push/test_push_service.py +++ b/tests/unit_tests/push/test_push_service.py @@ -17,8 +17,6 @@ import mockito import deep.logging -from deep.api.attributes import BoundedAttributes -from deep.api.plugin import Plugin from deep.config import ConfigService from deep.push import PushService from utils import mock_snapshot, Captor @@ -38,13 +36,13 @@ def tearDown(self): mockito.unstub() def test_push_service(self): - service = PushService(self.config, self.grpc_service, self.handler) + service = PushService(self.grpc_service, self.handler) service.push_snapshot(mock_snapshot()) mockito.verify(self.handler).submit_task(mockito.ANY, mockito.ANY) def test_push_service_function(self): - service = PushService(self.config, self.grpc_service, self.handler) + service = PushService(self.grpc_service, self.handler) service.push_snapshot(mock_snapshot()) task_captor = Captor() @@ -75,7 +73,7 @@ def mock_send(snap, **kwargs): self.assertIsNotNone(self.sent_snap) def test_do_not_send_on_convert_failure(self): - service = PushService(self.config, self.grpc_service, self.handler) + service = PushService(self.grpc_service, self.handler) class FakeSnapshot: def complete(self): @@ -110,57 +108,3 @@ def mock_send(snap, **kwargs): task(snapshot) self.assertIsNone(self.sent_snap) - - def test_does_decorate(self): - class TestPlugin(Plugin): - def load_plugin(self) -> BoundedAttributes: - return BoundedAttributes() - - def collect_attributes(self) -> BoundedAttributes: - return BoundedAttributes(attributes={ - 'test': 'plugin' - }) - - self.config.plugins.append(TestPlugin()) - - service = PushService(self.config, self.grpc_service, self.handler) - - service.push_snapshot(mock_snapshot()) - - task_captor = Captor() - snapshot_captor = Captor() - - mockito.verify(self.handler).submit_task(task_captor, snapshot_captor) - - task = task_captor.get_value() - snapshot = snapshot_captor.get_value() - - self.assertIsNotNone(task) - self.assertIsNotNone(snapshot) - - self.assertEqual(snapshot.attributes['test'], 'plugin') - - def test_does__send_on_decorate_FAIL(self): - class TestPlugin(Plugin): - def load_plugin(self) -> BoundedAttributes: - return BoundedAttributes() - - def collect_attributes(self) -> BoundedAttributes: - raise Exception("test exception") - - self.config.plugins.append(TestPlugin()) - - service = PushService(self.config, self.grpc_service, self.handler) - - service.push_snapshot(mock_snapshot()) - - task_captor = Captor() - snapshot_captor = Captor() - - mockito.verify(self.handler).submit_task(task_captor, snapshot_captor) - - task = task_captor.get_value() - snapshot = snapshot_captor.get_value() - - self.assertIsNotNone(task) - self.assertIsNotNone(snapshot) From 6cb11756ecd4a5342fc5f878969071ec95550bff Mon Sep 17 00:00:00 2001 From: Ben Donnelly Date: Thu, 18 Jan 2024 22:27:41 +0000 Subject: [PATCH 2/3] feat(plugins): update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 73ccbbd..2a8ccfe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ - **[CHANGE]**: change(build): add doc string check to flake8 [#14](https://github.com/intergral/deep/pull/14) [@Umaaz](https://github.com/Umaaz) - **[FEATURE]**: feat(logging): initial implementation of log points [#3](https://github.com/intergral/deep/pull/3) [@Umaaz](https://github.com/Umaaz) +- **[FEATURE]**: feat(plugins): change plugins to allow better customisation [#22](https://github.com/intergral/deep/pull/22) [@Umaaz](https://github.com/Umaaz) - **[ENHANCEMENT]**: enhancement(trigger): change tracepoint handling to use triggers [#16](https://github.com/intergral/deep/pull/16) [@Umaaz](https://github.com/Umaaz) - **[BUGFIX]**: feat(api): add api function to register tracepoint directly [#8](https://github.com/intergral/deep/pull/8) [@Umaaz](https://github.com/Umaaz) From 51260371fced7491f40c31306cc50746e1c4c5b5 Mon Sep 17 00:00:00 2001 From: Ben Donnelly Date: Fri, 19 Jan 2024 10:36:16 +0000 Subject: [PATCH 3/3] fix(lint): add flake8-debug - remove project wide ignore for F401 in __init__ files --- .flake8 | 14 ++++++-------- dev-requirements.txt | 19 +++++++++++++------ src/deep/api/__init__.py | 2 +- src/deep/api/plugin/__init__.py | 1 - src/deep/config/__init__.py | 2 +- src/deep/grpc/__init__.py | 2 +- 6 files changed, 22 insertions(+), 18 deletions(-) diff --git a/.flake8 b/.flake8 index e2d7a76..43c6a8c 100644 --- a/.flake8 +++ b/.flake8 @@ -9,21 +9,19 @@ exclude = scripts, max-line-length = 120 per-file-ignores = - # ignore unused imports in __init__ files - */__init__.py: F401 # supress some docstring requirements in tests - tests/unit_tests/*.py: D,NP100 - tests/unit_tests/**/*.py: D,NP100 - tests/it_tests/*.py: D,NP100 - tests/it_tests/**/*.py: D,NP100 + tests/unit_tests/*.py: D + tests/unit_tests/**/*.py: D + tests/it_tests/*.py: D + tests/it_tests/**/*.py: D # these files are from OTEL so should use OTEL license. */deep/api/types.py: NCF102 */deep/api/resource/__init__.py: NCF102 */deep/api/attributes/__init__.py: NCF102 tests/unit_tests/api/attributes/*.py: NCF102,D tests/unit_tests/api/resource/*.py: NCF102,D - examples/**/*: NP100 - dev/**/*: NP100 + examples/**/*: DB100 + dev/**/*: DB100 detailed-output = True copyright-regex = diff --git a/dev-requirements.txt b/dev-requirements.txt index 0bb281b..3e92733 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,17 +1,24 @@ +# linting deps flake8 +flake8-docstrings +flake8-debug +flake8-header-validator>=0.0.3 + +# test deps parameterized pytest +pytest-cov +mockito + +# doc deps mkdocs-material -pdoc3 mkdocstrings-python +pdoc3 + +# build deps build twine -flake8-docstrings certifi>=2023.7.22 # not directly required, pinned by Snyk to avoid a vulnerability -flake8-header-validator>=0.0.3 setuptools>=65.5.1 # not directly required, pinned by Snyk to avoid a vulnerability -pytest-cov -mockito opentelemetry-api opentelemetry-sdk -flake8-no-print diff --git a/src/deep/api/__init__.py b/src/deep/api/__init__.py index 74e5c02..2dfb615 100644 --- a/src/deep/api/__init__.py +++ b/src/deep/api/__init__.py @@ -15,4 +15,4 @@ """Main api for Deep.""" -from deep.api.deep import Deep +from deep.api.deep import Deep # noqa: F401 diff --git a/src/deep/api/plugin/__init__.py b/src/deep/api/plugin/__init__.py index 7a5b40e..6017f93 100644 --- a/src/deep/api/plugin/__init__.py +++ b/src/deep/api/plugin/__init__.py @@ -16,7 +16,6 @@ """Load and handle plugins.""" import abc -import os from importlib import import_module from typing import TYPE_CHECKING, Optional, TypeVar, List diff --git a/src/deep/config/__init__.py b/src/deep/config/__init__.py index 4db09f2..a29a052 100644 --- a/src/deep/config/__init__.py +++ b/src/deep/config/__init__.py @@ -23,7 +23,7 @@ import os import sys -from .config_service import ConfigService +from .config_service import ConfigService # noqa: F401 LOGGING_CONF = os.getenv('DEEP_LOGGING_CONF', None) '''The path to the logging config file to use''' diff --git a/src/deep/grpc/__init__.py b/src/deep/grpc/__init__.py index 5bb1b5c..f1f815c 100644 --- a/src/deep/grpc/__init__.py +++ b/src/deep/grpc/__init__.py @@ -27,7 +27,7 @@ # noinspection PyUnresolvedReferences from deepproto.proto.resource.v1.resource_pb2 import Resource -from .grpc_service import GRPCService +from .grpc_service import GRPCService # noqa: F401 from ..api.tracepoint.tracepoint_config import LabelExpression, MetricDefinition from ..api.tracepoint.trigger import build_trigger, Trigger