From 7979435c150b89d7ad66a23bbc1b32198edd74c0 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Wed, 21 Jun 2023 16:14:41 +0100 Subject: [PATCH 1/3] [hermes] Add storage tests + refactor --- hermes/Cargo.lock | 202 +-------- hermes/Cargo.toml | 3 +- hermes/src/api/types.rs | 2 +- hermes/src/api/ws.rs | 2 +- hermes/src/store.rs | 25 +- hermes/src/store/storage.rs | 100 ++++- hermes/src/store/storage/local_storage.rs | 519 +++++++++++++++++++++- pythnet/pythnet_sdk/src/messages.rs | 13 +- 8 files changed, 628 insertions(+), 238 deletions(-) diff --git a/hermes/Cargo.lock b/hermes/Cargo.lock index 2eccaa22fc..bec56a3600 100644 --- a/hermes/Cargo.lock +++ b/hermes/Cargo.lock @@ -674,12 +674,6 @@ dependencies = [ "serde", ] -[[package]] -name = "bytecount" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" - [[package]] name = "bytemuck" version = "1.13.1" @@ -712,15 +706,6 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" -[[package]] -name = "camino" -version = "1.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c530edf18f37068ac2d977409ed5cd50d53d73bc653c7647b48eb78976ac9ae2" -dependencies = [ - "serde", -] - [[package]] name = "caps" version = "0.5.5" @@ -731,28 +716,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "cargo-platform" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbdb825da8a5df079a43676dbe042702f1707b1109f713a01420fbb4cc71fa27" -dependencies = [ - "serde", -] - -[[package]] -name = "cargo_metadata" -version = "0.14.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" -dependencies = [ - "camino", - "cargo-platform", - "semver 1.0.17", - "serde", - "serde_json", -] - [[package]] name = "cc" version = "1.0.79" @@ -1367,15 +1330,6 @@ dependencies = [ "libc", ] -[[package]] -name = "error-chain" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" -dependencies = [ - "version_check", -] - [[package]] name = "event-listener" version = "2.5.3" @@ -1639,12 +1593,6 @@ dependencies = [ "polyval", ] -[[package]] -name = "glob" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" - [[package]] name = "gloo-timers" version = "0.2.6" @@ -1711,7 +1659,7 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermes" -version = "0.1.2" +version = "0.1.3" dependencies = [ "anyhow", "async-trait", @@ -1730,7 +1678,6 @@ dependencies = [ "libc", "libp2p", "log", - "moka", "pyth-sdk", "pythnet-sdk", "rand 0.8.5", @@ -2861,15 +2808,6 @@ dependencies = [ "linked-hash-map", ] -[[package]] -name = "mach2" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" -dependencies = [ - "libc", -] - [[package]] name = "match_cfg" version = "0.1.0" @@ -2966,32 +2904,6 @@ dependencies = [ "windows-sys 0.45.0", ] -[[package]] -name = "moka" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "934030d03f6191edbb4ba16835ccdb80d560788ac686570a8e2986a0fb59ded8" -dependencies = [ - "async-io", - "async-lock", - "crossbeam-channel", - "crossbeam-epoch", - "crossbeam-utils", - "futures-util", - "num_cpus", - "once_cell", - "parking_lot 0.12.1", - "quanta", - "rustc_version 0.4.0", - "scheduled-thread-pool", - "skeptic", - "smallvec", - "tagptr", - "thiserror", - "triomphe", - "uuid", -] - [[package]] name = "multiaddr" version = "0.13.0" @@ -3776,17 +3688,6 @@ dependencies = [ "prost", ] -[[package]] -name = "pulldown-cmark" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d9cc634bc78768157b5cbfe988ffcd1dcba95cd2b2f03a88316c08c6d00ed63" -dependencies = [ - "bitflags", - "memchr", - "unicase", -] - [[package]] name = "pyth-sdk" version = "0.7.0" @@ -3827,22 +3728,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "quanta" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cc73c42f9314c4bdce450c77e6f09ecbddefbeddb1b5979ded332a3913ded33" -dependencies = [ - "crossbeam-utils", - "libc", - "mach2", - "once_cell", - "raw-cpuid", - "wasi 0.11.0+wasi-snapshot-preview1", - "web-sys", - "winapi", -] - [[package]] name = "quick-error" version = "1.2.3" @@ -4011,15 +3896,6 @@ dependencies = [ "rand_core 0.6.4", ] -[[package]] -name = "raw-cpuid" -version = "10.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" -dependencies = [ - "bitflags", -] - [[package]] name = "rayon" version = "1.7.0" @@ -4323,15 +4199,6 @@ dependencies = [ "cipher 0.4.4", ] -[[package]] -name = "same-file" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" -dependencies = [ - "winapi-util", -] - [[package]] name = "schannel" version = "0.1.21" @@ -4341,15 +4208,6 @@ dependencies = [ "windows-sys 0.42.0", ] -[[package]] -name = "scheduled-thread-pool" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" -dependencies = [ - "parking_lot 0.12.1", -] - [[package]] name = "schemars" version = "0.8.12" @@ -4447,9 +4305,6 @@ name = "semver" version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" -dependencies = [ - "serde", -] [[package]] name = "semver-parser" @@ -4681,21 +4536,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "skeptic" -version = "0.13.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" -dependencies = [ - "bytecount", - "cargo_metadata", - "error-chain", - "glob", - "pulldown-cmark", - "tempfile", - "walkdir", -] - [[package]] name = "slab" version = "0.4.8" @@ -5543,12 +5383,6 @@ dependencies = [ "libc", ] -[[package]] -name = "tagptr" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" - [[package]] name = "tempfile" version = "3.5.0" @@ -5881,12 +5715,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "triomphe" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1ee9bd9239c339d714d657fac840c6d2a4f9c45f4f9ec7b0975113458be78db" - [[package]] name = "trust-dns-proto" version = "0.20.4" @@ -6001,15 +5829,6 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "unicase" -version = "2.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" -dependencies = [ - "version_check", -] - [[package]] name = "unicode-bidi" version = "0.3.13" @@ -6110,15 +5929,6 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" -[[package]] -name = "uuid" -version = "1.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "345444e32442451b267fc254ae85a209c64be56d2890e601a0c37ff0c3c5ecd2" -dependencies = [ - "getrandom 0.2.9", -] - [[package]] name = "value-bag" version = "1.0.0-alpha.9" @@ -6159,16 +5969,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" -[[package]] -name = "walkdir" -version = "2.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36df944cda56c7d8d8b7496af378e6b16de9284591917d307c9b4d313c44e698" -dependencies = [ - "same-file", - "winapi-util", -] - [[package]] name = "want" version = "0.3.0" diff --git a/hermes/Cargo.toml b/hermes/Cargo.toml index bca0f76acb..2f246dc209 100644 --- a/hermes/Cargo.toml +++ b/hermes/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.1.2" +version = "0.1.3" edition = "2021" [dependencies] @@ -35,7 +35,6 @@ libp2p = { version = "0.42.2", features = [ ]} log = { version = "0.4.17" } -moka = { version = "0.11.0", features = ["future"] } pyth-sdk = { version = "0.7.0" } # Parse Wormhole attester price attestations. diff --git a/hermes/src/api/types.rs b/hermes/src/api/types.rs index 03fd159ddd..4371452ad5 100644 --- a/hermes/src/api/types.rs +++ b/hermes/src/api/types.rs @@ -70,7 +70,7 @@ impl RpcPriceFeed { let price_feed_message = price_feed_update.price_feed; Self { - id: PriceIdentifier::new(price_feed_message.id), + id: PriceIdentifier::new(price_feed_message.feed_id), price: Price { price: price_feed_message.price, conf: price_feed_message.conf, diff --git a/hermes/src/api/ws.rs b/hermes/src/api/ws.rs index 8b1e3cf196..934460e532 100644 --- a/hermes/src/api/ws.rs +++ b/hermes/src/api/ws.rs @@ -165,7 +165,7 @@ impl Subscriber { { let config = self .price_feeds_with_config - .get(&PriceIdentifier::new(update.price_feed.id)) + .get(&PriceIdentifier::new(update.price_feed.feed_id)) .ok_or(anyhow::anyhow!( "Config missing, price feed list was poisoned during iteration." ))?; diff --git a/hermes/src/store.rs b/hermes/src/store.rs index 70c30a1615..786e2f3243 100644 --- a/hermes/src/store.rs +++ b/hermes/src/store.rs @@ -48,6 +48,7 @@ use { std::{ collections::{ BTreeMap, + BTreeSet, HashSet, }, sync::Arc, @@ -78,9 +79,11 @@ pub mod storage; pub mod types; pub mod wormhole; +const OBSERVED_CACHE_SIZE: usize = 1000; + pub struct Store { pub storage: StorageInstance, - pub observed_vaa_seqs: Cache, + pub observed_vaa_seqs: RwLock>, pub guardian_set: RwLock>, pub update_tx: Sender<()>, pub last_completed_update_at: RwLock>, @@ -90,10 +93,7 @@ impl Store { pub fn new_with_local_cache(update_tx: Sender<()>, cache_size: u64) -> Arc { Arc::new(Self { storage: storage::local_storage::LocalStorage::new_instance(cache_size), - observed_vaa_seqs: Cache::builder() - .max_capacity(cache_size) - .time_to_live(Duration::from_secs(60 * 5)) - .build(), + observed_vaa_seqs: RwLock::new(Default::default()), guardian_set: RwLock::new(Default::default()), update_tx, last_completed_update_at: RwLock::new(None), @@ -113,7 +113,7 @@ impl Store { return Ok(()); // Ignore VAA from other emitters } - if self.observed_vaa_seqs.get(&vaa.sequence).is_some() { + if self.observed_vaa_seqs.read().await.contains(&vaa.sequence) { return Ok(()); // Ignore VAA if we have already seen it } @@ -127,7 +127,11 @@ impl Store { } }; - self.observed_vaa_seqs.insert(vaa.sequence, true).await; + let mut observed_vaa_seqs = self.observed_vaa_seqs.write().await; + observed_vaa_seqs.insert(vaa.sequence); + if observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE { + observed_vaa_seqs.pop_first(); + } match WormholeMessage::try_from_bytes(vaa.payload)?.payload { WormholePayload::Merkle(proof) => { @@ -232,7 +236,10 @@ impl Store { let messages = self .storage .fetch_message_states( - price_ids, + price_ids + .iter() + .map(|price_id| price_id.to_bytes()) + .collect(), request_time, MessageStateFilter::Only(MessageType::PriceFeedMessage), ) @@ -267,7 +274,7 @@ impl Store { .message_state_keys() .await .iter() - .map(|key| PriceIdentifier::new(key.id)) + .map(|key| PriceIdentifier::new(key.feed_id)) .collect() } diff --git a/hermes/src/store/storage.rs b/hermes/src/store/storage.rs index 16542ca2dc..7a2c681fb8 100644 --- a/hermes/src/store/storage.rs +++ b/hermes/src/store/storage.rs @@ -14,8 +14,8 @@ use { Result, }, async_trait::async_trait, - pyth_sdk::PriceIdentifier, pythnet_sdk::messages::{ + FeedId, Message, MessageType, }, @@ -57,8 +57,8 @@ impl TryFrom for CompletedAccumulatorState { #[derive(Clone, PartialEq, Eq, Debug, Hash)] pub struct MessageStateKey { - pub id: [u8; 32], - pub type_: MessageType, + pub feed_id: FeedId, + pub type_: MessageType, } #[derive(Clone, PartialEq, Eq, Debug, PartialOrd, Ord)] @@ -85,8 +85,8 @@ impl MessageState { pub fn key(&self) -> MessageStateKey { MessageStateKey { - id: self.message.id(), - type_: self.message.into(), + feed_id: self.message.feed_id(), + type_: self.message.into(), } } @@ -125,13 +125,99 @@ pub trait Storage: Send + Sync { async fn store_message_states(&self, message_states: Vec) -> Result<()>; async fn fetch_message_states( &self, - ids: Vec, + ids: Vec, request_time: RequestTime, filter: MessageStateFilter, ) -> Result>; async fn store_accumulator_state(&self, state: AccumulatorState) -> Result<()>; - async fn fetch_accumulator_state(&self, slot: u64) -> Result>; + async fn fetch_accumulator_state(&self, slot: Slot) -> Result>; } pub type StorageInstance = Box; + +#[cfg(test)] +mod test { + use { + super::*, + pythnet_sdk::wire::v1::WormholeMerkleRoot, + }; + + #[test] + pub fn test_complete_accumulator_state_try_from_accumulator_state_works() { + let accumulator_state = AccumulatorState { + slot: 1, + accumulator_messages: None, + wormhole_merkle_state: None, + }; + + assert!(CompletedAccumulatorState::try_from(accumulator_state.clone()).is_err()); + + let accumulator_state = AccumulatorState { + slot: 1, + accumulator_messages: Some(AccumulatorMessages { + slot: 1, + magic: [0; 4], + ring_size: 10, + messages: vec![], + }), + wormhole_merkle_state: None, + }; + + assert!(CompletedAccumulatorState::try_from(accumulator_state.clone()).is_err()); + + let accumulator_state = AccumulatorState { + slot: 1, + accumulator_messages: None, + wormhole_merkle_state: Some(WormholeMerkleState { + vaa: vec![], + root: WormholeMerkleRoot { + slot: 1, + ring_size: 10, + root: [0; 20], + }, + }), + }; + + assert!(CompletedAccumulatorState::try_from(accumulator_state.clone()).is_err()); + + let accumulator_state = AccumulatorState { + slot: 1, + accumulator_messages: Some(AccumulatorMessages { + slot: 1, + magic: [0; 4], + ring_size: 10, + messages: vec![], + }), + wormhole_merkle_state: Some(WormholeMerkleState { + vaa: vec![], + root: WormholeMerkleRoot { + slot: 1, + ring_size: 10, + root: [0; 20], + }, + }), + }; + + assert_eq!( + CompletedAccumulatorState::try_from(accumulator_state.clone()).unwrap(), + CompletedAccumulatorState { + slot: 1, + accumulator_messages: AccumulatorMessages { + slot: 1, + magic: [0; 4], + ring_size: 10, + messages: vec![], + }, + wormhole_merkle_state: WormholeMerkleState { + vaa: vec![], + root: WormholeMerkleRoot { + slot: 1, + ring_size: 10, + root: [0; 20], + }, + }, + } + ); + } +} diff --git a/hermes/src/store/storage/local_storage.rs b/hermes/src/store/storage/local_storage.rs index 7760d13333..dfe1ffa935 100644 --- a/hermes/src/store/storage/local_storage.rs +++ b/hermes/src/store/storage/local_storage.rs @@ -16,20 +16,22 @@ use { }, async_trait::async_trait, dashmap::DashMap, - moka::sync::Cache, - pyth_sdk::PriceIdentifier, - pythnet_sdk::messages::MessageType, + pythnet_sdk::messages::{ + FeedId, + MessageType, + }, std::{ collections::VecDeque, sync::Arc, }, strum::IntoEnumIterator, + tokio::sync::RwLock, }; #[derive(Clone)] pub struct LocalStorage { message_cache: Arc>>, - accumulator_cache: Cache, + accumulator_cache: Arc>>, cache_size: u64, } @@ -37,7 +39,7 @@ impl LocalStorage { pub fn new_instance(cache_size: u64) -> StorageInstance { Box::new(Self { message_cache: Arc::new(DashMap::new()), - accumulator_cache: Cache::builder().max_capacity(cache_size).build(), + accumulator_cache: Arc::new(RwLock::new(VecDeque::new())), cache_size, }) } @@ -123,7 +125,7 @@ impl Storage for LocalStorage { async fn fetch_message_states( &self, - ids: Vec, + ids: Vec, request_time: RequestTime, filter: MessageStateFilter, ) -> Result> { @@ -137,8 +139,8 @@ impl Storage for LocalStorage { message_types.into_iter().map(move |message_type| { let key = MessageStateKey { - id: id.to_bytes(), - type_: message_type, + feed_id: id, + type_: message_type, }; self.retrieve_message_state(key, request_time.clone()) .ok_or(anyhow!("Message not found")) @@ -155,12 +157,507 @@ impl Storage for LocalStorage { } async fn store_accumulator_state(&self, state: super::AccumulatorState) -> Result<()> { - let key = state.slot; - self.accumulator_cache.insert(key, state); + let mut accumulator_cache = self.accumulator_cache.write().await; + accumulator_cache.push_back(state); + + let mut i = accumulator_cache.len().saturating_sub(1); + while i > 0 && accumulator_cache[i - 1].slot > accumulator_cache[i].slot { + accumulator_cache.swap(i - 1, i); + i -= 1; + } + + if accumulator_cache.len() > self.cache_size as usize { + accumulator_cache.pop_front(); + } + Ok(()) } async fn fetch_accumulator_state(&self, slot: Slot) -> Result> { - Ok(self.accumulator_cache.get(&slot)) + let accumulator_cache = self.accumulator_cache.read().await; + match accumulator_cache.binary_search_by_key(&slot, |state| state.slot) { + Ok(idx) => Ok(accumulator_cache.get(idx).cloned()), + Err(_) => Ok(None), + } + } +} + +#[cfg(test)] +mod test { + use { + super::*, + crate::store::{ + proof::wormhole_merkle::WormholeMerkleMessageProof, + types::{ + AccumulatorMessages, + ProofSet, + }, + }, + pyth_sdk::UnixTimestamp, + pythnet_sdk::{ + accumulators::merkle::MerklePath, + hashers::keccak256_160::Keccak160, + messages::{ + Message, + PriceFeedMessage, + }, + }, + }; + + pub fn create_dummy_price_feed_message_state( + feed_id: FeedId, + publish_time: i64, + slot: Slot, + ) -> MessageState { + MessageState { + slot, + message: Message::PriceFeedMessage(PriceFeedMessage { + feed_id, + publish_time, + price: 1, + conf: 2, + exponent: 3, + ema_price: 4, + ema_conf: 5, + prev_publish_time: 6, + }), + received_at: publish_time, + proof_set: ProofSet { + wormhole_merkle_proof: WormholeMerkleMessageProof { + vaa: vec![], + proof: MerklePath::::new(vec![]), + }, + }, + } + } + + pub async fn create_and_store_dummy_price_feed_message_state( + storage: &StorageInstance, + feed_id: FeedId, + publish_time: UnixTimestamp, + slot: Slot, + ) -> MessageState { + let message_state = create_dummy_price_feed_message_state(feed_id, publish_time, slot); + storage + .store_message_states(vec![message_state.clone()]) + .await + .unwrap(); + message_state + } + + #[tokio::test] + pub async fn test_store_and_retrieve_latest_message_state_works() { + // Initialize a storage with a cache size of 2 per key. + let storage = LocalStorage::new_instance(2); + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + let message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // The latest message state should be the one we just stored. + assert_eq!( + storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::Latest, + MessageStateFilter::Only(MessageType::PriceFeedMessage), + ) + .await + .unwrap(), + vec![message_state] + ); + } + + #[tokio::test] + pub async fn test_store_and_retrieve_latest_message_state_with_multiple_update_works() { + // Initialize a storage with a cache size of 2 per key. + let storage = LocalStorage::new_instance(2); + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + let _old_message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // Create and store a message state with feed id [1....] and publish time 20 at slot 10. + let new_message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 10).await; + + // The latest message state should be the one with publish time 20. + assert_eq!( + storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::Latest, + MessageStateFilter::Only(MessageType::PriceFeedMessage) + ) + .await + .unwrap(), + vec![new_message_state.clone()] + ); + } + + #[tokio::test] + pub async fn test_store_and_retrieve_latest_message_state_with_out_of_order_update_works() { + // Initialize a storage with a cache size of 2 per key. + let storage = LocalStorage::new_instance(2); + + // Create and store a message state with feed id [1....] and publish time 20 at slot 10. + let new_message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 10).await; + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + let _old_message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // The latest message state should be the one with publish time 20. + assert_eq!( + storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::Latest, + MessageStateFilter::Only(MessageType::PriceFeedMessage) + ) + .await + .unwrap(), + vec![new_message_state.clone()] + ); + } + + #[tokio::test] + pub async fn test_store_and_retrieve_first_after_message_state_works() { + // Initialize a storage with a cache size of 2 per key. + let storage = LocalStorage::new_instance(2); + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + let old_message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // Create and store a message state with feed id [1....] and publish time 13 at slot 10. + let new_message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await; + + // The first message state after time 10 should be the old message state. + assert_eq!( + storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::FirstAfter(10), + MessageStateFilter::Only(MessageType::PriceFeedMessage) + ) + .await + .unwrap(), + vec![old_message_state] + ); + + // Querying the first after pub time 11, 12, 13 should all return the new message state. + for request_time in 11..14 { + assert_eq!( + storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::FirstAfter(request_time), + MessageStateFilter::Only(MessageType::PriceFeedMessage) + ) + .await + .unwrap(), + vec![new_message_state.clone()] + ); + } + } + + #[tokio::test] + pub async fn test_store_and_retrieve_latest_message_state_with_same_pubtime_works() { + // Initialize a storage with a cache size of 2 per key. + let storage = LocalStorage::new_instance(2); + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + let slightly_older_message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // Create and store a message state with feed id [1....] and publish time 10 at slot 7. + let slightly_newer_message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 7).await; + + // The latest message state should be the one with the higher slot. + assert_eq!( + storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::Latest, + MessageStateFilter::Only(MessageType::PriceFeedMessage), + ) + .await + .unwrap(), + vec![slightly_newer_message_state] + ); + + // Querying the first message state after time 10 should return the one with the lower slot. + assert_eq!( + storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::FirstAfter(10), + MessageStateFilter::Only(MessageType::PriceFeedMessage), + ) + .await + .unwrap(), + vec![slightly_older_message_state] + ); + } + + + #[tokio::test] + pub async fn test_store_and_retrieve_first_after_message_state_fails_for_past_time() { + // Initialize a storage with a cache size of 2 per key. + let storage = LocalStorage::new_instance(2); + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // Create and store a message state with feed id [1....] and publish time 13 at slot 10. + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await; + + // Query the message state before the available times should return an error. + // This is because we are not sure that the first available message is really the first. + assert!(storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::FirstAfter(9), + MessageStateFilter::Only(MessageType::PriceFeedMessage) + ) + .await + .is_err()); + } + + #[tokio::test] + pub async fn test_store_and_retrieve_first_after_message_state_fails_for_future_time() { + // Initialize a storage with a cache size of 2 per key. + let storage = LocalStorage::new_instance(2); + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // Create and store a message state with feed id [1....] and publish time 13 at slot 10. + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await; + + // Query the message state after the available times should return an error. + assert!(storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::FirstAfter(14), + MessageStateFilter::Only(MessageType::PriceFeedMessage) + ) + .await + .is_err()); + } + + #[tokio::test] + pub async fn test_store_more_message_states_than_cache_size_evicts_old_messages() { + // Initialize a storage with a cache size of 2 per key. + let storage = LocalStorage::new_instance(2); + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // Create and store a message state with feed id [1....] and publish time 13 at slot 10. + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await; + + // Create and store a message state with feed id [1....] and publish time 20 at slot 14. + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 14).await; + + // The message at time 10 should be evicted and querying for it should return an error. + assert!(storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::FirstAfter(10), + MessageStateFilter::Only(MessageType::PriceFeedMessage) + ) + .await + .is_err()); + } + + #[tokio::test] + pub async fn test_store_and_receive_multiple_message_feed_ids_works() { + // Initialize a storage with a cache size of 1 per key. + let storage = LocalStorage::new_instance(1); + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + let message_state_1 = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // Create and store a message state with feed id [2....] and publish time 13 at slot 10. + let message_state_2 = + create_and_store_dummy_price_feed_message_state(&storage, [2; 32], 10, 5).await; + + // Check both message states can be retrieved. + assert_eq!( + storage + .fetch_message_states( + vec![[1; 32], [2; 32]], + RequestTime::Latest, + MessageStateFilter::Only(MessageType::PriceFeedMessage), + ) + .await + .unwrap(), + vec![message_state_1, message_state_2] + ); + } + + #[tokio::test] + pub async fn test_receive_not_existent_message_fails() { + // Initialize a storage with a cache size of 2 per key. + let storage = LocalStorage::new_instance(2); + + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // Check both message states can be retrieved. + assert!(storage + .fetch_message_states( + vec![[2; 32]], + RequestTime::Latest, + MessageStateFilter::Only(MessageType::PriceFeedMessage), + ) + .await + .is_err()); + } + + pub fn create_empty_accumulator_state_at_slot(slot: Slot) -> AccumulatorState { + AccumulatorState { + slot, + accumulator_messages: None, + wormhole_merkle_state: None, + } + } + + #[tokio::test] + pub async fn test_store_and_receive_accumulator_state_works() { + // Initialize a storage with a cache size of 2 per key and the accumulator state. + let storage = LocalStorage::new_instance(2); + + // Create and store an accumulator state with slot 10. + let accumulator_state = create_empty_accumulator_state_at_slot(10); + + // Store the accumulator state. + storage + .store_accumulator_state(accumulator_state.clone()) + .await + .unwrap(); + + // Make sure the retrieved accumulator state is what we stored. + assert_eq!( + storage.fetch_accumulator_state(10).await.unwrap().unwrap(), + accumulator_state + ); + } + + #[tokio::test] + pub async fn test_store_and_receive_accumulator_state_works_on_overwrite() { + // Initialize a storage with a cache size of 2 per key and the accumulator state. + let storage = LocalStorage::new_instance(2); + + // Create and store an accumulator state with slot 10. + let mut accumulator_state = create_empty_accumulator_state_at_slot(10); + + // Store the accumulator state. + storage + .store_accumulator_state(accumulator_state.clone()) + .await + .unwrap(); + + // Retrieve the accumulator state and make sure it is what we stored. + assert_eq!( + storage.fetch_accumulator_state(10).await.unwrap().unwrap(), + accumulator_state + ); + + // Change the state to have accumulator messages + // We mutate the existing state because the normal flow is like this. + accumulator_state.accumulator_messages = Some(AccumulatorMessages { + magic: [0; 4], + slot: 10, + ring_size: 3, + messages: vec![], + }); + + // Store the accumulator state again. + storage + .store_accumulator_state(accumulator_state.clone()) + .await + .unwrap(); + + // Make sure the retrieved accumulator state is what we stored. + assert_eq!( + storage.fetch_accumulator_state(10).await.unwrap().unwrap(), + accumulator_state + ); + } + + #[tokio::test] + pub async fn test_store_and_receive_multiple_accumulator_state_works() { + // Initialize a storage with a cache size of 2 per key and the accumulator state. + let storage = LocalStorage::new_instance(2); + + let accumulator_state_at_slot_10 = create_empty_accumulator_state_at_slot(10); + let accumulator_state_at_slot_20 = create_empty_accumulator_state_at_slot(20); + + // Store the accumulator states. + storage + .store_accumulator_state(accumulator_state_at_slot_10.clone()) + .await + .unwrap(); + + storage + .store_accumulator_state(accumulator_state_at_slot_20.clone()) + .await + .unwrap(); + + // Retrieve the accumulator states and make sure it is what we stored. + assert_eq!( + storage.fetch_accumulator_state(10).await.unwrap().unwrap(), + accumulator_state_at_slot_10 + ); + + assert_eq!( + storage.fetch_accumulator_state(20).await.unwrap().unwrap(), + accumulator_state_at_slot_20 + ); + } + + #[tokio::test] + pub async fn test_store_and_receive_accumulator_state_evicts_cache() { + // Initialize a storage with a cache size of 2 per key and the accumulator state. + let storage = LocalStorage::new_instance(2); + + let accumulator_state_at_slot_10 = create_empty_accumulator_state_at_slot(10); + storage + .store_accumulator_state(accumulator_state_at_slot_10.clone()) + .await + .unwrap(); + + let accumulator_state_at_slot_20 = create_empty_accumulator_state_at_slot(20); + storage + .store_accumulator_state(accumulator_state_at_slot_20.clone()) + .await + .unwrap(); + + let accumulator_state_at_slot_30 = create_empty_accumulator_state_at_slot(30); + storage + .store_accumulator_state(accumulator_state_at_slot_30.clone()) + .await + .unwrap(); + + // The accumulator state at slot 10 should be evicted from the cache. + assert_eq!(storage.fetch_accumulator_state(10).await.unwrap(), None); + + + // Retrieve the rest of accumulator states and make sure it is what we stored. + assert_eq!( + storage.fetch_accumulator_state(20).await.unwrap().unwrap(), + accumulator_state_at_slot_20 + ); + + assert_eq!( + storage.fetch_accumulator_state(30).await.unwrap().unwrap(), + accumulator_state_at_slot_30 + ); } } diff --git a/pythnet/pythnet_sdk/src/messages.rs b/pythnet/pythnet_sdk/src/messages.rs index 3dc43f212a..3d66cb35a5 100644 --- a/pythnet/pythnet_sdk/src/messages.rs +++ b/pythnet/pythnet_sdk/src/messages.rs @@ -17,7 +17,6 @@ use serde::{ /// some of the methods for PriceFeedMessage and TwapMessage are not used by the oracle /// for the same reason. Rust compiler doesn't include the unused methods in the contract. /// Once we start using the unused structs and methods, the contract size will increase. - #[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)] #[cfg_attr( feature = "strum", @@ -47,10 +46,10 @@ impl Message { } } - pub fn id(&self) -> [u8; 32] { + pub fn feed_id(&self) -> FeedId { match self { - Self::PriceFeedMessage(msg) => msg.id, - Self::TwapMessage(msg) => msg.id, + Self::PriceFeedMessage(msg) => msg.feed_id, + Self::TwapMessage(msg) => msg.feed_id, } } } @@ -65,11 +64,13 @@ impl Arbitrary for Message { } } +/// Id of a feed producing the message. One feed produces one or more messages. +pub type FeedId = [u8; 32]; #[repr(C)] #[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)] pub struct PriceFeedMessage { - pub id: [u8; 32], + pub feed_id: FeedId, pub price: i64, pub conf: u64, pub exponent: i32, @@ -119,7 +120,7 @@ impl Arbitrary for PriceFeedMessage { #[repr(C)] #[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)] pub struct TwapMessage { - pub id: [u8; 32], + pub feed_id: FeedId, pub cumulative_price: i128, pub cumulative_conf: u128, pub num_down_slots: u64, From 80a48b7bbafa8610d58c9402c704ddb855e07344 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Wed, 21 Jun 2023 18:29:23 +0100 Subject: [PATCH 2/3] Bump pythnet_sdk version + update cosmwasm --- pythnet/pythnet_sdk/Cargo.toml | 2 +- target_chains/cosmwasm/Cargo.lock | 2 +- target_chains/cosmwasm/contracts/pyth/src/contract.rs | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pythnet/pythnet_sdk/Cargo.toml b/pythnet/pythnet_sdk/Cargo.toml index 93bf278a3c..c2e233dcd6 100644 --- a/pythnet/pythnet_sdk/Cargo.toml +++ b/pythnet/pythnet_sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pythnet-sdk" -version = "1.13.6" +version = "2.0.0" description = "Pyth Runtime for Solana" authors = ["Pyth Data Association"] repository = "https://github.com/pyth-network/pythnet" diff --git a/target_chains/cosmwasm/Cargo.lock b/target_chains/cosmwasm/Cargo.lock index 0a2b337ebf..70086bf84e 100644 --- a/target_chains/cosmwasm/Cargo.lock +++ b/target_chains/cosmwasm/Cargo.lock @@ -1490,7 +1490,7 @@ dependencies = [ [[package]] name = "pythnet-sdk" -version = "1.13.6" +version = "2.0.0" dependencies = [ "bincode", "borsh", diff --git a/target_chains/cosmwasm/contracts/pyth/src/contract.rs b/target_chains/cosmwasm/contracts/pyth/src/contract.rs index 781fc1296f..3839f6ac23 100644 --- a/target_chains/cosmwasm/contracts/pyth/src/contract.rs +++ b/target_chains/cosmwasm/contracts/pyth/src/contract.rs @@ -537,7 +537,7 @@ fn process_accumulator( match msg { Message::PriceFeedMessage(price_feed_message) => { let price_feed = PriceFeed::new( - PriceIdentifier::new(price_feed_message.id), + PriceIdentifier::new(price_feed_message.feed_id), Price { price: price_feed_message.price, conf: price_feed_message.conf, @@ -1116,7 +1116,7 @@ mod test { let mut dummy_id = [0; 32]; dummy_id[0] = value as u8; let msg = PriceFeedMessage { - id: dummy_id, + feed_id: dummy_id, price: value, conf: value as u64, exponent: value as i32, @@ -1195,7 +1195,7 @@ mod test { match msg { Message::PriceFeedMessage(feed_msg) => { let feed = price_feed_read_bucket(&deps.storage) - .load(&feed_msg.id) + .load(&feed_msg.feed_id) .unwrap(); let price = feed.get_price_unchecked(); let ema_price = feed.get_ema_price_unchecked(); @@ -1469,7 +1469,7 @@ mod test { // Although Twap Message is a valid message but it won't get stored on-chain via // `update_price_feeds` and (will be) used in other methods let feed1 = Message::TwapMessage(TwapMessage { - id: [0; 32], + feed_id: [0; 32], cumulative_price: 0, cumulative_conf: 0, num_down_slots: 0, From 48ecb6c079baf71773f94a86878f8b150166c8e2 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Thu, 22 Jun 2023 11:21:37 +0100 Subject: [PATCH 3/3] Address review feedbacks --- hermes/Cargo.lock | 2 +- hermes/Cargo.toml | 2 +- hermes/src/store.rs | 11 ++-- hermes/src/store/storage/local_storage.rs | 75 +++++++++-------------- 4 files changed, 36 insertions(+), 54 deletions(-) diff --git a/hermes/Cargo.lock b/hermes/Cargo.lock index bec56a3600..bfa025aa77 100644 --- a/hermes/Cargo.lock +++ b/hermes/Cargo.lock @@ -3703,7 +3703,7 @@ dependencies = [ [[package]] name = "pythnet-sdk" -version = "1.13.6" +version = "2.0.0" dependencies = [ "bincode", "borsh", diff --git a/hermes/Cargo.toml b/hermes/Cargo.toml index 2f246dc209..1bfc06eb96 100644 --- a/hermes/Cargo.toml +++ b/hermes/Cargo.toml @@ -38,7 +38,7 @@ log = { version = "0.4.17" } pyth-sdk = { version = "0.7.0" } # Parse Wormhole attester price attestations. -pythnet-sdk = { path = "../pythnet/pythnet_sdk/", version = "=1.13.6", features = ["strum"] } +pythnet-sdk = { path = "../pythnet/pythnet_sdk/", version = "2.0.0", features = ["strum"] } rand = { version = "0.8.5" } reqwest = { version = "0.11.14", features = ["blocking", "json"] } diff --git a/hermes/src/store.rs b/hermes/src/store.rs index 786e2f3243..e9e026e1fa 100644 --- a/hermes/src/store.rs +++ b/hermes/src/store.rs @@ -33,7 +33,6 @@ use { anyhow, Result, }, - moka::future::Cache, pyth_sdk::PriceIdentifier, pythnet_sdk::{ messages::{ @@ -127,10 +126,12 @@ impl Store { } }; - let mut observed_vaa_seqs = self.observed_vaa_seqs.write().await; - observed_vaa_seqs.insert(vaa.sequence); - if observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE { - observed_vaa_seqs.pop_first(); + { + let mut observed_vaa_seqs = self.observed_vaa_seqs.write().await; + observed_vaa_seqs.insert(vaa.sequence); + while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE { + observed_vaa_seqs.pop_first(); + } } match WormholeMessage::try_from_bytes(vaa.payload)?.payload { diff --git a/hermes/src/store/storage/local_storage.rs b/hermes/src/store/storage/local_storage.rs index dfe1ffa935..518168b70e 100644 --- a/hermes/src/store/storage/local_storage.rs +++ b/hermes/src/store/storage/local_storage.rs @@ -291,7 +291,7 @@ mod test { ) .await .unwrap(), - vec![new_message_state.clone()] + vec![new_message_state] ); } @@ -318,7 +318,7 @@ mod test { ) .await .unwrap(), - vec![new_message_state.clone()] + vec![new_message_state] ); } @@ -528,19 +528,26 @@ mod test { } } + pub async fn create_and_store_empty_accumulator_state_at_slot( + storage: &StorageInstance, + slot: Slot, + ) -> AccumulatorState { + let accumulator_state = create_empty_accumulator_state_at_slot(slot); + storage + .store_accumulator_state(accumulator_state.clone()) + .await + .unwrap(); + accumulator_state + } + #[tokio::test] pub async fn test_store_and_receive_accumulator_state_works() { // Initialize a storage with a cache size of 2 per key and the accumulator state. let storage = LocalStorage::new_instance(2); // Create and store an accumulator state with slot 10. - let accumulator_state = create_empty_accumulator_state_at_slot(10); - - // Store the accumulator state. - storage - .store_accumulator_state(accumulator_state.clone()) - .await - .unwrap(); + let accumulator_state = + create_and_store_empty_accumulator_state_at_slot(&storage, 10).await; // Make sure the retrieved accumulator state is what we stored. assert_eq!( @@ -555,13 +562,8 @@ mod test { let storage = LocalStorage::new_instance(2); // Create and store an accumulator state with slot 10. - let mut accumulator_state = create_empty_accumulator_state_at_slot(10); - - // Store the accumulator state. - storage - .store_accumulator_state(accumulator_state.clone()) - .await - .unwrap(); + let mut accumulator_state = + create_and_store_empty_accumulator_state_at_slot(&storage, 10).await; // Retrieve the accumulator state and make sure it is what we stored. assert_eq!( @@ -596,19 +598,10 @@ mod test { // Initialize a storage with a cache size of 2 per key and the accumulator state. let storage = LocalStorage::new_instance(2); - let accumulator_state_at_slot_10 = create_empty_accumulator_state_at_slot(10); - let accumulator_state_at_slot_20 = create_empty_accumulator_state_at_slot(20); - - // Store the accumulator states. - storage - .store_accumulator_state(accumulator_state_at_slot_10.clone()) - .await - .unwrap(); - - storage - .store_accumulator_state(accumulator_state_at_slot_20.clone()) - .await - .unwrap(); + let accumulator_state_at_slot_10 = + create_and_store_empty_accumulator_state_at_slot(&storage, 10).await; + let accumulator_state_at_slot_20 = + create_and_store_empty_accumulator_state_at_slot(&storage, 20).await; // Retrieve the accumulator states and make sure it is what we stored. assert_eq!( @@ -627,28 +620,16 @@ mod test { // Initialize a storage with a cache size of 2 per key and the accumulator state. let storage = LocalStorage::new_instance(2); - let accumulator_state_at_slot_10 = create_empty_accumulator_state_at_slot(10); - storage - .store_accumulator_state(accumulator_state_at_slot_10.clone()) - .await - .unwrap(); - - let accumulator_state_at_slot_20 = create_empty_accumulator_state_at_slot(20); - storage - .store_accumulator_state(accumulator_state_at_slot_20.clone()) - .await - .unwrap(); - - let accumulator_state_at_slot_30 = create_empty_accumulator_state_at_slot(30); - storage - .store_accumulator_state(accumulator_state_at_slot_30.clone()) - .await - .unwrap(); + let _accumulator_state_at_slot_10 = + create_and_store_empty_accumulator_state_at_slot(&storage, 10).await; + let accumulator_state_at_slot_20 = + create_and_store_empty_accumulator_state_at_slot(&storage, 20).await; + let accumulator_state_at_slot_30 = + create_and_store_empty_accumulator_state_at_slot(&storage, 30).await; // The accumulator state at slot 10 should be evicted from the cache. assert_eq!(storage.fetch_accumulator_state(10).await.unwrap(), None); - // Retrieve the rest of accumulator states and make sure it is what we stored. assert_eq!( storage.fetch_accumulator_state(20).await.unwrap().unwrap(),