Skip to content

Commit fb251d0

Browse files
authored
Add support for synchronous Transport sniffing
1 parent 3a65e8b commit fb251d0

File tree

11 files changed

+457
-54
lines changed

11 files changed

+457
-54
lines changed

elastic_transport/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
NodeConfig,
4949
QueryParams,
5050
RequestOptions,
51+
SniffOptions,
5152
)
5253
from ._node import AiohttpHttpNode, BaseNode, RequestsHttpNode, Urllib3HttpNode
5354
from ._node_pool import NodePool, NodeSelector, RandomSelector, RoundRobinSelector
@@ -95,6 +96,7 @@
9596
"SerializationError",
9697
"Serializer",
9798
"ServiceUnavailableError",
99+
"SniffOptions",
98100
"TextSerializer",
99101
"TlsError",
100102
"TooManyRequestsError",

elastic_transport/_async_transport.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -168,22 +168,23 @@ async def perform_request(
168168

169169
except TransportError as e:
170170
retry = False
171+
node_failure = e.status not in NOT_DEAD_NODE_HTTP_STATUSES
171172
if isinstance(e, ConnectionTimeout):
172173
retry = self.retry_on_timeout
174+
node_failure = True
173175
elif isinstance(e, ConnectionError):
174176
retry = True
177+
node_failure = True
175178
elif e.status in self.retry_on_status:
176179
retry = True
177180

181+
# If the error was determined to be a node failure
182+
# we mark it dead in the node pool to allow for
183+
# other nodes to be retried.
184+
if node_failure:
185+
self.node_pool.mark_dead(node)
186+
178187
if retry:
179-
try:
180-
# only mark as dead if we are retrying
181-
if e.status not in NOT_DEAD_NODE_HTTP_STATUSES:
182-
await self.mark_dead(node)
183-
except TransportError:
184-
# If sniffing on failure, it could fail too. Catch the
185-
# exception not to interrupt the retries.
186-
pass
187188
# raise exception on last retry
188189
if attempt == self.max_retries:
189190
e.errors = tuple(errors)
@@ -199,10 +200,6 @@ async def perform_request(
199200
self.node_pool.mark_live(node)
200201
return response, data
201202

202-
async def mark_dead(self, node: BaseNode) -> None:
203-
"""Marks a node as dead and optionally starts sniffing for additional nodes if enabled"""
204-
self.node_pool.mark_dead(node)
205-
206203
async def close(self) -> None:
207204
"""
208205
Explicitly closes all nodes in the transport's pool

elastic_transport/_compat.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
# under the License.
1717

1818
import asyncio
19+
import inspect
1920
import sys
21+
from pathlib import Path
2022
from urllib.parse import quote as _quote
2123
from urllib.parse import urlencode, urlparse
2224

@@ -67,6 +69,50 @@ def __enter__(self) -> None:
6769
def __exit__(self, *_) -> None:
6870
pass
6971

72+
def acquire(self, blocking: bool = True) -> bool:
73+
return True
74+
75+
def release(self) -> None:
76+
pass
77+
78+
79+
def warn_stacklevel() -> int:
80+
"""Dynamically determine warning stacklevel for warnings based on the call stack"""
81+
try:
82+
# Grab the root module from the current module '__name__'
83+
module_name = __name__.partition(".")[0]
84+
module_path = Path(sys.modules[module_name].__file__)
85+
86+
# If the module is a folder we're looking at
87+
# subdirectories, otherwise we're looking for
88+
# an exact match.
89+
module_is_folder = module_path.name == "__init__.py"
90+
if module_is_folder:
91+
module_path = module_path.parent
92+
93+
# Look through frames until we find a file that
94+
# isn't a part of our module, then return that stacklevel.
95+
for level, frame in enumerate(inspect.stack()):
96+
# Garbage collecting frames
97+
frame_filename = Path(frame.filename)
98+
del frame
99+
100+
if (
101+
# If the module is a folder we look at subdirectory
102+
module_is_folder
103+
and module_path not in frame_filename.parents
104+
) or (
105+
# Otherwise we're looking for an exact match.
106+
not module_is_folder
107+
and module_path != frame_filename
108+
):
109+
return level
110+
except KeyError:
111+
pass
112+
except Exception:
113+
return 2
114+
return 0
115+
70116

71117
__all__ = [
72118
"get_running_loop",

elastic_transport/_models.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -210,24 +210,24 @@ def _normalize_key(self, key: str) -> str:
210210
class ApiResponseMeta:
211211
"""Metadata that is returned from Transport.perform_request()"""
212212

213-
#: Node which handled the request
214-
node: "NodeConfig"
215-
216-
#: Number of seconds from start of request to start of response
217-
duration: float
213+
#: HTTP status code
214+
status: int
218215

219216
#: HTTP version being used
220217
http_version: str
221218

222-
#: HTTP status code
223-
status: int
224-
225219
#: HTTP headers
226220
headers: HttpHeaders
227221

222+
#: Number of seconds from start of request to start of response
223+
duration: float
224+
225+
#: Node which handled the request
226+
node: "NodeConfig"
227+
228228
#: Extras that can be set to anything, typically used by third-parties.
229229
#: Third-party keys should start with an underscore and prefix.
230-
_extras: Dict[str, Any] = field(default_factory=dict)
230+
_extras: Dict[str, Any] = field(default_factory=dict, repr=False)
231231

232232
@property
233233
def mimetype(self) -> Optional[str]:
@@ -388,6 +388,14 @@ def __hash__(self) -> int:
388388
)
389389

390390

391+
@dataclass()
392+
class SniffOptions:
393+
"""Options which are passed to Transport.sniff_callback"""
394+
395+
is_initial_sniff: bool
396+
sniff_timeout: Optional[float]
397+
398+
391399
@dataclass(frozen=True, repr=True)
392400
class RequestOptions:
393401
"""Options which can be passed per request to the Transport"""

elastic_transport/_node/_http_aiohttp.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import warnings
2525
from typing import Tuple
2626

27-
from .._compat import get_running_loop
27+
from .._compat import get_running_loop, warn_stacklevel
2828
from .._exceptions import ConnectionError, ConnectionTimeout, SecurityWarning, TlsError
2929
from .._models import ApiResponseMeta, HttpHeaders, NodeConfig
3030
from ..client_utils import DEFAULT, client_meta_version, normalize_headers
@@ -77,7 +77,7 @@ def __init__(self, config: NodeConfig):
7777
if config.ssl_show_warn:
7878
warnings.warn(
7979
f"Connecting to {self.base_url!r} using TLS with verify_certs=False is insecure",
80-
stacklevel=2,
80+
stacklevel=warn_stacklevel(),
8181
category=SecurityWarning,
8282
)
8383

elastic_transport/_node/_http_requests.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import urllib3
2525

26+
from .._compat import warn_stacklevel
2627
from .._exceptions import ConnectionError, ConnectionTimeout, SecurityWarning, TlsError
2728
from .._models import ApiResponseMeta, HttpHeaders, NodeConfig
2829
from ..client_utils import DEFAULT, client_meta_version
@@ -97,7 +98,7 @@ def __init__(self, config: NodeConfig):
9798
):
9899
warnings.warn(
99100
f"Connecting to {self.base_url!r} using TLS with verify_certs=False is insecure",
100-
stacklevel=2,
101+
stacklevel=warn_stacklevel(),
101102
category=SecurityWarning,
102103
)
103104

elastic_transport/_node/_http_urllib3.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from urllib3.exceptions import ConnectTimeoutError, ReadTimeoutError
2626
from urllib3.util.retry import Retry
2727

28+
from .._compat import warn_stacklevel
2829
from .._exceptions import ConnectionError, ConnectionTimeout, SecurityWarning, TlsError
2930
from .._models import ApiResponseMeta, HttpHeaders, NodeConfig
3031
from ..client_utils import DEFAULT, client_meta_version
@@ -87,7 +88,7 @@ def __init__(self, config: NodeConfig):
8788
if config.ssl_show_warn:
8889
warnings.warn(
8990
f"Connecting to {self.base_url!r} using TLS with verify_certs=False is insecure",
90-
stacklevel=2,
91+
stacklevel=warn_stacklevel(),
9192
category=SecurityWarning,
9293
)
9394
else:

elastic_transport/_node_pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ def add(self, node_config: NodeConfig) -> None:
289289
# Start the node as dead because 'dead_nodes' is thread-safe.
290290
# The node will be resurrected on the next call to .get()
291291
self.dead_consecutive_failures[node.config] = 0
292-
self.dead_nodes.put((0.0, node))
292+
self.dead_nodes.put((time.time(), node))
293293

294294
def remove(self, node_config: NodeConfig) -> None:
295295
# Can't mark a seed node as removed.

0 commit comments

Comments
 (0)