Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion hermes/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,9 @@ where
Ok(price_feeds_with_update_data) => Ok(price_feeds_with_update_data),
Err(e) => {
if let RequestTime::FirstAfter(publish_time) = request_time {
return Benchmarks::get_verified_price_feeds(state, price_ids, publish_time).await;
return state
.get_verified_price_feeds(price_ids, publish_time)
.await;
}
Err(e)
}
Expand Down
28 changes: 15 additions & 13 deletions hermes/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
//! This module contains the global state of the application.

use {
self::cache::CacheState,
self::{
benchmarks::BenchmarksState,
cache::CacheState,
},
crate::{
aggregate::{
AggregateState,
Expand Down Expand Up @@ -29,10 +32,12 @@ pub mod benchmarks;
pub mod cache;

pub struct State {
/// Storage is a short-lived cache of the state of all the updates that have been passed to the
/// store.
/// State for the `Cache` service for short-lived storage of updates.
pub cache: CacheState,

/// State for the `Benchmarks` service for looking up historical updates.
pub benchmarks: BenchmarksState,

/// Sequence numbers of lately observed Vaas. Store uses this set
/// to ignore the previously observed Vaas as a performance boost.
pub observed_vaa_seqs: RwLock<BTreeSet<u64>>,
Expand All @@ -46,9 +51,6 @@ pub struct State {
/// The aggregate module state.
pub aggregate_state: RwLock<AggregateState>,

/// Benchmarks endpoint
pub benchmarks_endpoint: Option<Url>,

/// Metrics registry
pub metrics_registry: RwLock<Registry>,

Expand All @@ -64,13 +66,13 @@ impl State {
) -> Arc<Self> {
let mut metrics_registry = Registry::default();
Arc::new(Self {
cache: CacheState::new(cache_size),
observed_vaa_seqs: RwLock::new(Default::default()),
guardian_set: RwLock::new(Default::default()),
api_update_tx: update_tx,
aggregate_state: RwLock::new(AggregateState::new(&mut metrics_registry)),
benchmarks_endpoint,
metrics_registry: RwLock::new(metrics_registry),
cache: CacheState::new(cache_size),
benchmarks: BenchmarksState::new(benchmarks_endpoint),
observed_vaa_seqs: RwLock::new(Default::default()),
guardian_set: RwLock::new(Default::default()),
api_update_tx: update_tx,
aggregate_state: RwLock::new(AggregateState::new(&mut metrics_registry)),
metrics_registry: RwLock::new(metrics_registry),
price_feeds_metadata: RwLock::new(Default::default()),
})
}
Expand Down
33 changes: 27 additions & 6 deletions hermes/src/state/benchmarks.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! This module communicates with Pyth Benchmarks, an API for historical price feeds and their updates.

use {
super::State,
crate::{
aggregate::{
PriceFeedsWithUpdateData,
Expand All @@ -14,6 +15,7 @@ use {
Engine as _,
},
pyth_sdk::PriceIdentifier,
reqwest::Url,
serde::Deserialize,
};

Expand Down Expand Up @@ -50,7 +52,23 @@ impl TryFrom<BinaryBlob> for Vec<Vec<u8>> {
}
}

#[async_trait::async_trait]
pub struct BenchmarksState {
endpoint: Option<Url>,
}

impl BenchmarksState {
pub fn new(url: Option<Url>) -> Self {
Self { endpoint: url }
}
}

/// Allow downcasting State into BenchmarksState for functions that depend on the `Benchmarks` service.
impl<'a> From<&'a State> for &'a BenchmarksState {
fn from(state: &'a State) -> &'a BenchmarksState {
&state.benchmarks
}
}

pub trait Benchmarks {
async fn get_verified_price_feeds(
&self,
Expand All @@ -59,22 +77,25 @@ pub trait Benchmarks {
) -> Result<PriceFeedsWithUpdateData>;
}

#[async_trait::async_trait]
impl Benchmarks for crate::state::State {
impl<T> Benchmarks for T
where
for<'a> &'a T: Into<&'a BenchmarksState>,
T: Sync,
{
async fn get_verified_price_feeds(
&self,
price_ids: &[PriceIdentifier],
publish_time: UnixTimestamp,
) -> Result<PriceFeedsWithUpdateData> {
let endpoint = self
.benchmarks_endpoint
.into()
.endpoint
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Benchmarks endpoint is not set"))?
.join(&format!("/v1/updates/price/{}", publish_time))
.unwrap();

let client = reqwest::Client::new();
let mut request = client
let mut request = reqwest::Client::new()
.get(endpoint)
.timeout(BENCHMARKS_REQUEST_TIMEOUT)
.query(&[("encoding", "hex")])
Expand Down