Skip to content

Commit 371ca96

Browse files
srothhsl0thentr0py
andauthored
fix(asyncio integration): Filter SDK internal tasks from span creation (#4700)
Filter SDK internal tasks from span creation. Implements a new contextmanager for tasks spawned internally by the SDK, which filters it in the asyncio integration. GH-4699 --------- Co-authored-by: Neel Shah <[email protected]>
1 parent 9a2c80c commit 371ca96

File tree

5 files changed

+143
-30
lines changed

5 files changed

+143
-30
lines changed

sentry_sdk/integrations/asyncio.py

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44
import sentry_sdk
55
from sentry_sdk.consts import OP
66
from sentry_sdk.integrations import Integration, DidNotEnable
7-
from sentry_sdk.utils import event_from_exception, logger, reraise
7+
from sentry_sdk.utils import (
8+
event_from_exception,
9+
logger,
10+
reraise,
11+
is_internal_task,
12+
)
813
from sentry_sdk.transport import AsyncHttpTransport
914

1015
try:
@@ -69,6 +74,33 @@ def _patched_close() -> None:
6974
loop._sentry_flush_patched = True # type: ignore
7075

7176

77+
def _create_task_with_factory(
78+
orig_task_factory: Any,
79+
loop: asyncio.AbstractEventLoop,
80+
coro: Coroutine[Any, Any, Any],
81+
**kwargs: Any,
82+
) -> asyncio.Task[Any]:
83+
task = None
84+
85+
# Trying to use user set task factory (if there is one)
86+
if orig_task_factory:
87+
task = orig_task_factory(loop, coro, **kwargs)
88+
89+
if task is None:
90+
# The default task factory in `asyncio` does not have its own function
91+
# but is just a couple of lines in `asyncio.base_events.create_task()`
92+
# Those lines are copied here.
93+
94+
# WARNING:
95+
# If the default behavior of the task creation in asyncio changes,
96+
# this will break!
97+
task = Task(coro, loop=loop, **kwargs)
98+
if task._source_traceback: # type: ignore
99+
del task._source_traceback[-1] # type: ignore
100+
101+
return task
102+
103+
72104
def patch_asyncio() -> None:
73105
orig_task_factory = None
74106
try:
@@ -81,6 +113,14 @@ def _sentry_task_factory(
81113
**kwargs: Any,
82114
) -> asyncio.Future[Any]:
83115

116+
# Check if this is an internal Sentry task
117+
is_internal = is_internal_task()
118+
119+
if is_internal:
120+
return _create_task_with_factory(
121+
orig_task_factory, loop, coro, **kwargs
122+
)
123+
84124
async def _task_with_sentry_span_creation() -> Any:
85125
result = None
86126

@@ -98,25 +138,9 @@ async def _task_with_sentry_span_creation() -> Any:
98138

99139
return result
100140

101-
task = None
102-
103-
# Trying to use user set task factory (if there is one)
104-
if orig_task_factory:
105-
task = orig_task_factory(
106-
loop, _task_with_sentry_span_creation(), **kwargs
107-
)
108-
109-
if task is None:
110-
# The default task factory in `asyncio` does not have its own function
111-
# but is just a couple of lines in `asyncio.base_events.create_task()`
112-
# Those lines are copied here.
113-
114-
# WARNING:
115-
# If the default behavior of the task creation in asyncio changes,
116-
# this will break!
117-
task = Task(_task_with_sentry_span_creation(), loop=loop, **kwargs)
118-
if task._source_traceback: # type: ignore
119-
del task._source_traceback[-1] # type: ignore
141+
task = _create_task_with_factory(
142+
orig_task_factory, loop, _task_with_sentry_span_creation(), **kwargs
143+
)
120144

121145
# Set the task name to include the original coroutine's name
122146
try:

sentry_sdk/transport.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@
3939
import certifi
4040

4141
from sentry_sdk.consts import EndpointType
42-
from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions
42+
from sentry_sdk.utils import (
43+
Dsn,
44+
logger,
45+
capture_internal_exceptions,
46+
mark_sentry_task_internal,
47+
)
4348
from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker
4449
from sentry_sdk.envelope import Envelope, Item, PayloadRef
4550

@@ -901,7 +906,8 @@ def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore
901906
self._worker.kill()
902907
try:
903908
# Return the pool cleanup task so caller can await it if needed
904-
return self.loop.create_task(self._pool.aclose()) # type: ignore
909+
with mark_sentry_task_internal():
910+
return self.loop.create_task(self._pool.aclose()) # type: ignore
905911
except RuntimeError:
906912
logger.warning("Event loop not running, aborting kill.")
907913
return None

sentry_sdk/utils.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22
import base64
3+
import contextvars
34
import json
45
import linecache
56
import logging
@@ -12,6 +13,7 @@
1213
import threading
1314
import time
1415
from collections import namedtuple
16+
from contextlib import contextmanager
1517
from datetime import datetime, timezone
1618
from decimal import Decimal
1719
from functools import partial, partialmethod, wraps
@@ -72,6 +74,25 @@
7274

7375
_installed_modules = None
7476

77+
_is_sentry_internal_task = contextvars.ContextVar(
78+
"is_sentry_internal_task", default=False
79+
)
80+
81+
82+
def is_internal_task():
83+
return _is_sentry_internal_task.get()
84+
85+
86+
@contextmanager
87+
def mark_sentry_task_internal():
88+
"""Context manager to mark a task as Sentry internal."""
89+
token = _is_sentry_internal_task.set(True)
90+
try:
91+
yield
92+
finally:
93+
_is_sentry_internal_task.reset(token)
94+
95+
7596
BASE64_ALPHABET = re.compile(r"^[a-zA-Z0-9/+=]*$")
7697

7798
FALSY_ENV_VALUES = frozenset(("false", "f", "n", "no", "off", "0"))

sentry_sdk/worker.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from time import sleep, time
88
from sentry_sdk._queue import Queue, FullError
9-
from sentry_sdk.utils import logger
9+
from sentry_sdk.utils import logger, mark_sentry_task_internal
1010
from sentry_sdk.consts import DEFAULT_QUEUE_SIZE
1111

1212
from typing import TYPE_CHECKING
@@ -231,7 +231,8 @@ def start(self) -> None:
231231
self._loop = asyncio.get_running_loop()
232232
if self._queue is None:
233233
self._queue = asyncio.Queue(maxsize=self._queue_size)
234-
self._task = self._loop.create_task(self._target())
234+
with mark_sentry_task_internal():
235+
self._task = self._loop.create_task(self._target())
235236
self._task_for_pid = os.getpid()
236237
except RuntimeError:
237238
# There is no event loop running
@@ -273,7 +274,8 @@ async def _wait_flush(self, timeout: float, callback: Optional[Any] = None) -> N
273274

274275
def flush(self, timeout: float, callback: Optional[Any] = None) -> Optional[asyncio.Task[None]]: # type: ignore[override]
275276
if self.is_alive and timeout > 0.0 and self._loop and self._loop.is_running():
276-
return self._loop.create_task(self._wait_flush(timeout, callback))
277+
with mark_sentry_task_internal():
278+
return self._loop.create_task(self._wait_flush(timeout, callback))
277279
return None
278280

279281
def submit(self, callback: Callable[[], Any]) -> bool:
@@ -295,7 +297,8 @@ async def _target(self) -> None:
295297
self._queue.task_done()
296298
break
297299
# Firing tasks instead of awaiting them allows for concurrent requests
298-
task = asyncio.create_task(self._process_callback(callback))
300+
with mark_sentry_task_internal():
301+
task = asyncio.create_task(self._process_callback(callback))
299302
# Create a strong reference to the task so it can be cancelled on kill
300303
# and does not get garbage collected while running
301304
self._active_tasks.add(task)

tests/integrations/asyncio/test_asyncio.py

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import sentry_sdk
99
from sentry_sdk.consts import OP
1010
from sentry_sdk.integrations.asyncio import AsyncioIntegration, patch_asyncio
11+
from sentry_sdk.utils import mark_sentry_task_internal
12+
1113

1214
try:
1315
from contextvars import Context, ContextVar
@@ -379,6 +381,55 @@ async def test_span_origin(
379381
assert event["spans"][0]["origin"] == "auto.function.asyncio"
380382

381383

384+
@minimum_python_38
385+
@pytest.mark.asyncio(loop_scope="module")
386+
async def test_internal_tasks_not_wrapped(sentry_init, capture_events):
387+
388+
sentry_init(integrations=[AsyncioIntegration()], traces_sample_rate=1.0)
389+
events = capture_events()
390+
391+
# Create a user task that should be wrapped
392+
async def user_task():
393+
await asyncio.sleep(0.01)
394+
return "user_result"
395+
396+
# Create an internal task that should NOT be wrapped
397+
async def internal_task():
398+
await asyncio.sleep(0.01)
399+
return "internal_result"
400+
401+
with sentry_sdk.start_transaction(name="test_transaction"):
402+
user_task_obj = asyncio.create_task(user_task())
403+
404+
with mark_sentry_task_internal():
405+
internal_task_obj = asyncio.create_task(internal_task())
406+
407+
user_result = await user_task_obj
408+
internal_result = await internal_task_obj
409+
410+
assert user_result == "user_result"
411+
assert internal_result == "internal_result"
412+
413+
assert len(events) == 1
414+
transaction = events[0]
415+
416+
user_spans = []
417+
internal_spans = []
418+
419+
for span in transaction.get("spans", []):
420+
if "user_task" in span.get("description", ""):
421+
user_spans.append(span)
422+
elif "internal_task" in span.get("description", ""):
423+
internal_spans.append(span)
424+
425+
assert (
426+
len(user_spans) > 0
427+
), f"User task should have been traced. All spans: {[s.get('description') for s in transaction.get('spans', [])]}"
428+
assert (
429+
len(internal_spans) == 0
430+
), f"Internal task should NOT have been traced. All spans: {[s.get('description') for s in transaction.get('spans', [])]}"
431+
432+
382433
@minimum_python_38
383434
def test_loop_close_patching(sentry_init):
384435
sentry_init(integrations=[AsyncioIntegration()])
@@ -405,6 +456,12 @@ def test_loop_close_flushes_async_transport(sentry_init):
405456

406457
sentry_init(integrations=[AsyncioIntegration()])
407458

459+
# Save the current event loop to restore it later
460+
try:
461+
original_loop = asyncio.get_event_loop()
462+
except RuntimeError:
463+
original_loop = None
464+
408465
loop = asyncio.new_event_loop()
409466
asyncio.set_event_loop(loop)
410467

@@ -415,14 +472,16 @@ def test_loop_close_flushes_async_transport(sentry_init):
415472
mock_client = Mock()
416473
mock_transport = Mock(spec=AsyncHttpTransport)
417474
mock_client.transport = mock_transport
418-
mock_client.close = AsyncMock(return_value=None)
475+
mock_client.close_async = AsyncMock(return_value=None)
419476

420477
with patch("sentry_sdk.get_client", return_value=mock_client):
421478
loop.close()
422479

423-
mock_client.close.assert_called_once()
424-
mock_client.close.assert_awaited_once()
480+
mock_client.close_async.assert_called_once()
481+
mock_client.close_async.assert_awaited_once()
425482

426-
except Exception:
483+
finally:
427484
if not loop.is_closed():
428485
loop.close()
486+
if original_loop:
487+
asyncio.set_event_loop(original_loop)

0 commit comments

Comments
 (0)