Skip to content

Commit 55273b7

Browse files
srothhsl0thentr0py
andauthored
Add experimental async transport (#4572)
Includes: * this PR: new shared base class for common sync/async logic * #4580 * #4591 * #4614 * #4615 * #4700 Fixes GH-4568 --------- Co-authored-by: Neel Shah <[email protected]>
1 parent d723eca commit 55273b7

File tree

15 files changed

+1508
-111
lines changed

15 files changed

+1508
-111
lines changed

requirements-testing.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ asttokens
1111
responses
1212
pysocks
1313
socksio
14-
httpcore[http2]
14+
httpcore[http2,asyncio]
1515
setuptools
1616
freezegun
1717
Brotli

scripts/populate_tox/config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@
9696
"pytest-asyncio",
9797
"python-multipart",
9898
"requests",
99-
"anyio<4",
99+
"anyio>=3,<5",
100100
],
101101
# There's an incompatibility between FastAPI's TestClient, which is
102102
# actually Starlette's TestClient, which is actually httpx's Client.
@@ -106,6 +106,7 @@
106106
# FastAPI versions we use older httpx which still supports the
107107
# deprecated argument.
108108
"<0.110.1": ["httpx<0.28.0"],
109+
"<0.80": ["anyio<4"],
109110
"py3.6": ["aiocontextvars"],
110111
},
111112
},

scripts/populate_tox/tox.jinja

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ deps =
207207
httpx-v0.25: pytest-httpx==0.25.0
208208
httpx: pytest-httpx
209209
# anyio is a dep of httpx
210-
httpx: anyio<4.0.0
210+
httpx: anyio>=3,<5
211211
httpx-v0.16: httpx~=0.16.0
212212
httpx-v0.18: httpx~=0.18.0
213213
httpx-v0.20: httpx~=0.20.0

