From 0cd14599e9001dc26aaa92cae4ae2305c82f6153 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 5 Sep 2025 14:28:54 +0200 Subject: [PATCH 01/12] feat: Add concurrent.futures integration --- sentry_sdk/integrations/concurrent.py | 46 ++++++ .../concurrent/test_concurrent.py | 133 ++++++++++++++++++ tests/test_basics.py | 1 + 3 files changed, 180 insertions(+) create mode 100644 sentry_sdk/integrations/concurrent.py create mode 100644 tests/integrations/concurrent/test_concurrent.py diff --git a/sentry_sdk/integrations/concurrent.py b/sentry_sdk/integrations/concurrent.py new file mode 100644 index 0000000000..a0b76d1d32 --- /dev/null +++ b/sentry_sdk/integrations/concurrent.py @@ -0,0 +1,46 @@ +from functools import wraps + +from concurrent.futures import ThreadPoolExecutor + +import sentry_sdk +from sentry_sdk.integrations import Integration +from sentry_sdk.scope import use_isolation_scope, use_scope + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Any + from typing import Callable + + +class ConcurrentIntegration(Integration): + identifier = "concurrent" + + def __init__(self, record_exceptions_on_futures=True): + # type: (bool) -> None + self.record_exceptions_on_futures = record_exceptions_on_futures + + @staticmethod + def setup_once(): + # type: () -> None + old_submit = ThreadPoolExecutor.submit + + @wraps(old_submit) + def sentry_submit(self, fn, *args, **kwargs): + # type: (ThreadPoolExecutor, Callable, *Any, **Any) -> Any + integration = sentry_sdk.get_client().get_integration(ConcurrentIntegration) + if integration is None: + return old_submit(self, fn, *args, **kwargs) + + isolation_scope = sentry_sdk.get_isolation_scope().fork() + current_scope = sentry_sdk.get_current_scope().fork() + + def wrapped_fn(*args, **kwargs): + # type: (*Any, **Any) -> Any + with use_isolation_scope(isolation_scope): + with use_scope(current_scope): + return fn(*args, **kwargs) + + return old_submit(self, wrapped_fn, *args, **kwargs) + + ThreadPoolExecutor.submit = sentry_submit diff --git a/tests/integrations/concurrent/test_concurrent.py b/tests/integrations/concurrent/test_concurrent.py new file mode 100644 index 0000000000..74a0d0bcbc --- /dev/null +++ b/tests/integrations/concurrent/test_concurrent.py @@ -0,0 +1,133 @@ +from textwrap import dedent +from concurrent import futures +from concurrent.futures import Future, ThreadPoolExecutor + +import sentry_sdk + +from sentry_sdk.integrations.concurrent import ConcurrentIntegration + +original_submit = ThreadPoolExecutor.submit +original_set_exception = Future.set_exception + + +def test_propagates_threadpool_scope(sentry_init, capture_events): + sentry_init( + default_integrations=False, + traces_sample_rate=1.0, + integrations=[ConcurrentIntegration()], + ) + events = capture_events() + + def double(number): + with sentry_sdk.start_span(op="task", name=str(number)): + return number * 2 + + with sentry_sdk.start_transaction(name="test_handles_threadpool"): + with futures.ThreadPoolExecutor(max_workers=1) as executor: + tasks = [executor.submit(double, number) for number in [1, 2, 3, 4]] + for future in futures.as_completed(tasks): + print("Getting future value!", future.result()) + + sentry_sdk.flush() + + assert len(events) == 1 + (event,) = events + assert event["spans"][0]["trace_id"] == event["spans"][1]["trace_id"] + assert event["spans"][1]["trace_id"] == event["spans"][2]["trace_id"] + assert event["spans"][2]["trace_id"] == event["spans"][3]["trace_id"] + assert event["spans"][3]["trace_id"] == event["spans"][0]["trace_id"] + + +def test_propagates_threadpool_scope_in_map(sentry_init, capture_events): + sentry_init( + default_integrations=False, + traces_sample_rate=1.0, + integrations=[ConcurrentIntegration()], + ) + events = capture_events() + + def double(number): + with sentry_sdk.start_span(op="task", name=str(number)): + return number * 2 + + with sentry_sdk.start_transaction(name="test_handles_threadpool"): + with futures.ThreadPoolExecutor(max_workers=1) as executor: + for value in executor.map(double, [1, 2, 3, 4]): + print("Getting future value!", value) + + sentry_sdk.flush() + + assert len(events) == 1 + (event,) = events + assert event["spans"][0]["trace_id"] == event["spans"][1]["trace_id"] + assert event["spans"][1]["trace_id"] == event["spans"][2]["trace_id"] + assert event["spans"][2]["trace_id"] == event["spans"][3]["trace_id"] + assert event["spans"][3]["trace_id"] == event["spans"][0]["trace_id"] + + +def test_scope_data_not_leaked_in_executor(sentry_init): + sentry_init( + integrations=[ConcurrentIntegration()], + ) + + sentry_sdk.set_tag("initial_tag", "initial_value") + initial_iso_scope = sentry_sdk.get_isolation_scope() + + def do_some_work(): + # check if we have the initial scope data propagated into the thread + assert sentry_sdk.get_isolation_scope()._tags == { + "initial_tag": "initial_value" + } + + # change data in isolation scope in thread + sentry_sdk.set_tag("thread_tag", "thread_value") + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(do_some_work) + future.result() + + # check if the initial scope data is not modified by the started thread + assert initial_iso_scope._tags == { + "initial_tag": "initial_value" + }, "The isolation scope in the main thread should not be modified by the started thread." + + +def test_spans_from_multiple_threads(sentry_init, capture_events, render_span_tree): + sentry_init( + traces_sample_rate=1.0, + integrations=[ConcurrentIntegration()], + ) + events = capture_events() + + def do_some_work(number): + with sentry_sdk.start_span( + op=f"inner-run-{number}", name=f"Thread: child-{number}" + ): + pass + + with sentry_sdk.start_transaction(op="outer-trx"): + with futures.ThreadPoolExecutor(max_workers=1) as executor: + for number in range(5): + with sentry_sdk.start_span( + op=f"outer-submit-{number}", name="Thread: main" + ): + future = executor.submit(do_some_work, number) + future.result() + + (event,) = events + + assert render_span_tree(event) == dedent( + """\ + - op="outer-trx": description=null + - op="outer-submit-0": description="Thread: main" + - op="inner-run-0": description="Thread: child-0" + - op="outer-submit-1": description="Thread: main" + - op="inner-run-1": description="Thread: child-1" + - op="outer-submit-2": description="Thread: main" + - op="inner-run-2": description="Thread: child-2" + - op="outer-submit-3": description="Thread: main" + - op="inner-run-3": description="Thread: child-3" + - op="outer-submit-4": description="Thread: main" + - op="inner-run-4": description="Thread: child-4"\ +""" + ) diff --git a/tests/test_basics.py b/tests/test_basics.py index 45303c9a59..7314bcabe4 100644 --- a/tests/test_basics.py +++ b/tests/test_basics.py @@ -868,6 +868,7 @@ def foo(event, hint): (["atexit"], "sentry.python"), (["boto3"], "sentry.python"), (["celery"], "sentry.python"), + (["concurrent"], "sentry.python"), (["dedupe"], "sentry.python"), (["excepthook"], "sentry.python"), (["unraisablehook"], "sentry.python"), From 9002da6efaaec7bfbd2aad33c1b9783b1de4f832 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 5 Sep 2025 15:13:08 +0200 Subject: [PATCH 02/12] Fix typing --- sentry_sdk/integrations/concurrent.py | 46 ++++++++++++++++----------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/sentry_sdk/integrations/concurrent.py b/sentry_sdk/integrations/concurrent.py index a0b76d1d32..2a962fce91 100644 --- a/sentry_sdk/integrations/concurrent.py +++ b/sentry_sdk/integrations/concurrent.py @@ -1,6 +1,6 @@ from functools import wraps -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, Future import sentry_sdk from sentry_sdk.integrations import Integration @@ -9,8 +9,9 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from typing import Any - from typing import Callable + from typing import Any, Callable, TypeVar + + T = TypeVar("T", bound=Any) class ConcurrentIntegration(Integration): @@ -23,24 +24,31 @@ def __init__(self, record_exceptions_on_futures=True): @staticmethod def setup_once(): # type: () -> None - old_submit = ThreadPoolExecutor.submit + ThreadPoolExecutor.submit = _wrap_submit_call(ThreadPoolExecutor.submit) + + +def _wrap_submit_call(func): + # type: (Any) -> Any + """ + Wrap task call with a try catch to get exceptions. + """ - @wraps(old_submit) - def sentry_submit(self, fn, *args, **kwargs): - # type: (ThreadPoolExecutor, Callable, *Any, **Any) -> Any - integration = sentry_sdk.get_client().get_integration(ConcurrentIntegration) - if integration is None: - return old_submit(self, fn, *args, **kwargs) + @wraps(func) + def sentry_submit(self, fn, *args, **kwargs): + # type: (ThreadPoolExecutor, Callable[..., T], *Any, **Any) -> Future[T] + integration = sentry_sdk.get_client().get_integration(ConcurrentIntegration) + if integration is None: + return func(self, fn, *args, **kwargs) - isolation_scope = sentry_sdk.get_isolation_scope().fork() - current_scope = sentry_sdk.get_current_scope().fork() + isolation_scope = sentry_sdk.get_isolation_scope().fork() + current_scope = sentry_sdk.get_current_scope().fork() - def wrapped_fn(*args, **kwargs): - # type: (*Any, **Any) -> Any - with use_isolation_scope(isolation_scope): - with use_scope(current_scope): - return fn(*args, **kwargs) + def wrapped_fn(*args, **kwargs): + # type: (*Any, **Any) -> Any + with use_isolation_scope(isolation_scope): + with use_scope(current_scope): + return fn(*args, **kwargs) - return old_submit(self, wrapped_fn, *args, **kwargs) + return func(self, wrapped_fn, *args, **kwargs) - ThreadPoolExecutor.submit = sentry_submit + return sentry_submit From 8d0fa96e994fdcdd87642d1562ca84de4f482d80 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 5 Sep 2025 15:20:00 +0200 Subject: [PATCH 03/12] Fix typing --- sentry_sdk/integrations/concurrent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/concurrent.py b/sentry_sdk/integrations/concurrent.py index 2a962fce91..0bfe177854 100644 --- a/sentry_sdk/integrations/concurrent.py +++ b/sentry_sdk/integrations/concurrent.py @@ -24,7 +24,7 @@ def __init__(self, record_exceptions_on_futures=True): @staticmethod def setup_once(): # type: () -> None - ThreadPoolExecutor.submit = _wrap_submit_call(ThreadPoolExecutor.submit) + ThreadPoolExecutor.submit = _wrap_submit_call(ThreadPoolExecutor.submit) # type: ignore def _wrap_submit_call(func): From b3975606c91cc453caf164fa483a9178f3706dc2 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 5 Sep 2025 15:39:57 +0200 Subject: [PATCH 04/12] Update docstring --- sentry_sdk/integrations/concurrent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_sdk/integrations/concurrent.py b/sentry_sdk/integrations/concurrent.py index 0bfe177854..2bd1b0b793 100644 --- a/sentry_sdk/integrations/concurrent.py +++ b/sentry_sdk/integrations/concurrent.py @@ -30,7 +30,7 @@ def setup_once(): def _wrap_submit_call(func): # type: (Any) -> Any """ - Wrap task call with a try catch to get exceptions. + Wrap submit call to propagate scopes on task submission. """ @wraps(func) From e3dd4c511f0c2fb4931c21541d34cc024e8d3c0f Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Thu, 25 Sep 2025 11:57:34 +0200 Subject: [PATCH 05/12] Remove constructor param --- sentry_sdk/integrations/concurrent.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sentry_sdk/integrations/concurrent.py b/sentry_sdk/integrations/concurrent.py index 2bd1b0b793..35480e8feb 100644 --- a/sentry_sdk/integrations/concurrent.py +++ b/sentry_sdk/integrations/concurrent.py @@ -17,10 +17,6 @@ class ConcurrentIntegration(Integration): identifier = "concurrent" - def __init__(self, record_exceptions_on_futures=True): - # type: (bool) -> None - self.record_exceptions_on_futures = record_exceptions_on_futures - @staticmethod def setup_once(): # type: () -> None @@ -28,7 +24,7 @@ def setup_once(): def _wrap_submit_call(func): - # type: (Any) -> Any + # type: (Callable[..., Future[T]]) -> Callable[..., Future[T]] """ Wrap submit call to propagate scopes on task submission. """ From 463375ec6d1a37ecc4929a2d6052db8343af2bd2 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Thu, 25 Sep 2025 15:48:09 +0200 Subject: [PATCH 06/12] move into threading integration --- sentry_sdk/integrations/concurrent.py | 50 ------- sentry_sdk/integrations/threading.py | 34 +++++ .../concurrent/test_concurrent.py | 133 ------------------ .../integrations/threading/test_threading.py | 41 ++++++ tests/test_basics.py | 1 - 5 files changed, 75 insertions(+), 184 deletions(-) delete mode 100644 sentry_sdk/integrations/concurrent.py delete mode 100644 tests/integrations/concurrent/test_concurrent.py diff --git a/sentry_sdk/integrations/concurrent.py b/sentry_sdk/integrations/concurrent.py deleted file mode 100644 index 35480e8feb..0000000000 --- a/sentry_sdk/integrations/concurrent.py +++ /dev/null @@ -1,50 +0,0 @@ -from functools import wraps - -from concurrent.futures import ThreadPoolExecutor, Future - -import sentry_sdk -from sentry_sdk.integrations import Integration -from sentry_sdk.scope import use_isolation_scope, use_scope - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from typing import Any, Callable, TypeVar - - T = TypeVar("T", bound=Any) - - -class ConcurrentIntegration(Integration): - identifier = "concurrent" - - @staticmethod - def setup_once(): - # type: () -> None - ThreadPoolExecutor.submit = _wrap_submit_call(ThreadPoolExecutor.submit) # type: ignore - - -def _wrap_submit_call(func): - # type: (Callable[..., Future[T]]) -> Callable[..., Future[T]] - """ - Wrap submit call to propagate scopes on task submission. - """ - - @wraps(func) - def sentry_submit(self, fn, *args, **kwargs): - # type: (ThreadPoolExecutor, Callable[..., T], *Any, **Any) -> Future[T] - integration = sentry_sdk.get_client().get_integration(ConcurrentIntegration) - if integration is None: - return func(self, fn, *args, **kwargs) - - isolation_scope = sentry_sdk.get_isolation_scope().fork() - current_scope = sentry_sdk.get_current_scope().fork() - - def wrapped_fn(*args, **kwargs): - # type: (*Any, **Any) -> Any - with use_isolation_scope(isolation_scope): - with use_scope(current_scope): - return fn(*args, **kwargs) - - return func(self, wrapped_fn, *args, **kwargs) - - return sentry_submit diff --git a/sentry_sdk/integrations/threading.py b/sentry_sdk/integrations/threading.py index c031c51f50..66617dabe5 100644 --- a/sentry_sdk/integrations/threading.py +++ b/sentry_sdk/integrations/threading.py @@ -2,6 +2,7 @@ import warnings from functools import wraps from threading import Thread, current_thread +from concurrent.futures import ThreadPoolExecutor, Future import sentry_sdk from sentry_sdk.integrations import Integration @@ -24,6 +25,7 @@ from sentry_sdk._types import ExcInfo F = TypeVar("F", bound=Callable[..., Any]) + T = TypeVar("T", bound=Any) class ThreadingIntegration(Integration): @@ -109,6 +111,7 @@ def sentry_start(self, *a, **kw): return old_start(self, *a, **kw) Thread.start = sentry_start # type: ignore + ThreadPoolExecutor.submit = _wrap_threadpool_executor_submit(ThreadPoolExecutor.submit) # type: ignore def _wrap_run(isolation_scope_to_use, current_scope_to_use, old_run_func): @@ -134,6 +137,37 @@ def _run_old_run_func(): return run # type: ignore +def _wrap_threadpool_executor_submit(func): + # type: (Callable[..., Future[T]]) -> Callable[..., Future[T]] + """ + Wrap submit call to propagate scopes on task submission. + """ + + @wraps(func) + def sentry_submit(self, fn, *args, **kwargs): + # type: (ThreadPoolExecutor, Callable[..., T], *Any, **Any) -> Future[T] + integration = sentry_sdk.get_client().get_integration(ThreadingIntegration) + if integration is None: + return func(self, fn, *args, **kwargs) + + if integration.propagate_scope: + isolation_scope = sentry_sdk.get_isolation_scope().fork() + current_scope = sentry_sdk.get_current_scope().fork() + else: + isolation_scope = None + current_scope = None + + def wrapped_fn(*args, **kwargs): + # type: (*Any, **Any) -> Any + with use_isolation_scope(isolation_scope): + with use_scope(current_scope): + return fn(*args, **kwargs) + + return func(self, wrapped_fn, *args, **kwargs) + + return sentry_submit + + def _capture_exception(): # type: () -> ExcInfo exc_info = sys.exc_info() diff --git a/tests/integrations/concurrent/test_concurrent.py b/tests/integrations/concurrent/test_concurrent.py deleted file mode 100644 index 74a0d0bcbc..0000000000 --- a/tests/integrations/concurrent/test_concurrent.py +++ /dev/null @@ -1,133 +0,0 @@ -from textwrap import dedent -from concurrent import futures -from concurrent.futures import Future, ThreadPoolExecutor - -import sentry_sdk - -from sentry_sdk.integrations.concurrent import ConcurrentIntegration - -original_submit = ThreadPoolExecutor.submit -original_set_exception = Future.set_exception - - -def test_propagates_threadpool_scope(sentry_init, capture_events): - sentry_init( - default_integrations=False, - traces_sample_rate=1.0, - integrations=[ConcurrentIntegration()], - ) - events = capture_events() - - def double(number): - with sentry_sdk.start_span(op="task", name=str(number)): - return number * 2 - - with sentry_sdk.start_transaction(name="test_handles_threadpool"): - with futures.ThreadPoolExecutor(max_workers=1) as executor: - tasks = [executor.submit(double, number) for number in [1, 2, 3, 4]] - for future in futures.as_completed(tasks): - print("Getting future value!", future.result()) - - sentry_sdk.flush() - - assert len(events) == 1 - (event,) = events - assert event["spans"][0]["trace_id"] == event["spans"][1]["trace_id"] - assert event["spans"][1]["trace_id"] == event["spans"][2]["trace_id"] - assert event["spans"][2]["trace_id"] == event["spans"][3]["trace_id"] - assert event["spans"][3]["trace_id"] == event["spans"][0]["trace_id"] - - -def test_propagates_threadpool_scope_in_map(sentry_init, capture_events): - sentry_init( - default_integrations=False, - traces_sample_rate=1.0, - integrations=[ConcurrentIntegration()], - ) - events = capture_events() - - def double(number): - with sentry_sdk.start_span(op="task", name=str(number)): - return number * 2 - - with sentry_sdk.start_transaction(name="test_handles_threadpool"): - with futures.ThreadPoolExecutor(max_workers=1) as executor: - for value in executor.map(double, [1, 2, 3, 4]): - print("Getting future value!", value) - - sentry_sdk.flush() - - assert len(events) == 1 - (event,) = events - assert event["spans"][0]["trace_id"] == event["spans"][1]["trace_id"] - assert event["spans"][1]["trace_id"] == event["spans"][2]["trace_id"] - assert event["spans"][2]["trace_id"] == event["spans"][3]["trace_id"] - assert event["spans"][3]["trace_id"] == event["spans"][0]["trace_id"] - - -def test_scope_data_not_leaked_in_executor(sentry_init): - sentry_init( - integrations=[ConcurrentIntegration()], - ) - - sentry_sdk.set_tag("initial_tag", "initial_value") - initial_iso_scope = sentry_sdk.get_isolation_scope() - - def do_some_work(): - # check if we have the initial scope data propagated into the thread - assert sentry_sdk.get_isolation_scope()._tags == { - "initial_tag": "initial_value" - } - - # change data in isolation scope in thread - sentry_sdk.set_tag("thread_tag", "thread_value") - - with futures.ThreadPoolExecutor(max_workers=1) as executor: - future = executor.submit(do_some_work) - future.result() - - # check if the initial scope data is not modified by the started thread - assert initial_iso_scope._tags == { - "initial_tag": "initial_value" - }, "The isolation scope in the main thread should not be modified by the started thread." - - -def test_spans_from_multiple_threads(sentry_init, capture_events, render_span_tree): - sentry_init( - traces_sample_rate=1.0, - integrations=[ConcurrentIntegration()], - ) - events = capture_events() - - def do_some_work(number): - with sentry_sdk.start_span( - op=f"inner-run-{number}", name=f"Thread: child-{number}" - ): - pass - - with sentry_sdk.start_transaction(op="outer-trx"): - with futures.ThreadPoolExecutor(max_workers=1) as executor: - for number in range(5): - with sentry_sdk.start_span( - op=f"outer-submit-{number}", name="Thread: main" - ): - future = executor.submit(do_some_work, number) - future.result() - - (event,) = events - - assert render_span_tree(event) == dedent( - """\ - - op="outer-trx": description=null - - op="outer-submit-0": description="Thread: main" - - op="inner-run-0": description="Thread: child-0" - - op="outer-submit-1": description="Thread: main" - - op="inner-run-1": description="Thread: child-1" - - op="outer-submit-2": description="Thread: main" - - op="inner-run-2": description="Thread: child-2" - - op="outer-submit-3": description="Thread: main" - - op="inner-run-3": description="Thread: child-3" - - op="outer-submit-4": description="Thread: main" - - op="inner-run-4": description="Thread: child-4"\ -""" - ) diff --git a/tests/integrations/threading/test_threading.py b/tests/integrations/threading/test_threading.py index 4577c846d8..0fbe072fe1 100644 --- a/tests/integrations/threading/test_threading.py +++ b/tests/integrations/threading/test_threading.py @@ -276,3 +276,44 @@ def do_some_work(number): - op="outer-submit-4": description="Thread: main"\ """ ) + + +def test_spans_from_threadpool(sentry_init, capture_events, render_span_tree): + sentry_init( + traces_sample_rate=1.0, + integrations=[ThreadingIntegration()], + ) + events = capture_events() + + def do_some_work(number): + with sentry_sdk.start_span( + op=f"inner-run-{number}", name=f"Thread: child-{number}" + ): + pass + + with sentry_sdk.start_transaction(op="outer-trx"): + with futures.ThreadPoolExecutor(max_workers=1) as executor: + for number in range(5): + with sentry_sdk.start_span( + op=f"outer-submit-{number}", name="Thread: main" + ): + future = executor.submit(do_some_work, number) + future.result() + + (event,) = events + + assert render_span_tree(event) == dedent( + """\ + - op="outer-trx": description=null + - op="outer-submit-0": description="Thread: main" + - op="inner-run-0": description="Thread: child-0" + - op="outer-submit-1": description="Thread: main" + - op="inner-run-1": description="Thread: child-1" + - op="outer-submit-2": description="Thread: main" + - op="inner-run-2": description="Thread: child-2" + - op="outer-submit-3": description="Thread: main" + - op="inner-run-3": description="Thread: child-3" + - op="outer-submit-4": description="Thread: main" + - op="inner-run-4": description="Thread: child-4"\ +""" + ) diff --git a/tests/test_basics.py b/tests/test_basics.py index 7314bcabe4..45303c9a59 100644 --- a/tests/test_basics.py +++ b/tests/test_basics.py @@ -868,7 +868,6 @@ def foo(event, hint): (["atexit"], "sentry.python"), (["boto3"], "sentry.python"), (["celery"], "sentry.python"), - (["concurrent"], "sentry.python"), (["dedupe"], "sentry.python"), (["excepthook"], "sentry.python"), (["unraisablehook"], "sentry.python"), From 7423d8a966109073ddad7a2e11877475bbe97ef8 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Thu, 25 Sep 2025 15:54:09 +0200 Subject: [PATCH 07/12] mypy --- sentry_sdk/integrations/threading.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/sentry_sdk/integrations/threading.py b/sentry_sdk/integrations/threading.py index 66617dabe5..c853c0473b 100644 --- a/sentry_sdk/integrations/threading.py +++ b/sentry_sdk/integrations/threading.py @@ -150,15 +150,13 @@ def sentry_submit(self, fn, *args, **kwargs): if integration is None: return func(self, fn, *args, **kwargs) - if integration.propagate_scope: - isolation_scope = sentry_sdk.get_isolation_scope().fork() - current_scope = sentry_sdk.get_current_scope().fork() - else: - isolation_scope = None - current_scope = None - def wrapped_fn(*args, **kwargs): # type: (*Any, **Any) -> Any + if not integration.propagate_scope: + return fn(*args, **kwargs) + + isolation_scope = sentry_sdk.get_isolation_scope().fork() + current_scope = sentry_sdk.get_current_scope().fork() with use_isolation_scope(isolation_scope): with use_scope(current_scope): return fn(*args, **kwargs) From 6894efc3efb16ac70421d2bedac58275922a4f2a Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Thu, 25 Sep 2025 15:59:25 +0200 Subject: [PATCH 08/12] use correct scopes --- sentry_sdk/integrations/threading.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/sentry_sdk/integrations/threading.py b/sentry_sdk/integrations/threading.py index c853c0473b..e9c4b7b779 100644 --- a/sentry_sdk/integrations/threading.py +++ b/sentry_sdk/integrations/threading.py @@ -150,16 +150,19 @@ def sentry_submit(self, fn, *args, **kwargs): if integration is None: return func(self, fn, *args, **kwargs) - def wrapped_fn(*args, **kwargs): - # type: (*Any, **Any) -> Any - if not integration.propagate_scope: - return fn(*args, **kwargs) - + if integration.propagate_scope: isolation_scope = sentry_sdk.get_isolation_scope().fork() current_scope = sentry_sdk.get_current_scope().fork() - with use_isolation_scope(isolation_scope): - with use_scope(current_scope): - return fn(*args, **kwargs) + else: + isolation_scope = None + current_scope = None + + def wrapped_fn(*args, **kwargs): + # type: (*Any, **Any) -> Any + if isolation_scope is not None and current_scope is not None: + with use_isolation_scope(isolation_scope): + with use_scope(current_scope): + return fn(*args, **kwargs) return func(self, wrapped_fn, *args, **kwargs) From 31128923a60d41ef3d22e0e6248d20bf2f59e7b1 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Thu, 25 Sep 2025 16:09:47 +0200 Subject: [PATCH 09/12] always run original function --- sentry_sdk/integrations/threading.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sentry_sdk/integrations/threading.py b/sentry_sdk/integrations/threading.py index e9c4b7b779..3359384fb8 100644 --- a/sentry_sdk/integrations/threading.py +++ b/sentry_sdk/integrations/threading.py @@ -164,6 +164,8 @@ def wrapped_fn(*args, **kwargs): with use_scope(current_scope): return fn(*args, **kwargs) + return fn(*args, **kwargs) + return func(self, wrapped_fn, *args, **kwargs) return sentry_submit From f51b4f0881b85f6c9f29382f469b2ea306485340 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Thu, 25 Sep 2025 16:10:46 +0200 Subject: [PATCH 10/12] also add new test --- .../integrations/threading/test_threading.py | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/tests/integrations/threading/test_threading.py b/tests/integrations/threading/test_threading.py index 0fbe072fe1..2472787e7c 100644 --- a/tests/integrations/threading/test_threading.py +++ b/tests/integrations/threading/test_threading.py @@ -278,10 +278,17 @@ def do_some_work(number): ) -def test_spans_from_threadpool(sentry_init, capture_events, render_span_tree): +@pytest.mark.parametrize( + "propagate_scope", + (True, False), + ids=["propagate_scope=True", "propagate_scope=False"], +) +def test_spans_from_threadpool( + sentry_init, capture_events, render_span_tree, propagate_scope +): sentry_init( traces_sample_rate=1.0, - integrations=[ThreadingIntegration()], + integrations=[ThreadingIntegration(propagate_scope=propagate_scope)], ) events = capture_events() @@ -302,8 +309,9 @@ def do_some_work(number): (event,) = events - assert render_span_tree(event) == dedent( - """\ + if propagate_scope: + assert render_span_tree(event) == dedent( + """\ - op="outer-trx": description=null - op="outer-submit-0": description="Thread: main" - op="inner-run-0": description="Thread: child-0" @@ -316,4 +324,16 @@ def do_some_work(number): - op="outer-submit-4": description="Thread: main" - op="inner-run-4": description="Thread: child-4"\ """ - ) + ) + + elif not propagate_scope: + assert render_span_tree(event) == dedent( + """\ + - op="outer-trx": description=null + - op="outer-submit-0": description="Thread: main" + - op="outer-submit-1": description="Thread: main" + - op="outer-submit-2": description="Thread: main" + - op="outer-submit-3": description="Thread: main" + - op="outer-submit-4": description="Thread: main"\ +""" + ) From 6ebc29a9b1a9531d8badec19f3c6372714305b37 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 7 Oct 2025 14:58:31 +0200 Subject: [PATCH 11/12] use same version gating as Thread.start --- sentry_sdk/integrations/threading.py | 31 +++++++++++++++++----------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/sentry_sdk/integrations/threading.py b/sentry_sdk/integrations/threading.py index 3359384fb8..4723d05a42 100644 --- a/sentry_sdk/integrations/threading.py +++ b/sentry_sdk/integrations/threading.py @@ -61,6 +61,15 @@ def setup_once(): django_version = None channels_version = None + is_async_emulated_with_threads = ( + sys.version_info < (3, 9) + and channels_version is not None + and channels_version < "4.0.0" + and django_version is not None + and django_version >= (3, 0) + and django_version < (4, 0) + ) + @wraps(old_start) def sentry_start(self, *a, **kw): # type: (Thread, *Any, **Any) -> Any @@ -69,14 +78,7 @@ def sentry_start(self, *a, **kw): return old_start(self, *a, **kw) if integration.propagate_scope: - if ( - sys.version_info < (3, 9) - and channels_version is not None - and channels_version < "4.0.0" - and django_version is not None - and django_version >= (3, 0) - and django_version < (4, 0) - ): + if is_async_emulated_with_threads: warnings.warn( "There is a known issue with Django channels 2.x and 3.x when using Python 3.8 or older. " "(Async support is emulated using threads and some Sentry data may be leaked between those threads.) " @@ -111,7 +113,9 @@ def sentry_start(self, *a, **kw): return old_start(self, *a, **kw) Thread.start = sentry_start # type: ignore - ThreadPoolExecutor.submit = _wrap_threadpool_executor_submit(ThreadPoolExecutor.submit) # type: ignore + ThreadPoolExecutor.submit = _wrap_threadpool_executor_submit( + ThreadPoolExecutor.submit, is_async_emulated_with_threads + ) # type: ignore def _wrap_run(isolation_scope_to_use, current_scope_to_use, old_run_func): @@ -137,8 +141,8 @@ def _run_old_run_func(): return run # type: ignore -def _wrap_threadpool_executor_submit(func): - # type: (Callable[..., Future[T]]) -> Callable[..., Future[T]] +def _wrap_threadpool_executor_submit(func, is_async_emulated_with_threads): + # type: (Callable[..., Future[T]], bool) -> Callable[..., Future[T]] """ Wrap submit call to propagate scopes on task submission. """ @@ -150,7 +154,10 @@ def sentry_submit(self, fn, *args, **kwargs): if integration is None: return func(self, fn, *args, **kwargs) - if integration.propagate_scope: + if integration.propagate_scope and is_async_emulated_with_threads: + isolation_scope = sentry_sdk.get_isolation_scope() + current_scope = sentry_sdk.get_current_scope() + elif integration.propagate_scope: isolation_scope = sentry_sdk.get_isolation_scope().fork() current_scope = sentry_sdk.get_current_scope().fork() else: From 1c5cc93a03e1eaf781f0212588222ec539933a03 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Tue, 7 Oct 2025 15:01:49 +0200 Subject: [PATCH 12/12] move type:ignore --- sentry_sdk/integrations/threading.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/threading.py b/sentry_sdk/integrations/threading.py index 4723d05a42..cfe54c829c 100644 --- a/sentry_sdk/integrations/threading.py +++ b/sentry_sdk/integrations/threading.py @@ -113,9 +113,9 @@ def sentry_start(self, *a, **kw): return old_start(self, *a, **kw) Thread.start = sentry_start # type: ignore - ThreadPoolExecutor.submit = _wrap_threadpool_executor_submit( + ThreadPoolExecutor.submit = _wrap_threadpool_executor_submit( # type: ignore ThreadPoolExecutor.submit, is_async_emulated_with_threads - ) # type: ignore + ) def _wrap_run(isolation_scope_to_use, current_scope_to_use, old_run_func):