Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hermes/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion hermes/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.4.3"
version = "0.4.4"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down
109 changes: 96 additions & 13 deletions hermes/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,15 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> {

// Once the accumulator reaches a complete state for a specific slot
// we can build the message states
build_message_states(state, accumulator_messages, wormhole_merkle_state).await?;
let message_states = build_message_states(accumulator_messages, wormhole_merkle_state)?;

let message_state_keys = message_states
.iter()
.map(|message_state| message_state.key())
.collect::<HashSet<_>>();

tracing::info!(len = message_states.len(), "Storing Message States.");
state.store_message_states(message_states).await?;

// Update the aggregate state
let mut aggregate_state = state.aggregate_state.write().await;
Expand All @@ -266,6 +274,7 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> {
.await?;
}
Some(latest) if slot > latest => {
state.prune_removed_keys(message_state_keys).await;
aggregate_state.latest_completed_slot.replace(slot);
state
.api_update_tx
Expand Down Expand Up @@ -296,18 +305,17 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> {
Ok(())
}

#[tracing::instrument(skip(state, accumulator_messages, wormhole_merkle_state))]
async fn build_message_states(
state: &State,
#[tracing::instrument(skip(accumulator_messages, wormhole_merkle_state))]
fn build_message_states(
accumulator_messages: AccumulatorMessages,
wormhole_merkle_state: WormholeMerkleState,
) -> Result<()> {
) -> Result<Vec<MessageState>> {
let wormhole_merkle_message_states_proofs =
construct_message_states_proofs(&accumulator_messages, &wormhole_merkle_state)?;

let current_time: UnixTimestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as _;

let message_states = accumulator_messages
accumulator_messages
.raw_messages
.into_iter()
.enumerate()
Expand All @@ -326,13 +334,7 @@ async fn build_message_states(
current_time,
))
})
.collect::<Result<Vec<_>>>()?;

tracing::info!(len = message_states.len(), "Storing Message States.");

state.store_message_states(message_states).await?;

Ok(())
.collect::<Result<Vec<_>>>()
}

async fn get_verified_price_feeds<S>(
Expand Down Expand Up @@ -677,6 +679,87 @@ mod test {
}
}

/// On this test we will initially have two price feeds. Then we will send an update with only
/// price feed 1 (without price feed 2) and make sure that price feed 2 is not stored anymore.
#[tokio::test]
pub async fn test_getting_price_ids_works_fine_after_price_removal() {
let (state, mut update_rx) = setup_state(10).await;

let price_feed_1 = create_dummy_price_feed_message(100, 10, 9);
let price_feed_2 = create_dummy_price_feed_message(200, 10, 9);

// Populate the state
store_multiple_concurrent_valid_updates(
state.clone(),
generate_update(
vec![
Message::PriceFeedMessage(price_feed_1),
Message::PriceFeedMessage(price_feed_2),
],
10,
20,
),
)
.await;

// Check that the update_rx channel has received a message
assert_eq!(
update_rx.recv().await,
Some(AggregationEvent::New { slot: 10 })
);

// Check the price ids are stored correctly
assert_eq!(
get_price_feed_ids(&*state).await,
vec![
PriceIdentifier::new([100; 32]),
PriceIdentifier::new([200; 32])
]
.into_iter()
.collect()
);

// Check that price feed 2 exists
assert!(get_price_feeds_with_update_data(
&*state,
&[PriceIdentifier::new([200; 32])],
RequestTime::Latest,
)
.await
.is_ok());

// Now send an update with only price feed 1 (without price feed 2)
// and make sure that price feed 2 is not stored anymore.
let price_feed_1 = create_dummy_price_feed_message(100, 12, 10);

// Populate the state
store_multiple_concurrent_valid_updates(
state.clone(),
generate_update(vec![Message::PriceFeedMessage(price_feed_1)], 15, 30),
)
.await;

// Check that the update_rx channel has received a message
assert_eq!(
update_rx.recv().await,
Some(AggregationEvent::New { slot: 15 })
);

// Check that price feed 2 does not exist anymore
assert_eq!(
get_price_feed_ids(&*state).await,
vec![PriceIdentifier::new([100; 32]),].into_iter().collect()
);

assert!(get_price_feeds_with_update_data(
&*state,
&[PriceIdentifier::new([200; 32])],
RequestTime::Latest,
)
.await
.is_err());
}

