Skip to content

Commit 6fcacc2

Browse files
committed
refactor(hermes): state->wormhole/metrics downcasting
1 parent cf90bff commit 6fcacc2

File tree

10 files changed

+427
-294
lines changed

10 files changed

+427
-294
lines changed

apps/hermes/src/api.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ mod ws;
2727
pub struct ApiState<S = State> {
2828
pub state: Arc<S>,
2929
pub ws: Arc<ws::WsState>,
30-
pub metrics: Arc<metrics_middleware::Metrics>,
30+
pub metrics: Arc<metrics_middleware::ApiMetrics>,
3131
}
3232

3333
/// Manually implement `Clone` as the derive macro will try and slap `Clone` on
@@ -49,7 +49,7 @@ impl ApiState<State> {
4949
requester_ip_header_name: String,
5050
) -> Self {
5151
Self {
52-
metrics: Arc::new(metrics_middleware::Metrics::new(state.clone())),
52+
metrics: Arc::new(metrics_middleware::ApiMetrics::new(state.clone())),
5353
ws: Arc::new(ws::WsState::new(
5454
ws_whitelist,
5555
requester_ip_header_name,

apps/hermes/src/api/metrics_middleware.rs

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use {
22
super::ApiState,
3-
crate::state::State as AppState,
3+
crate::state::metrics::Metrics,
44
axum::{
55
extract::{
66
MatchedPath,
@@ -22,13 +22,19 @@ use {
2222
tokio::time::Instant,
2323
};
2424

25-
pub struct Metrics {
25+
pub struct ApiMetrics {
2626
pub requests: Family<Labels, Counter>,
2727
pub latencies: Family<Labels, Histogram>,
2828
}
2929

30-
impl Metrics {
31-
pub fn new(state: Arc<AppState>) -> Self {
30+
impl ApiMetrics {
31+
pub fn new<S>(state: Arc<S>) -> Self
32+
where
33+
S: Metrics,
34+
S: Send,
35+
S: Sync,
36+
S: 'static,
37+
{
3238
let new = Self {
3339
requests: Family::default(),
3440
latencies: Family::new_with_constructor(|| {
@@ -46,15 +52,21 @@ impl Metrics {
4652
let latencies = new.latencies.clone();
4753

4854
tokio::spawn(async move {
49-
let mut metrics_registry = state.metrics_registry.write().await;
50-
51-
metrics_registry.register("api_requests", "Total number of API requests", requests);
55+
Metrics::register(
56+
&*state,
57+
("api_requests", "Total number of API requests", requests),
58+
)
59+
.await;
5260

53-
metrics_registry.register(
54-
"api_request_latency_seconds",
55-
"API request latency in seconds",
56-
latencies,
57-
);
61+
Metrics::register(
62+
&*state,
63+
(
64+
"api_request_latency_seconds",
65+
"API request latency in seconds",
66+
latencies,
67+
),
68+
)
69+
.await;
5870
});
5971
}
6072

@@ -80,21 +92,18 @@ pub async fn track_metrics<B>(
8092
} else {
8193
req.uri().path().to_owned()
8294
};
83-
let method = req.method().clone();
8495

96+
let method = req.method().clone();
8597
let response = next.run(req).await;
86-
8798
let latency = start.elapsed().as_secs_f64();
8899
let status = response.status().as_u16();
89-
90100
let labels = Labels {
91101
method: method.to_string(),
92102
path,
93103
status,
94104
};
95105

96106
api_state.metrics.requests.get_or_create(&labels).inc();
97-
98107
api_state
99108
.metrics
100109
.latencies

apps/hermes/src/api/ws.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use {
1212
AggregationEvent,
1313
RequestTime,
1414
},
15+
metrics::Metrics,
1516
State,
1617
},
1718
anyhow::{
@@ -115,12 +116,18 @@ pub struct Labels {
115116
pub status: Status,
116117
}
117118

118-
pub struct Metrics {
119+
pub struct WsMetrics {
119120
pub interactions: Family<Labels, Counter>,
120121
}
121122

122-
impl Metrics {
123-
pub fn new(state: Arc<State>) -> Self {
123+
impl WsMetrics {
124+
pub fn new<S>(state: Arc<S>) -> Self
125+
where
126+
S: Metrics,
127+
S: Send,
128+
S: Sync,
129+
S: 'static,
130+
{
124131
let new = Self {
125132
interactions: Family::default(),
126133
};
@@ -129,11 +136,15 @@ impl Metrics {
129136
let interactions = new.interactions.clone();
130137

131138
tokio::spawn(async move {
132-
state.metrics_registry.write().await.register(
133-
"ws_interactions",
134-
"Total number of websocket interactions",
135-
interactions,
136-
);
139+
Metrics::register(
140+
&*state,
141+
(
142+
"ws_interactions",
143+
"Total number of websocket interactions",
144+
interactions,
145+
),
146+
)
147+
.await;
137148
});
138149
}
139150

@@ -146,7 +157,7 @@ pub struct WsState {
146157
pub bytes_limit_whitelist: Vec<IpNet>,
147158
pub rate_limiter: DefaultKeyedRateLimiter<IpAddr>,
148159
pub requester_ip_header_name: String,
149-
pub metrics: Metrics,
160+
pub metrics: WsMetrics,
150161
}
151162

152163
impl WsState {
@@ -158,7 +169,7 @@ impl WsState {
158169
))),
159170
bytes_limit_whitelist: whitelist,
160171
requester_ip_header_name,
161-
metrics: Metrics::new(state.clone()),
172+
metrics: WsMetrics::new(state.clone()),
162173
}
163174
}
164175
}

apps/hermes/src/metrics_server.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
use {
66
crate::{
77
config::RunOptions,
8-
state::State as AppState,
8+
state::{
9+
metrics::Metrics,
10+
State as AppState,
11+
},
912
},
1013
anyhow::Result,
1114
axum::{
@@ -15,7 +18,6 @@ use {
1518
routing::get,
1619
Router,
1720
},
18-
prometheus_client::encoding::text::encode,
1921
std::sync::Arc,
2022
};
2123

@@ -43,13 +45,7 @@ pub async fn run(opts: RunOptions, state: Arc<AppState>) -> Result<()> {
4345
}
4446

4547
pub async fn metrics(State(state): State<Arc<AppState>>) -> impl IntoResponse {
46-
let registry = state.metrics_registry.read().await;
47-
let mut buffer = String::new();
48-
49-
// Should not fail if the metrics are valid and there is memory available
50-
// to write to the buffer.
51-
encode(&mut buffer, &registry).unwrap();
52-
48+
let buffer = Metrics::encode(&*state).await;
5349
(
5450
[(
5551
header::CONTENT_TYPE,

apps/hermes/src/network/pythnet.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use {
77
api::types::PriceFeedMetadata,
88
config::RunOptions,
99
network::wormhole::{
10-
update_guardian_set,
1110
BridgeData,
1211
GuardianSet,
1312
GuardianSetData,
@@ -22,6 +21,7 @@ use {
2221
Aggregates,
2322
Update,
2423
},
24+
wormhole::Wormhole,
2525
State,
2626
},
2727
},
@@ -215,11 +215,14 @@ pub async fn run(store: Arc<State>, pythnet_ws_endpoint: String) -> Result<!> {
215215
/// This method performs the necessary work to pull down the bridge state and associated guardian
216216
/// sets from a deployed Wormhole contract. Note that we only fetch the last two accounts due to
217217
/// the fact that during a Wormhole upgrade, there will only be messages produces from those two.
218-
async fn fetch_existing_guardian_sets(
219-
state: Arc<State>,
218+
async fn fetch_existing_guardian_sets<S>(
219+
state: Arc<S>,
220220
pythnet_http_endpoint: String,
221221
wormhole_contract_addr: Pubkey,
222-
) -> Result<()> {
222+
) -> Result<()>
223+
where
224+
S: Wormhole,
225+
{
223226
let client = RpcClient::new(pythnet_http_endpoint.to_string());
224227
let bridge = fetch_bridge_data(&client, &wormhole_contract_addr).await?;
225228

@@ -233,7 +236,7 @@ async fn fetch_existing_guardian_sets(
233236
"Retrieved Current GuardianSet.",
234237
);
235238

236-
update_guardian_set(&state, bridge.guardian_set_index, current).await;
239+
Wormhole::update_guardian_set(&*state, bridge.guardian_set_index, current).await;
237240

238241
// If there are more than one guardian set, we want to fetch the previous one as well as it
239242
// may still be in transition phase if a guardian upgrade has just occurred.
@@ -251,7 +254,7 @@ async fn fetch_existing_guardian_sets(
251254
"Retrieved Previous GuardianSet.",
252255
);
253256

254-
update_guardian_set(&state, bridge.guardian_set_index - 1, previous).await;
257+
Wormhole::update_guardian_set(&*state, bridge.guardian_set_index - 1, previous).await;
255258
}
256259

257260
Ok(())

0 commit comments

Comments
 (0)