Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions ddtrace/context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import threading
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
from typing import Tuple

from .constants import ORIGIN_KEY
from .constants import SAMPLING_PRIORITY_KEY
Expand All @@ -7,6 +11,9 @@
from .utils.formats import get_env


if TYPE_CHECKING:
from ddtrace import Span

log = get_logger(__name__)


Expand All @@ -29,16 +36,23 @@ class Context(object):
_partial_flush_enabled = asbool(get_env("tracer", "partial_flush_enabled", default=False))
_partial_flush_min_spans = int(get_env("tracer", "partial_flush_min_spans", default=500))

def __init__(self, trace_id=None, span_id=None, sampling_priority=None, dd_origin=None):
def __init__(
self,
trace_id=None, # type: Optional[int]
span_id=None, # type: Optional[int]
sampling_priority=None, # type: Optional[int]
dd_origin=None, # type: Optional[str]
):
# type: (...) -> None
"""
Initialize a new thread-safe ``Context``.

:param int trace_id: trace_id of parent span
:param int span_id: span_id of parent span
"""
self._trace = []
self._trace = [] # type: List[Span]
self._finished_spans = 0
self._current_span = None
self._current_span = None # type: Optional[Span]
self._lock = threading.Lock()

self._parent_trace_id = trace_id
Expand Down Expand Up @@ -66,11 +80,13 @@ def sampling_priority(self):

@sampling_priority.setter
def sampling_priority(self, value):
# type: (int) -> None
"""Set sampling priority."""
with self._lock:
self._sampling_priority = value

def clone(self):
# type: () -> Context
"""
Partially clones the current context.
It copies everything EXCEPT the registered and finished spans.
Expand All @@ -85,12 +101,14 @@ def clone(self):
return new_ctx

def get_current_root_span(self):
# type: () -> Optional[Span]
"""
Return the root span of the context or None if it does not exist.
"""
return self._trace[0] if len(self._trace) > 0 else None

def get_current_span(self):
# type: () -> Optional[Span]
"""
Return the last active span that corresponds to the last inserted
item in the trace list. This cannot be considered as the current active
Expand All @@ -101,6 +119,7 @@ def get_current_span(self):
return self._current_span

def _set_current_span(self, span):
# type: (Optional[Span]) -> None
"""
Set current span internally.

Expand All @@ -114,16 +133,17 @@ def _set_current_span(self, span):
self._parent_span_id = None

def add_span(self, span):
# type: (Span) -> None
"""
Add a span to the context trace list, keeping it as the last active span.
"""
with self._lock:
self._set_current_span(span)

self._trace.append(span)
span._context = self

def close_span(self, span):
# type: (Span) -> Tuple[Optional[List[Span]], Optional[bool]]
"""
Mark a span as a finished, increasing the internal counter to prevent
cycles inside _trace list.
Expand Down
6 changes: 5 additions & 1 deletion ddtrace/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
import re
from typing import List
from typing import Optional
from typing import TYPE_CHECKING

from ddtrace import Span
from ddtrace.vendor import six

from .ext import http


if TYPE_CHECKING:
from ddtrace import Span


class TraceFilter(six.with_metaclass(abc.ABCMeta)): # type: ignore[misc]
@abc.abstractmethod
def process_trace(self, trace):
Expand Down
9 changes: 9 additions & 0 deletions ddtrace/helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
from typing import Optional
from typing import TYPE_CHECKING
from typing import Tuple

import ddtrace


if TYPE_CHECKING:
from ddtrace.tracer import Tracer


def get_correlation_ids(tracer=None):
# type: (Optional[Tracer]) -> Tuple[Optional[int], Optional[int]]
"""Retrieves the Correlation Identifiers for the current active ``Trace``.
This helper method can be achieved manually and should be considered
only a shortcut. The main reason is to abstract the current ``Tracer``
Expand Down
6 changes: 5 additions & 1 deletion ddtrace/internal/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import threading
from typing import List
from typing import Optional
from typing import TYPE_CHECKING

import ddtrace
from ddtrace import Span
from ddtrace.vendor import six

from . import agent
Expand All @@ -29,6 +29,10 @@
from .sma import SimpleMovingAverage


if TYPE_CHECKING:
from ddtrace import Span


log = get_logger(__name__)

DEFAULT_SHUTDOWN_TIMEOUT = 5
Expand Down
12 changes: 11 additions & 1 deletion ddtrace/monkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
import os
import sys
import threading
from typing import Any
from typing import Callable
from typing import Dict
from typing import List

from ddtrace.vendor.wrapt.importer import when_imported

Expand Down Expand Up @@ -103,6 +107,7 @@ class ModuleNotFoundException(PatchException):


def _on_import_factory(module, raise_errors=True):
# type: (str, bool) -> Callable[[Any], None]
"""Factory to create an import hook for the provided module name"""

def on_import(hook):
Expand All @@ -115,6 +120,7 @@ def on_import(hook):


def patch_all(**patch_modules):
# type: (Dict[str, bool]) -> None
"""Automatically patches all available modules.

In addition to ``patch_modules``, an override can be specified via an
Expand Down Expand Up @@ -144,6 +150,7 @@ def patch_all(**patch_modules):


def patch(raise_errors=True, **patch_modules):
# type: (bool, Dict[str, bool]) -> None
"""Patch only a set of given modules.

:param bool raise_errors: Raise error if one patch fail.
Expand Down Expand Up @@ -179,6 +186,7 @@ def patch(raise_errors=True, **patch_modules):


def patch_module(module, raise_errors=True):
# type: (str, bool) -> bool
"""Patch a single module

Returns if the module got properly patched.
Expand All @@ -197,12 +205,14 @@ def patch_module(module, raise_errors=True):


def get_patched_modules():
# type: () -> List[str]
"""Get the list of patched modules"""
with _LOCK:
return sorted(_PATCHED_MODULES)


def _patch_module(module):
# type: (str) -> bool
"""_patch_module will attempt to monkey patch the module.

Returns if the module got patched.
Expand All @@ -225,6 +235,6 @@ def _patch_module(module):
if not hasattr(imported_module, "patch"):
raise ModuleNotFoundException("module '%s' not installed" % module)

imported_module.patch()
imported_module.patch() # type: ignore
_PATCHED_MODULES.add(module)
return True
7 changes: 6 additions & 1 deletion ddtrace/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ def _has_active_context(self):

@abc.abstractmethod
def activate(self, context):
# type: (Context) -> None
pass

@abc.abstractmethod
def active(self):
# type: () -> Context
pass

def __call__(self, *args, **kwargs):
Expand All @@ -46,6 +48,7 @@ class DefaultContextProvider(BaseContextProvider):
"""

def __init__(self):
# type: () -> None
_DD_CONTEXTVAR.set(None)

def _has_active_context(self):
Expand All @@ -59,12 +62,14 @@ def _has_active_context(self):
return ctx is not None

def activate(self, ctx):
# type: (Context) -> None
"""Makes the given ``context`` active, so that the provider calls
the thread-local storage implementation.
"""
_DD_CONTEXTVAR.set(ctx)
_DD_CONTEXTVAR.set(ctx) # type: ignore[arg-type]

def active(self):
# type: () -> Context
"""Returns the current active ``Context`` for this tracer. Returned
``Context`` must be thread-safe or thread-local for this specific
implementation.
Expand Down
Loading