From 6e3f3873875b9fde72a9942e73cfdfa81405acef Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Wed, 23 Nov 2022 09:38:02 -0800 Subject: [PATCH 01/25] Refactor to make this change easier --- solana/pyth2wormhole/Cargo.lock | 2 +- .../client/src/attestation_cfg.rs | 17 +++- .../pyth2wormhole/client/src/batch_state.rs | 14 +-- solana/pyth2wormhole/client/src/main.rs | 97 ++++++++----------- 4 files changed, 61 insertions(+), 69 deletions(-) diff --git a/solana/pyth2wormhole/Cargo.lock b/solana/pyth2wormhole/Cargo.lock index 165dff9a59..37bb0154de 100644 --- a/solana/pyth2wormhole/Cargo.lock +++ b/solana/pyth2wormhole/Cargo.lock @@ -2642,7 +2642,7 @@ dependencies = [ [[package]] name = "pyth2wormhole-client" -version = "1.0.0-rc3" +version = "1.0.0" dependencies = [ "borsh", "clap 3.1.18", diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index c81e9a4374..57d3dcfdeb 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -102,7 +102,7 @@ impl AttestationConfig { } } - pub fn as_batches(&self, max_batch_size: usize) -> Vec { + pub fn as_batches(&self, max_batch_size: usize) -> Vec { self.symbol_groups .iter() .map(move |g| { @@ -116,7 +116,11 @@ impl AttestationConfig { .as_slice() .chunks(max_batch_size.clone()) .map(move |symbols| { - BatchState::new(name4closure.clone(), symbols, conditions4closure.clone()) + BatchConfig { + group_name: name4closure.clone(), + symbols: symbols.to_vec(), + conditions: conditions4closure.clone() + } }) }) .flatten() @@ -124,6 +128,15 @@ impl AttestationConfig { } } +/// Configuration for a single batch to send. +/// A valid batch config requires that `symbols.len() < max_batch_size`. +#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] +pub struct BatchConfig { + pub group_name: String, + pub symbols: Vec, + pub conditions: AttestationConditions, +} + #[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] pub struct SymbolGroup { pub group_name: String, diff --git a/solana/pyth2wormhole/client/src/batch_state.rs b/solana/pyth2wormhole/client/src/batch_state.rs index 62f914afba..307b2a9e9e 100644 --- a/solana/pyth2wormhole/client/src/batch_state.rs +++ b/solana/pyth2wormhole/client/src/batch_state.rs @@ -15,11 +15,13 @@ use crate::{ AttestationConditions, P2WSymbol, }; +use crate::attestation_cfg::BatchConfig; /// Runtime representation of a batch. It refers to the original group /// from the config. #[derive(Debug)] pub struct BatchState { + // TODO: replace with BatchConfig pub group_name: String, pub symbols: Vec, pub last_known_symbol_states: Vec>, @@ -29,15 +31,13 @@ pub struct BatchState { impl<'a> BatchState { pub fn new( - group_name: String, - symbols: &[P2WSymbol], - conditions: AttestationConditions, + config: &BatchConfig ) -> Self { Self { - group_name, - symbols: symbols.to_vec(), - conditions, - last_known_symbol_states: vec![None; symbols.len()], + group_name: config.group_name.clone(), + symbols: config.symbols.clone(), + conditions: config.conditions.clone(), + last_known_symbol_states: vec![None; config.symbols.len()], last_job_finished_at: Instant::now(), } } diff --git a/solana/pyth2wormhole/client/src/main.rs b/solana/pyth2wormhole/client/src/main.rs index 2112a63f36..c1f8572e03 100644 --- a/solana/pyth2wormhole/client/src/main.rs +++ b/solana/pyth2wormhole/client/src/main.rs @@ -57,11 +57,9 @@ use cli::{ use p2w_sdk::P2WEmitter; -use pyth2wormhole::{ - attest::P2W_MAX_BATCH_SIZE, - Pyth2WormholeConfig, -}; +use pyth2wormhole::{attest, attest::P2W_MAX_BATCH_SIZE, Pyth2WormholeConfig}; use pyth2wormhole_client::*; +use pyth2wormhole_client::attestation_cfg::BatchConfig; pub const SEQNO_PREFIX: &'static str = "Program log: Sequence: "; @@ -283,31 +281,10 @@ async fn handle_attest_daemon_mode( let start_time = Instant::now(); // Helps timekeep mapping lookups accurately let config = get_config_account(&lock_and_make_rpc(&rpc_cfg).await, &p2w_addr).await?; - - // Use the mapping if specified - if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { - match crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await { - Ok(additional_accounts) => { - debug!( - "Crawled mapping {} data:\n{:#?}", - mapping_addr, additional_accounts - ); - attestation_cfg.add_symbols(additional_accounts, "mapping".to_owned()); - } - // De-escalate crawling errors; A temporary failure to - // look up the mapping should not crash the attester - Err(e) => { - error!("Could not crawl mapping {}: {:?}", mapping_addr, e); - } - } - } - debug!( - "Attestation config (includes mapping accounts):\n{:#?}", - attestation_cfg - ); + let batch_cfg = attestation_config_to_batches(&rpc_cfg, &attestation_cfg, config.max_batch_size as usize).await; // Hash currently known config - hasher.update(serde_yaml::to_vec(&attestation_cfg)?); + hasher.update(serde_yaml::to_vec(&batch_cfg)?); hasher.update(borsh::to_vec(&config)?); let new_cfg_hash = hasher.finalize_reset(); @@ -321,7 +298,7 @@ async fn handle_attest_daemon_mode( info!("Spinning up attestation sched jobs"); // Start the new sched futures let new_sched_futs_handle = tokio::spawn(prepare_attestation_sched_jobs( - &attestation_cfg, + &batch_cfg, &config, &rpc_cfg, &p2w_addr, @@ -339,7 +316,7 @@ async fn handle_attest_daemon_mode( // Base case for first attestation attempt old_sched_futs_state = Some(( tokio::spawn(prepare_attestation_sched_jobs( - &attestation_cfg, + &batch_cfg, &config, &rpc_cfg, &p2w_addr, @@ -407,29 +384,8 @@ async fn handle_attest_non_daemon_mode( ) -> Result<(), ErrBox> { let p2w_cfg = get_config_account(&lock_and_make_rpc(&rpc_cfg).await, &p2w_addr).await?; - // Use the mapping if specified - if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { - match crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await { - Ok(additional_accounts) => { - debug!( - "Crawled mapping {} data:\n{:#?}", - mapping_addr, additional_accounts - ); - attestation_cfg.add_symbols(additional_accounts, "mapping".to_owned()); - } - // De-escalate crawling errors; A temporary failure to - // look up the mapping should not crash the attester - Err(e) => { - error!("Could not crawl mapping {}: {:?}", mapping_addr, e); - } - } - } - debug!( - "Attestation config (includes mapping accounts):\n{:#?}", - attestation_cfg - ); - - let batches = attestation_cfg.as_batches(p2w_cfg.max_batch_size as usize); + let batch_config = attestation_config_to_batches(&rpc_cfg, &attestation_cfg, p2w_cfg.max_batch_size as usize).await; + let batches: Vec<_> = batch_config.into_iter().map(|x| { BatchState::new(&x) }).collect(); let batch_count = batches.len(); // For enforcing min_msg_reuse_interval_ms, we keep a piece of @@ -479,22 +435,45 @@ async fn handle_attest_non_daemon_mode( Ok(()) } +async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestation_cfg: &AttestationConfig, max_batch_size: usize) -> Vec { + let mut attestation_cfg_copy = attestation_cfg.clone(); + + // Use the mapping if specified + if let Some(mapping_addr) = attestation_cfg_copy.mapping_addr.as_ref() { + match crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await { + Ok(additional_accounts) => { + debug!( + "Crawled mapping {} data:\n{:#?}", + mapping_addr, additional_accounts + ); + attestation_cfg_copy.add_symbols(additional_accounts, "mapping".to_owned()); + } + // De-escalate crawling errors; A temporary failure to + // look up the mapping should not crash the attester + Err(e) => { + error!("Could not crawl mapping {}: {:?}", mapping_addr, e); + } + } + } + debug!( + "Attestation config (includes mapping accounts):\n{:#?}", + attestation_cfg_copy + ); + + attestation_cfg_copy.as_batches(max_batch_size) +} + /// Constructs attestation scheduling jobs from attestation config. fn prepare_attestation_sched_jobs( - attestation_cfg: &AttestationConfig, + batch_cfg: &Vec, p2w_cfg: &Pyth2WormholeConfig, rpc_cfg: &Arc>, p2w_addr: &Pubkey, payer: &Keypair, message_q_mtx: Arc>, ) -> futures::future::JoinAll>> { - info!( - "{} symbol groups read, dividing into batches", - attestation_cfg.symbol_groups.len(), - ); - // Flatten attestation config into a plain list of batches - let batches: Vec<_> = attestation_cfg.as_batches(p2w_cfg.max_batch_size as usize); + let batches: Vec<_> = batch_cfg.into_iter().map(|x| { BatchState::new(&x)}).collect(); let batch_count = batches.len(); From 93c45d4444d94635049ae5f1c03ed04fabbfec08 Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Wed, 23 Nov 2022 10:19:07 -0800 Subject: [PATCH 02/25] stop mutating AttestationConfig --- .../client/src/attestation_cfg.rs | 95 ------------------- solana/pyth2wormhole/client/src/main.rs | 85 +++++++++++------ 2 files changed, 57 insertions(+), 123 deletions(-) diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index 57d3dcfdeb..72c666c148 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -50,58 +50,6 @@ pub struct AttestationConfig { } impl AttestationConfig { - /// Merges new symbols into the attestation config. Pre-existing - /// new symbols are ignored. The new_group_name group can already - /// exist - symbols will be appended to `symbols` field. - pub fn add_symbols( - &mut self, - mut new_symbols: HashMap>, - group_name: String, // Which group is extended by the new symbols - ) { - // Remove pre-existing symbols from the new symbols collection - for existing_group in &self.symbol_groups { - for existing_sym in &existing_group.symbols { - // Check if new symbols mention this product - if let Some(prices) = new_symbols.get_mut(&existing_sym.product_addr) { - // Prune the price if exists - prices.remove(&existing_sym.price_addr); - } - } - } - - // Turn the pruned symbols into P2WSymbol structs - let mut new_symbols_vec = new_symbols - .drain() // Makes us own the elements and lets us move them - .map(|(prod, prices)| iter::zip(iter::repeat(prod), prices)) // Convert to iterator over flat (prod, price) tuples - .flatten() // Flatten the tuple iterators - .map(|(prod, price)| P2WSymbol { - name: None, - product_addr: prod, - price_addr: price, - }) - .collect::>(); - - // Find and extend OR create the group of specified name - match self - .symbol_groups - .iter_mut() - .find(|g| g.group_name == group_name) // Advances the iterator and returns Some(item) on first hit - { - Some(existing_group) => existing_group.symbols.append(&mut new_symbols_vec), - None if new_symbols_vec.len() != 0 => { - // Group does not exist, assume defaults - let new_group = SymbolGroup { - group_name, - conditions: Default::default(), - symbols: new_symbols_vec, - }; - - self.symbol_groups.push(new_group); - } - None => {} - } - } - pub fn as_batches(&self, max_batch_size: usize) -> Vec { self.symbol_groups .iter() @@ -347,47 +295,4 @@ mod tests { Ok(()) } - - #[test] - fn test_add_symbols_works() -> Result<(), ErrBox> { - let empty_config = AttestationConfig { - min_msg_reuse_interval_ms: 1000, - max_msg_accounts: 100, - min_rpc_interval_ms: 42422, - mapping_addr: None, - mapping_reload_interval_mins: 42, - symbol_groups: vec![], - }; - - let mock_new_symbols = (0..255) - .map(|sym_idx| { - let mut mock_prod_bytes = [0u8; 32]; - mock_prod_bytes[31] = sym_idx; - - let mut mock_prices = HashSet::new(); - for px_idx in 1..=5 { - let mut mock_price_bytes = [0u8; 32]; - mock_price_bytes[31] = sym_idx; - mock_prices.insert(Pubkey::new_from_array(mock_price_bytes)); - } - - (Pubkey::new_from_array(mock_prod_bytes), mock_prices) - }) - .collect::>>(); - - let mut config1 = empty_config.clone(); - - config1.add_symbols(mock_new_symbols.clone(), "default".to_owned()); - - let mut config2 = config1.clone(); - - // Should not be created because there's no new symbols to add - // (we're adding identical mock_new_symbols again) - config2.add_symbols(mock_new_symbols.clone(), "default2".to_owned()); - - assert_ne!(config1, empty_config); // Check that config grows from empty - assert_eq!(config1, config2); // Check that no changes are made if all symbols are already in there - - Ok(()) - } } diff --git a/solana/pyth2wormhole/client/src/main.rs b/solana/pyth2wormhole/client/src/main.rs index c1f8572e03..f6a92b1135 100644 --- a/solana/pyth2wormhole/client/src/main.rs +++ b/solana/pyth2wormhole/client/src/main.rs @@ -3,19 +3,19 @@ pub mod cli; use std::{ fs::File, sync::Arc, + iter, time::{ Duration, Instant, }, }; +use std::collections::HashSet; use clap::Parser; -use futures::{ - future::{ - Future, - TryFutureExt, - }, -}; +use futures::{future::{ + Future, + TryFutureExt, +}, SinkExt, StreamExt}; use generic_array::GenericArray; use log::{ debug, @@ -251,7 +251,7 @@ async fn handle_attest_daemon_mode( rpc_cfg: Arc>, payer: Keypair, p2w_addr: Pubkey, - mut attestation_cfg: AttestationConfig, + attestation_cfg: AttestationConfig, ) -> Result<(), ErrBox> { info!( "Crawling mapping {:?} every {} minutes", @@ -272,6 +272,7 @@ async fn handle_attest_daemon_mode( attestation_cfg.max_msg_accounts as usize, ))); + let mut batch_cfg = vec![]; // This loop cranks attestations without interruption. This is // achieved by spinning up a new up-to-date symbol set before // letting go of the previous one. Additionally, hash of on-chain @@ -281,7 +282,7 @@ async fn handle_attest_daemon_mode( let start_time = Instant::now(); // Helps timekeep mapping lookups accurately let config = get_config_account(&lock_and_make_rpc(&rpc_cfg).await, &p2w_addr).await?; - let batch_cfg = attestation_config_to_batches(&rpc_cfg, &attestation_cfg, config.max_batch_size as usize).await; + batch_cfg = attestation_config_to_batches(&rpc_cfg, &attestation_cfg, config.max_batch_size as usize).await.unwrap_or(batch_cfg); // Hash currently known config hasher.update(serde_yaml::to_vec(&batch_cfg)?); @@ -375,7 +376,7 @@ async fn lock_and_make_rpc(rlmtx: &RLMutex) -> RpcClient { /// Non-daemon attestation scheduling async fn handle_attest_non_daemon_mode( - mut attestation_cfg: AttestationConfig, + attestation_cfg: AttestationConfig, rpc_cfg: Arc>, p2w_addr: Pubkey, payer: Keypair, @@ -384,7 +385,7 @@ async fn handle_attest_non_daemon_mode( ) -> Result<(), ErrBox> { let p2w_cfg = get_config_account(&lock_and_make_rpc(&rpc_cfg).await, &p2w_addr).await?; - let batch_config = attestation_config_to_batches(&rpc_cfg, &attestation_cfg, p2w_cfg.max_batch_size as usize).await; + let batch_config = attestation_config_to_batches(&rpc_cfg, &attestation_cfg, p2w_cfg.max_batch_size as usize).await?; let batches: Vec<_> = batch_config.into_iter().map(|x| { BatchState::new(&x) }).collect(); let batch_count = batches.len(); @@ -435,32 +436,60 @@ async fn handle_attest_non_daemon_mode( Ok(()) } -async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestation_cfg: &AttestationConfig, max_batch_size: usize) -> Vec { - let mut attestation_cfg_copy = attestation_cfg.clone(); - +// TODO: log failures here +// // De-escalate crawling errors; A temporary failure to +// // look up the mapping should not crash the attester +// Err(e) => { +// error!("Could not crawl mapping {}: {:?}", mapping_addr, e); +// } +async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestation_cfg: &AttestationConfig, max_batch_size: usize) -> Result, ErrBox> { + let existing_price_accounts: HashSet = attestation_cfg.symbol_groups.iter().flat_map(|x| x.symbols.iter().map(|y| y.price_addr.clone())).collect(); // Use the mapping if specified - if let Some(mapping_addr) = attestation_cfg_copy.mapping_addr.as_ref() { - match crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await { - Ok(additional_accounts) => { - debug!( - "Crawled mapping {} data:\n{:#?}", - mapping_addr, additional_accounts - ); - attestation_cfg_copy.add_symbols(additional_accounts, "mapping".to_owned()); - } - // De-escalate crawling errors; A temporary failure to - // look up the mapping should not crash the attester - Err(e) => { - error!("Could not crawl mapping {}: {:?}", mapping_addr, e); - } - } + if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { + crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await.map(|mut additional_accounts| { + debug!( + "Crawled mapping {} data:\n{:#?}", + mapping_addr, additional_accounts + ); + + // Turn the pruned symbols into P2WSymbol structs + let new_symbols_vec = additional_accounts + .drain() // Makes us own the elements and lets us move them + .map(|(prod, prices)| iter::zip(iter::repeat(prod), prices)) // Convert to iterator over flat (prod, price) tuples + .flatten() // Flatten the tuple iterators + .filter(|(_, price)| !existing_price_accounts.contains(price)) + .map(|(prod, price)| P2WSymbol { + name: None, + product_addr: prod, + price_addr: price, + }) + .collect::>(); + + let attestation_cfg_batches: Vec = attestation_cfg.as_batches(max_batch_size); + let new_batches: Vec = new_symbols_vec + .as_slice() + .chunks(max_batch_size) + .map(move |symbols| { + BatchConfig { + group_name: "mapping".to_owned(), + symbols: symbols.to_vec(), + conditions: Default::default(), + } + }).collect(); + + attestation_cfg_batches.into_iter().chain(new_batches.into_iter()).collect::>() + }) + } else { + Ok(attestation_cfg.as_batches(max_batch_size)) } + /* debug!( "Attestation config (includes mapping accounts):\n{:#?}", attestation_cfg_copy ); attestation_cfg_copy.as_batches(max_batch_size) + */ } /// Constructs attestation scheduling jobs from attestation config. From a049df6a6a49bcd29673054cf66ed38bc9d8ea00 Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Wed, 23 Nov 2022 10:47:47 -0800 Subject: [PATCH 03/25] get the product names also --- .../client/src/attestation_cfg.rs | 13 ++++++++- solana/pyth2wormhole/client/src/lib.rs | 29 +++++++++++++++---- solana/pyth2wormhole/client/src/main.rs | 26 ++++++++++------- 3 files changed, 50 insertions(+), 18 deletions(-) diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index 72c666c148..6679518f41 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -34,6 +34,7 @@ pub struct AttestationConfig { default // Uses Option::default() which is None )] pub mapping_addr: Option, + pub mapping_groups: Vec, /// The known symbol list will be reloaded based off this /// interval, to account for mapping changes. Note: This interval /// will only work if the mapping address is defined. Whenever @@ -76,8 +77,8 @@ impl AttestationConfig { } } +// TODO: cleanup with below /// Configuration for a single batch to send. -/// A valid batch config requires that `symbols.len() < max_batch_size`. #[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] pub struct BatchConfig { pub group_name: String, @@ -85,6 +86,16 @@ pub struct BatchConfig { pub conditions: AttestationConditions, } +#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] +pub struct NameGroup { + pub group_name: String, + /// Attestation conditions applied to all symbols in this group + /// TODO: make optional? + pub conditions: AttestationConditions, + /// The names of the symbols to include in this group + pub symbols: Vec, +} + #[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] pub struct SymbolGroup { pub group_name: String, diff --git a/solana/pyth2wormhole/client/src/lib.rs b/solana/pyth2wormhole/client/src/lib.rs index ed84301122..05bad06f60 100644 --- a/solana/pyth2wormhole/client/src/lib.rs +++ b/solana/pyth2wormhole/client/src/lib.rs @@ -405,8 +405,8 @@ pub fn gen_attest_tx( pub async fn crawl_pyth_mapping( rpc_client: &RpcClient, first_mapping_addr: &Pubkey, -) -> Result>, ErrBox> { - let mut ret = HashMap::new(); +) -> Result, ErrBox> { + let mut ret: Vec = vec![]; let mut n_mappings = 1; // We assume the first one must be valid let mut n_products_total = 0; // Grand total products in all mapping accounts @@ -442,6 +442,13 @@ pub async fn crawl_pyth_mapping( } }; + let mut prod_name = ""; + for (key, val) in prod.iter() { + if key.eq_ignore_ascii_case("symbol") { + prod_name = val; + } + } + let mut price_addr = prod.px_acc.clone(); let mut n_prod_prices = 0; @@ -457,6 +464,7 @@ pub async fn crawl_pyth_mapping( } // loop until the last non-zero PriceAccount.next account + let mut price_accounts: HashSet = HashSet::new(); loop { let price_bytes = rpc_client.get_account_data(&price_addr).await?; let price = match load_price_account(&price_bytes) { @@ -468,10 +476,7 @@ pub async fn crawl_pyth_mapping( }; // Append to existing set or create a new map entry - ret.entry(prod_addr.clone()) - .or_insert(HashSet::new()) - .insert(price_addr); - + price_accounts.insert(price_addr); n_prod_prices += 1; if price.next == Pubkey::default() { @@ -485,6 +490,11 @@ pub async fn crawl_pyth_mapping( price_addr = price.next.clone(); } + ret.push(P2WProductAccount { + key: prod_addr.clone(), + name: prod_name.to_owned(), // FIXME + price_account_keys: price_accounts, + }); n_prices_total += n_prod_prices; } @@ -511,3 +521,10 @@ pub async fn crawl_pyth_mapping( Ok(ret) } + +#[derive(Clone, Debug)] +pub struct P2WProductAccount { + pub key: Pubkey, + pub name: String, + pub price_account_keys: HashSet, +} \ No newline at end of file diff --git a/solana/pyth2wormhole/client/src/main.rs b/solana/pyth2wormhole/client/src/main.rs index f6a92b1135..b106dd7505 100644 --- a/solana/pyth2wormhole/client/src/main.rs +++ b/solana/pyth2wormhole/client/src/main.rs @@ -453,17 +453,21 @@ async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestati ); // Turn the pruned symbols into P2WSymbol structs - let new_symbols_vec = additional_accounts - .drain() // Makes us own the elements and lets us move them - .map(|(prod, prices)| iter::zip(iter::repeat(prod), prices)) // Convert to iterator over flat (prod, price) tuples - .flatten() // Flatten the tuple iterators - .filter(|(_, price)| !existing_price_accounts.contains(price)) - .map(|(prod, price)| P2WSymbol { - name: None, - product_addr: prod, - price_addr: price, - }) - .collect::>(); + let mut new_symbols_vec = vec![]; + for product_account in additional_accounts { + for price_account_key in product_account.price_account_keys { + if !existing_price_accounts.contains(&price_account_key) { + let symbol = P2WSymbol { + name: Some(product_account.name.clone()), + product_addr: product_account.key, + price_addr: price_account_key, + }; + new_symbols_vec.push(symbol); + } + } + } + + // TODO: group into batches using attestation_cfg.mapping_groups let attestation_cfg_batches: Vec = attestation_cfg.as_batches(max_batch_size); let new_batches: Vec = new_symbols_vec From 0b28a52b012a8001c3ed0ecd0a6b771b230e1c56 Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Wed, 23 Nov 2022 11:26:18 -0800 Subject: [PATCH 04/25] ok --- solana/pyth2wormhole/client/src/main.rs | 74 ++++++++++++++++--------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/solana/pyth2wormhole/client/src/main.rs b/solana/pyth2wormhole/client/src/main.rs index b106dd7505..b18b4f1efc 100644 --- a/solana/pyth2wormhole/client/src/main.rs +++ b/solana/pyth2wormhole/client/src/main.rs @@ -9,7 +9,7 @@ use std::{ Instant, }, }; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use clap::Parser; use futures::{future::{ @@ -443,17 +443,42 @@ async fn handle_attest_non_daemon_mode( // error!("Could not crawl mapping {}: {:?}", mapping_addr, e); // } async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestation_cfg: &AttestationConfig, max_batch_size: usize) -> Result, ErrBox> { - let existing_price_accounts: HashSet = attestation_cfg.symbol_groups.iter().flat_map(|x| x.symbols.iter().map(|y| y.price_addr.clone())).collect(); // Use the mapping if specified if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { - crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await.map(|mut additional_accounts| { + crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await.map(|additional_accounts| { debug!( "Crawled mapping {} data:\n{:#?}", mapping_addr, additional_accounts ); // Turn the pruned symbols into P2WSymbol structs - let mut new_symbols_vec = vec![]; + let mut name_to_symbols: HashMap> = HashMap::new(); + for product_account in &additional_accounts { + for price_account_key in &product_account.price_account_keys { + let symbol = P2WSymbol { + name: Some(product_account.name.clone()), + product_addr: product_account.key, + price_addr: price_account_key.clone(), + }; + + name_to_symbols.entry(product_account.name.clone()).or_insert(vec![]).push(symbol); + } + } + + let mut mapping_batches: Vec = vec![]; + for group in &attestation_cfg.mapping_groups { + let batch_items: Vec = group.symbols.iter().flat_map(|symbol| name_to_symbols.get(symbol).into_iter().flat_map(|x| x.into_iter())).map(|x| x.clone()).collect(); + mapping_batches.extend(partition_into_batches(&group.group_name, max_batch_size, &group.conditions, batch_items)) + } + + let attestation_cfg_batches: Vec = attestation_cfg.as_batches(max_batch_size); + + // Find any accounts not included in existing batches and group them into a remainder batch + let existing_price_accounts: HashSet = mapping_batches.iter().flat_map(|batch| batch.symbols.iter().map(|symbol| symbol.price_addr.clone())).chain( + attestation_cfg_batches.iter().flat_map(|batch| batch.symbols.iter().map(|symbol| symbol.price_addr.clone())) + ).collect(); + + let mut default_symbols: Vec = vec![]; for product_account in additional_accounts { for price_account_key in product_account.price_account_keys { if !existing_price_accounts.contains(&price_account_key) { @@ -462,38 +487,33 @@ async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestati product_addr: product_account.key, price_addr: price_account_key, }; - new_symbols_vec.push(symbol); + default_symbols.push(symbol); } } } + let default_batches = partition_into_batches(&"mapping".to_owned(), max_batch_size, &AttestationConditions::default(), default_symbols); - // TODO: group into batches using attestation_cfg.mapping_groups - - let attestation_cfg_batches: Vec = attestation_cfg.as_batches(max_batch_size); - let new_batches: Vec = new_symbols_vec - .as_slice() - .chunks(max_batch_size) - .map(move |symbols| { - BatchConfig { - group_name: "mapping".to_owned(), - symbols: symbols.to_vec(), - conditions: Default::default(), - } - }).collect(); - - attestation_cfg_batches.into_iter().chain(new_batches.into_iter()).collect::>() + attestation_cfg_batches.into_iter() + .chain(mapping_batches.into_iter()) + .chain(default_batches.into_iter()) + .collect::>() }) } else { Ok(attestation_cfg.as_batches(max_batch_size)) } - /* - debug!( - "Attestation config (includes mapping accounts):\n{:#?}", - attestation_cfg_copy - ); +} - attestation_cfg_copy.as_batches(max_batch_size) - */ +fn partition_into_batches(batch_name: &String, max_batch_size: usize, conditions: &AttestationConditions, symbols: Vec) -> Vec { + symbols + .as_slice() + .chunks(max_batch_size) + .map(move |batch_symbols| { + BatchConfig { + group_name: batch_name.to_owned(), + symbols: batch_symbols.to_vec(), + conditions: conditions.clone(), + } + }).collect() } /// Constructs attestation scheduling jobs from attestation config. From 9627bce5555c96e6f58209295c1c15318d83416e Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Wed, 23 Nov 2022 14:08:44 -0800 Subject: [PATCH 05/25] refactor --- .../pyth2wormhole/client/src/attestation_cfg.rs | 13 ++----------- solana/pyth2wormhole/client/src/batch_state.rs | 13 ++++++------- solana/pyth2wormhole/client/src/main.rs | 16 ++++++++-------- 3 files changed, 16 insertions(+), 26 deletions(-) diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index 6679518f41..9af6ae1322 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -51,7 +51,7 @@ pub struct AttestationConfig { } impl AttestationConfig { - pub fn as_batches(&self, max_batch_size: usize) -> Vec { + pub fn as_batches(&self, max_batch_size: usize) -> Vec { self.symbol_groups .iter() .map(move |g| { @@ -65,7 +65,7 @@ impl AttestationConfig { .as_slice() .chunks(max_batch_size.clone()) .map(move |symbols| { - BatchConfig { + SymbolGroup { group_name: name4closure.clone(), symbols: symbols.to_vec(), conditions: conditions4closure.clone() @@ -77,15 +77,6 @@ impl AttestationConfig { } } -// TODO: cleanup with below -/// Configuration for a single batch to send. -#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] -pub struct BatchConfig { - pub group_name: String, - pub symbols: Vec, - pub conditions: AttestationConditions, -} - #[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] pub struct NameGroup { pub group_name: String, diff --git a/solana/pyth2wormhole/client/src/batch_state.rs b/solana/pyth2wormhole/client/src/batch_state.rs index 307b2a9e9e..95f5ba1c8d 100644 --- a/solana/pyth2wormhole/client/src/batch_state.rs +++ b/solana/pyth2wormhole/client/src/batch_state.rs @@ -15,13 +15,12 @@ use crate::{ AttestationConditions, P2WSymbol, }; -use crate::attestation_cfg::BatchConfig; +use crate::attestation_cfg::SymbolGroup; /// Runtime representation of a batch. It refers to the original group /// from the config. #[derive(Debug)] pub struct BatchState { - // TODO: replace with BatchConfig pub group_name: String, pub symbols: Vec, pub last_known_symbol_states: Vec>, @@ -31,13 +30,13 @@ pub struct BatchState { impl<'a> BatchState { pub fn new( - config: &BatchConfig + group: &SymbolGroup ) -> Self { Self { - group_name: config.group_name.clone(), - symbols: config.symbols.clone(), - conditions: config.conditions.clone(), - last_known_symbol_states: vec![None; config.symbols.len()], + group_name: group.group_name.clone(), + symbols: group.symbols.clone(), + conditions: group.conditions.clone(), + last_known_symbol_states: vec![None; group.symbols.len()], last_job_finished_at: Instant::now(), } } diff --git a/solana/pyth2wormhole/client/src/main.rs b/solana/pyth2wormhole/client/src/main.rs index b18b4f1efc..46919768a7 100644 --- a/solana/pyth2wormhole/client/src/main.rs +++ b/solana/pyth2wormhole/client/src/main.rs @@ -59,7 +59,7 @@ use p2w_sdk::P2WEmitter; use pyth2wormhole::{attest, attest::P2W_MAX_BATCH_SIZE, Pyth2WormholeConfig}; use pyth2wormhole_client::*; -use pyth2wormhole_client::attestation_cfg::BatchConfig; +use pyth2wormhole_client::attestation_cfg::SymbolGroup; pub const SEQNO_PREFIX: &'static str = "Program log: Sequence: "; @@ -442,7 +442,7 @@ async fn handle_attest_non_daemon_mode( // Err(e) => { // error!("Could not crawl mapping {}: {:?}", mapping_addr, e); // } -async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestation_cfg: &AttestationConfig, max_batch_size: usize) -> Result, ErrBox> { +async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestation_cfg: &AttestationConfig, max_batch_size: usize) -> Result, ErrBox> { // Use the mapping if specified if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await.map(|additional_accounts| { @@ -465,13 +465,13 @@ async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestati } } - let mut mapping_batches: Vec = vec![]; + let mut mapping_batches: Vec = vec![]; for group in &attestation_cfg.mapping_groups { let batch_items: Vec = group.symbols.iter().flat_map(|symbol| name_to_symbols.get(symbol).into_iter().flat_map(|x| x.into_iter())).map(|x| x.clone()).collect(); mapping_batches.extend(partition_into_batches(&group.group_name, max_batch_size, &group.conditions, batch_items)) } - let attestation_cfg_batches: Vec = attestation_cfg.as_batches(max_batch_size); + let attestation_cfg_batches: Vec = attestation_cfg.as_batches(max_batch_size); // Find any accounts not included in existing batches and group them into a remainder batch let existing_price_accounts: HashSet = mapping_batches.iter().flat_map(|batch| batch.symbols.iter().map(|symbol| symbol.price_addr.clone())).chain( @@ -496,19 +496,19 @@ async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestati attestation_cfg_batches.into_iter() .chain(mapping_batches.into_iter()) .chain(default_batches.into_iter()) - .collect::>() + .collect::>() }) } else { Ok(attestation_cfg.as_batches(max_batch_size)) } } -fn partition_into_batches(batch_name: &String, max_batch_size: usize, conditions: &AttestationConditions, symbols: Vec) -> Vec { +fn partition_into_batches(batch_name: &String, max_batch_size: usize, conditions: &AttestationConditions, symbols: Vec) -> Vec { symbols .as_slice() .chunks(max_batch_size) .map(move |batch_symbols| { - BatchConfig { + SymbolGroup { group_name: batch_name.to_owned(), symbols: batch_symbols.to_vec(), conditions: conditions.clone(), @@ -518,7 +518,7 @@ fn partition_into_batches(batch_name: &String, max_batch_size: usize, conditions /// Constructs attestation scheduling jobs from attestation config. fn prepare_attestation_sched_jobs( - batch_cfg: &Vec, + batch_cfg: &Vec, p2w_cfg: &Pyth2WormholeConfig, rpc_cfg: &Arc>, p2w_addr: &Pubkey, From daca4dc949d94dbb7358492da0f1bdb4c45f53ae Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Wed, 23 Nov 2022 14:15:30 -0800 Subject: [PATCH 06/25] cleanup --- .../pyth2wormhole/client/src/attestation_cfg.rs | 4 +++- solana/pyth2wormhole/client/src/lib.rs | 2 +- solana/pyth2wormhole/client/src/main.rs | 17 +++++++++-------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index 9af6ae1322..08b32f401d 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -34,6 +34,8 @@ pub struct AttestationConfig { default // Uses Option::default() which is None )] pub mapping_addr: Option, + /// Collection of symbols identified by symbol name (e.g., "Crypto.BTC/USD") + /// These symbols are only active if `mapping_addr` is set. pub mapping_groups: Vec, /// The known symbol list will be reloaded based off this /// interval, to account for mapping changes. Note: This interval @@ -84,7 +86,7 @@ pub struct NameGroup { /// TODO: make optional? pub conditions: AttestationConditions, /// The names of the symbols to include in this group - pub symbols: Vec, + pub symbol_names: Vec, } #[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] diff --git a/solana/pyth2wormhole/client/src/lib.rs b/solana/pyth2wormhole/client/src/lib.rs index 05bad06f60..98f85aaf74 100644 --- a/solana/pyth2wormhole/client/src/lib.rs +++ b/solana/pyth2wormhole/client/src/lib.rs @@ -527,4 +527,4 @@ pub struct P2WProductAccount { pub key: Pubkey, pub name: String, pub price_account_keys: HashSet, -} \ No newline at end of file +} diff --git a/solana/pyth2wormhole/client/src/main.rs b/solana/pyth2wormhole/client/src/main.rs index 46919768a7..f039b0038c 100644 --- a/solana/pyth2wormhole/client/src/main.rs +++ b/solana/pyth2wormhole/client/src/main.rs @@ -282,7 +282,14 @@ async fn handle_attest_daemon_mode( let start_time = Instant::now(); // Helps timekeep mapping lookups accurately let config = get_config_account(&lock_and_make_rpc(&rpc_cfg).await, &p2w_addr).await?; - batch_cfg = attestation_config_to_batches(&rpc_cfg, &attestation_cfg, config.max_batch_size as usize).await.unwrap_or(batch_cfg); + batch_cfg = match attestation_config_to_batches(&rpc_cfg, &attestation_cfg, config.max_batch_size as usize).await { + Ok(config) => config, + Err(err) => { + // If we cannot query the mapping account, retain the existing batch configuration. + error!("Could not crawl mapping {}: {:?}", attestation_cfg.mapping_addr.unwrap_or_default(), e); + batch_cfg + } + }; // Hash currently known config hasher.update(serde_yaml::to_vec(&batch_cfg)?); @@ -436,12 +443,6 @@ async fn handle_attest_non_daemon_mode( Ok(()) } -// TODO: log failures here -// // De-escalate crawling errors; A temporary failure to -// // look up the mapping should not crash the attester -// Err(e) => { -// error!("Could not crawl mapping {}: {:?}", mapping_addr, e); -// } async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestation_cfg: &AttestationConfig, max_batch_size: usize) -> Result, ErrBox> { // Use the mapping if specified if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { @@ -467,7 +468,7 @@ async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestati let mut mapping_batches: Vec = vec![]; for group in &attestation_cfg.mapping_groups { - let batch_items: Vec = group.symbols.iter().flat_map(|symbol| name_to_symbols.get(symbol).into_iter().flat_map(|x| x.into_iter())).map(|x| x.clone()).collect(); + let batch_items: Vec = group.symbol_names.iter().flat_map(|symbol| name_to_symbols.get(symbol).into_iter().flat_map(|x| x.into_iter())).map(|x| x.clone()).collect(); mapping_batches.extend(partition_into_batches(&group.group_name, max_batch_size, &group.conditions, batch_items)) } From c88a7e0471260ff609b754cb6d7bf1e17afb445e Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Wed, 23 Nov 2022 14:20:34 -0800 Subject: [PATCH 07/25] more cleanup --- solana/pyth2wormhole/client/src/lib.rs | 2 +- solana/pyth2wormhole/client/src/main.rs | 39 +++++++++++++------------ 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/solana/pyth2wormhole/client/src/lib.rs b/solana/pyth2wormhole/client/src/lib.rs index 98f85aaf74..a54b1f36ff 100644 --- a/solana/pyth2wormhole/client/src/lib.rs +++ b/solana/pyth2wormhole/client/src/lib.rs @@ -492,7 +492,7 @@ pub async fn crawl_pyth_mapping( } ret.push(P2WProductAccount { key: prod_addr.clone(), - name: prod_name.to_owned(), // FIXME + name: prod_name.to_owned(), price_account_keys: price_accounts, }); diff --git a/solana/pyth2wormhole/client/src/main.rs b/solana/pyth2wormhole/client/src/main.rs index f039b0038c..5fc2cdacae 100644 --- a/solana/pyth2wormhole/client/src/main.rs +++ b/solana/pyth2wormhole/client/src/main.rs @@ -1,9 +1,7 @@ -pub mod cli; - use std::{ fs::File, - sync::Arc, iter, + sync::Arc, time::{ Duration, Instant, @@ -21,9 +19,10 @@ use log::{ debug, error, info, - warn, LevelFilter, + warn, }; +use p2w_sdk::P2WEmitter; use sha3::{Digest, Sha3_256}; use solana_client::{ nonblocking::rpc_client::RpcClient, @@ -32,15 +31,13 @@ use solana_client::{ use solana_program::pubkey::Pubkey; use solana_sdk::{ commitment_config::CommitmentConfig, - signature::{ - read_keypair_file, - }, + signature::read_keypair_file, signer::keypair::Keypair, }; use solana_transaction_status::UiTransactionEncoding; use solitaire::{ - processors::seeded::Seeded, ErrBox, + processors::seeded::Seeded, }; use tokio::{ sync::{ @@ -54,13 +51,12 @@ use cli::{ Action, Cli, }; - -use p2w_sdk::P2WEmitter; - use pyth2wormhole::{attest, attest::P2W_MAX_BATCH_SIZE, Pyth2WormholeConfig}; use pyth2wormhole_client::*; use pyth2wormhole_client::attestation_cfg::SymbolGroup; +pub mod cli; + pub const SEQNO_PREFIX: &'static str = "Program log: Sequence: "; #[tokio::main(flavor = "multi_thread")] @@ -286,7 +282,7 @@ async fn handle_attest_daemon_mode( Ok(config) => config, Err(err) => { // If we cannot query the mapping account, retain the existing batch configuration. - error!("Could not crawl mapping {}: {:?}", attestation_cfg.mapping_addr.unwrap_or_default(), e); + error!("Could not crawl mapping {}: {:?}", attestation_cfg.mapping_addr.unwrap_or_default(), err); batch_cfg } }; @@ -452,7 +448,7 @@ async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestati mapping_addr, additional_accounts ); - // Turn the pruned symbols into P2WSymbol structs + // Construct batches from the named groups in the attestation config let mut name_to_symbols: HashMap> = HashMap::new(); for product_account in &additional_accounts { for price_account_key in &product_account.price_account_keys { @@ -468,10 +464,15 @@ async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestati let mut mapping_batches: Vec = vec![]; for group in &attestation_cfg.mapping_groups { - let batch_items: Vec = group.symbol_names.iter().flat_map(|symbol| name_to_symbols.get(symbol).into_iter().flat_map(|x| x.into_iter())).map(|x| x.clone()).collect(); + let batch_items: Vec = group.symbol_names.iter().flat_map( + |symbol| name_to_symbols.get(symbol).into_iter().flat_map( + |x| x.into_iter() + ) + ).map(|x| x.clone()).collect(); mapping_batches.extend(partition_into_batches(&group.group_name, max_batch_size, &group.conditions, batch_items)) } + // Construct batches that are explicitly provided in the AttestationConfig let attestation_cfg_batches: Vec = attestation_cfg.as_batches(max_batch_size); // Find any accounts not included in existing batches and group them into a remainder batch @@ -479,7 +480,7 @@ async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestati attestation_cfg_batches.iter().flat_map(|batch| batch.symbols.iter().map(|symbol| symbol.price_addr.clone())) ).collect(); - let mut default_symbols: Vec = vec![]; + let mut remaining_symbols: Vec = vec![]; for product_account in additional_accounts { for price_account_key in product_account.price_account_keys { if !existing_price_accounts.contains(&price_account_key) { @@ -488,15 +489,15 @@ async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestati product_addr: product_account.key, price_addr: price_account_key, }; - default_symbols.push(symbol); + remaining_symbols.push(symbol); } } } - let default_batches = partition_into_batches(&"mapping".to_owned(), max_batch_size, &AttestationConditions::default(), default_symbols); + let remaining_batches = partition_into_batches(&"mapping".to_owned(), max_batch_size, &AttestationConditions::default(), remaining_symbols); attestation_cfg_batches.into_iter() .chain(mapping_batches.into_iter()) - .chain(default_batches.into_iter()) + .chain(remaining_batches.into_iter()) .collect::>() }) } else { @@ -504,6 +505,8 @@ async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestati } } +/// Partition symbols into a collection of batches, each of which contains no more than +/// `max_batch_size` symbols. fn partition_into_batches(batch_name: &String, max_batch_size: usize, conditions: &AttestationConditions, symbols: Vec) -> Vec { symbols .as_slice() From f036c09ffd657cad3c313dac9e901f199444c5ea Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Wed, 23 Nov 2022 14:21:30 -0800 Subject: [PATCH 08/25] more cleanup --- solana/pyth2wormhole/client/src/attestation_cfg.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index 08b32f401d..62a4780f7a 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -287,6 +287,7 @@ mod tests { max_msg_accounts: 100_000, min_rpc_interval_ms: 2123, mapping_addr: None, + mapping_groups: vec![], mapping_reload_interval_mins: 42, symbol_groups: vec![fastbois, slowbois], }; From 20fa60f1324194eaaf2a2324a68df92bcc997a04 Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Mon, 28 Nov 2022 10:49:26 -0800 Subject: [PATCH 09/25] comment --- solana/pyth2wormhole/client/src/attestation_cfg.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index 62a4780f7a..565b064080 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -49,6 +49,8 @@ pub struct AttestationConfig { #[serde(default = "default_min_rpc_interval_ms")] /// Rate-limiting minimum delay between RPC requests in milliseconds" pub min_rpc_interval_ms: u64, + /// Collection of symbols identified by their full account addresses. + /// These symbols will be published regardless of whether or not `mapping_addr` is provided. pub symbol_groups: Vec, } From 3aa4d3f5cbdb64429b19990f5f97d0012dafca7f Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Mon, 28 Nov 2022 11:08:25 -0800 Subject: [PATCH 10/25] i think this works --- .../client/src/attestation_cfg.rs | 17 ++++++---- solana/pyth2wormhole/client/src/main.rs | 7 +++-- third_party/pyth/p2w_autoattest.py | 31 +++++++------------ 3 files changed, 27 insertions(+), 28 deletions(-) diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index 565b064080..98c6c7773b 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -34,9 +34,6 @@ pub struct AttestationConfig { default // Uses Option::default() which is None )] pub mapping_addr: Option, - /// Collection of symbols identified by symbol name (e.g., "Crypto.BTC/USD") - /// These symbols are only active if `mapping_addr` is set. - pub mapping_groups: Vec, /// The known symbol list will be reloaded based off this /// interval, to account for mapping changes. Note: This interval /// will only work if the mapping address is defined. Whenever @@ -49,6 +46,14 @@ pub struct AttestationConfig { #[serde(default = "default_min_rpc_interval_ms")] /// Rate-limiting minimum delay between RPC requests in milliseconds" pub min_rpc_interval_ms: u64, + /// Attestation conditions that will be used for any symbols included in the mapping + /// that aren't explicitly in one of the groups below. + pub default_attestation_conditions: AttestationConditions, + + /// Collection of symbols identified by symbol name (e.g., "Crypto.BTC/USD") + /// These symbols are only active if `mapping_addr` is set. + pub name_groups: Vec, + /// Collection of symbols identified by their full account addresses. /// These symbols will be published regardless of whether or not `mapping_addr` is provided. pub symbol_groups: Vec, @@ -84,9 +89,9 @@ impl AttestationConfig { #[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] pub struct NameGroup { pub group_name: String, - /// Attestation conditions applied to all symbols in this group - /// TODO: make optional? - pub conditions: AttestationConditions, + /// Attestation conditions applied to all symbols in this group. + /// If not provided, use the default attestation conditions from `AttestationConfig`. + pub conditions: Option, /// The names of the symbols to include in this group pub symbol_names: Vec, } diff --git a/solana/pyth2wormhole/client/src/main.rs b/solana/pyth2wormhole/client/src/main.rs index 5fc2cdacae..d05deb645d 100644 --- a/solana/pyth2wormhole/client/src/main.rs +++ b/solana/pyth2wormhole/client/src/main.rs @@ -463,13 +463,14 @@ async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestati } let mut mapping_batches: Vec = vec![]; - for group in &attestation_cfg.mapping_groups { + for group in &attestation_cfg.name_groups { let batch_items: Vec = group.symbol_names.iter().flat_map( |symbol| name_to_symbols.get(symbol).into_iter().flat_map( |x| x.into_iter() ) ).map(|x| x.clone()).collect(); - mapping_batches.extend(partition_into_batches(&group.group_name, max_batch_size, &group.conditions, batch_items)) + let group_conditions = group.conditions.as_ref().unwrap_or(&attestation_cfg.default_attestation_conditions); + mapping_batches.extend(partition_into_batches(&group.group_name, max_batch_size, group_conditions, batch_items)) } // Construct batches that are explicitly provided in the AttestationConfig @@ -493,7 +494,7 @@ async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestati } } } - let remaining_batches = partition_into_batches(&"mapping".to_owned(), max_batch_size, &AttestationConditions::default(), remaining_symbols); + let remaining_batches = partition_into_batches(&"mapping".to_owned(), max_batch_size, &attestation_cfg.default_attestation_conditions, remaining_symbols); attestation_cfg_batches.into_iter() .chain(mapping_batches.into_iter()) diff --git a/third_party/pyth/p2w_autoattest.py b/third_party/pyth/p2w_autoattest.py index eda98169c2..8ca4c17564 100755 --- a/third_party/pyth/p2w_autoattest.py +++ b/third_party/pyth/p2w_autoattest.py @@ -190,31 +190,32 @@ def find_and_log_seqnos(s): mapping_reload_interval_mins: 1 # Very fast for testing purposes min_rpc_interval_ms: 0 # RIP RPC max_batch_jobs: 1000 # Where we're going there's no oomkiller -symbol_groups: +default_attestation_conditions: + min_interval_secs: 30 + price_changed_bps: 500 +name_groups: - group_name: fast_interval_only conditions: min_interval_secs: 1 - symbols: + symbol_names: """ - # integer-divide the symbols in ~half for two test - # groups. Assumes arr[:idx] is exclusive, and arr[idx:] is - # inclusive - third_len = len(pyth_accounts) // 3; + # integer-divide the symbols in ~thirds for three test groups: + # a fast group that is specified by symbol name only, + # a slower group that is specified with full account details, + # and a default group that is read from the mapping account without being explicitly configured. + third_len = len(pyth_accounts) // 3 for thing in pyth_accounts[:third_len]: name = thing["name"] - price = thing["price"] - product = thing["product"] cfg_yaml += f""" - - name: {name} - price_addr: {price} - product_addr: {product}""" + - {name}""" # End of fast_interval_only cfg_yaml += f""" +symbol_groups: - group_name: longer_interval_sensitive_changes conditions: min_interval_secs: 10 @@ -232,14 +233,6 @@ def find_and_log_seqnos(s): price_addr: {price} product_addr: {product}""" - cfg_yaml += f""" - - group_name: mapping - conditions: - min_interval_secs: 30 - price_changed_bps: 500 - symbols: [] -""" - with open(P2W_ATTESTATION_CFG, "w") as f: f.write(cfg_yaml) f.flush() From d707f56b45be1eb9c139e096b28369abfdeb2572 Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Mon, 28 Nov 2022 11:24:10 -0800 Subject: [PATCH 11/25] fix stuff --- solana/pyth2wormhole/client/src/lib.rs | 1 - solana/pyth2wormhole/client/src/main.rs | 6 ++++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/solana/pyth2wormhole/client/src/lib.rs b/solana/pyth2wormhole/client/src/lib.rs index df3ed35bf4..540f89ad47 100644 --- a/solana/pyth2wormhole/client/src/lib.rs +++ b/solana/pyth2wormhole/client/src/lib.rs @@ -81,7 +81,6 @@ use { ErrBox, }, std::collections::{ - HashMap, HashSet, }, }; diff --git a/solana/pyth2wormhole/client/src/main.rs b/solana/pyth2wormhole/client/src/main.rs index cd94527bab..679a0ba52e 100644 --- a/solana/pyth2wormhole/client/src/main.rs +++ b/solana/pyth2wormhole/client/src/main.rs @@ -56,8 +56,10 @@ use { Duration, Instant, }, - HashMap, - HashSet, + collections::{ + HashMap, + HashSet, + } }, tokio::{ sync::{ From 2017115d10eec5a678c92e83e6e3556c94d6e096 Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Mon, 28 Nov 2022 11:35:59 -0800 Subject: [PATCH 12/25] clippy --- .../programs/remote-executor/src/lib.rs | 2 +- .../client/src/attestation_cfg.rs | 56 ++--- .../pyth2wormhole/client/src/batch_state.rs | 14 +- solana/pyth2wormhole/client/src/lib.rs | 14 +- solana/pyth2wormhole/client/src/main.rs | 215 +++++++++++------- solana/pyth2wormhole/program/src/config.rs | 2 +- third_party/pyth/p2w_autoattest.py | 2 +- 7 files changed, 174 insertions(+), 131 deletions(-) diff --git a/pythnet/remote-executor/programs/remote-executor/src/lib.rs b/pythnet/remote-executor/programs/remote-executor/src/lib.rs index bacb4251c6..74afed7dc1 100644 --- a/pythnet/remote-executor/programs/remote-executor/src/lib.rs +++ b/pythnet/remote-executor/programs/remote-executor/src/lib.rs @@ -1,5 +1,5 @@ #![deny(warnings)] -#![allow(clippy::result_large_err)] +#![allow(clippy::result_unit_err)] use { anchor_lang::{ diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index 7d863062f0..45504852e2 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -1,5 +1,4 @@ use { - crate::BatchState, log::info, serde::{ de::Error, @@ -9,30 +8,23 @@ use { Serializer, }, solana_program::pubkey::Pubkey, - std::{ - collections::{ - HashMap, - HashSet, - }, - iter, - str::FromStr, - }, + std::str::FromStr, }; /// Pyth2wormhole config specific to attestation requests -#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] +#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] pub struct AttestationConfig { #[serde(default = "default_min_msg_reuse_interval_ms")] - pub min_msg_reuse_interval_ms: u64, + pub min_msg_reuse_interval_ms: u64, #[serde(default = "default_max_msg_accounts")] - pub max_msg_accounts: u64, + pub max_msg_accounts: u64, /// Optionally, we take a mapping account to add remaining symbols from a Pyth deployments. These symbols are processed under attestation conditions for the `default` symbol group. #[serde( deserialize_with = "opt_pubkey_string_de", serialize_with = "opt_pubkey_string_ser", default // Uses Option::default() which is None )] - pub mapping_addr: Option, + pub mapping_addr: Option, /// The known symbol list will be reloaded based off this /// interval, to account for mapping changes. Note: This interval /// will only work if the mapping address is defined. Whenever @@ -41,10 +33,10 @@ pub struct AttestationConfig { /// symbol list, and before stopping the pre-existing obsolete /// jobs to maintain uninterrupted cranking. #[serde(default = "default_mapping_reload_interval_mins")] - pub mapping_reload_interval_mins: u64, + pub mapping_reload_interval_mins: u64, #[serde(default = "default_min_rpc_interval_ms")] /// Rate-limiting minimum delay between RPC requests in milliseconds - pub min_rpc_interval_ms: u64, + pub min_rpc_interval_ms: u64, /// Attestation conditions that will be used for any symbols included in the mapping /// that aren't explicitly in one of the groups below. pub default_attestation_conditions: AttestationConditions, @@ -72,29 +64,27 @@ impl AttestationConfig { g.symbols .as_slice() .chunks(max_batch_size) - .map(move |symbols| { - SymbolGroup { - group_name: name4closure.clone(), - symbols: symbols.to_vec(), - conditions: conditions4closure.clone() - } + .map(move |symbols| SymbolGroup { + group_name: name4closure.clone(), + symbols: symbols.to_vec(), + conditions: conditions4closure.clone(), }) }) .collect() } } -#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] +#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] pub struct NameGroup { - pub group_name: String, + pub group_name: String, /// Attestation conditions applied to all symbols in this group. /// If not provided, use the default attestation conditions from `AttestationConfig`. - pub conditions: Option, + pub conditions: Option, /// The names of the symbols to include in this group pub symbol_names: Vec, } -#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] +#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] pub struct SymbolGroup { pub group_name: String, /// Attestation conditions applied to all symbols in this group @@ -130,7 +120,7 @@ pub const fn default_max_batch_jobs() -> usize { /// of the active conditions is met. Option<> fields can be /// de-activated with None. All conditions are inactive by default, /// except for the non-Option ones. -#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)] +#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] pub struct AttestationConditions { /// Baseline, unconditional attestation interval. Attestation is triggered if the specified interval elapsed since last attestation. #[serde(default = "default_min_interval_secs")] @@ -289,14 +279,14 @@ mod tests { }; let cfg = AttestationConfig { - min_msg_reuse_interval_ms: 1000, - max_msg_accounts: 100_000, - min_rpc_interval_ms: 2123, - mapping_addr: None, - mapping_reload_interval_mins: 42, + min_msg_reuse_interval_ms: 1000, + max_msg_accounts: 100_000, + min_rpc_interval_ms: 2123, + mapping_addr: None, + mapping_reload_interval_mins: 42, default_attestation_conditions: AttestationConditions::default(), - name_groups: vec![], - symbol_groups: vec![fastbois, slowbois], + name_groups: vec![], + symbol_groups: vec![fastbois, slowbois], }; let serialized = serde_yaml::to_string(&cfg)?; diff --git a/solana/pyth2wormhole/client/src/batch_state.rs b/solana/pyth2wormhole/client/src/batch_state.rs index 2a0c061c05..a0ee4d3b91 100644 --- a/solana/pyth2wormhole/client/src/batch_state.rs +++ b/solana/pyth2wormhole/client/src/batch_state.rs @@ -1,5 +1,6 @@ use { crate::{ + attestation_cfg::SymbolGroup, AttestationConditions, P2WSymbol, }, @@ -14,7 +15,6 @@ use { Instant, }, }; -use crate::attestation_cfg::SymbolGroup; /// Runtime representation of a batch. It refers to the original group /// from the config. @@ -28,15 +28,13 @@ pub struct BatchState { } impl<'a> BatchState { - pub fn new( - group: &SymbolGroup - ) -> Self { + pub fn new(group: &SymbolGroup) -> Self { Self { - group_name: group.group_name.clone(), - symbols: group.symbols.clone(), - conditions: group.conditions.clone(), + group_name: group.group_name.clone(), + symbols: group.symbols.clone(), + conditions: group.conditions.clone(), last_known_symbol_states: vec![None; group.symbols.len()], - last_job_finished_at: Instant::now(), + last_job_finished_at: Instant::now(), } } diff --git a/solana/pyth2wormhole/client/src/lib.rs b/solana/pyth2wormhole/client/src/lib.rs index 540f89ad47..5d99d94b5a 100644 --- a/solana/pyth2wormhole/client/src/lib.rs +++ b/solana/pyth2wormhole/client/src/lib.rs @@ -80,9 +80,7 @@ use { AccountState, ErrBox, }, - std::collections::{ - HashSet, - }, + std::collections::HashSet, }; /// Future-friendly version of solitaire::ErrBox @@ -445,7 +443,7 @@ pub async fn crawl_pyth_mapping( } } - let mut price_addr = prod.px_acc.clone(); + let mut price_addr = prod.px_acc; let mut n_prod_prices = 0; // the product might have no price, can happen in tilt due to race-condition, failed tx to add price, ... @@ -486,8 +484,8 @@ pub async fn crawl_pyth_mapping( price_addr = price.next; } ret.push(P2WProductAccount { - key: prod_addr.clone(), - name: prod_name.to_owned(), + key: *prod_addr, + name: prod_name.to_owned(), price_account_keys: price_accounts, }); @@ -519,7 +517,7 @@ pub async fn crawl_pyth_mapping( #[derive(Clone, Debug)] pub struct P2WProductAccount { - pub key: Pubkey, - pub name: String, + pub key: Pubkey, + pub name: String, pub price_account_keys: HashSet, } diff --git a/solana/pyth2wormhole/client/src/main.rs b/solana/pyth2wormhole/client/src/main.rs index 679a0ba52e..c7a415dd90 100644 --- a/solana/pyth2wormhole/client/src/main.rs +++ b/solana/pyth2wormhole/client/src/main.rs @@ -14,21 +14,24 @@ use { debug, error, info, - LevelFilter, warn, + LevelFilter, }, p2w_sdk::P2WEmitter, prometheus::{ - IntCounter, - IntGauge, register_int_counter, register_int_gauge, + IntCounter, + IntGauge, }, pyth2wormhole::{ attest::P2W_MAX_BATCH_SIZE, Pyth2WormholeConfig, }, - pyth2wormhole_client::*, + pyth2wormhole_client::{ + attestation_cfg::SymbolGroup, + *, + }, sha3::{ Digest, Sha3_256, @@ -45,10 +48,14 @@ use { }, solana_transaction_status::UiTransactionEncoding, solitaire::{ - ErrBox, processors::seeded::Seeded, + ErrBox, }, std::{ + collections::{ + HashMap, + HashSet, + }, fs::File, net::SocketAddr, sync::Arc, @@ -56,10 +63,6 @@ use { Duration, Instant, }, - collections::{ - HashMap, - HashSet, - } }, tokio::{ sync::{ @@ -69,7 +72,6 @@ use { task::JoinHandle, }, }; -use pyth2wormhole_client::attestation_cfg::SymbolGroup; pub mod cli; @@ -324,11 +326,21 @@ async fn handle_attest_daemon_mode( }; // Use the mapping if specified - batch_cfg = match attestation_config_to_batches(&rpc_cfg, &attestation_cfg, config.max_batch_size as usize).await { + batch_cfg = match attestation_config_to_batches( + &rpc_cfg, + &attestation_cfg, + config.max_batch_size as usize, + ) + .await + { Ok(config) => config, Err(err) => { // If we cannot query the mapping account, retain the existing batch configuration. - error!("Could not crawl mapping {}: {:?}", attestation_cfg.mapping_addr.unwrap_or_default(), err); + error!( + "Could not crawl mapping {}: {:?}", + attestation_cfg.mapping_addr.unwrap_or_default(), + err + ); batch_cfg } }; @@ -432,8 +444,13 @@ async fn handle_attest_non_daemon_mode( ) -> Result<(), ErrBox> { let p2w_cfg = get_config_account(&lock_and_make_rpc(&rpc_cfg).await, &p2w_addr).await?; - let batch_config = attestation_config_to_batches(&rpc_cfg, &attestation_cfg, p2w_cfg.max_batch_size as usize).await?; - let batches: Vec<_> = batch_config.into_iter().map(|x| { BatchState::new(&x) }).collect(); + let batch_config = + attestation_config_to_batches(&rpc_cfg, &attestation_cfg, p2w_cfg.max_batch_size as usize) + .await?; + let batches: Vec<_> = batch_config + .into_iter() + .map(|x| BatchState::new(&x)) + .collect(); let batch_count = batches.len(); // For enforcing min_msg_reuse_interval_ms, we keep a piece of @@ -483,68 +500,104 @@ async fn handle_attest_non_daemon_mode( Ok(()) } -async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestation_cfg: &AttestationConfig, max_batch_size: usize) -> Result, ErrBox> { +async fn attestation_config_to_batches( + rpc_cfg: &Arc>, + attestation_cfg: &AttestationConfig, + max_batch_size: usize, +) -> Result, ErrBox> { // Use the mapping if specified if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { - crawl_pyth_mapping(&lock_and_make_rpc(&rpc_cfg).await, mapping_addr).await.map(|additional_accounts| { - debug!( - "Crawled mapping {} data:\n{:#?}", - mapping_addr, additional_accounts - ); - - // Construct batches from the named groups in the attestation config - let mut name_to_symbols: HashMap> = HashMap::new(); - for product_account in &additional_accounts { - for price_account_key in &product_account.price_account_keys { - let symbol = P2WSymbol { - name: Some(product_account.name.clone()), - product_addr: product_account.key, - price_addr: price_account_key.clone(), - }; - - name_to_symbols.entry(product_account.name.clone()).or_insert(vec![]).push(symbol); - } - } - - let mut mapping_batches: Vec = vec![]; - for group in &attestation_cfg.name_groups { - let batch_items: Vec = group.symbol_names.iter().flat_map( - |symbol| name_to_symbols.get(symbol).into_iter().flat_map( - |x| x.into_iter() - ) - ).map(|x| x.clone()).collect(); - let group_conditions = group.conditions.as_ref().unwrap_or(&attestation_cfg.default_attestation_conditions); - mapping_batches.extend(partition_into_batches(&group.group_name, max_batch_size, group_conditions, batch_items)) - } - - // Construct batches that are explicitly provided in the AttestationConfig - let attestation_cfg_batches: Vec = attestation_cfg.as_batches(max_batch_size); - - // Find any accounts not included in existing batches and group them into a remainder batch - let existing_price_accounts: HashSet = mapping_batches.iter().flat_map(|batch| batch.symbols.iter().map(|symbol| symbol.price_addr.clone())).chain( - attestation_cfg_batches.iter().flat_map(|batch| batch.symbols.iter().map(|symbol| symbol.price_addr.clone())) - ).collect(); + crawl_pyth_mapping(&lock_and_make_rpc(rpc_cfg).await, mapping_addr) + .await + .map(|additional_accounts| { + debug!( + "Crawled mapping {} data:\n{:#?}", + mapping_addr, additional_accounts + ); - let mut remaining_symbols: Vec = vec![]; - for product_account in additional_accounts { - for price_account_key in product_account.price_account_keys { - if !existing_price_accounts.contains(&price_account_key) { + // Construct batches from the named groups in the attestation config + let mut name_to_symbols: HashMap> = HashMap::new(); + for product_account in &additional_accounts { + for price_account_key in &product_account.price_account_keys { let symbol = P2WSymbol { - name: Some(product_account.name.clone()), + name: Some(product_account.name.clone()), product_addr: product_account.key, - price_addr: price_account_key, + price_addr: *price_account_key, }; - remaining_symbols.push(symbol); + + name_to_symbols + .entry(product_account.name.clone()) + .or_insert(vec![]) + .push(symbol); } } - } - let remaining_batches = partition_into_batches(&"mapping".to_owned(), max_batch_size, &attestation_cfg.default_attestation_conditions, remaining_symbols); - attestation_cfg_batches.into_iter() - .chain(mapping_batches.into_iter()) - .chain(remaining_batches.into_iter()) - .collect::>() - }) + let mut mapping_batches: Vec = vec![]; + for group in &attestation_cfg.name_groups { + let batch_items: Vec = group + .symbol_names + .iter() + .flat_map(|symbol| { + name_to_symbols + .get(symbol) + .into_iter() + .flat_map(|x| x.iter()) + }) + .cloned() + .collect(); + let group_conditions = group + .conditions + .as_ref() + .unwrap_or(&attestation_cfg.default_attestation_conditions); + mapping_batches.extend(partition_into_batches( + &group.group_name, + max_batch_size, + group_conditions, + batch_items, + )) + } + + // Construct batches that are explicitly provided in the AttestationConfig + let attestation_cfg_batches: Vec = + attestation_cfg.as_batches(max_batch_size); + + // Find any accounts not included in existing batches and group them into a remainder batch + let existing_price_accounts: HashSet = mapping_batches + .iter() + .flat_map(|batch| batch.symbols.iter().map(|symbol| symbol.price_addr)) + .chain( + attestation_cfg_batches + .iter() + .flat_map(|batch| batch.symbols.iter().map(|symbol| symbol.price_addr)), + ) + .collect(); + + let mut remaining_symbols: Vec = vec![]; + for product_account in additional_accounts { + for price_account_key in product_account.price_account_keys { + if !existing_price_accounts.contains(&price_account_key) { + let symbol = P2WSymbol { + name: Some(product_account.name.clone()), + product_addr: product_account.key, + price_addr: price_account_key, + }; + remaining_symbols.push(symbol); + } + } + } + let remaining_batches = partition_into_batches( + &"mapping".to_owned(), + max_batch_size, + &attestation_cfg.default_attestation_conditions, + remaining_symbols, + ); + + attestation_cfg_batches + .into_iter() + .chain(mapping_batches.into_iter()) + .chain(remaining_batches.into_iter()) + .collect::>() + }) } else { Ok(attestation_cfg.as_batches(max_batch_size)) } @@ -552,22 +605,26 @@ async fn attestation_config_to_batches(rpc_cfg: &Arc>, attestati /// Partition symbols into a collection of batches, each of which contains no more than /// `max_batch_size` symbols. -fn partition_into_batches(batch_name: &String, max_batch_size: usize, conditions: &AttestationConditions, symbols: Vec) -> Vec { +fn partition_into_batches( + batch_name: &String, + max_batch_size: usize, + conditions: &AttestationConditions, + symbols: Vec, +) -> Vec { symbols - .as_slice() - .chunks(max_batch_size) - .map(move |batch_symbols| { - SymbolGroup { - group_name: batch_name.to_owned(), - symbols: batch_symbols.to_vec(), - conditions: conditions.clone(), - } - }).collect() + .as_slice() + .chunks(max_batch_size) + .map(move |batch_symbols| SymbolGroup { + group_name: batch_name.to_owned(), + symbols: batch_symbols.to_vec(), + conditions: conditions.clone(), + }) + .collect() } /// Constructs attestation scheduling jobs from attestation config. fn prepare_attestation_sched_jobs( - batch_cfg: &Vec, + batch_cfg: &[SymbolGroup], p2w_cfg: &Pyth2WormholeConfig, rpc_cfg: &Arc>, p2w_addr: &Pubkey, @@ -575,7 +632,7 @@ fn prepare_attestation_sched_jobs( message_q_mtx: Arc>, ) -> futures::future::JoinAll>> { // Flatten attestation config into a plain list of batches - let batches: Vec<_> = batch_cfg.into_iter().map(|x| { BatchState::new(&x)}).collect(); + let batches: Vec<_> = batch_cfg.iter().map(BatchState::new).collect(); let batch_count = batches.len(); diff --git a/solana/pyth2wormhole/program/src/config.rs b/solana/pyth2wormhole/program/src/config.rs index c295082155..e8e0919dab 100644 --- a/solana/pyth2wormhole/program/src/config.rs +++ b/solana/pyth2wormhole/program/src/config.rs @@ -121,7 +121,7 @@ impl From for Pyth2WormholeConfigV2 { } // Added ops_owner which can toggle the is_active field -#[derive(Clone, Default, Hash, BorshDeserialize, BorshSerialize, PartialEq)] +#[derive(Clone, Default, Hash, BorshDeserialize, BorshSerialize, PartialEq, Eq)] #[cfg_attr(feature = "client", derive(Debug))] pub struct Pyth2WormholeConfigV3 { /// Authority owning this contract diff --git a/third_party/pyth/p2w_autoattest.py b/third_party/pyth/p2w_autoattest.py index 64e0a90c7a..2c752389a3 100755 --- a/third_party/pyth/p2w_autoattest.py +++ b/third_party/pyth/p2w_autoattest.py @@ -215,7 +215,7 @@ def find_and_log_seqnos(s): # End of fast_interval_only cfg_yaml += f""" -symbol_groups: +symbol_groups: - group_name: longer_interval_sensitive_changes conditions: min_interval_secs: 10 From a8f60bf6c0579963f2f44e81fda29a9c0828780e Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Tue, 29 Nov 2022 13:48:33 -0800 Subject: [PATCH 13/25] more cleanup --- .../client/src/attestation_cfg.rs | 216 +++++++++++++++--- .../pyth2wormhole/client/src/batch_state.rs | 4 +- solana/pyth2wormhole/client/src/main.rs | 172 ++++---------- 3 files changed, 228 insertions(+), 164 deletions(-) diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index 45504852e2..8a96619afd 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -1,5 +1,9 @@ use { - log::info, + crate::P2WProductAccount, + log::{ + info, + warn, + }, serde::{ de::Error, Deserialize, @@ -8,7 +12,13 @@ use { Serializer, }, solana_program::pubkey::Pubkey, - std::str::FromStr, + std::{ + collections::{ + HashMap, + HashSet, + }, + str::FromStr, + }, }; /// Pyth2wormhole config specific to attestation requests @@ -38,54 +48,169 @@ pub struct AttestationConfig { /// Rate-limiting minimum delay between RPC requests in milliseconds pub min_rpc_interval_ms: u64, /// Attestation conditions that will be used for any symbols included in the mapping - /// that aren't explicitly in one of the groups below. + /// that aren't explicitly in one of the groups below, and any groups without explicitly + /// configured attestation conditions. pub default_attestation_conditions: AttestationConditions, - /// Collection of symbols identified by symbol name (e.g., "Crypto.BTC/USD") - /// These symbols are only active if `mapping_addr` is set. - pub name_groups: Vec, - /// Collection of symbols identified by their full account addresses. /// These symbols will be published regardless of whether or not `mapping_addr` is provided. pub symbol_groups: Vec, } impl AttestationConfig { - pub fn as_batches(&self, max_batch_size: usize) -> Vec { - self.symbol_groups + pub fn instantiate_batches( + &self, + product_accounts: &[P2WProductAccount], + max_batch_size: usize, + ) -> Vec { + // Construct mapping from the name of each product account to its corresponding symbols + let mut name_to_symbols: HashMap> = HashMap::new(); + for product_account in product_accounts { + for price_account_key in &product_account.price_account_keys { + let symbol = P2WSymbol { + name: Some(product_account.name.clone()), + product_addr: product_account.key, + price_addr: *price_account_key, + }; + + name_to_symbols + .entry(product_account.name.clone()) + .or_insert(vec![]) + .push(symbol); + } + } + + // Instantiate batches from the configured symbol groups. + let mut configured_batches: Vec = vec![]; + for group in &self.symbol_groups { + let group_symbols: Vec = group + .symbols + .iter() + .flat_map(|symbol| { + match (&symbol.name, &symbol.price_addr, &symbol.product_addr) { + (maybe_name, Some(price_addr), Some(product_addr)) => { + vec![P2WSymbol { + name: maybe_name.clone(), + product_addr: *product_addr, + price_addr: *price_addr, + }] + } + (Some(name), None, None) => { + let maybe_matched_symbols: Option<&Vec> = + name_to_symbols.get(name); + if let Some(matched_symbols) = maybe_matched_symbols { + matched_symbols.clone() + } else { + warn!( + "Could not find product account for configured symbol {}", + name + ); + vec![] + } + } + _ => { + // FIXME + panic!("Bad config"); + } + } + }) + .collect(); + + let group_conditions = group + .conditions + .as_ref() + .unwrap_or(&self.default_attestation_conditions); + configured_batches.extend(AttestationConfig::partition_into_batches( + &group.group_name, + max_batch_size, + group_conditions, + group_symbols, + )) + } + + // Find any accounts not included in existing batches and group them into a remainder batch + let existing_price_accounts: HashSet = configured_batches .iter() - .flat_map(move |g| { - let conditions4closure = g.conditions.clone(); - let name4closure = g.group_name.clone(); - - info!("Group {:?}, {} symbols", g.group_name, g.symbols.len(),); - - // Divide group into batches - g.symbols - .as_slice() - .chunks(max_batch_size) - .map(move |symbols| SymbolGroup { - group_name: name4closure.clone(), - symbols: symbols.to_vec(), - conditions: conditions4closure.clone(), - }) + .flat_map(|batch| batch.symbols.iter().map(|symbol| symbol.price_addr)) + .chain( + configured_batches + .iter() + .flat_map(|batch| batch.symbols.iter().map(|symbol| symbol.price_addr)), + ) + .collect(); + + let mut remaining_symbols: Vec = vec![]; + for product_account in product_accounts { + for price_account_key in &product_account.price_account_keys { + if !existing_price_accounts.contains(price_account_key) { + let symbol = P2WSymbol { + name: Some(product_account.name.clone()), + product_addr: product_account.key, + price_addr: *price_account_key, + }; + remaining_symbols.push(symbol); + } + } + } + let remaining_batches = AttestationConfig::partition_into_batches( + &"mapping".to_owned(), + max_batch_size, + &self.default_attestation_conditions, + remaining_symbols, + ); + + let all_batches = configured_batches + .into_iter() + .chain(remaining_batches.into_iter()) + .collect::>(); + + for batch in &all_batches { + info!( + "Batch {:?}, {} symbols", + batch.group_name, + batch.symbols.len(), + ); + } + + all_batches + } + + /// Partition symbols into a collection of batches, each of which contains no more than + /// `max_batch_size` symbols. + fn partition_into_batches( + batch_name: &String, + max_batch_size: usize, + conditions: &AttestationConditions, + symbols: Vec, + ) -> Vec { + symbols + .as_slice() + .chunks(max_batch_size) + .map(move |batch_symbols| SymbolBatch { + group_name: batch_name.to_owned(), + symbols: batch_symbols.to_vec(), + conditions: conditions.clone(), }) .collect() } } #[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] -pub struct NameGroup { - pub group_name: String, - /// Attestation conditions applied to all symbols in this group. +pub struct SymbolGroup { + pub group_name: String, + /// Attestation conditions applied to all symbols in this group /// If not provided, use the default attestation conditions from `AttestationConfig`. - pub conditions: Option, - /// The names of the symbols to include in this group - pub symbol_names: Vec, + pub conditions: Option, + + /// FIXME + /// Collection of symbols identified by their full account addresses. + /// These symbols will be published regardless of whether or not `mapping_addr` is provided in the + /// `AttestationConfig`. + pub symbols: Vec, } #[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] -pub struct SymbolGroup { +pub struct SymbolBatch { pub group_name: String, /// Attestation conditions applied to all symbols in this group pub conditions: AttestationConditions, @@ -171,6 +296,33 @@ impl Default for AttestationConditions { } /// Config entry for a Pyth product + price pair +#[derive(Clone, Default, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] +pub struct SymbolConfig { + /// On-chain symbol name + pub name: Option, + + #[serde( + deserialize_with = "opt_pubkey_string_de", + serialize_with = "opt_pubkey_string_ser" + )] + pub product_addr: Option, + #[serde( + deserialize_with = "opt_pubkey_string_de", + serialize_with = "opt_pubkey_string_ser" + )] + pub price_addr: Option, +} + +impl ToString for SymbolConfig { + // FIXME the default is bad + fn to_string(&self) -> String { + self.name.clone().unwrap_or(format!( + "Unnamed product {}", + self.product_addr.unwrap_or_default() + )) + } +} + #[derive(Clone, Default, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] pub struct P2WSymbol { /// User-defined human-readable name @@ -297,4 +449,6 @@ mod tests { Ok(()) } + + // FIXME: add merging unit test } diff --git a/solana/pyth2wormhole/client/src/batch_state.rs b/solana/pyth2wormhole/client/src/batch_state.rs index a0ee4d3b91..7574e2f2ae 100644 --- a/solana/pyth2wormhole/client/src/batch_state.rs +++ b/solana/pyth2wormhole/client/src/batch_state.rs @@ -1,6 +1,6 @@ use { crate::{ - attestation_cfg::SymbolGroup, + attestation_cfg::SymbolBatch, AttestationConditions, P2WSymbol, }, @@ -28,7 +28,7 @@ pub struct BatchState { } impl<'a> BatchState { - pub fn new(group: &SymbolGroup) -> Self { + pub fn new(group: &SymbolBatch) -> Self { Self { group_name: group.group_name.clone(), symbols: group.symbols.clone(), diff --git a/solana/pyth2wormhole/client/src/main.rs b/solana/pyth2wormhole/client/src/main.rs index c7a415dd90..c75771d0b7 100644 --- a/solana/pyth2wormhole/client/src/main.rs +++ b/solana/pyth2wormhole/client/src/main.rs @@ -29,8 +29,20 @@ use { Pyth2WormholeConfig, }, pyth2wormhole_client::{ - attestation_cfg::SymbolGroup, - *, + attestation_cfg::SymbolBatch, + crawl_pyth_mapping, + gen_init_tx, + gen_migrate_tx, + gen_set_config_tx, + gen_set_is_active_tx, + get_config_account, + start_metrics_server, + AttestationConfig, + BatchState, + ErrBoxSend, + P2WMessageQueue, + P2WSymbol, + RLMutex, }, sha3::{ Digest, @@ -52,10 +64,6 @@ use { ErrBox, }, std::{ - collections::{ - HashMap, - HashSet, - }, fs::File, net::SocketAddr, sync::Arc, @@ -326,24 +334,15 @@ async fn handle_attest_daemon_mode( }; // Use the mapping if specified - batch_cfg = match attestation_config_to_batches( + // If we cannot query the mapping account, retain the existing batch configuration. + batch_cfg = attestation_config_to_batches( &rpc_cfg, &attestation_cfg, config.max_batch_size as usize, ) .await - { - Ok(config) => config, - Err(err) => { - // If we cannot query the mapping account, retain the existing batch configuration. - error!( - "Could not crawl mapping {}: {:?}", - attestation_cfg.mapping_addr.unwrap_or_default(), - err - ); - batch_cfg - } - }; + .unwrap_or(batch_cfg); + // Hash currently known config hasher.update(serde_yaml::to_vec(&batch_cfg)?); @@ -446,7 +445,11 @@ async fn handle_attest_non_daemon_mode( let batch_config = attestation_config_to_batches(&rpc_cfg, &attestation_cfg, p2w_cfg.max_batch_size as usize) - .await?; + .await + .unwrap_or( + attestation_cfg.instantiate_batches(&vec![], p2w_cfg.max_batch_size as usize), + ); + let batches: Vec<_> = batch_config .into_iter() .map(|x| BatchState::new(&x)) @@ -504,127 +507,34 @@ async fn attestation_config_to_batches( rpc_cfg: &Arc>, attestation_cfg: &AttestationConfig, max_batch_size: usize, -) -> Result, ErrBox> { +) -> Result, ErrBox> { // Use the mapping if specified - if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { - crawl_pyth_mapping(&lock_and_make_rpc(rpc_cfg).await, mapping_addr) - .await - .map(|additional_accounts| { - debug!( - "Crawled mapping {} data:\n{:#?}", - mapping_addr, additional_accounts - ); + let products = if let Some(mapping_addr) = attestation_cfg.mapping_addr.as_ref() { + let product_accounts_res = + crawl_pyth_mapping(&lock_and_make_rpc(rpc_cfg).await, mapping_addr).await; - // Construct batches from the named groups in the attestation config - let mut name_to_symbols: HashMap> = HashMap::new(); - for product_account in &additional_accounts { - for price_account_key in &product_account.price_account_keys { - let symbol = P2WSymbol { - name: Some(product_account.name.clone()), - product_addr: product_account.key, - price_addr: *price_account_key, - }; - - name_to_symbols - .entry(product_account.name.clone()) - .or_insert(vec![]) - .push(symbol); - } - } - - let mut mapping_batches: Vec = vec![]; - for group in &attestation_cfg.name_groups { - let batch_items: Vec = group - .symbol_names - .iter() - .flat_map(|symbol| { - name_to_symbols - .get(symbol) - .into_iter() - .flat_map(|x| x.iter()) - }) - .cloned() - .collect(); - let group_conditions = group - .conditions - .as_ref() - .unwrap_or(&attestation_cfg.default_attestation_conditions); - mapping_batches.extend(partition_into_batches( - &group.group_name, - max_batch_size, - group_conditions, - batch_items, - )) - } - - // Construct batches that are explicitly provided in the AttestationConfig - let attestation_cfg_batches: Vec = - attestation_cfg.as_batches(max_batch_size); - - // Find any accounts not included in existing batches and group them into a remainder batch - let existing_price_accounts: HashSet = mapping_batches - .iter() - .flat_map(|batch| batch.symbols.iter().map(|symbol| symbol.price_addr)) - .chain( - attestation_cfg_batches - .iter() - .flat_map(|batch| batch.symbols.iter().map(|symbol| symbol.price_addr)), - ) - .collect(); - - let mut remaining_symbols: Vec = vec![]; - for product_account in additional_accounts { - for price_account_key in product_account.price_account_keys { - if !existing_price_accounts.contains(&price_account_key) { - let symbol = P2WSymbol { - name: Some(product_account.name.clone()), - product_addr: product_account.key, - price_addr: price_account_key, - }; - remaining_symbols.push(symbol); - } - } - } - let remaining_batches = partition_into_batches( - &"mapping".to_owned(), - max_batch_size, - &attestation_cfg.default_attestation_conditions, - remaining_symbols, + match product_accounts_res { + Err(err) => { + error!( + "Could not crawl mapping {}: {:?}", + attestation_cfg.mapping_addr.unwrap_or_default(), + err ); + } + _ => {} + } - attestation_cfg_batches - .into_iter() - .chain(mapping_batches.into_iter()) - .chain(remaining_batches.into_iter()) - .collect::>() - }) + product_accounts_res? } else { - Ok(attestation_cfg.as_batches(max_batch_size)) - } -} + vec![] + }; -/// Partition symbols into a collection of batches, each of which contains no more than -/// `max_batch_size` symbols. -fn partition_into_batches( - batch_name: &String, - max_batch_size: usize, - conditions: &AttestationConditions, - symbols: Vec, -) -> Vec { - symbols - .as_slice() - .chunks(max_batch_size) - .map(move |batch_symbols| SymbolGroup { - group_name: batch_name.to_owned(), - symbols: batch_symbols.to_vec(), - conditions: conditions.clone(), - }) - .collect() + Ok(attestation_cfg.instantiate_batches(&products, max_batch_size)) } /// Constructs attestation scheduling jobs from attestation config. fn prepare_attestation_sched_jobs( - batch_cfg: &[SymbolGroup], + batch_cfg: &[SymbolBatch], p2w_cfg: &Pyth2WormholeConfig, rpc_cfg: &Arc>, p2w_addr: &Pubkey, From efade987653fa0719e8a6b6599acbea0ba9be93f Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Tue, 29 Nov 2022 13:49:32 -0800 Subject: [PATCH 14/25] main --- third_party/pyth/p2w_autoattest.py | 31 ++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/third_party/pyth/p2w_autoattest.py b/third_party/pyth/p2w_autoattest.py index 2c752389a3..c0e3f6b8bd 100755 --- a/third_party/pyth/p2w_autoattest.py +++ b/third_party/pyth/p2w_autoattest.py @@ -190,32 +190,31 @@ def find_and_log_seqnos(s): mapping_reload_interval_mins: 1 # Very fast for testing purposes min_rpc_interval_ms: 0 # RIP RPC max_batch_jobs: 1000 # Where we're going there's no oomkiller -default_attestation_conditions: - min_interval_secs: 30 - price_changed_bps: 500 -name_groups: +symbol_groups: - group_name: fast_interval_only conditions: min_interval_secs: 1 - symbol_names: + symbols: """ - # integer-divide the symbols in ~thirds for three test groups: - # a fast group that is specified by symbol name only, - # a slower group that is specified with full account details, - # and a default group that is read from the mapping account without being explicitly configured. - third_len = len(pyth_accounts) // 3 + # integer-divide the symbols in ~half for two test + # groups. Assumes arr[:idx] is exclusive, and arr[idx:] is + # inclusive + third_len = len(pyth_accounts) // 3; for thing in pyth_accounts[:third_len]: name = thing["name"] + price = thing["price"] + product = thing["product"] cfg_yaml += f""" - - {name}""" + - name: {name} + price_addr: {price} + product_addr: {product}""" # End of fast_interval_only cfg_yaml += f""" -symbol_groups: - group_name: longer_interval_sensitive_changes conditions: min_interval_secs: 10 @@ -233,6 +232,14 @@ def find_and_log_seqnos(s): price_addr: {price} product_addr: {product}""" + cfg_yaml += f""" + - group_name: mapping + conditions: + min_interval_secs: 30 + price_changed_bps: 500 + symbols: [] +""" + with open(P2W_ATTESTATION_CFG, "w") as f: f.write(cfg_yaml) f.flush() From f8d2653840d1eb3f3f9bcf6035c97cabb9c050cf Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Tue, 29 Nov 2022 13:50:43 -0800 Subject: [PATCH 15/25] main --- .../pyth2wormhole/client/src/attestation_cfg.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index 8a96619afd..935b9feff9 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -396,16 +396,16 @@ mod tests { fn test_sanity() -> Result<(), ErrBox> { let fastbois = SymbolGroup { group_name: "fast bois".to_owned(), - conditions: AttestationConditions { + conditions: Some(AttestationConditions { min_interval_secs: 5, ..Default::default() - }, + }), symbols: vec![ - P2WSymbol { + SymbolConfig { name: Some("ETHUSD".to_owned()), ..Default::default() }, - P2WSymbol { + SymbolConfig { name: Some("BTCUSD".to_owned()), ..Default::default() }, @@ -414,16 +414,16 @@ mod tests { let slowbois = SymbolGroup { group_name: "slow bois".to_owned(), - conditions: AttestationConditions { + conditions: Some(AttestationConditions { min_interval_secs: 200, ..Default::default() - }, + }), symbols: vec![ - P2WSymbol { + SymbolConfig { name: Some("CNYAUD".to_owned()), ..Default::default() }, - P2WSymbol { + SymbolConfig { name: Some("INRPLN".to_owned()), ..Default::default() }, @@ -437,7 +437,6 @@ mod tests { mapping_addr: None, mapping_reload_interval_mins: 42, default_attestation_conditions: AttestationConditions::default(), - name_groups: vec![], symbol_groups: vec![fastbois, slowbois], }; From c5af84aef4e841fc3a10e18875e251ca976e672d Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Tue, 29 Nov 2022 13:53:00 -0800 Subject: [PATCH 16/25] fix formatting --- .../client/src/attestation_cfg.rs | 2 -- solana/pyth2wormhole/client/src/main.rs | 22 +++++++++---------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index 935b9feff9..e4b3a4a88d 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -448,6 +448,4 @@ mod tests { Ok(()) } - - // FIXME: add merging unit test } diff --git a/solana/pyth2wormhole/client/src/main.rs b/solana/pyth2wormhole/client/src/main.rs index c75771d0b7..9ce45b24a2 100644 --- a/solana/pyth2wormhole/client/src/main.rs +++ b/solana/pyth2wormhole/client/src/main.rs @@ -31,6 +31,7 @@ use { pyth2wormhole_client::{ attestation_cfg::SymbolBatch, crawl_pyth_mapping, + gen_attest_tx, gen_init_tx, gen_migrate_tx, gen_set_config_tx, @@ -446,9 +447,9 @@ async fn handle_attest_non_daemon_mode( let batch_config = attestation_config_to_batches(&rpc_cfg, &attestation_cfg, p2w_cfg.max_batch_size as usize) .await - .unwrap_or( - attestation_cfg.instantiate_batches(&vec![], p2w_cfg.max_batch_size as usize), - ); + .unwrap_or_else(|_| { + attestation_cfg.instantiate_batches(&[], p2w_cfg.max_batch_size as usize) + }); let batches: Vec<_> = batch_config .into_iter() @@ -513,15 +514,12 @@ async fn attestation_config_to_batches( let product_accounts_res = crawl_pyth_mapping(&lock_and_make_rpc(rpc_cfg).await, mapping_addr).await; - match product_accounts_res { - Err(err) => { - error!( - "Could not crawl mapping {}: {:?}", - attestation_cfg.mapping_addr.unwrap_or_default(), - err - ); - } - _ => {} + if let Err(err) = &product_accounts_res { + error!( + "Could not crawl mapping {}: {:?}", + attestation_cfg.mapping_addr.unwrap_or_default(), + err + ); } product_accounts_res? From 927cf594eba3c634538fe0dde01ab728d26a49ea Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Thu, 1 Dec 2022 07:11:58 -0800 Subject: [PATCH 17/25] blah --- .../pyth2wormhole/client/src/attestation_cfg.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index e4b3a4a88d..b2d4dca15f 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -52,8 +52,7 @@ pub struct AttestationConfig { /// configured attestation conditions. pub default_attestation_conditions: AttestationConditions, - /// Collection of symbols identified by their full account addresses. - /// These symbols will be published regardless of whether or not `mapping_addr` is provided. + /// Groups of symbols to publish. pub symbol_groups: Vec, } @@ -202,10 +201,7 @@ pub struct SymbolGroup { /// If not provided, use the default attestation conditions from `AttestationConfig`. pub conditions: Option, - /// FIXME - /// Collection of symbols identified by their full account addresses. - /// These symbols will be published regardless of whether or not `mapping_addr` is provided in the - /// `AttestationConfig`. + /// The symbols to publish in this group. pub symbols: Vec, } @@ -295,7 +291,13 @@ impl Default for AttestationConditions { } } -/// Config entry for a Pyth product + price pair +/// Config entry for a symbol to publish. +/// Symbols can be configured in two ways: +/// 1. Provide the address of both the product and price account. In this case, a name may be optionally +/// specified to improve human-readability. +/// 2. Provide the name of the feed in the product account. This will be matched against a list of +/// all symbol names generated from the mapping account (assuming `mapping_addr` is set in the +/// parent `AttestationConfig`). #[derive(Clone, Default, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] pub struct SymbolConfig { /// On-chain symbol name From 1879610b95f50c735f0f07016cc5a06649d5106d Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Thu, 1 Dec 2022 07:59:34 -0800 Subject: [PATCH 18/25] test --- .../client/src/attestation_cfg.rs | 231 ++++++++++++++---- 1 file changed, 178 insertions(+), 53 deletions(-) diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index b2d4dca15f..c23bb96ba8 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -1,5 +1,11 @@ use { - crate::P2WProductAccount, + crate::{ + attestation_cfg::SymbolConfig::{ + Address, + Name, + }, + P2WProductAccount, + }, log::{ info, warn, @@ -85,31 +91,29 @@ impl AttestationConfig { let group_symbols: Vec = group .symbols .iter() - .flat_map(|symbol| { - match (&symbol.name, &symbol.price_addr, &symbol.product_addr) { - (maybe_name, Some(price_addr), Some(product_addr)) => { - vec![P2WSymbol { - name: maybe_name.clone(), - product_addr: *product_addr, - price_addr: *price_addr, - }] - } - (Some(name), None, None) => { - let maybe_matched_symbols: Option<&Vec> = - name_to_symbols.get(name); - if let Some(matched_symbols) = maybe_matched_symbols { - matched_symbols.clone() - } else { - warn!( - "Could not find product account for configured symbol {}", - name - ); - vec![] - } - } - _ => { - // FIXME - panic!("Bad config"); + .flat_map(|symbol| match &symbol { + Address { + name, + product, + price, + } => { + vec![P2WSymbol { + name: name.clone(), + product_addr: *product, + price_addr: *price, + }] + } + Name { name } => { + let maybe_matched_symbols: Option<&Vec> = + name_to_symbols.get(name); + if let Some(matched_symbols) = maybe_matched_symbols { + matched_symbols.clone() + } else { + warn!( + "Could not find product account for configured symbol {}", + name + ); + vec![] } } }) @@ -298,30 +302,39 @@ impl Default for AttestationConditions { /// 2. Provide the name of the feed in the product account. This will be matched against a list of /// all symbol names generated from the mapping account (assuming `mapping_addr` is set in the /// parent `AttestationConfig`). -#[derive(Clone, Default, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] -pub struct SymbolConfig { - /// On-chain symbol name - pub name: Option, - - #[serde( - deserialize_with = "opt_pubkey_string_de", - serialize_with = "opt_pubkey_string_ser" - )] - pub product_addr: Option, - #[serde( - deserialize_with = "opt_pubkey_string_de", - serialize_with = "opt_pubkey_string_ser" - )] - pub price_addr: Option, +#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum SymbolConfig { + Name { + name: String, + }, + Address { + name: Option, + + #[serde( + deserialize_with = "pubkey_string_de", + serialize_with = "pubkey_string_ser" + )] + product: Pubkey, + #[serde( + deserialize_with = "pubkey_string_de", + serialize_with = "pubkey_string_ser" + )] + price: Pubkey, + }, } impl ToString for SymbolConfig { // FIXME the default is bad fn to_string(&self) -> String { + "".to_owned() + + /* self.name.clone().unwrap_or(format!( "Unnamed product {}", self.product_addr.unwrap_or_default() )) + */ } } @@ -391,6 +404,10 @@ where mod tests { use { super::*, + crate::attestation_cfg::SymbolConfig::{ + Address, + Name, + }, solitaire::ErrBox, }; @@ -403,13 +420,13 @@ mod tests { ..Default::default() }), symbols: vec![ - SymbolConfig { - name: Some("ETHUSD".to_owned()), - ..Default::default() + Name { + name: "ETHUSD".to_owned(), }, - SymbolConfig { - name: Some("BTCUSD".to_owned()), - ..Default::default() + Address { + name: Some("BTCUSD".to_owned()), + product: Pubkey::new_unique(), + price: Pubkey::new_unique(), }, ], }; @@ -421,13 +438,13 @@ mod tests { ..Default::default() }), symbols: vec![ - SymbolConfig { - name: Some("CNYAUD".to_owned()), - ..Default::default() + Name { + name: "CNYAUD".to_owned(), }, - SymbolConfig { - name: Some("INRPLN".to_owned()), - ..Default::default() + Address { + name: None, + product: Pubkey::new_unique(), + price: Pubkey::new_unique(), }, ], }; @@ -450,4 +467,112 @@ mod tests { Ok(()) } + + #[test] + fn test_instantiate_batches() -> Result<(), ErrBox> { + let btc_product_key = Pubkey::new_unique(); + let btc_price_key = Pubkey::new_unique(); + + let eth_product_key = Pubkey::new_unique(); + let eth_price_key_1 = Pubkey::new_unique(); + let eth_price_key_2 = Pubkey::new_unique(); + + let eth_dup_product_key = Pubkey::new_unique(); + let eth_dup_price_key = Pubkey::new_unique(); + + let attestation_conditions_1 = AttestationConditions { + min_interval_secs: 5, + ..Default::default() + }; + + let products = vec![P2WProductAccount { + name: "ETHUSD".to_owned(), + key: eth_product_key, + price_account_keys: HashSet::from([eth_price_key_1, eth_price_key_2]), + }]; + + let group1 = SymbolGroup { + group_name: "group 1".to_owned(), + conditions: Some(attestation_conditions_1.clone()), + symbols: vec![ + Address { + name: Some("BTCUSD".to_owned()), + price: btc_price_key, + product: btc_product_key, + }, + Name { + name: "ETHUSD".to_owned(), + }, + ], + }; + + let group2 = SymbolGroup { + group_name: "group 2".to_owned(), + conditions: None, + symbols: vec![Address { + name: Some("ETHUSD".to_owned()), + price: eth_dup_price_key, + product: eth_dup_product_key, + }], + }; + + let default_attestation_conditions = AttestationConditions { + min_interval_secs: 1, + ..Default::default() + }; + + let cfg = AttestationConfig { + min_msg_reuse_interval_ms: 1000, + max_msg_accounts: 100_000, + min_rpc_interval_ms: 2123, + mapping_addr: None, + mapping_reload_interval_mins: 42, + default_attestation_conditions: default_attestation_conditions.clone(), + symbol_groups: vec![group1, group2], + }; + + let batches = cfg.instantiate_batches(&products, 2); + + assert_eq!( + batches, + vec![ + SymbolBatch { + group_name: "group 1".to_owned(), + conditions: attestation_conditions_1.clone(), + symbols: vec![ + P2WSymbol { + name: Some("BTCUSD".to_owned()), + product_addr: btc_product_key, + price_addr: btc_price_key, + }, + P2WSymbol { + name: Some("ETHUSD".to_owned()), + product_addr: eth_product_key, + price_addr: eth_price_key_1, + } + ], + }, + SymbolBatch { + group_name: "group 1".to_owned(), + conditions: attestation_conditions_1, + symbols: vec![P2WSymbol { + name: Some("ETHUSD".to_owned()), + product_addr: eth_product_key, + price_addr: eth_price_key_2, + }], + }, + SymbolBatch { + group_name: "group 2".to_owned(), + conditions: default_attestation_conditions, + symbols: vec![P2WSymbol { + name: Some("ETHUSD".to_owned()), + product_addr: eth_dup_product_key, + price_addr: eth_dup_price_key, + }], + }, + ] + ); + + Ok(()) + } } From ac08f2e34e385d96c47f641dd917885fae487a2d Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Thu, 1 Dec 2022 08:46:54 -0800 Subject: [PATCH 19/25] cleanup --- .../client/src/attestation_cfg.rs | 132 ++++++++++-------- 1 file changed, 73 insertions(+), 59 deletions(-) diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index c23bb96ba8..4cdb3f00c9 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -1,7 +1,7 @@ use { crate::{ attestation_cfg::SymbolConfig::{ - Address, + Key, Name, }, P2WProductAccount, @@ -31,10 +31,12 @@ use { #[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] pub struct AttestationConfig { #[serde(default = "default_min_msg_reuse_interval_ms")] - pub min_msg_reuse_interval_ms: u64, + pub min_msg_reuse_interval_ms: u64, #[serde(default = "default_max_msg_accounts")] - pub max_msg_accounts: u64, - /// Optionally, we take a mapping account to add remaining symbols from a Pyth deployments. These symbols are processed under attestation conditions for the `default` symbol group. + pub max_msg_accounts: u64, + + /// Optionally, we take a mapping account to add remaining symbols from a Pyth deployments. + /// These symbols are processed under `default_attestation_conditions`. #[serde( deserialize_with = "opt_pubkey_string_de", serialize_with = "opt_pubkey_string_ser", @@ -59,10 +61,12 @@ pub struct AttestationConfig { pub default_attestation_conditions: AttestationConditions, /// Groups of symbols to publish. - pub symbol_groups: Vec, + pub symbol_groups: Vec, } impl AttestationConfig { + /// Instantiate the batches of symbols to attest by matching the config against the collection + /// of on-chain product accounts. pub fn instantiate_batches( &self, product_accounts: &[P2WProductAccount], @@ -92,7 +96,7 @@ impl AttestationConfig { .symbols .iter() .flat_map(|symbol| match &symbol { - Address { + Key { name, product, price, @@ -109,6 +113,11 @@ impl AttestationConfig { if let Some(matched_symbols) = maybe_matched_symbols { matched_symbols.clone() } else { + // It's slightly unfortunate that this is a warning, but it seems better than crashing. + // The data in the mapping account can change while the attester is running and trigger this case, + // which means that it is not necessarily a configuration problem. + // Note that any named symbols in the config which fail to match will still be included + // in the remaining_symbols group below. warn!( "Could not find product account for configured symbol {}", name @@ -199,7 +208,7 @@ impl AttestationConfig { } #[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] -pub struct SymbolGroup { +pub struct SymbolGroupConfig { pub group_name: String, /// Attestation conditions applied to all symbols in this group /// If not provided, use the default attestation conditions from `AttestationConfig`. @@ -209,6 +218,54 @@ pub struct SymbolGroup { pub symbols: Vec, } +/// Config entry for a symbol to attest. +#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum SymbolConfig { + /// A symbol specified by its product name. + Name { + /// The name of the symbol. This name is matched against the product account metadata. + name: String, + }, + /// A symbol specified by its product and price account keys. + Key { + /// Optional human-readable name for the symbol (for logging purposes). + /// This field does not need to match the on-chain data for the product. + name: Option, + + #[serde( + deserialize_with = "pubkey_string_de", + serialize_with = "pubkey_string_ser" + )] + product: Pubkey, + #[serde( + deserialize_with = "pubkey_string_de", + serialize_with = "pubkey_string_ser" + )] + price: Pubkey, + }, +} + +impl ToString for SymbolConfig { + fn to_string(&self) -> String { + match &self { + Name { name } => name.clone(), + Key { + name: Some(name), + product: _, + price: _, + } => name.clone(), + Key { + name: None, + product, + price: _, + } => { + format!("Unnamed product {}", product) + } + } + } +} + #[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] pub struct SymbolBatch { pub group_name: String, @@ -295,49 +352,6 @@ impl Default for AttestationConditions { } } -/// Config entry for a symbol to publish. -/// Symbols can be configured in two ways: -/// 1. Provide the address of both the product and price account. In this case, a name may be optionally -/// specified to improve human-readability. -/// 2. Provide the name of the feed in the product account. This will be matched against a list of -/// all symbol names generated from the mapping account (assuming `mapping_addr` is set in the -/// parent `AttestationConfig`). -#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum SymbolConfig { - Name { - name: String, - }, - Address { - name: Option, - - #[serde( - deserialize_with = "pubkey_string_de", - serialize_with = "pubkey_string_ser" - )] - product: Pubkey, - #[serde( - deserialize_with = "pubkey_string_de", - serialize_with = "pubkey_string_ser" - )] - price: Pubkey, - }, -} - -impl ToString for SymbolConfig { - // FIXME the default is bad - fn to_string(&self) -> String { - "".to_owned() - - /* - self.name.clone().unwrap_or(format!( - "Unnamed product {}", - self.product_addr.unwrap_or_default() - )) - */ - } -} - #[derive(Clone, Default, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] pub struct P2WSymbol { /// User-defined human-readable name @@ -405,7 +419,7 @@ mod tests { use { super::*, crate::attestation_cfg::SymbolConfig::{ - Address, + Key, Name, }, solitaire::ErrBox, @@ -413,7 +427,7 @@ mod tests { #[test] fn test_sanity() -> Result<(), ErrBox> { - let fastbois = SymbolGroup { + let fastbois = SymbolGroupConfig { group_name: "fast bois".to_owned(), conditions: Some(AttestationConditions { min_interval_secs: 5, @@ -423,7 +437,7 @@ mod tests { Name { name: "ETHUSD".to_owned(), }, - Address { + Key { name: Some("BTCUSD".to_owned()), product: Pubkey::new_unique(), price: Pubkey::new_unique(), @@ -431,7 +445,7 @@ mod tests { ], }; - let slowbois = SymbolGroup { + let slowbois = SymbolGroupConfig { group_name: "slow bois".to_owned(), conditions: Some(AttestationConditions { min_interval_secs: 200, @@ -441,7 +455,7 @@ mod tests { Name { name: "CNYAUD".to_owned(), }, - Address { + Key { name: None, product: Pubkey::new_unique(), price: Pubkey::new_unique(), @@ -491,11 +505,11 @@ mod tests { price_account_keys: HashSet::from([eth_price_key_1, eth_price_key_2]), }]; - let group1 = SymbolGroup { + let group1 = SymbolGroupConfig { group_name: "group 1".to_owned(), conditions: Some(attestation_conditions_1.clone()), symbols: vec![ - Address { + Key { name: Some("BTCUSD".to_owned()), price: btc_price_key, product: btc_product_key, @@ -506,10 +520,10 @@ mod tests { ], }; - let group2 = SymbolGroup { + let group2 = SymbolGroupConfig { group_name: "group 2".to_owned(), conditions: None, - symbols: vec![Address { + symbols: vec![Key { name: Some("ETHUSD".to_owned()), price: eth_dup_price_key, product: eth_dup_product_key, From 1d4a611d4092392b60028b74b1c0b7b918e6abd8 Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Thu, 1 Dec 2022 08:47:38 -0800 Subject: [PATCH 20/25] fix python --- third_party/pyth/p2w_autoattest.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/third_party/pyth/p2w_autoattest.py b/third_party/pyth/p2w_autoattest.py index c0e3f6b8bd..a31ad5af9f 100755 --- a/third_party/pyth/p2w_autoattest.py +++ b/third_party/pyth/p2w_autoattest.py @@ -208,7 +208,8 @@ def find_and_log_seqnos(s): product = thing["product"] cfg_yaml += f""" - - name: {name} + - type: name + name: {name} price_addr: {price} product_addr: {product}""" @@ -228,7 +229,8 @@ def find_and_log_seqnos(s): product = stuff["product"] cfg_yaml += f""" - - name: {name} + - type: name + name: {name} price_addr: {price} product_addr: {product}""" From b025711e73897a79ab264876b2f4e68f65082a5c Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Thu, 1 Dec 2022 10:16:27 -0800 Subject: [PATCH 21/25] config --- third_party/pyth/p2w_autoattest.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/third_party/pyth/p2w_autoattest.py b/third_party/pyth/p2w_autoattest.py index 7198ce3e24..d660780030 100755 --- a/third_party/pyth/p2w_autoattest.py +++ b/third_party/pyth/p2w_autoattest.py @@ -155,7 +155,7 @@ product = thing["product"] cfg_yaml += f""" - - type: name + - type: key name: {name} price_addr: {price} product_addr: {product}""" @@ -176,7 +176,7 @@ product = stuff["product"] cfg_yaml += f""" - - type: name + - type: key name: {name} price_addr: {price} product_addr: {product}""" From 4ce957586b133a62e2d6526bc0b9d1d2769e9ced Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Thu, 1 Dec 2022 10:29:32 -0800 Subject: [PATCH 22/25] fix test --- solana/pyth2wormhole/client/src/attestation_cfg.rs | 2 +- solana/pyth2wormhole/client/src/lib.rs | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index 4cdb3f00c9..3673b2b534 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -502,7 +502,7 @@ mod tests { let products = vec![P2WProductAccount { name: "ETHUSD".to_owned(), key: eth_product_key, - price_account_keys: HashSet::from([eth_price_key_1, eth_price_key_2]), + price_account_keys: vec![eth_price_key_1, eth_price_key_2], }]; let group1 = SymbolGroupConfig { diff --git a/solana/pyth2wormhole/client/src/lib.rs b/solana/pyth2wormhole/client/src/lib.rs index 5d99d94b5a..06b5f5cd8b 100644 --- a/solana/pyth2wormhole/client/src/lib.rs +++ b/solana/pyth2wormhole/client/src/lib.rs @@ -80,7 +80,6 @@ use { AccountState, ErrBox, }, - std::collections::HashSet, }; /// Future-friendly version of solitaire::ErrBox @@ -458,7 +457,7 @@ pub async fn crawl_pyth_mapping( } // loop until the last non-zero PriceAccount.next account - let mut price_accounts: HashSet = HashSet::new(); + let mut price_accounts: Vec = vec![]; loop { let price_bytes = rpc_client.get_account_data(&price_addr).await?; let price = match load_price_account(&price_bytes) { @@ -469,7 +468,7 @@ pub async fn crawl_pyth_mapping( } }; - price_accounts.insert(price_addr); + price_accounts.push(price_addr); n_prod_prices += 1; if price.next == Pubkey::default() { @@ -519,5 +518,5 @@ pub async fn crawl_pyth_mapping( pub struct P2WProductAccount { pub key: Pubkey, pub name: String, - pub price_account_keys: HashSet, + pub price_account_keys: Vec, } From 90889e9a6871d22073cd4491b9f60d5b1fa508a3 Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Thu, 1 Dec 2022 11:01:57 -0800 Subject: [PATCH 23/25] grr --- third_party/pyth/p2w_autoattest.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/third_party/pyth/p2w_autoattest.py b/third_party/pyth/p2w_autoattest.py index d660780030..adc73ed4e5 100755 --- a/third_party/pyth/p2w_autoattest.py +++ b/third_party/pyth/p2w_autoattest.py @@ -157,8 +157,8 @@ cfg_yaml += f""" - type: key name: {name} - price_addr: {price} - product_addr: {product}""" + price: {price} + product: {product}""" # End of fast_interval_only @@ -178,8 +178,8 @@ cfg_yaml += f""" - type: key name: {name} - price_addr: {price} - product_addr: {product}""" + price: {price} + product: {product}""" cfg_yaml += f""" - group_name: mapping From caceedba6e12c8f017d2999f53e59f5c0240268a Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Thu, 1 Dec 2022 11:17:47 -0800 Subject: [PATCH 24/25] grr --- third_party/pyth/p2w_autoattest.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/third_party/pyth/p2w_autoattest.py b/third_party/pyth/p2w_autoattest.py index adc73ed4e5..122fdc4c52 100755 --- a/third_party/pyth/p2w_autoattest.py +++ b/third_party/pyth/p2w_autoattest.py @@ -137,6 +137,8 @@ mapping_reload_interval_mins: 1 # Very fast for testing purposes min_rpc_interval_ms: 0 # RIP RPC max_batch_jobs: 1000 # Where we're going there's no oomkiller +default_attestation_conditions: + min_interval_secs: 60 symbol_groups: - group_name: fast_interval_only conditions: From 83f8502f79c4f7968dbcb8e4de29abb471fc61f6 Mon Sep 17 00:00:00 2001 From: Jayant Krishnamurthy Date: Fri, 2 Dec 2022 08:06:12 -0800 Subject: [PATCH 25/25] comments --- .../client/src/attestation_cfg.rs | 62 +++++++++++++------ solana/pyth2wormhole/client/src/lib.rs | 8 +-- solana/pyth2wormhole/client/src/main.rs | 2 + 3 files changed, 50 insertions(+), 22 deletions(-) diff --git a/solana/pyth2wormhole/client/src/attestation_cfg.rs b/solana/pyth2wormhole/client/src/attestation_cfg.rs index 3673b2b534..5c0ce34dbf 100644 --- a/solana/pyth2wormhole/client/src/attestation_cfg.rs +++ b/solana/pyth2wormhole/client/src/attestation_cfg.rs @@ -76,16 +76,18 @@ impl AttestationConfig { let mut name_to_symbols: HashMap> = HashMap::new(); for product_account in product_accounts { for price_account_key in &product_account.price_account_keys { - let symbol = P2WSymbol { - name: Some(product_account.name.clone()), - product_addr: product_account.key, - price_addr: *price_account_key, - }; - - name_to_symbols - .entry(product_account.name.clone()) - .or_insert(vec![]) - .push(symbol); + if let Some(name) = &product_account.name { + let symbol = P2WSymbol { + name: Some(name.clone()), + product_addr: product_account.key, + price_addr: *price_account_key, + }; + + name_to_symbols + .entry(name.clone()) + .or_insert(vec![]) + .push(symbol); + } } } @@ -156,7 +158,7 @@ impl AttestationConfig { for price_account_key in &product_account.price_account_keys { if !existing_price_accounts.contains(price_account_key) { let symbol = P2WSymbol { - name: Some(product_account.name.clone()), + name: product_account.name.clone(), product_addr: product_account.key, price_addr: *price_account_key, }; @@ -224,7 +226,10 @@ pub struct SymbolGroupConfig { pub enum SymbolConfig { /// A symbol specified by its product name. Name { - /// The name of the symbol. This name is matched against the product account metadata. + /// The name of the symbol. This name is matched against the "symbol" field in the product + /// account metadata. If multiple price accounts have this name (either because 2 product + /// accounts have the same symbol or a single product account has multiple price accounts), + /// it matches *all* of them and puts them into this group. name: String, }, /// A symbol specified by its product and price account keys. @@ -266,6 +271,8 @@ impl ToString for SymbolConfig { } } +/// A batch of symbols that's ready to be attested. Includes all necessary information +/// (such as price/product account keys). #[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)] pub struct SymbolBatch { pub group_name: String, @@ -491,6 +498,9 @@ mod tests { let eth_price_key_1 = Pubkey::new_unique(); let eth_price_key_2 = Pubkey::new_unique(); + let unk_product_key = Pubkey::new_unique(); + let unk_price_key = Pubkey::new_unique(); + let eth_dup_product_key = Pubkey::new_unique(); let eth_dup_price_key = Pubkey::new_unique(); @@ -499,11 +509,18 @@ mod tests { ..Default::default() }; - let products = vec![P2WProductAccount { - name: "ETHUSD".to_owned(), - key: eth_product_key, - price_account_keys: vec![eth_price_key_1, eth_price_key_2], - }]; + let products = vec![ + P2WProductAccount { + name: Some("ETHUSD".to_owned()), + key: eth_product_key, + price_account_keys: vec![eth_price_key_1, eth_price_key_2], + }, + P2WProductAccount { + name: None, + key: unk_product_key, + price_account_keys: vec![unk_price_key], + }, + ]; let group1 = SymbolGroupConfig { group_name: "group 1".to_owned(), @@ -577,13 +594,22 @@ mod tests { }, SymbolBatch { group_name: "group 2".to_owned(), - conditions: default_attestation_conditions, + conditions: default_attestation_conditions.clone(), symbols: vec![P2WSymbol { name: Some("ETHUSD".to_owned()), product_addr: eth_dup_product_key, price_addr: eth_dup_price_key, }], }, + SymbolBatch { + group_name: "mapping".to_owned(), + conditions: default_attestation_conditions, + symbols: vec![P2WSymbol { + name: None, + product_addr: unk_product_key, + price_addr: unk_price_key, + }], + } ] ); diff --git a/solana/pyth2wormhole/client/src/lib.rs b/solana/pyth2wormhole/client/src/lib.rs index 06b5f5cd8b..1cc29029c1 100644 --- a/solana/pyth2wormhole/client/src/lib.rs +++ b/solana/pyth2wormhole/client/src/lib.rs @@ -435,10 +435,10 @@ pub async fn crawl_pyth_mapping( } }; - let mut prod_name = ""; + let mut prod_name = None; for (key, val) in prod.iter() { if key.eq_ignore_ascii_case("symbol") { - prod_name = val; + prod_name = Some(val.to_owned()); } } @@ -484,7 +484,7 @@ pub async fn crawl_pyth_mapping( } ret.push(P2WProductAccount { key: *prod_addr, - name: prod_name.to_owned(), + name: prod_name.clone(), price_account_keys: price_accounts, }); @@ -517,6 +517,6 @@ pub async fn crawl_pyth_mapping( #[derive(Clone, Debug)] pub struct P2WProductAccount { pub key: Pubkey, - pub name: String, + pub name: Option, pub price_account_keys: Vec, } diff --git a/solana/pyth2wormhole/client/src/main.rs b/solana/pyth2wormhole/client/src/main.rs index e5b9205598..235ce55f32 100644 --- a/solana/pyth2wormhole/client/src/main.rs +++ b/solana/pyth2wormhole/client/src/main.rs @@ -504,6 +504,8 @@ async fn handle_attest_non_daemon_mode( Ok(()) } +/// Generate batches to attest by retrieving the on-chain product account data and grouping it +/// according to the configuration in `attestation_cfg`. async fn attestation_config_to_batches( rpc_cfg: &Arc>, attestation_cfg: &AttestationConfig,