Skip to content

Commit 657264a

Browse files
committed
[hermes] Add health probe
1 parent adadd33 commit 657264a

File tree

3 files changed

+56
-12
lines changed

3 files changed

+56
-12
lines changed

hermes/src/api.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ pub async fn run(store: Arc<Store>, mut update_rx: Receiver<()>, rpc_addr: Strin
4646
let app = app
4747
.route("/", get(rest::index))
4848
.route("/live", get(rest::live))
49+
.route("/ready", get(rest::ready))
50+
.route("/health", get(rest::health))
4951
.route("/ws", get(ws::ws_route_handler))
5052
.route("/api/latest_price_feeds", get(rest::latest_price_feeds))
5153
.route("/api/latest_vaas", get(rest::latest_vaas))

hermes/src/api/rest.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -250,17 +250,35 @@ pub async fn get_vaa_ccip(
250250
}))
251251
}
252252

253-
// This function implements the `/live` endpoint. It returns a `200` status code. This endpoint is
254-
// used by the Kubernetes liveness probe.
255-
pub async fn live() -> Result<impl IntoResponse, std::convert::Infallible> {
256-
Ok(())
253+
pub async fn live() -> Response {
254+
(StatusCode::OK, "OK").into_response()
255+
}
256+
257+
// This endpoint is the same as health and is provided for backward compatibility
258+
// with the price service.
259+
pub async fn ready(State(state): State<super::State>) -> Response {
260+
match state.store.is_healthy().await {
261+
true => (StatusCode::OK, "OK").into_response(),
262+
false => (StatusCode::SERVICE_UNAVAILABLE, "Service Unavailable").into_response(),
263+
}
264+
}
265+
266+
// This endpoint serves as a health check for this service. Previously `ready` was used
267+
// for this purpose, but now health is used because it is more standard.
268+
pub async fn health(State(state): State<super::State>) -> Response {
269+
match state.store.is_healthy().await {
270+
true => (StatusCode::OK, "OK").into_response(),
271+
false => (StatusCode::SERVICE_UNAVAILABLE, "Service Unavailable").into_response(),
272+
}
257273
}
258274

259275
// This is the index page for the REST service. It will list all the available endpoints.
260276
// TODO: Dynamically generate this list if possible.
261277
pub async fn index() -> impl IntoResponse {
262278
Json([
263279
"/live",
280+
"/ready",
281+
"/health",
264282
"/api/price_feed_ids",
265283
"/api/latest_price_feeds?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&..(&verbose=true)(&binary=true)",
266284
"/api/latest_vaas?ids[]=<price_feed_id>&ids[]=<price_feed_id_2>&...",

hermes/src/store.rs

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,19 @@ use {
5252
},
5353
sync::Arc,
5454
time::{
55-
Duration,
5655
SystemTime,
5756
UNIX_EPOCH,
5857
},
5958
},
60-
tokio::sync::{
61-
mpsc::Sender,
62-
RwLock,
59+
tokio::{
60+
sync::{
61+
mpsc::Sender,
62+
RwLock,
63+
},
64+
time::{
65+
Duration,
66+
Instant,
67+
},
6368
},
6469
wormhole_sdk::{
6570
Address,
@@ -74,10 +79,11 @@ pub mod types;
7479
pub mod wormhole;
7580

7681
pub struct Store {
77-
pub storage: StorageInstance,
78-
pub observed_vaa_seqs: Cache<u64, bool>,
79-
pub guardian_set: RwLock<BTreeMap<u32, GuardianSet>>,
80-
pub update_tx: Sender<()>,
82+
pub storage: StorageInstance,
83+
pub observed_vaa_seqs: Cache<u64, bool>,
84+
pub guardian_set: RwLock<BTreeMap<u32, GuardianSet>>,
85+
pub update_tx: Sender<()>,
86+
pub last_completed_update_at: RwLock<Option<Instant>>,
8187
}
8288

8389
impl Store {
@@ -90,6 +96,7 @@ impl Store {
9096
.build(),
9197
guardian_set: RwLock::new(Default::default()),
9298
update_tx,
99+
last_completed_update_at: RwLock::new(None),
93100
})
94101
}
95102

@@ -170,6 +177,11 @@ impl Store {
170177

171178
self.update_tx.send(()).await?;
172179

180+
self.last_completed_update_at
181+
.write()
182+
.await
183+
.replace(Instant::now());
184+
173185
Ok(())
174186
}
175187

@@ -258,4 +270,16 @@ impl Store {
258270
.map(|key| PriceIdentifier::new(key.id))
259271
.collect()
260272
}
273+
274+
pub async fn is_healthy(&self) -> bool {
275+
const STALENESS_THRESHOLD: Duration = Duration::from_secs(30);
276+
277+
let last_completed_update_at = self.last_completed_update_at.read().await;
278+
match last_completed_update_at.as_ref() {
279+
Some(last_completed_update_at) => {
280+
last_completed_update_at.elapsed() < STALENESS_THRESHOLD
281+
}
282+
None => false,
283+
}
284+
}
261285
}

0 commit comments

Comments
 (0)