Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.8.5"
version = "0.8.6"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down
21 changes: 18 additions & 3 deletions apps/hermes/server/src/api/rest/v2/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ use {
pyth_sdk::PriceIdentifier,
serde::Deserialize,
serde_qs::axum::QsQuery,
std::convert::Infallible,
tokio::sync::broadcast,
std::{convert::Infallible, time::Duration},
tokio::{sync::broadcast, time::Instant},
tokio_stream::{wrappers::BroadcastStream, StreamExt as _},
utoipa::IntoParams,
};

// Constants
const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours

#[derive(Debug, Deserialize, IntoParams)]
#[into_params(parameter_in = Query)]
pub struct StreamPriceUpdatesQueryParams {
Expand Down Expand Up @@ -75,6 +78,9 @@ fn default_true() -> bool {
params(StreamPriceUpdatesQueryParams)
)]
/// SSE route handler for streaming price updates.
///
/// The connection will automatically close after 24 hours to prevent resource leaks.
/// Clients should implement reconnection logic to maintain continuous price updates.
pub async fn price_stream_sse_handler<S>(
State(state): State<ApiState<S>>,
QsQuery(params): QsQuery<StreamPriceUpdatesQueryParams>,
Expand All @@ -93,7 +99,11 @@ where
// Convert the broadcast receiver into a Stream
let stream = BroadcastStream::new(update_rx);

// Set connection start time
let start_time = Instant::now();

let sse_stream = stream
.take_while(move |_| start_time.elapsed() < MAX_CONNECTION_DURATION)
.then(move |message| {
let state_clone = state.clone(); // Clone again to use inside the async block
let price_ids_clone = price_ids.clone(); // Clone again for use inside the async block
Expand Down Expand Up @@ -122,7 +132,12 @@ where
}
}
})
.filter_map(|x| x);
.filter_map(|x| x)
.chain(futures::stream::once(async {
Ok(Event::default()
.event("error")
.data("Connection timeout reached (24h)"))
}));

Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
}
Expand Down
28 changes: 27 additions & 1 deletion apps/hermes/server/src/api/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,15 @@ use {
},
time::Duration,
},
tokio::sync::{broadcast::Receiver, watch},
tokio::{
sync::{broadcast::Receiver, watch},
time::Instant,
},
};

const PING_INTERVAL_DURATION: Duration = Duration::from_secs(30);
const MAX_CLIENT_MESSAGE_SIZE: usize = 100 * 1024; // 100 KiB
const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours

/// The maximum number of bytes that can be sent per second per IP address.
/// If the limit is exceeded, the connection is closed.
Expand Down Expand Up @@ -252,6 +256,7 @@ pub struct Subscriber<S> {
sender: SplitSink<WebSocket, Message>,
price_feeds_with_config: HashMap<PriceIdentifier, PriceFeedClientConfig>,
ping_interval: tokio::time::Interval,
connection_deadline: Instant,
exit: watch::Receiver<bool>,
responded_to_ping: bool,
}
Expand Down Expand Up @@ -280,6 +285,7 @@ where
sender,
price_feeds_with_config: HashMap::new(),
ping_interval: tokio::time::interval(PING_INTERVAL_DURATION),
connection_deadline: Instant::now() + MAX_CONNECTION_DURATION,
exit: crate::EXIT.subscribe(),
responded_to_ping: true, // We start with true so we don't close the connection immediately
}
Expand Down Expand Up @@ -325,6 +331,26 @@ where
self.sender.send(Message::Ping(vec![])).await?;
Ok(())
},
_ = tokio::time::sleep_until(self.connection_deadline) => {
tracing::info!(
id = self.id,
ip = ?self.ip_addr,
"Connection timeout reached (24h). Closing connection.",
);
self.sender
.send(
serde_json::to_string(&ServerMessage::Response(
ServerResponseMessage::Err {
error: "Connection timeout reached (24h)".to_string(),
},
))?
.into(),
)
.await?;
self.sender.close().await?;
self.closed = true;
Ok(())
},
_ = self.exit.changed() => {
self.sender.close().await?;
self.closed = true;
Expand Down
Loading