diff --git a/apps/quorum/Cargo.lock b/apps/quorum/Cargo.lock index 50069a4b78..f21bb3f7fb 100644 --- a/apps/quorum/Cargo.lock +++ b/apps/quorum/Cargo.lock @@ -413,6 +413,26 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-prometheus" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e1a6651f119707ec6c416f38fdb5be223707627fe6d47f912850634c106215" +dependencies = [ + "axum", + "bytes", + "futures-core", + "http 1.3.1", + "http-body 1.0.1", + "matchit", + "metrics", + "metrics-exporter-prometheus", + "pin-project", + "tokio", + "tower", + "tower-http", +] + [[package]] name = "backtrace" version = "0.3.75" @@ -1300,6 +1320,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -1544,6 +1570,9 @@ name = "hashbrown" version = "0.15.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3" +dependencies = [ + "foldhash", +] [[package]] name = "heck" @@ -2132,6 +2161,46 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25dea7ac8057892855ec285c440160265225438c3c45072613c25a4b26e98ef5" +dependencies = [ + "ahash", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd7399781913e5393588a8d8c6a2867bf85fb38eaf2502fdce465aad2dc6f034" +dependencies = [ + "base64 0.22.1", + "indexmap", + "metrics", + "metrics-util", + "quanta", + "thiserror 1.0.69", +] + +[[package]] +name = "metrics-util" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8496cc523d1f94c1385dd8f0f0c2c480b2b8aeccb5b7e4485ad6365523ae376" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.15.3", + "metrics", + "quanta", + "rand 0.9.1", + "rand_xoshiro", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.17" @@ -2513,6 +2582,26 @@ dependencies = [ "num", ] +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2671,10 +2760,11 @@ dependencies = [ [[package]] name = "quorum" -version = "0.1.3" +version = "0.2.0" dependencies = [ "anyhow", "axum", + "axum-prometheus", "borsh 1.5.7", "clap", "futures", @@ -2809,6 +2899,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rand_xoshiro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" +dependencies = [ + "rand_core 0.9.3", +] + [[package]] name = "raw-cpuid" version = "11.5.0" @@ -3443,6 +3542,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" +[[package]] +name = "sketches-ddsketch" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a" + [[package]] name = "slab" version = "0.4.9" @@ -5775,6 +5880,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower-http" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" +dependencies = [ + "bitflags 2.9.1", + "bytes", + "http 1.3.1", + "pin-project-lite", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.3" diff --git a/apps/quorum/Cargo.toml b/apps/quorum/Cargo.toml index 6a37434cff..8287d728b7 100644 --- a/apps/quorum/Cargo.toml +++ b/apps/quorum/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "quorum" -version = "0.1.3" +version = "0.2.0" edition = "2021" [dependencies] @@ -23,3 +23,4 @@ time = "0.3.41" serde_json = "1.0.140" futures = "0.3.31" serde_wormhole = "0.1.0" +axum-prometheus = "0.8.0" diff --git a/apps/quorum/src/api.rs b/apps/quorum/src/api.rs index 192bfdc493..f068252bfa 100644 --- a/apps/quorum/src/api.rs +++ b/apps/quorum/src/api.rs @@ -3,6 +3,7 @@ use axum::{ routing::{get, post}, Json, Router, }; +use axum_prometheus::{EndpointLabel, PrometheusMetricLayerBuilder}; use secp256k1::{ ecdsa::{RecoverableSignature, RecoveryId}, Message, Secp256k1, @@ -26,16 +27,24 @@ pub type Payload<'a> = &'a RawMessage; pub async fn run(listen_address: SocketAddr, state: State) -> anyhow::Result<()> { tracing::info!("Starting server..."); + let (prometheus_layer, _) = PrometheusMetricLayerBuilder::new() + .with_metrics_from_fn(|| state.metrics_recorder.clone()) + .with_endpoint_label_type(EndpointLabel::MatchedPathWithFallbackFn(|_| { + "unknown".to_string() + })) + .build_pair(); + let routes = Router::new() .route("/live", get(|| async { "OK" })) .route("/observation", post(post_observation)) .route("/ws", get(ws_route_handler)) + .layer(prometheus_layer) .with_state(state); let listener = tokio::net::TcpListener::bind(&listen_address).await?; axum::serve(listener, routes) .with_graceful_shutdown(async { - let _ = crate::server::EXIT.subscribe().changed().await; + crate::server::wait_for_exit().await; tracing::info!("Shutting down server..."); }) .await?; diff --git a/apps/quorum/src/main.rs b/apps/quorum/src/main.rs index 63ce067bf6..567a7e0c6c 100644 --- a/apps/quorum/src/main.rs +++ b/apps/quorum/src/main.rs @@ -4,6 +4,7 @@ use std::io::IsTerminal; use crate::server::RunOptions; mod api; +mod metrics_server; mod pythnet; mod server; mod ws; diff --git a/apps/quorum/src/metrics_server.rs b/apps/quorum/src/metrics_server.rs new file mode 100644 index 0000000000..2063996885 --- /dev/null +++ b/apps/quorum/src/metrics_server.rs @@ -0,0 +1,38 @@ +use axum::{routing::get, Router}; +use axum_prometheus::{ + metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}, + PrometheusMetricLayerBuilder, +}; + +use crate::server::{wait_for_exit, RunOptions, State}; + +pub const DEFAULT_METRICS_BUCKET: &[f64; 20] = &[ + 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.25, 1.5, 2.0, + 3.0, 5.0, 10.0, +]; + +pub fn setup_metrics_recorder() -> anyhow::Result { + PrometheusBuilder::new() + .set_buckets(DEFAULT_METRICS_BUCKET)? + .install_recorder() + .map_err(|err| anyhow::anyhow!("Failed to set up metrics recorder: {:?}", err)) +} + +pub async fn run(run_options: RunOptions, state: State) -> anyhow::Result<()> { + tracing::info!("Starting Metrics Server..."); + + let (_, metric_handle) = PrometheusMetricLayerBuilder::new() + .with_metrics_from_fn(|| state.metrics_recorder.clone()) + .build_pair(); + let app = Router::new(); + let app = app.route("/metrics", get(|| async move { metric_handle.render() })); + + let listener = tokio::net::TcpListener::bind(&run_options.server.metrics_addr).await?; + axum::serve(listener, app) + .with_graceful_shutdown(async { + let _ = wait_for_exit().await; + tracing::info!("Shutting down metrics server..."); + }) + .await?; + Ok(()) +} diff --git a/apps/quorum/src/server.rs b/apps/quorum/src/server.rs index 5ae593fc14..73fc82a159 100644 --- a/apps/quorum/src/server.rs +++ b/apps/quorum/src/server.rs @@ -1,18 +1,26 @@ +use axum_prometheus::metrics_exporter_prometheus::PrometheusHandle; use clap::{crate_authors, crate_description, crate_name, crate_version, Args, Parser}; use lazy_static::lazy_static; use solana_client::client_error::reqwest::Url; use solana_sdk::pubkey::Pubkey; -use std::{collections::HashMap, net::SocketAddr, ops::Deref, sync::Arc}; -use tokio::sync::{watch, RwLock}; +use std::{ + collections::HashMap, future::Future, net::SocketAddr, ops::Deref, sync::Arc, time::Duration, +}; +use tokio::{ + sync::{watch, RwLock}, + time::sleep, +}; use wormhole_sdk::{vaa::Signature, GuardianSetInfo}; use crate::{ api::{self}, + metrics_server::{self, setup_metrics_recorder}, pythnet::fetch_guardian_set, ws::WsState, }; const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:9000"; +const DEFAULT_METRICS_ADDR: &str = "127.0.0.1:9001"; #[derive(Args, Clone, Debug)] #[command(next_help_heading = "Server Options")] @@ -23,6 +31,11 @@ pub struct ServerOptions { #[arg(default_value = DEFAULT_LISTEN_ADDR)] #[arg(env = "LISTEN_ADDR")] pub listen_addr: SocketAddr, + /// Address and port the metrics will bind to. + #[arg(long = "metrics-addr")] + #[arg(default_value = DEFAULT_METRICS_ADDR)] + #[arg(env = "METRICS_ADDR")] + pub metrics_addr: SocketAddr, } // `Options` is a structup definition to provide clean command-line args for Hermes. @@ -66,7 +79,17 @@ lazy_static! { /// - The `Receiver` side of a watch channel performs the detection based on if the change /// happened after the subscribe, so it means all listeners should always be notified /// correctly. - pub static ref EXIT: watch::Sender = watch::channel(false).0; + + static ref EXIT: watch::Sender = watch::channel(false).0; +} + +pub async fn wait_for_exit() { + let mut rx = EXIT.subscribe(); + // Check if the exit flag is already set, if so, we don't need to wait. + if !(*rx.borrow()) { + // Wait until the exit flag is set. + let _ = rx.changed().await; + } } #[derive(Clone)] @@ -81,6 +104,8 @@ pub struct StateInner { pub observation_lifetime: u32, pub ws: WsState, + + pub metrics_recorder: PrometheusHandle, } impl Deref for State { type Target = Arc; @@ -93,17 +118,42 @@ impl Deref for State { const DEFAULT_OBSERVATION_LIFETIME: u32 = 10; // In seconds const WEBSOCKET_NOTIFICATION_CHANNEL_SIZE: usize = 1000; +async fn fault_tolerant_handler(name: String, f: F) +where + F: Fn() -> Fut, + Fut: Future> + Send + 'static, + Fut::Output: Send + 'static, +{ + loop { + let res = tokio::spawn(f()).await; + match res { + Ok(result) => match result { + Ok(_) => break, // This will happen on graceful shutdown + Err(err) => { + tracing::error!("{} returned error: {:?}", name, err); + sleep(Duration::from_millis(500)).await; + } + }, + Err(err) => { + tracing::error!("{} is panicked or canceled: {:?}", name, err); + EXIT.send_modify(|exit| *exit = true); + break; + } + } + } +} + pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { // Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown. tokio::spawn(async move { tracing::info!("Registered shutdown signal handler..."); tokio::signal::ctrl_c().await.unwrap(); tracing::info!("Shut down signal received, waiting for tasks..."); - let _ = EXIT.send(true); + EXIT.send_modify(|exit| *exit = true); }); let guardian_set = fetch_guardian_set( - run_options.pythnet_url, + run_options.pythnet_url.clone(), run_options.wormhole_pid, run_options.guardian_set_index, ) @@ -118,19 +168,28 @@ pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { observation_lifetime: run_options.observation_lifetime, ws: WsState::new(WEBSOCKET_NOTIFICATION_CHANNEL_SIZE), + + metrics_recorder: setup_metrics_recorder()?, })); - tokio::join!(async { - if let Err(e) = api::run(run_options.server.listen_addr, state).await { - tracing::error!(error = ?e, "Failed to start API server"); - } - }); + tokio::join!( + fault_tolerant_handler("API server".to_string(), || api::run( + run_options.server.listen_addr, + state.clone() + )), + fault_tolerant_handler("metrics server".to_string(), || metrics_server::run( + run_options.clone(), + state.clone() + )), + ); Ok(()) } #[cfg(test)] pub mod tests { + use axum_prometheus::metrics_exporter_prometheus::PrometheusBuilder; + use super::*; pub fn get_state( @@ -146,6 +205,8 @@ pub mod tests { guardian_set_index: 0, ws: WsState::new(1), + + metrics_recorder: PrometheusBuilder::new().build_recorder().handle(), })) } } diff --git a/apps/quorum/src/ws.rs b/apps/quorum/src/ws.rs index a2ec0c1a5c..90f492da6e 100644 --- a/apps/quorum/src/ws.rs +++ b/apps/quorum/src/ws.rs @@ -1,5 +1,5 @@ use { - crate::server::{State, EXIT}, + crate::server::{wait_for_exit, State}, anyhow::{anyhow, Result}, axum::{ extract::{ @@ -16,7 +16,7 @@ use { sync::atomic::{AtomicUsize, Ordering}, time::Duration, }, - tokio::sync::{broadcast, watch}, + tokio::sync::broadcast, }; pub struct WsState { @@ -68,7 +68,6 @@ pub struct Subscriber { sender: SplitSink, ping_interval: tokio::time::Interval, responded_to_ping: bool, - exit: watch::Receiver, } const PING_INTERVAL_DURATION: Duration = Duration::from_secs(30); @@ -88,7 +87,6 @@ impl Subscriber { sender, ping_interval: tokio::time::interval(PING_INTERVAL_DURATION), responded_to_ping: true, // We start with true so we don't close the connection immediately - exit: EXIT.subscribe(), } } @@ -122,7 +120,7 @@ impl Subscriber { self.sender.send(Message::Ping(vec![].into())).await?; Ok(()) }, - _ = self.exit.changed() => { + _ = wait_for_exit() => { self.sender.close().await?; self.closed = true; Err(anyhow!("Application is shutting down. Closing connection."))