Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions hermes/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use {
state::{
benchmarks::Benchmarks,
cache::{
AggregateCache,
Cache,
MessageState,
MessageStateFilter,
},
Expand Down Expand Up @@ -336,7 +336,7 @@ async fn get_verified_price_feeds<S>(
request_time: RequestTime,
) -> Result<PriceFeedsWithUpdateData>
where
S: AggregateCache,
S: Cache,
{
let messages = state
.fetch_message_states(
Expand Down Expand Up @@ -396,7 +396,7 @@ pub async fn get_price_feeds_with_update_data<S>(
request_time: RequestTime,
) -> Result<PriceFeedsWithUpdateData>
where
S: AggregateCache,
S: Cache,
S: Benchmarks,
{
match get_verified_price_feeds(state, price_ids, request_time.clone()).await {
Expand All @@ -412,7 +412,7 @@ where

pub async fn get_price_feed_ids<S>(state: &S) -> HashSet<PriceIdentifier>
where
S: AggregateCache,
S: Cache,
{
state
.message_state_keys()
Expand Down Expand Up @@ -468,10 +468,7 @@ mod test {
Accumulator,
},
hashers::keccak256_160::Keccak160,
messages::{
Message,
PriceFeedMessage,
},
messages::PriceFeedMessage,
wire::v1::{
AccumulatorUpdateData,
Proof,
Expand Down
8 changes: 4 additions & 4 deletions hermes/src/aggregate/wormhole_merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
crate::{
network::wormhole::VaaBytes,
state::cache::{
AggregateCache,
Cache,
MessageState,
},
},
Expand Down Expand Up @@ -70,14 +70,14 @@ impl From<MessageState> for RawMessageWithMerkleProof {
}

pub async fn store_wormhole_merkle_verified_message<S>(
store: &S,
state: &S,
root: WormholeMerkleRoot,
vaa: VaaBytes,
) -> Result<()>
where
S: AggregateCache,
S: Cache,
{
store
state
.store_wormhole_merkle_state(WormholeMerkleState { root, vaa })
.await?;
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions hermes/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! This module contains the global state of the application.

use {
self::cache::Cache,
self::cache::CacheState,
crate::{
aggregate::{
AggregateState,
Expand Down Expand Up @@ -31,7 +31,7 @@ pub mod cache;
pub struct State {
/// Storage is a short-lived cache of the state of all the updates that have been passed to the
/// store.
pub cache: Cache,
pub cache: CacheState,

/// Sequence numbers of lately observed Vaas. Store uses this set
/// to ignore the previously observed Vaas as a performance boost.
Expand Down Expand Up @@ -64,7 +64,7 @@ impl State {
) -> Arc<Self> {
let mut metrics_registry = Registry::default();
Arc::new(Self {
cache: Cache::new(cache_size),
cache: CacheState::new(cache_size),
observed_vaa_seqs: RwLock::new(Default::default()),
guardian_set: RwLock::new(Default::default()),
api_update_tx: update_tx,
Expand Down
178 changes: 90 additions & 88 deletions hermes/src/state/cache.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use {
super::State,
crate::aggregate::{
wormhole_merkle::WormholeMerkleState,
AccumulatorMessages,
Expand Down Expand Up @@ -96,79 +97,42 @@ pub enum MessageStateFilter {
Only(MessageType),
}

pub struct Cache {
/// Accumulator messages cache
///
/// We do not write to this cache much, so we can use a simple RwLock instead of a DashMap.
accumulator_messages_cache: Arc<RwLock<BTreeMap<Slot, AccumulatorMessages>>>,

/// Wormhole merkle state cache
///
/// We do not write to this cache much, so we can use a simple RwLock instead of a DashMap.
wormhole_merkle_state_cache: Arc<RwLock<BTreeMap<Slot, WormholeMerkleState>>>,
/// A Cache of AccumulatorMessage by slot. We do not write to this cache much, so we can use a simple RwLock instead of a DashMap.
type AccumulatorMessagesCache = Arc<RwLock<BTreeMap<Slot, AccumulatorMessages>>>;

message_cache: Arc<RwLock<HashMap<MessageStateKey, BTreeMap<MessageStateTime, MessageState>>>>,
cache_size: u64,
}

async fn retrieve_message_state(
cache: &Cache,
key: MessageStateKey,
request_time: RequestTime,
) -> Option<MessageState> {
match cache.message_cache.read().await.get(&key) {
Some(key_cache) => {
match request_time {
RequestTime::Latest => key_cache.last_key_value().map(|(_, v)| v).cloned(),
RequestTime::FirstAfter(time) => {
// If the requested time is before the first element in the vector, we are
// not sure that the first element is the closest one.
if let Some((_, oldest_record_value)) = key_cache.first_key_value() {
if time < oldest_record_value.time().publish_time {
return None;
}
}
/// A Cache of WormholeMerkleState by slot. We do not write to this cache much, so we can use a simple RwLock instead of a DashMap.
type WormholeMerkleStateCache = Arc<RwLock<BTreeMap<Slot, WormholeMerkleState>>>;

let lookup_time = MessageStateTime {
publish_time: time,
slot: 0,
};
/// A Cache of `Time<->MessageState` by feed id.
type MessageCache = Arc<RwLock<HashMap<MessageStateKey, BTreeMap<MessageStateTime, MessageState>>>>;

// Get the first element that is greater than or equal to the lookup time.
key_cache
.lower_bound(Bound::Included(&lookup_time))
.peek_next()
.map(|(_, v)| v)
.cloned()
}
RequestTime::AtSlot(slot) => {
// Get the state with slot equal to the lookup slot.
key_cache
.iter()
.rev() // Usually the slot lies at the end of the map
.find(|(k, _)| k.slot == slot)
.map(|(_, v)| v)
.cloned()
}
}
}
None => None,
}
/// A collection of caches for various program state.
pub struct CacheState {
accumulator_messages_cache: AccumulatorMessagesCache,
wormhole_merkle_state_cache: WormholeMerkleStateCache,
message_cache: MessageCache,
cache_size: u64,
}

impl Cache {
pub fn new(cache_size: u64) -> Self {
impl CacheState {
pub fn new(size: u64) -> Self {
Self {
message_cache: Arc::new(RwLock::new(HashMap::new())),
accumulator_messages_cache: Arc::new(RwLock::new(BTreeMap::new())),
accumulator_messages_cache: Arc::new(RwLock::new(BTreeMap::new())),
wormhole_merkle_state_cache: Arc::new(RwLock::new(BTreeMap::new())),
cache_size,
message_cache: Arc::new(RwLock::new(HashMap::new())),
cache_size: size,
}
}
}

#[async_trait::async_trait]
pub trait AggregateCache {
/// Allow downcasting State into CacheState for functions that depend on the `Cache` service.
impl<'a> From<&'a State> for &'a CacheState {
fn from(state: &'a State) -> &'a CacheState {
&state.cache
}
}

pub trait Cache {
async fn message_state_keys(&self) -> Vec<MessageStateKey>;
async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()>;
async fn prune_removed_keys(&self, current_keys: HashSet<MessageStateKey>);
Expand All @@ -190,10 +154,13 @@ pub trait AggregateCache {
async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>>;
}

#[async_trait::async_trait]
impl AggregateCache for crate::state::State {
impl<T> Cache for T
where
for<'a> &'a T: Into<&'a CacheState>,
T: Sync,
{
async fn message_state_keys(&self) -> Vec<MessageStateKey> {
self.cache
self.into()
.message_cache
.read()
.await
Expand All @@ -203,7 +170,7 @@ impl AggregateCache for crate::state::State {
}

async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()> {
let mut message_cache = self.cache.message_cache.write().await;
let mut message_cache = self.into().message_cache.write().await;

for message_state in message_states {
let key = message_state.key();
Expand All @@ -212,7 +179,7 @@ impl AggregateCache for crate::state::State {
cache.insert(time, message_state);

// Remove the earliest message states if the cache size is exceeded
while cache.len() > self.cache.cache_size as usize {
while cache.len() > self.into().cache_size as usize {
cache.pop_first();
}
}
Expand All @@ -227,7 +194,7 @@ impl AggregateCache for crate::state::State {
/// lose the cache for that key and cannot retrieve it for historical
/// price queries.
async fn prune_removed_keys(&self, current_keys: HashSet<MessageStateKey>) {
let mut message_cache = self.cache.message_cache.write().await;
let mut message_cache = self.into().message_cache.write().await;

// Sometimes, some keys are removed from the accumulator. We track which keys are not
// present in the message states and remove them from the cache.
Expand Down Expand Up @@ -262,7 +229,7 @@ impl AggregateCache for crate::state::State {
feed_id: id,
type_: message_type,
};
retrieve_message_state(&self.cache, key, request_time.clone())
retrieve_message_state(self.into(), key, request_time.clone())
})
}))
.await
Expand All @@ -275,60 +242,95 @@ impl AggregateCache for crate::state::State {
&self,
accumulator_messages: AccumulatorMessages,
) -> Result<()> {
let mut cache = self.cache.accumulator_messages_cache.write().await;
let mut cache = self.into().accumulator_messages_cache.write().await;
cache.insert(accumulator_messages.slot, accumulator_messages);
while cache.len() > self.cache.cache_size as usize {
while cache.len() > self.into().cache_size as usize {
cache.pop_first();
}
Ok(())
}

async fn fetch_accumulator_messages(&self, slot: Slot) -> Result<Option<AccumulatorMessages>> {
let cache = self.cache.accumulator_messages_cache.read().await;
let cache = self.into().accumulator_messages_cache.read().await;
Ok(cache.get(&slot).cloned())
}

async fn store_wormhole_merkle_state(
&self,
wormhole_merkle_state: WormholeMerkleState,
) -> Result<()> {
let mut cache = self.cache.wormhole_merkle_state_cache.write().await;
let mut cache = self.into().wormhole_merkle_state_cache.write().await;
cache.insert(wormhole_merkle_state.root.slot, wormhole_merkle_state);
while cache.len() > self.cache.cache_size as usize {
while cache.len() > self.into().cache_size as usize {
cache.pop_first();
}
Ok(())
}

async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>> {
let cache = self.cache.wormhole_merkle_state_cache.read().await;
let cache = self.into().wormhole_merkle_state_cache.read().await;
Ok(cache.get(&slot).cloned())
}
}

async fn retrieve_message_state(
cache: &CacheState,
key: MessageStateKey,
request_time: RequestTime,
) -> Option<MessageState> {
match cache.message_cache.read().await.get(&key) {
Some(key_cache) => {
match request_time {
RequestTime::Latest => key_cache.last_key_value().map(|(_, v)| v).cloned(),
RequestTime::FirstAfter(time) => {
// If the requested time is before the first element in the vector, we are
// not sure that the first element is the closest one.
if let Some((_, oldest_record_value)) = key_cache.first_key_value() {
if time < oldest_record_value.time().publish_time {
return None;
}
}

let lookup_time = MessageStateTime {
publish_time: time,
slot: 0,
};

// Get the first element that is greater than or equal to the lookup time.
key_cache
.lower_bound(Bound::Included(&lookup_time))
.peek_next()
.map(|(_, v)| v)
.cloned()
}
RequestTime::AtSlot(slot) => {
// Get the state with slot equal to the lookup slot.
key_cache
.iter()
.rev() // Usually the slot lies at the end of the map
.find(|(k, _)| k.slot == slot)
.map(|(_, v)| v)
.cloned()
}
}
}
None => None,
}
}

#[cfg(test)]
mod test {
use {
super::*,
crate::{
aggregate::{
wormhole_merkle::{
WormholeMerkleMessageProof,
WormholeMerkleState,
},
AccumulatorMessages,
ProofSet,
},
aggregate::wormhole_merkle::WormholeMerkleMessageProof,
state::test::setup_state,
},
pyth_sdk::UnixTimestamp,
pythnet_sdk::{
accumulators::merkle::MerklePath,
hashers::keccak256_160::Keccak160,
messages::{
Message,
PriceFeedMessage,
},
messages::PriceFeedMessage,
wire::v1::WormholeMerkleRoot,
},
};
Expand Down Expand Up @@ -369,7 +371,7 @@ mod test {
slot: Slot,
) -> MessageState
where
S: AggregateCache,
S: Cache,
{
let message_state = create_dummy_price_feed_message_state(feed_id, publish_time, slot);
state
Expand Down