From 5f61ea680d86549220d6fce48738838b8b63f648 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Rivi=C3=A8re?= Date: Sat, 11 Oct 2025 18:23:21 +0200 Subject: [PATCH 1/2] fix: `is_reconnecting` case --- nats/src/nats/aio/client.py | 188 ++++++++++++++++++++++++++---------- 1 file changed, 136 insertions(+), 52 deletions(-) diff --git a/nats/src/nats/aio/client.py b/nats/src/nats/aio/client.py index 85bad94ce..690d9bf9d 100644 --- a/nats/src/nats/aio/client.py +++ b/nats/src/nats/aio/client.py @@ -174,7 +174,9 @@ def parse_version(self) -> None: self._prerelease_version = matches["prerelease"] or "" self._build_version = matches["buildmetadata"] or "" if self._build_version: - self._dev_version = "+".join([self._prerelease_version, self._build_version]) + self._dev_version = "+".join( + [self._prerelease_version, self._build_version] + ) else: self._dev_version = self._prerelease_version @@ -516,7 +518,11 @@ async def subscribe_handler(msg): if user or password or token or server_auth_configured: self._auth_configured = True - if self._user_credentials is not None or self._nkeys_seed is not None or self._nkeys_seed_str is not None: + if ( + self._user_credentials is not None + or self._nkeys_seed is not None + or self._nkeys_seed_str is not None + ): self._auth_configured = True self._setup_nkeys_connect() @@ -536,7 +542,9 @@ async def subscribe_handler(msg): try: await self._select_next_server() await self._process_connect_init() - assert self._current_server, "the current server must be set by _select_next_server" + assert ( + self._current_server + ), "the current server must be set by _select_next_server" self._current_server.reconnects = 0 break except errors.NoServersError as e: @@ -600,7 +608,11 @@ def sig_cb(nonce: str) -> bytes: return sig self._signature_cb = sig_cb - elif isinstance(creds, str) or isinstance(creds, UserString) or isinstance(creds, Path): + elif ( + isinstance(creds, str) + or isinstance(creds, UserString) + or isinstance(creds, Path) + ): # Define the functions to be able to sign things using nkeys. def user_cb() -> bytearray: return self._read_creds_user_jwt(creds) @@ -667,7 +679,9 @@ def get_user_jwt(f): return get_user_jwt(f) def _setup_nkeys_seed_connect(self) -> None: - assert self._nkeys_seed or self._nkeys_seed_str, "Client.connect must be called first" + assert ( + self._nkeys_seed or self._nkeys_seed_str + ), "Client.connect must be called first" import nkeys @@ -722,7 +736,10 @@ async def _close(self, status: int, do_cbs: bool = True) -> None: if self._reading_task is not None and not self._reading_task.cancelled(): self._reading_task.cancel() - if self._ping_interval_task is not None and not self._ping_interval_task.cancelled(): + if ( + self._ping_interval_task is not None + and not self._ping_interval_task.cancelled() + ): self._ping_interval_task.cancel() if self._flusher_task is not None and not self._flusher_task.cancelled(): @@ -733,7 +750,10 @@ async def _close(self, status: int, do_cbs: bool = True) -> None: # Wait for the reconnection task to be done which should be soon. try: - if self._reconnection_task_future is not None and not self._reconnection_task_future.cancelled(): + if ( + self._reconnection_task_future is not None + and not self._reconnection_task_future.cancelled() + ): await asyncio.wait_for( self._reconnection_task_future, self.options["reconnect_time_wait"], @@ -887,7 +907,10 @@ async def main(): payload_size = len(payload) if not self.is_connected: - if self._max_pending_size <= 0 or payload_size + self._pending_data_size > self._max_pending_size: + if ( + self._max_pending_size <= 0 + or payload_size + self._pending_data_size > self._max_pending_size + ): # Cannot publish during a reconnection when the buffering is disabled, # or if pending buffer is already full. raise errors.OutboundBufferLimitError @@ -1048,9 +1071,7 @@ async def request( if old_style: # FIXME: Support headers in old style requests. try: - return await self._request_old_style( - subject, payload, timeout=timeout - ) + return await self._request_old_style(subject, payload, timeout=timeout) except (errors.TimeoutError, asyncio.TimeoutError): await self._check_connection_health() raise errors.TimeoutError @@ -1059,7 +1080,9 @@ async def request( msg = await self._request_new_style( subject, payload, timeout=timeout, headers=headers ) - status = msg.headers.get(nats.js.api.Header.STATUS) if msg.headers else None + status = ( + msg.headers.get(nats.js.api.Header.STATUS) if msg.headers else None + ) if status == NO_RESPONDERS_STATUS: raise errors.NoRespondersError return msg @@ -1102,9 +1125,7 @@ def cleanup_resp_map(f): try: # Publish the request - await self.publish( - subject, payload, reply=inbox.decode(), headers=headers - ) + await self.publish(subject, payload, reply=inbox.decode(), headers=headers) if not self.is_connected: future.cancel() @@ -1140,7 +1161,9 @@ def new_inbox(self) -> str: next_inbox.extend(self._nuid.next()) return next_inbox.decode() - async def _request_old_style(self, subject: str, payload: bytes, timeout: float = 1) -> Msg: + async def _request_old_style( + self, subject: str, payload: bytes, timeout: float = 1 + ) -> Msg: """ Implements the request/response pattern via pub/sub using an ephemeral subscription which will be published @@ -1256,7 +1279,9 @@ def is_connecting(self) -> bool: @property def is_draining(self) -> bool: - return self._status == Client.DRAINING_SUBS or self._status == Client.DRAINING_PUBS + return ( + self._status == Client.DRAINING_SUBS or self._status == Client.DRAINING_PUBS + ) @property def is_draining_pubs(self) -> bool: @@ -1289,7 +1314,10 @@ async def _send_command(self, cmd: bytes, priority: bool = False) -> None: else: self._pending.append(cmd) self._pending_data_size += len(cmd) - if self._max_pending_size > 0 and self._pending_data_size > self._max_pending_size: + if ( + self._max_pending_size > 0 + and self._pending_data_size > self._max_pending_size + ): # Only flush force timeout on publish await self._flush_pending(force_flush=True) @@ -1355,10 +1383,16 @@ def _setup_server_pool(self, connect_url: Union[List[str]]) -> None: raise errors.Error("nats: invalid connect url option") # make sure protocols aren't mixed if not ( - all(server.uri.scheme in ("nats", "tls") for server in self._server_pool) - or all(server.uri.scheme in ("ws", "wss") for server in self._server_pool) + all( + server.uri.scheme in ("nats", "tls") for server in self._server_pool + ) + or all( + server.uri.scheme in ("ws", "wss") for server in self._server_pool + ) ): - raise errors.Error("nats: mixing of websocket and non websocket URLs is not allowed") + raise errors.Error( + "nats: mixing of websocket and non websocket URLs is not allowed" + ) else: raise errors.Error("nats: invalid connect url option") @@ -1383,7 +1417,10 @@ async def _select_next_server(self) -> None: # Not yet exceeded max_reconnect_attempts so can still use # this server in the future. self._server_pool.append(s) - if s.last_attempt is not None and now < s.last_attempt + self.options["reconnect_time_wait"]: + if ( + s.last_attempt is not None + and now < s.last_attempt + self.options["reconnect_time_wait"] + ): # Backoff connecting to server if we attempted recently. await asyncio.sleep(self.options["reconnect_time_wait"]) try: @@ -1456,24 +1493,33 @@ async def _check_connection_health(self) -> bool: bool: True if connection is healthy or was successfully reconnected, False otherwise """ if not self.is_connected: - if self.options[ - "allow_reconnect" - ] and not self.is_reconnecting and not self.is_closed: - self._status = Client.RECONNECTING - self._ps.reset() - - try: - if self._reconnection_task is not None and not self._reconnection_task.cancelled( - ): - self._reconnection_task.cancel() - - loop = asyncio.get_running_loop() - self._reconnection_task = loop.create_task(self._attempt_reconnect()) - + if self.options["allow_reconnect"]: + if self.is_reconnecting: await asyncio.sleep(self.options["reconnect_time_wait"]) return self.is_connected - except Exception: - return False + + if not self.is_closed: + self._status = Client.RECONNECTING + self._ps.reset() + + try: + if ( + self._reconnection_task is not None + and not self._reconnection_task.cancelled() + ): + self._reconnection_task.cancel() + + loop = asyncio.get_running_loop() + self._reconnection_task = loop.create_task( + self._attempt_reconnect() + ) + + await asyncio.sleep(self.options["reconnect_time_wait"]) + return self.is_connected + except Exception: + return False + + return False return False return True @@ -1491,11 +1537,16 @@ async def _process_op_err(self, e: Exception) -> None: self._status = Client.RECONNECTING self._ps.reset() - if self._reconnection_task is not None and not self._reconnection_task.cancelled(): + if ( + self._reconnection_task is not None + and not self._reconnection_task.cancelled() + ): # Cancel the previous task in case it may still be running. self._reconnection_task.cancel() - self._reconnection_task = asyncio.get_running_loop().create_task(self._attempt_reconnect()) + self._reconnection_task = asyncio.get_running_loop().create_task( + self._attempt_reconnect() + ) else: self._process_disconnect() self._err = e @@ -1506,7 +1557,10 @@ async def _attempt_reconnect(self) -> None: if self._reading_task is not None and not self._reading_task.cancelled(): self._reading_task.cancel() - if self._ping_interval_task is not None and not self._ping_interval_task.cancelled(): + if ( + self._ping_interval_task is not None + and not self._ping_interval_task.cancelled() + ): self._ping_interval_task.cancel() if self._flusher_task is not None and not self._flusher_task.cancelled(): @@ -1598,7 +1652,10 @@ async def _attempt_reconnect(self) -> None: except asyncio.CancelledError: break - if self._reconnection_task_future is not None and not self._reconnection_task_future.cancelled(): + if ( + self._reconnection_task_future is not None + and not self._reconnection_task_future.cancelled() + ): self._reconnection_task_future.set_result(True) def _connect_command(self) -> bytes: @@ -1632,7 +1689,10 @@ def _connect_command(self) -> bytes: options["nkey"] = self._public_nkey # In case there is no password, then consider handle # sending a token instead. - elif self.options["user"] is not None and self.options["password"] is not None: + elif ( + self.options["user"] is not None + and self.options["password"] is not None + ): options["user"] = self.options["user"] options["pass"] = self.options["password"] elif self.options["token"] is not None: @@ -1738,7 +1798,10 @@ async def _process_headers(self, headers) -> Optional[Dict[str, str]]: if parse_email: parsed_hdr = parse_email(raw_headers).headers else: - parsed_hdr = {k.strip(): v.strip() for k, v in self._hdr_parser.parsebytes(raw_headers).items()} + parsed_hdr = { + k.strip(): v.strip() + for k, v in self._hdr_parser.parsebytes(raw_headers).items() + } if hdr: hdr.update(parsed_hdr) else: @@ -1844,19 +1907,28 @@ async def _process_msg( try: sub._pending_size += payload_size # allow setting pending_bytes_limit to 0 to disable - if sub._pending_bytes_limit > 0 and sub._pending_size >= sub._pending_bytes_limit: + if ( + sub._pending_bytes_limit > 0 + and sub._pending_size >= sub._pending_bytes_limit + ): # Subtract the bytes since the message will be thrown away # so it would not be pending data. sub._pending_size -= payload_size await self._error_cb( - errors.SlowConsumerError(subject=msg.subject, reply=msg.reply, sid=sid, sub=sub) + errors.SlowConsumerError( + subject=msg.subject, reply=msg.reply, sid=sid, sub=sub + ) ) return sub._pending_queue.put_nowait(msg) except asyncio.QueueFull: sub._pending_size -= len(msg.data) - await self._error_cb(errors.SlowConsumerError(subject=msg.subject, reply=msg.reply, sid=sid, sub=sub)) + await self._error_cb( + errors.SlowConsumerError( + subject=msg.subject, reply=msg.reply, sid=sid, sub=sub + ) + ) # Store the ACK metadata from the message to # compare later on with the received heartbeat. @@ -1906,7 +1978,9 @@ def _process_disconnect(self) -> None: """ self._status = Client.DISCONNECTED - async def _process_info(self, info: Dict[str, Any], initial_connection: bool = False) -> None: + async def _process_info( + self, info: Dict[str, Any], initial_connection: bool = False + ) -> None: """ Process INFO lines sent by the server to reconfigure client with latest updates from cluster to enable server discovery. @@ -1947,7 +2021,11 @@ async def _process_info(self, info: Dict[str, Any], initial_connection: bool = F for srv in connect_urls: self._server_pool.append(srv) - if not initial_connection and connect_urls and self._discovered_server_cb: + if ( + not initial_connection + and connect_urls + and self._discovered_server_cb + ): await self._discovered_server_cb() def _host_is_ip(self, connect_url: Optional[str]) -> bool: @@ -1988,10 +2066,14 @@ async def _process_connect_init(self) -> None: ) connection_completed = self._transport.readline() - info_line = await asyncio.wait_for(connection_completed, self.options["connect_timeout"]) + info_line = await asyncio.wait_for( + connection_completed, self.options["connect_timeout"] + ) if INFO_OP not in info_line: # FIXME: Handle PING/PONG arriving first as well. - raise errors.Error("nats: empty response from server when expecting INFO message") + raise errors.Error( + "nats: empty response from server when expecting INFO message" + ) _, info = info_line.split(INFO_OP + _SPC_, 1) @@ -2078,7 +2160,9 @@ async def _process_connect_init(self) -> None: self._reading_task = asyncio.get_running_loop().create_task(self._read_loop()) self._pongs = [] self._pings_outstanding = 0 - self._ping_interval_task = asyncio.get_running_loop().create_task(self._ping_interval()) + self._ping_interval_task = asyncio.get_running_loop().create_task( + self._ping_interval() + ) # Task for kicking the flusher queue self._flusher_task = asyncio.get_running_loop().create_task(self._flusher()) From fcff317ba62a7d0a646e3150ee5f9819be23b1a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Laurent=20Rivi=C3=A8re?= Date: Sat, 11 Oct 2025 18:24:38 +0200 Subject: [PATCH 2/2] fix: revert style --- nats/src/nats/aio/client.py | 163 ++++++++++-------------------------- 1 file changed, 43 insertions(+), 120 deletions(-) diff --git a/nats/src/nats/aio/client.py b/nats/src/nats/aio/client.py index 690d9bf9d..3b281fbed 100644 --- a/nats/src/nats/aio/client.py +++ b/nats/src/nats/aio/client.py @@ -174,9 +174,7 @@ def parse_version(self) -> None: self._prerelease_version = matches["prerelease"] or "" self._build_version = matches["buildmetadata"] or "" if self._build_version: - self._dev_version = "+".join( - [self._prerelease_version, self._build_version] - ) + self._dev_version = "+".join([self._prerelease_version, self._build_version]) else: self._dev_version = self._prerelease_version @@ -518,11 +516,7 @@ async def subscribe_handler(msg): if user or password or token or server_auth_configured: self._auth_configured = True - if ( - self._user_credentials is not None - or self._nkeys_seed is not None - or self._nkeys_seed_str is not None - ): + if self._user_credentials is not None or self._nkeys_seed is not None or self._nkeys_seed_str is not None: self._auth_configured = True self._setup_nkeys_connect() @@ -542,9 +536,7 @@ async def subscribe_handler(msg): try: await self._select_next_server() await self._process_connect_init() - assert ( - self._current_server - ), "the current server must be set by _select_next_server" + assert self._current_server, "the current server must be set by _select_next_server" self._current_server.reconnects = 0 break except errors.NoServersError as e: @@ -608,11 +600,7 @@ def sig_cb(nonce: str) -> bytes: return sig self._signature_cb = sig_cb - elif ( - isinstance(creds, str) - or isinstance(creds, UserString) - or isinstance(creds, Path) - ): + elif isinstance(creds, str) or isinstance(creds, UserString) or isinstance(creds, Path): # Define the functions to be able to sign things using nkeys. def user_cb() -> bytearray: return self._read_creds_user_jwt(creds) @@ -679,9 +667,7 @@ def get_user_jwt(f): return get_user_jwt(f) def _setup_nkeys_seed_connect(self) -> None: - assert ( - self._nkeys_seed or self._nkeys_seed_str - ), "Client.connect must be called first" + assert self._nkeys_seed or self._nkeys_seed_str, "Client.connect must be called first" import nkeys @@ -736,10 +722,7 @@ async def _close(self, status: int, do_cbs: bool = True) -> None: if self._reading_task is not None and not self._reading_task.cancelled(): self._reading_task.cancel() - if ( - self._ping_interval_task is not None - and not self._ping_interval_task.cancelled() - ): + if self._ping_interval_task is not None and not self._ping_interval_task.cancelled(): self._ping_interval_task.cancel() if self._flusher_task is not None and not self._flusher_task.cancelled(): @@ -750,10 +733,7 @@ async def _close(self, status: int, do_cbs: bool = True) -> None: # Wait for the reconnection task to be done which should be soon. try: - if ( - self._reconnection_task_future is not None - and not self._reconnection_task_future.cancelled() - ): + if self._reconnection_task_future is not None and not self._reconnection_task_future.cancelled(): await asyncio.wait_for( self._reconnection_task_future, self.options["reconnect_time_wait"], @@ -907,10 +887,7 @@ async def main(): payload_size = len(payload) if not self.is_connected: - if ( - self._max_pending_size <= 0 - or payload_size + self._pending_data_size > self._max_pending_size - ): + if self._max_pending_size <= 0 or payload_size + self._pending_data_size > self._max_pending_size: # Cannot publish during a reconnection when the buffering is disabled, # or if pending buffer is already full. raise errors.OutboundBufferLimitError @@ -1071,7 +1048,9 @@ async def request( if old_style: # FIXME: Support headers in old style requests. try: - return await self._request_old_style(subject, payload, timeout=timeout) + return await self._request_old_style( + subject, payload, timeout=timeout + ) except (errors.TimeoutError, asyncio.TimeoutError): await self._check_connection_health() raise errors.TimeoutError @@ -1080,9 +1059,7 @@ async def request( msg = await self._request_new_style( subject, payload, timeout=timeout, headers=headers ) - status = ( - msg.headers.get(nats.js.api.Header.STATUS) if msg.headers else None - ) + status = msg.headers.get(nats.js.api.Header.STATUS) if msg.headers else None if status == NO_RESPONDERS_STATUS: raise errors.NoRespondersError return msg @@ -1125,7 +1102,9 @@ def cleanup_resp_map(f): try: # Publish the request - await self.publish(subject, payload, reply=inbox.decode(), headers=headers) + await self.publish( + subject, payload, reply=inbox.decode(), headers=headers + ) if not self.is_connected: future.cancel() @@ -1161,9 +1140,7 @@ def new_inbox(self) -> str: next_inbox.extend(self._nuid.next()) return next_inbox.decode() - async def _request_old_style( - self, subject: str, payload: bytes, timeout: float = 1 - ) -> Msg: + async def _request_old_style(self, subject: str, payload: bytes, timeout: float = 1) -> Msg: """ Implements the request/response pattern via pub/sub using an ephemeral subscription which will be published @@ -1279,9 +1256,7 @@ def is_connecting(self) -> bool: @property def is_draining(self) -> bool: - return ( - self._status == Client.DRAINING_SUBS or self._status == Client.DRAINING_PUBS - ) + return self._status == Client.DRAINING_SUBS or self._status == Client.DRAINING_PUBS @property def is_draining_pubs(self) -> bool: @@ -1314,10 +1289,7 @@ async def _send_command(self, cmd: bytes, priority: bool = False) -> None: else: self._pending.append(cmd) self._pending_data_size += len(cmd) - if ( - self._max_pending_size > 0 - and self._pending_data_size > self._max_pending_size - ): + if self._max_pending_size > 0 and self._pending_data_size > self._max_pending_size: # Only flush force timeout on publish await self._flush_pending(force_flush=True) @@ -1383,16 +1355,10 @@ def _setup_server_pool(self, connect_url: Union[List[str]]) -> None: raise errors.Error("nats: invalid connect url option") # make sure protocols aren't mixed if not ( - all( - server.uri.scheme in ("nats", "tls") for server in self._server_pool - ) - or all( - server.uri.scheme in ("ws", "wss") for server in self._server_pool - ) + all(server.uri.scheme in ("nats", "tls") for server in self._server_pool) + or all(server.uri.scheme in ("ws", "wss") for server in self._server_pool) ): - raise errors.Error( - "nats: mixing of websocket and non websocket URLs is not allowed" - ) + raise errors.Error("nats: mixing of websocket and non websocket URLs is not allowed") else: raise errors.Error("nats: invalid connect url option") @@ -1417,10 +1383,7 @@ async def _select_next_server(self) -> None: # Not yet exceeded max_reconnect_attempts so can still use # this server in the future. self._server_pool.append(s) - if ( - s.last_attempt is not None - and now < s.last_attempt + self.options["reconnect_time_wait"] - ): + if s.last_attempt is not None and now < s.last_attempt + self.options["reconnect_time_wait"]: # Backoff connecting to server if we attempted recently. await asyncio.sleep(self.options["reconnect_time_wait"]) try: @@ -1493,32 +1456,30 @@ async def _check_connection_health(self) -> bool: bool: True if connection is healthy or was successfully reconnected, False otherwise """ if not self.is_connected: - if self.options["allow_reconnect"]: + if self.options[ + "allow_reconnect" + ]: if self.is_reconnecting: await asyncio.sleep(self.options["reconnect_time_wait"]) return self.is_connected - + if not self.is_closed: self._status = Client.RECONNECTING self._ps.reset() try: - if ( - self._reconnection_task is not None - and not self._reconnection_task.cancelled() + if self._reconnection_task is not None and not self._reconnection_task.cancelled( ): self._reconnection_task.cancel() loop = asyncio.get_running_loop() - self._reconnection_task = loop.create_task( - self._attempt_reconnect() - ) + self._reconnection_task = loop.create_task(self._attempt_reconnect()) await asyncio.sleep(self.options["reconnect_time_wait"]) return self.is_connected except Exception: return False - + return False return False return True @@ -1537,16 +1498,11 @@ async def _process_op_err(self, e: Exception) -> None: self._status = Client.RECONNECTING self._ps.reset() - if ( - self._reconnection_task is not None - and not self._reconnection_task.cancelled() - ): + if self._reconnection_task is not None and not self._reconnection_task.cancelled(): # Cancel the previous task in case it may still be running. self._reconnection_task.cancel() - self._reconnection_task = asyncio.get_running_loop().create_task( - self._attempt_reconnect() - ) + self._reconnection_task = asyncio.get_running_loop().create_task(self._attempt_reconnect()) else: self._process_disconnect() self._err = e @@ -1557,10 +1513,7 @@ async def _attempt_reconnect(self) -> None: if self._reading_task is not None and not self._reading_task.cancelled(): self._reading_task.cancel() - if ( - self._ping_interval_task is not None - and not self._ping_interval_task.cancelled() - ): + if self._ping_interval_task is not None and not self._ping_interval_task.cancelled(): self._ping_interval_task.cancel() if self._flusher_task is not None and not self._flusher_task.cancelled(): @@ -1652,10 +1605,7 @@ async def _attempt_reconnect(self) -> None: except asyncio.CancelledError: break - if ( - self._reconnection_task_future is not None - and not self._reconnection_task_future.cancelled() - ): + if self._reconnection_task_future is not None and not self._reconnection_task_future.cancelled(): self._reconnection_task_future.set_result(True) def _connect_command(self) -> bytes: @@ -1689,10 +1639,7 @@ def _connect_command(self) -> bytes: options["nkey"] = self._public_nkey # In case there is no password, then consider handle # sending a token instead. - elif ( - self.options["user"] is not None - and self.options["password"] is not None - ): + elif self.options["user"] is not None and self.options["password"] is not None: options["user"] = self.options["user"] options["pass"] = self.options["password"] elif self.options["token"] is not None: @@ -1798,10 +1745,7 @@ async def _process_headers(self, headers) -> Optional[Dict[str, str]]: if parse_email: parsed_hdr = parse_email(raw_headers).headers else: - parsed_hdr = { - k.strip(): v.strip() - for k, v in self._hdr_parser.parsebytes(raw_headers).items() - } + parsed_hdr = {k.strip(): v.strip() for k, v in self._hdr_parser.parsebytes(raw_headers).items()} if hdr: hdr.update(parsed_hdr) else: @@ -1907,28 +1851,19 @@ async def _process_msg( try: sub._pending_size += payload_size # allow setting pending_bytes_limit to 0 to disable - if ( - sub._pending_bytes_limit > 0 - and sub._pending_size >= sub._pending_bytes_limit - ): + if sub._pending_bytes_limit > 0 and sub._pending_size >= sub._pending_bytes_limit: # Subtract the bytes since the message will be thrown away # so it would not be pending data. sub._pending_size -= payload_size await self._error_cb( - errors.SlowConsumerError( - subject=msg.subject, reply=msg.reply, sid=sid, sub=sub - ) + errors.SlowConsumerError(subject=msg.subject, reply=msg.reply, sid=sid, sub=sub) ) return sub._pending_queue.put_nowait(msg) except asyncio.QueueFull: sub._pending_size -= len(msg.data) - await self._error_cb( - errors.SlowConsumerError( - subject=msg.subject, reply=msg.reply, sid=sid, sub=sub - ) - ) + await self._error_cb(errors.SlowConsumerError(subject=msg.subject, reply=msg.reply, sid=sid, sub=sub)) # Store the ACK metadata from the message to # compare later on with the received heartbeat. @@ -1978,9 +1913,7 @@ def _process_disconnect(self) -> None: """ self._status = Client.DISCONNECTED - async def _process_info( - self, info: Dict[str, Any], initial_connection: bool = False - ) -> None: + async def _process_info(self, info: Dict[str, Any], initial_connection: bool = False) -> None: """ Process INFO lines sent by the server to reconfigure client with latest updates from cluster to enable server discovery. @@ -2021,11 +1954,7 @@ async def _process_info( for srv in connect_urls: self._server_pool.append(srv) - if ( - not initial_connection - and connect_urls - and self._discovered_server_cb - ): + if not initial_connection and connect_urls and self._discovered_server_cb: await self._discovered_server_cb() def _host_is_ip(self, connect_url: Optional[str]) -> bool: @@ -2066,14 +1995,10 @@ async def _process_connect_init(self) -> None: ) connection_completed = self._transport.readline() - info_line = await asyncio.wait_for( - connection_completed, self.options["connect_timeout"] - ) + info_line = await asyncio.wait_for(connection_completed, self.options["connect_timeout"]) if INFO_OP not in info_line: # FIXME: Handle PING/PONG arriving first as well. - raise errors.Error( - "nats: empty response from server when expecting INFO message" - ) + raise errors.Error("nats: empty response from server when expecting INFO message") _, info = info_line.split(INFO_OP + _SPC_, 1) @@ -2160,9 +2085,7 @@ async def _process_connect_init(self) -> None: self._reading_task = asyncio.get_running_loop().create_task(self._read_loop()) self._pongs = [] self._pings_outstanding = 0 - self._ping_interval_task = asyncio.get_running_loop().create_task( - self._ping_interval() - ) + self._ping_interval_task = asyncio.get_running_loop().create_task(self._ping_interval()) # Task for kicking the flusher queue self._flusher_task = asyncio.get_running_loop().create_task(self._flusher())