sentry_sdk/api.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,14 @@ def flush(
229229
return get_client().flush(timeout=timeout, callback=callback)
230230

231231

232+
@clientmethod
233+
async def flush_async(
234+
timeout: Optional[float] = None,
235+
callback: Optional[Callable[[int, float], None]] = None,
236+
) -> None:
237+
return await get_client().flush_async(timeout=timeout, callback=callback)
238+
239+
232240
def start_span(**kwargs: Any) -> Span:
233241
"""
234242
Start and return a span.

sentry_sdk/client.py

Lines changed: 90 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
)
2626
from sentry_sdk.serializer import serialize
2727
from sentry_sdk.tracing import trace
28-
from sentry_sdk.transport import BaseHttpTransport, make_transport
28+
from sentry_sdk.transport import HttpTransportCore, make_transport, AsyncHttpTransport
2929
from sentry_sdk.consts import (
3030
SPANDATA,
3131
DEFAULT_MAX_VALUE_LENGTH,
@@ -214,6 +214,12 @@ def close(self, *args: Any, **kwargs: Any) -> None:
214214
def flush(self, *args: Any, **kwargs: Any) -> None:
215215
return None
216216

217+
async def close_async(self, *args: Any, **kwargs: Any) -> None:
218+
return None
219+
220+
async def flush_async(self, *args: Any, **kwargs: Any) -> None:
221+
return None
222+
217223
def __enter__(self) -> BaseClient:
218224
return self
219225

@@ -406,7 +412,7 @@ def _capture_envelope(envelope: Envelope) -> None:
406412
self.monitor
407413
or self.log_batcher
408414
or has_profiling_enabled(self.options)
409-
or isinstance(self.transport, BaseHttpTransport)
415+
or isinstance(self.transport, HttpTransportCore)
410416
):
411417
# If we have anything on that could spawn a background thread, we
412418
# need to check if it's safe to use them.
@@ -918,6 +924,14 @@ def get_integration(
918924

919925
return self.integrations.get(integration_name)
920926

927+
def _close_components(self) -> None:
928+
"""Kill all client components in the correct order."""
929+
self.session_flusher.kill()
930+
if self.log_batcher is not None:
931+
self.log_batcher.kill()
932+
if self.monitor:
933+
self.monitor.kill()
934+
921935
def close(
922936
self,
923937
timeout: Optional[float] = None,
@@ -928,19 +942,43 @@ def close(
928942
semantics as :py:meth:`Client.flush`.
929943
"""
930944
if self.transport is not None:
945+
if isinstance(self.transport, AsyncHttpTransport) and hasattr(
946+
self.transport, "loop"
947+
):
948+
logger.debug(
949+
"close() used with AsyncHttpTransport, aborting. Please use close_async() instead."
950+
)
951+
return
931952
self.flush(timeout=timeout, callback=callback)
932-
933-
self.session_flusher.kill()
934-
935-
if self.log_batcher is not None:
936-
self.log_batcher.kill()
937-
938-
if self.monitor:
939-
self.monitor.kill()
940-
953+
self._close_components()
941954
self.transport.kill()
942955
self.transport = None
943956

957+
async def close_async(
958+
self,
959+
timeout: Optional[float] = None,
960+
callback: Optional[Callable[[int, float], None]] = None,
961+
) -> None:
962+
"""
963+
Asynchronously close the client and shut down the transport. Arguments have the same
964+
semantics as :py:meth:`Client.flush_async`.
965+
"""
966+
if self.transport is not None:
967+
if not (
968+
isinstance(self.transport, AsyncHttpTransport)
969+
and hasattr(self.transport, "loop")
970+
):
971+
logger.debug(
972+
"close_async() used with non-async transport, aborting. Please use close() instead."
973+
)
974+
return
975+
await self.flush_async(timeout=timeout, callback=callback)
976+
self._close_components()
977+
kill_task = self.transport.kill() # type: ignore
978+
if kill_task is not None:
979+
await kill_task
980+
self.transport = None
981+
944982
def flush(
945983
self,
946984
timeout: Optional[float] = None,
@@ -954,15 +992,52 @@ def flush(
954992
:param callback: Is invoked with the number of pending events and the configured timeout.
955993
"""
956994
if self.transport is not None:
995+
if isinstance(self.transport, AsyncHttpTransport) and hasattr(
996+
self.transport, "loop"
997+
):
998+
logger.debug(
999+
"flush() used with AsyncHttpTransport, aborting. Please use flush_async() instead."
1000+
)
1001+
return
9571002
if timeout is None:
9581003
timeout = self.options["shutdown_timeout"]
959-
self.session_flusher.flush()
960-
961-
if self.log_batcher is not None:
962-
self.log_batcher.flush()
1004+
self._flush_components()
9631005

9641006
self.transport.flush(timeout=timeout, callback=callback)
9651007

1008+
async def flush_async(
1009+
self,
1010+
timeout: Optional[float] = None,
1011+
callback: Optional[Callable[[int, float], None]] = None,
1012+
) -> None:
1013+
"""
1014+
Asynchronously wait for the current events to be sent.
1015+
1016+
:param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used.
1017+
1018+
:param callback: Is invoked with the number of pending events and the configured timeout.
1019+
"""
1020+
if self.transport is not None:
1021+
if not (
1022+
isinstance(self.transport, AsyncHttpTransport)
1023+
and hasattr(self.transport, "loop")
1024+
):
1025+
logger.debug(
1026+
"flush_async() used with non-async transport, aborting. Please use flush() instead."
1027+
)
1028+
return
1029+
if timeout is None:
1030+
timeout = self.options["shutdown_timeout"]
1031+
self._flush_components()
1032+
flush_task = self.transport.flush(timeout=timeout, callback=callback) # type: ignore
1033+
if flush_task is not None:
1034+
await flush_task
1035+
1036+
def _flush_components(self) -> None:
1037+
self.session_flusher.flush()
1038+
if self.log_batcher is not None:
1039+
self.log_batcher.flush()
1040+
9661041
def __enter__(self) -> _Client:
9671042
return self
9681043

sentry_sdk/consts.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class CompressionAlgo(Enum):
7878
"transport_compression_algo": Optional[CompressionAlgo],
7979
"transport_num_pools": Optional[int],
8080
"transport_http2": Optional[bool],
81+
"transport_async": Optional[bool],
8182
},
8283
total=False,
8384
)

sentry_sdk/integrations/asyncio.py

Lines changed: 85 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@
44
import sentry_sdk
55
from sentry_sdk.consts import OP
66
from sentry_sdk.integrations import Integration, DidNotEnable
7-
from sentry_sdk.utils import event_from_exception, logger, reraise
7+
from sentry_sdk.utils import (
8+
event_from_exception,
9+
logger,
10+
reraise,
11+
is_internal_task,
12+
)
13+
from sentry_sdk.transport import AsyncHttpTransport
814

915
try:
1016
import asyncio
@@ -29,6 +35,72 @@ def get_name(coro: Any) -> str:
2935
)
3036

3137

38+
def patch_loop_close() -> None:
39+
"""Patch loop.close to flush pending events before shutdown."""
40+
# Atexit shutdown hook happens after the event loop is closed.
41+
# Therefore, it is necessary to patch the loop.close method to ensure
42+
# that pending events are flushed before the interpreter shuts down.
43+
try:
44+
loop = asyncio.get_running_loop()
45+
except RuntimeError:
46+
# No running loop → cannot patch now
47+
return
48+
49+
if getattr(loop, "_sentry_flush_patched", False):
50+
return
51+
52+
async def _flush() -> None:
53+
client = sentry_sdk.get_client()
54+
if not client:
55+
return
56+
57+
try:
58+
if not isinstance(client.transport, AsyncHttpTransport):
59+
return
60+
61+
await client.close_async()
62+
except Exception:
63+
logger.warning("Sentry flush failed during loop shutdown", exc_info=True)
64+
65+
orig_close = loop.close
66+
67+
def _patched_close() -> None:
68+
try:
69+
loop.run_until_complete(_flush())
70+
finally:
71+
orig_close()
72+
73+
loop.close = _patched_close # type: ignore
74+
loop._sentry_flush_patched = True # type: ignore
75+
76+
77+
def _create_task_with_factory(
78+
orig_task_factory: Any,
79+
loop: asyncio.AbstractEventLoop,
80+
coro: Coroutine[Any, Any, Any],
81+
**kwargs: Any,
82+
) -> asyncio.Task[Any]:
83+
task = None
84+
85+
# Trying to use user set task factory (if there is one)
86+
if orig_task_factory:
87+
task = orig_task_factory(loop, coro, **kwargs)
88+
89+
if task is None:
90+
# The default task factory in `asyncio` does not have its own function
91+
# but is just a couple of lines in `asyncio.base_events.create_task()`
92+
# Those lines are copied here.
93+
94+
# WARNING:
95+
# If the default behavior of the task creation in asyncio changes,
96+
# this will break!
97+
task = Task(coro, loop=loop, **kwargs)
98+
if task._source_traceback: # type: ignore
99+
del task._source_traceback[-1] # type: ignore
100+
101+
return task
102+
103+
32104
def patch_asyncio() -> None:
33105
orig_task_factory = None
34106
try:
@@ -41,6 +113,14 @@ def _sentry_task_factory(
41113
**kwargs: Any,
42114
) -> asyncio.Future[Any]:
43115

116+
# Check if this is an internal Sentry task
117+
is_internal = is_internal_task()
118+
119+
if is_internal:
120+
return _create_task_with_factory(
121+
orig_task_factory, loop, coro, **kwargs
122+
)
123+
44124
async def _task_with_sentry_span_creation() -> Any:
45125
result = None
46126

@@ -58,25 +138,9 @@ async def _task_with_sentry_span_creation() -> Any:
58138

59139
return result
60140

61-
task = None
62-
63-
# Trying to use user set task factory (if there is one)
64-
if orig_task_factory:
65-
task = orig_task_factory(
66-
loop, _task_with_sentry_span_creation(), **kwargs
67-
)
68-
69-
if task is None:
70-
# The default task factory in `asyncio` does not have its own function
71-
# but is just a couple of lines in `asyncio.base_events.create_task()`
72-
# Those lines are copied here.
73-
74-
# WARNING:
75-
# If the default behavior of the task creation in asyncio changes,
76-
# this will break!
77-
task = Task(_task_with_sentry_span_creation(), loop=loop, **kwargs)
78-
if task._source_traceback: # type: ignore
79-
del task._source_traceback[-1] # type: ignore
141+
task = _create_task_with_factory(
142+
orig_task_factory, loop, _task_with_sentry_span_creation(), **kwargs
143+
)
80144

81145
# Set the task name to include the original coroutine's name
82146
try:
@@ -124,3 +188,4 @@ class AsyncioIntegration(Integration):
124188
@staticmethod
125189
def setup_once() -> None:
126190
patch_asyncio()
191+
patch_loop_close()

0 commit comments

Comments
 (0)