Skip to content

Commit 9c172d7

Browse files
committed
[hermes] Add health probe
1 parent 2b829f3 commit 9c172d7

File tree

3 files changed

+56
-12
lines changed

3 files changed

+56
-12
lines changed

hermes/src/api.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ pub async fn run(store: Arc<Store>, mut update_rx: Receiver<()>, rpc_addr: Strin
4545
let app = app
4646
.route("/", get(rest::index))
4747
.route("/live", get(rest::live))
48+
.route("/ready", get(rest::ready))
49+
.route("/health", get(rest::health))
4850
.route("/ws", get(ws::ws_route_handler))
4951
.route("/api/latest_price_feeds", get(rest::latest_price_feeds))
5052
.route("/api/latest_vaas", get(rest::latest_vaas))

hermes/src/api/rest.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -250,17 +250,35 @@ pub async fn get_vaa_ccip(
250250
}))
251251
}
252252

253-
// This function implements the `/live` endpoint. It returns a `200` status code. This endpoint is
254-
// used by the Kubernetes liveness probe.
255-
pub async fn live() -> Result<impl IntoResponse, std::convert::Infallible> {
256-
Ok(())
253+
pub async fn live() -> Response {
254+
(StatusCode::OK, "OK").into_response()
255+
}
256+
257+
// This endpoint is the same as health and is provided for backward compatibility
258+
// with the price service.
259+
pub async fn ready(State(state): State<super::State>) -> Response {
260+
match state.store.is_healthy().await {
261+
true => (StatusCode::OK, "OK").into_response(),
262+
false => (StatusCode::SERVICE_UNAVAILABLE, "Service Unavailable").into_response(),
263+
}
264+
}
265+
266+
// This endpoint serves as a health check for this service. Previously `ready` was used
267+
// for this purpose, but now health is used because it is more standard.
268+
pub async fn health(State(state): State<super::State>) -> Response {
269+
match state.store.is_healthy().await {
270+
true => (StatusCode::OK, "OK").into_response(),
271+
false => (StatusCode::SERVICE_UNAVAILABLE, "Service Unavailable").into_response(),
272+
}
257273
}
258274

259275
// This is the index page for the REST service. It will list all the available endpoints.
260276
// TODO: Dynamically generate this list if possible.
261277
pub async fn index() -> impl IntoResponse {
262278
Json([
263279
"/live",
280+
"/ready",
281+
"/health",
264282
"/api/price_feed_ids",
265283
"/api/latest_price_feeds?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&verbose=true)(&binary=true)",
266284
"/api/latest_vaas?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&...",

hermes/src/store.rs

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,19 @@ use {
5050
},
5151
sync::Arc,
5252
time::{
53-
Duration,
5453
SystemTime,
5554
UNIX_EPOCH,
5655
},
5756
},
58-
tokio::sync::{
59-
mpsc::Sender,
60-
RwLock,
57+
tokio::{
58+
sync::{
59+
mpsc::Sender,
60+
RwLock,
61+
},
62+
time::{
63+
Duration,
64+
Instant,
65+
},
6166
},
6267
wormhole_sdk::{
6368
Address,
@@ -72,10 +77,11 @@ pub mod types;
7277
pub mod wormhole;
7378

7479
pub struct Store {
75-
pub storage: StorageInstance,
76-
pub observed_vaa_seqs: Cache<u64, bool>,
77-
pub guardian_set: RwLock<BTreeMap<u32, GuardianSet>>,
78-
pub update_tx: Sender<()>,
80+
pub storage: StorageInstance,
81+
pub observed_vaa_seqs: Cache<u64, bool>,
82+
pub guardian_set: RwLock<BTreeMap<u32, GuardianSet>>,
83+
pub update_tx: Sender<()>,
84+
pub last_completed_update_at: RwLock<Option<Instant>>,
7985
}
8086

8187
impl Store {
@@ -88,6 +94,7 @@ impl Store {
8894
.build(),
8995
guardian_set: RwLock::new(Default::default()),
9096
update_tx,
97+
last_completed_update_at: RwLock::new(None),
9198
})
9299
}
93100

@@ -168,6 +175,11 @@ impl Store {
168175

169176
self.update_tx.send(()).await?;
170177

178+
self.last_completed_update_at
179+
.write()
180+
.await
181+
.replace(Instant::now());
182+
171183
Ok(())
172184
}
173185

@@ -256,4 +268,16 @@ impl Store {
256268
.map(|key| PriceIdentifier::new(key.id))
257269
.collect()
258270
}
271+
272+
pub async fn is_healthy(&self) -> bool {
273+
const STALENESS_THRESHOLD: Duration = Duration::from_secs(30);
274+
275+
let last_completed_update_at = self.last_completed_update_at.read().await;
276+
match last_completed_update_at.as_ref() {
277+
Some(last_completed_update_at) => {
278+
last_completed_update_at.elapsed() < STALENESS_THRESHOLD
279+
}
280+
None => false,
281+
}
282+
}
259283
}

0 commit comments

Comments
 (0)