#[tokio::test]
pub async fn test_metadata_times_and_readiness_work() {
// The receiver channel should stay open for the state to work
Expand Down
41 changes: 31 additions & 10 deletions hermes/src/api/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,22 +350,43 @@ impl Subscriber {
.keys()
.cloned()
.collect::<Vec<_>>();
for update in crate::aggregate::get_price_feeds_with_update_data(

let updates = match crate::aggregate::get_price_feeds_with_update_data(
&*self.store,
&price_feed_ids,
RequestTime::AtSlot(event.slot()),
)
.await
.map_err(|e| {
tracing::warn!(
"Failed to get price feeds {:?} with update data: {:?}",
price_feed_ids,
e
);
e
})?
.price_feeds
{
Ok(updates) => updates,
Err(_) => {
// The error can only happen when a price feed was available
// and is no longer there as we check the price feed ids upon
// subscription. In this case we just remove the non-existing
// price feed from the list and will keep sending updates for
// the rest.
let available_price_feed_ids =
crate::aggregate::get_price_feed_ids(&*self.store).await;

self.price_feeds_with_config
.retain(|price_feed_id, _| available_price_feed_ids.contains(price_feed_id));

let price_feed_ids = self
.price_feeds_with_config
.keys()
.cloned()
.collect::<Vec<_>>();

crate::aggregate::get_price_feeds_with_update_data(
&*self.store,
&price_feed_ids,
RequestTime::AtSlot(event.slot()),
)
.await?
}
};

for update in updates.price_feeds {
let config = self
.price_feeds_with_config
.get(&update.price_feed.id)
Expand Down
28 changes: 27 additions & 1 deletion hermes/src/state/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use {
collections::{
BTreeMap,
HashMap,
HashSet,
},
ops::Bound,
sync::Arc,
Expand Down Expand Up @@ -169,6 +170,7 @@ impl Cache {
pub trait AggregateCache {
async fn message_state_keys(&self) -> Vec<MessageStateKey>;
async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()>;
async fn prune_removed_keys(&self, current_keys: HashSet<MessageStateKey>);
async fn fetch_message_states(
&self,
ids: Vec<FeedId>,
Expand Down Expand Up @@ -206,17 +208,41 @@ impl AggregateCache for crate::state::State {
let key = message_state.key();
let time = message_state.time();
let cache = message_cache.entry(key).or_insert_with(BTreeMap::new);

cache.insert(time, message_state);

// Remove the earliest message states if the cache size is exceeded
while cache.len() > self.cache.cache_size as usize {
cache.pop_first();
}
}

Ok(())
}

/// This method takes the current feed ids and prunes the cache for the keys
/// that are not present in the current feed ids.
///
/// There is a side-effect of this: if a key gets removed, we will
/// lose the cache for that key and cannot retrieve it for historical
/// price queries.
async fn prune_removed_keys(&self, current_keys: HashSet<MessageStateKey>) {
let mut message_cache = self.cache.message_cache.write().await;

// Sometimes, some keys are removed from the accumulator. We track which keys are not
// present in the message states and remove them from the cache.
let keys_in_cache = message_cache
.iter()
.map(|(key, _)| key.clone())
.collect::<HashSet<_>>();

for key in keys_in_cache {
if !current_keys.contains(&key) {
tracing::info!("Feed {:?} seems to be removed. Removing it from cache", key);
message_cache.remove(&key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is not very often, it would be great if we add some log here to see it in action once live.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a info log level for it.

}
}
}

async fn fetch_message_states(
&self,
ids: Vec<FeedId>,
Expand Down