From d37dc5dee434726fd0944667cc3403d412493955 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Thu, 24 Apr 2025 14:50:52 +0900 Subject: [PATCH 01/10] feat(ws): add connection timeout for WebSocket connections --- apps/hermes/server/src/api/ws.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/apps/hermes/server/src/api/ws.rs b/apps/hermes/server/src/api/ws.rs index 1106abdb6b..754f16f132 100644 --- a/apps/hermes/server/src/api/ws.rs +++ b/apps/hermes/server/src/api/ws.rs @@ -45,6 +45,7 @@ use { const PING_INTERVAL_DURATION: Duration = Duration::from_secs(30); const MAX_CLIENT_MESSAGE_SIZE: usize = 100 * 1024; // 100 KiB +const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours /// The maximum number of bytes that can be sent per second per IP address. /// If the limit is exceeded, the connection is closed. @@ -252,6 +253,7 @@ pub struct Subscriber { sender: SplitSink, price_feeds_with_config: HashMap, ping_interval: tokio::time::Interval, + connection_timeout: tokio::time::Sleep, exit: watch::Receiver, responded_to_ping: bool, } @@ -280,6 +282,7 @@ where sender, price_feeds_with_config: HashMap::new(), ping_interval: tokio::time::interval(PING_INTERVAL_DURATION), + connection_timeout: tokio::time::sleep(MAX_CONNECTION_DURATION), exit: crate::EXIT.subscribe(), responded_to_ping: true, // We start with true so we don't close the connection immediately } @@ -325,6 +328,26 @@ where self.sender.send(Message::Ping(vec![])).await?; Ok(()) }, + _ = &mut self.connection_timeout => { + tracing::info!( + id = self.id, + ip = ?self.ip_addr, + "Connection timeout reached (24h). Closing connection.", + ); + self.sender + .send( + serde_json::to_string(&ServerMessage::Response( + ServerResponseMessage::Err { + error: "Connection timeout reached (24h)".to_string(), + }, + ))? + .into(), + ) + .await?; + self.sender.close().await?; + self.closed = true; + return Ok(()); + }, _ = self.exit.changed() => { self.sender.close().await?; self.closed = true; From ecfacae9d0d28bde1241e05992d58ee6ca3a2db4 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Thu, 24 Apr 2025 15:12:51 +0900 Subject: [PATCH 02/10] feat(ws): replace connection timeout with connection deadline for improved WebSocket management --- apps/hermes/server/src/api/ws.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/apps/hermes/server/src/api/ws.rs b/apps/hermes/server/src/api/ws.rs index 754f16f132..1089641921 100644 --- a/apps/hermes/server/src/api/ws.rs +++ b/apps/hermes/server/src/api/ws.rs @@ -40,7 +40,10 @@ use { }, time::Duration, }, - tokio::sync::{broadcast::Receiver, watch}, + tokio::{ + sync::{broadcast::Receiver, watch}, + time::Instant, + }, }; const PING_INTERVAL_DURATION: Duration = Duration::from_secs(30); @@ -253,7 +256,7 @@ pub struct Subscriber { sender: SplitSink, price_feeds_with_config: HashMap, ping_interval: tokio::time::Interval, - connection_timeout: tokio::time::Sleep, + connection_deadline: Instant, exit: watch::Receiver, responded_to_ping: bool, } @@ -282,7 +285,7 @@ where sender, price_feeds_with_config: HashMap::new(), ping_interval: tokio::time::interval(PING_INTERVAL_DURATION), - connection_timeout: tokio::time::sleep(MAX_CONNECTION_DURATION), + connection_deadline: Instant::now() + MAX_CONNECTION_DURATION, exit: crate::EXIT.subscribe(), responded_to_ping: true, // We start with true so we don't close the connection immediately } @@ -328,7 +331,7 @@ where self.sender.send(Message::Ping(vec![])).await?; Ok(()) }, - _ = &mut self.connection_timeout => { + _ = tokio::time::sleep_until(self.connection_deadline) => { tracing::info!( id = self.id, ip = ?self.ip_addr, From 1d28b8d59f638600967bdf1fc097760377ddac2a Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Thu, 24 Apr 2025 15:32:38 +0900 Subject: [PATCH 03/10] fix(ws): simplify return statement in WebSocket close handling --- apps/hermes/server/src/api/ws.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/hermes/server/src/api/ws.rs b/apps/hermes/server/src/api/ws.rs index 1089641921..aae25630d2 100644 --- a/apps/hermes/server/src/api/ws.rs +++ b/apps/hermes/server/src/api/ws.rs @@ -349,7 +349,7 @@ where .await?; self.sender.close().await?; self.closed = true; - return Ok(()); + Ok(()) }, _ = self.exit.changed() => { self.sender.close().await?; From 0967ecfda0c6dde7e3edc2f42e7662e97b2eb25b Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Thu, 24 Apr 2025 15:44:30 +0900 Subject: [PATCH 04/10] feat(sse): implement connection timeout for SSE streams with a maximum duration --- apps/hermes/server/src/api/rest/v2/sse.rs | 25 ++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/apps/hermes/server/src/api/rest/v2/sse.rs b/apps/hermes/server/src/api/rest/v2/sse.rs index 2fb9161dc1..659000a623 100644 --- a/apps/hermes/server/src/api/rest/v2/sse.rs +++ b/apps/hermes/server/src/api/rest/v2/sse.rs @@ -19,12 +19,15 @@ use { pyth_sdk::PriceIdentifier, serde::Deserialize, serde_qs::axum::QsQuery, - std::convert::Infallible, - tokio::sync::broadcast, + std::{convert::Infallible, time::Duration}, + tokio::{sync::broadcast, time::Instant}, tokio_stream::{wrappers::BroadcastStream, StreamExt as _}, utoipa::IntoParams, }; +// Constants +const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(10); // 24 hours + #[derive(Debug, Deserialize, IntoParams)] #[into_params(parameter_in = Query)] pub struct StreamPriceUpdatesQueryParams { @@ -93,10 +96,17 @@ where // Convert the broadcast receiver into a Stream let stream = BroadcastStream::new(update_rx); + // Set connection deadline + let connection_deadline = Instant::now() + MAX_CONNECTION_DURATION; + let sse_stream = stream + .take_while(move |_| { + let now = Instant::now(); + now < connection_deadline + }) .then(move |message| { - let state_clone = state.clone(); // Clone again to use inside the async block - let price_ids_clone = price_ids.clone(); // Clone again for use inside the async block + let state_clone = state.clone(); + let price_ids_clone = price_ids.clone(); async move { match message { Ok(event) => { @@ -122,7 +132,12 @@ where } } }) - .filter_map(|x| x); + .filter_map(|x| x) + .chain(futures::stream::once(async { + Ok(Event::default() + .event("error") + .data("Connection timeout reached (24h)")) + })); Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default())) } From 59e0a42b0faa40739a4ae1412374ee54f24cc173 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Thu, 24 Apr 2025 15:46:48 +0900 Subject: [PATCH 05/10] feat(sse): update maximum connection duration for SSE streams to 24 hours --- apps/hermes/server/src/api/rest/v2/sse.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/hermes/server/src/api/rest/v2/sse.rs b/apps/hermes/server/src/api/rest/v2/sse.rs index 659000a623..b521abb621 100644 --- a/apps/hermes/server/src/api/rest/v2/sse.rs +++ b/apps/hermes/server/src/api/rest/v2/sse.rs @@ -26,7 +26,7 @@ use { }; // Constants -const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(10); // 24 hours +const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours #[derive(Debug, Deserialize, IntoParams)] #[into_params(parameter_in = Query)] From a9a9119f5ab382e691c145e76c37dcffac0cc97c Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Thu, 24 Apr 2025 15:49:22 +0900 Subject: [PATCH 06/10] refactor(sse): clone state and price_ids for use inside async block --- apps/hermes/server/src/api/rest/v2/sse.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/hermes/server/src/api/rest/v2/sse.rs b/apps/hermes/server/src/api/rest/v2/sse.rs index b521abb621..07d43fd111 100644 --- a/apps/hermes/server/src/api/rest/v2/sse.rs +++ b/apps/hermes/server/src/api/rest/v2/sse.rs @@ -105,8 +105,8 @@ where now < connection_deadline }) .then(move |message| { - let state_clone = state.clone(); - let price_ids_clone = price_ids.clone(); + let state_clone = state.clone(); // Clone again to use inside the async block + let price_ids_clone = price_ids.clone(); // Clone again for use inside the async block async move { match message { Ok(event) => { From ddf9bcd53474aba0401c89b97a0e654a30e336c0 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Thu, 24 Apr 2025 20:30:55 +0900 Subject: [PATCH 07/10] refactor(sse): replace connection deadline with start time for SSE stream duration check --- apps/hermes/server/src/api/rest/v2/sse.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/apps/hermes/server/src/api/rest/v2/sse.rs b/apps/hermes/server/src/api/rest/v2/sse.rs index 07d43fd111..bda040d23b 100644 --- a/apps/hermes/server/src/api/rest/v2/sse.rs +++ b/apps/hermes/server/src/api/rest/v2/sse.rs @@ -96,13 +96,12 @@ where // Convert the broadcast receiver into a Stream let stream = BroadcastStream::new(update_rx); - // Set connection deadline - let connection_deadline = Instant::now() + MAX_CONNECTION_DURATION; + // Set connection start time + let start_time = Instant::now(); let sse_stream = stream .take_while(move |_| { - let now = Instant::now(); - now < connection_deadline + start_time.elapsed() < MAX_CONNECTION_DURATION }) .then(move |message| { let state_clone = state.clone(); // Clone again to use inside the async block From e45ef898f8d2242ddc7148636b8e24e5221fa589 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Thu, 24 Apr 2025 20:31:44 +0900 Subject: [PATCH 08/10] chore: bump version to 0.8.6 in Cargo files --- apps/hermes/server/Cargo.lock | 2 +- apps/hermes/server/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/hermes/server/Cargo.lock b/apps/hermes/server/Cargo.lock index b853a87f6a..358f91f3f3 100644 --- a/apps/hermes/server/Cargo.lock +++ b/apps/hermes/server/Cargo.lock @@ -1868,7 +1868,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermes" -version = "0.8.5" +version = "0.8.6" dependencies = [ "anyhow", "async-trait", diff --git a/apps/hermes/server/Cargo.toml b/apps/hermes/server/Cargo.toml index 7c69e2a323..5c8f3921c9 100644 --- a/apps/hermes/server/Cargo.toml +++ b/apps/hermes/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.8.5" +version = "0.8.6" description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." edition = "2021" From 7e9c707af5c1f24aa8b1a2b3191eabc48179ce0b Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Thu, 24 Apr 2025 20:39:54 +0900 Subject: [PATCH 09/10] docs(sse): update price_stream_sse_handler documentation to clarify connection closure after 24 hours --- apps/hermes/server/src/api/rest/v2/sse.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/apps/hermes/server/src/api/rest/v2/sse.rs b/apps/hermes/server/src/api/rest/v2/sse.rs index bda040d23b..c5ee6e60cd 100644 --- a/apps/hermes/server/src/api/rest/v2/sse.rs +++ b/apps/hermes/server/src/api/rest/v2/sse.rs @@ -78,6 +78,9 @@ fn default_true() -> bool { params(StreamPriceUpdatesQueryParams) )] /// SSE route handler for streaming price updates. +/// +/// The connection will automatically close after 24 hours to prevent resource leaks. +/// Clients should implement reconnection logic to maintain continuous price updates. pub async fn price_stream_sse_handler( State(state): State>, QsQuery(params): QsQuery, From 2c26a12b350ccebd13935512683e8570b39116d3 Mon Sep 17 00:00:00 2001 From: Daniel Chew Date: Thu, 24 Apr 2025 20:44:07 +0900 Subject: [PATCH 10/10] refactor(sse): reorganize struct --- apps/hermes/server/src/api/rest/v2/sse.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/apps/hermes/server/src/api/rest/v2/sse.rs b/apps/hermes/server/src/api/rest/v2/sse.rs index c5ee6e60cd..ced0f6420d 100644 --- a/apps/hermes/server/src/api/rest/v2/sse.rs +++ b/apps/hermes/server/src/api/rest/v2/sse.rs @@ -103,9 +103,7 @@ where let start_time = Instant::now(); let sse_stream = stream - .take_while(move |_| { - start_time.elapsed() < MAX_CONNECTION_DURATION - }) + .take_while(move |_| start_time.elapsed() < MAX_CONNECTION_DURATION) .then(move |message| { let state_clone = state.clone(); // Clone again to use inside the async block let price_ids_clone = price_ids.clone(); // Clone again for use inside the async block