Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions solana/pyth2wormhole/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions solana/pyth2wormhole/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ solana-transaction-status = "=1.10.31"
solitaire = {git = "https://github.com/wormhole-foundation/wormhole", tag = "v2.8.9"}
tokio = {version = "1", features = ["sync", "rt-multi-thread", "time"]}
futures = "0.3.21"
sha3 = "0.10.6"
generic-array = "0.14.6"

[dev-dependencies]
pyth-client = "0.5.0"
Expand Down
63 changes: 52 additions & 11 deletions solana/pyth2wormhole/client/src/attestation_cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::{
str::FromStr,
};

use log::info;

use serde::{
de::Error,
Deserialize,
Expand All @@ -16,8 +18,10 @@ use serde::{
};
use solana_program::pubkey::Pubkey;

use crate::BatchState;

/// Pyth2wormhole config specific to attestation requests
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)]
pub struct AttestationConfig {
#[serde(default = "default_min_msg_reuse_interval_ms")]
pub min_msg_reuse_interval_ms: u64,
Expand All @@ -30,6 +34,15 @@ pub struct AttestationConfig {
default // Uses Option::default() which is None
)]
pub mapping_addr: Option<Pubkey>,
/// The known symbol list will be reloaded based off this
/// interval, to account for mapping changes. Note: This interval
/// will only work if the mapping address is defined. Whenever
/// it's time to look up the mapping, new attestation jobs are
/// started lazily, only if mapping contents affected the known
/// symbol list, and before stopping the pre-existing obsolete
/// jobs to maintain uninterrupted cranking.
#[serde(default = "default_mapping_reload_interval_mins")]
pub mapping_reload_interval_mins: u64,
#[serde(default = "default_min_rpc_interval_ms")]
/// Rate-limiting minimum delay between RPC requests in milliseconds"
pub min_rpc_interval_ms: u64,
Expand All @@ -49,7 +62,7 @@ impl AttestationConfig {
for existing_group in &self.symbol_groups {
for existing_sym in &existing_group.symbols {
// Check if new symbols mention this product
if let Some(mut prices) = new_symbols.get_mut(&existing_sym.product_addr) {
if let Some(prices) = new_symbols.get_mut(&existing_sym.product_addr) {
// Prune the price if exists
prices.remove(&existing_sym.price_addr);
}
Expand All @@ -74,7 +87,7 @@ impl AttestationConfig {
.iter_mut()
.find(|g| g.group_name == group_name) // Advances the iterator and returns Some(item) on first hit
{
Some(mut existing_group) => existing_group.symbols.append(&mut new_symbols_vec),
Some(existing_group) => existing_group.symbols.append(&mut new_symbols_vec),
None if new_symbols_vec.len() != 0 => {
// Group does not exist, assume defaults
let new_group = SymbolGroup {
Expand All @@ -88,9 +101,30 @@ impl AttestationConfig {
None => {}
}
}

pub fn as_batches(&self, max_batch_size: usize) -> Vec<BatchState> {
self.symbol_groups
.iter()
.map(move |g| {
let conditions4closure = g.conditions.clone();
let name4closure = g.group_name.clone();

info!("Group {:?}, {} symbols", g.group_name, g.symbols.len(),);

// Divide group into batches
g.symbols
.as_slice()
.chunks(max_batch_size.clone())
.map(move |symbols| {
BatchState::new(name4closure.clone(), symbols, conditions4closure.clone())
})
})
.flatten()
.collect()
}
}

#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)]
pub struct SymbolGroup {
pub group_name: String,
/// Attestation conditions applied to all symbols in this group
Expand All @@ -106,6 +140,10 @@ pub const fn default_min_msg_reuse_interval_ms() -> u64 {
10_000 // 10s
}

pub const fn default_mapping_reload_interval_mins() -> u64 {
15
}

pub const fn default_min_rpc_interval_ms() -> u64 {
150
}
Expand All @@ -122,7 +160,7 @@ pub const fn default_max_batch_jobs() -> usize {
/// of the active conditions is met. Option<> fields can be
/// de-activated with None. All conditions are inactive by default,
/// except for the non-Option ones.
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[derive(Clone, Debug, Hash, Deserialize, Serialize, PartialEq)]
pub struct AttestationConditions {
/// Baseline, unconditional attestation interval. Attestation is triggered if the specified interval elapsed since last attestation.
#[serde(default = "default_min_interval_secs")]
Expand All @@ -134,9 +172,10 @@ pub struct AttestationConditions {
#[serde(default = "default_max_batch_jobs")]
pub max_batch_jobs: usize,

/// Trigger attestation if price changes by the specified percentage.
/// Trigger attestation if price changes by the specified
/// percentage, expressed in integer basis points (1bps = 0.01%)
#[serde(default)]
pub price_changed_pct: Option<f64>,
pub price_changed_bps: Option<u64>,

/// Trigger attestation if publish_time advances at least the
/// specified amount.
Expand All @@ -152,11 +191,11 @@ impl AttestationConditions {
let AttestationConditions {
min_interval_secs: _min_interval_secs,
max_batch_jobs: _max_batch_jobs,
price_changed_pct,
price_changed_bps,
publish_time_min_delta_secs,
} = self;

price_changed_pct.is_some() || publish_time_min_delta_secs.is_some()
price_changed_bps.is_some() || publish_time_min_delta_secs.is_some()
}
}

Expand All @@ -165,14 +204,14 @@ impl Default for AttestationConditions {
Self {
min_interval_secs: default_min_interval_secs(),
max_batch_jobs: default_max_batch_jobs(),
price_changed_pct: None,
price_changed_bps: None,
publish_time_min_delta_secs: None,
}
}
}

/// Config entry for a Pyth product + price pair
#[derive(Clone, Default, Debug, Deserialize, Serialize, PartialEq, Eq)]
#[derive(Clone, Default, Debug, Hash, Deserialize, Serialize, PartialEq, Eq)]
pub struct P2WSymbol {
/// User-defined human-readable name
pub name: Option<String>,
Expand Down Expand Up @@ -283,6 +322,7 @@ mod tests {
max_msg_accounts: 100_000,
min_rpc_interval_ms: 2123,
mapping_addr: None,
mapping_reload_interval_mins: 42,
symbol_groups: vec![fastbois, slowbois],
};

Expand All @@ -302,6 +342,7 @@ mod tests {
max_msg_accounts: 100,
min_rpc_interval_ms: 42422,
mapping_addr: None,
mapping_reload_interval_mins: 42,
symbol_groups: vec![],
};

Expand Down
23 changes: 9 additions & 14 deletions solana/pyth2wormhole/client/src/batch_state.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use futures::future::TryFutureExt;
use log::{
debug,
trace,
warn,
};
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::Signature;

use pyth_sdk_solana::state::PriceAccount;

Expand All @@ -16,31 +13,29 @@ use std::time::{

use crate::{
AttestationConditions,
ErrBox,
P2WSymbol,
RLMutex,
};

/// Runtime representation of a batch. It refers to the original group
/// from the config.
#[derive(Debug)]
pub struct BatchState<'a> {
pub struct BatchState {
pub group_name: String,
pub symbols: &'a [P2WSymbol],
pub symbols: Vec<P2WSymbol>,
pub last_known_symbol_states: Vec<Option<PriceAccount>>,
pub conditions: AttestationConditions,
pub last_job_finished_at: Instant,
}

impl<'a> BatchState<'a> {
impl<'a> BatchState {
pub fn new(
group_name: String,
symbols: &'a [P2WSymbol],
symbols: &[P2WSymbol],
conditions: AttestationConditions,
) -> Self {
Self {
group_name,
symbols,
symbols: symbols.to_vec(),
conditions,
last_known_symbol_states: vec![None; symbols.len()],
last_job_finished_at: Instant::now(),
Expand Down Expand Up @@ -69,7 +64,7 @@ impl<'a> BatchState<'a> {

// Only lookup and compare symbols if the conditions require
if self.conditions.need_onchain_lookup() {
let mut new_symbol_states: Vec<Option<PriceAccount>> =
let new_symbol_states: Vec<Option<PriceAccount>> =
match c.get_multiple_accounts(&pubkeys).await {
Ok(acc_opts) => {
acc_opts
Expand Down Expand Up @@ -120,9 +115,9 @@ impl<'a> BatchState<'a> {
))
}

// price_changed_pct
} else if let Some(pct) = self.conditions.price_changed_pct {
let pct = pct.abs();
// price_changed_bps
} else if let Some(bps) = self.conditions.price_changed_bps {
let pct = bps as f64 / 100.0;
let price_pct_diff = ((old.agg.price as f64 - new.agg.price as f64)
/ old.agg.price as f64
* 100.0)
Expand Down
6 changes: 2 additions & 4 deletions solana/pyth2wormhole/client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ pub enum Action {
},
#[clap(about = "Print out emitter address for the specified pyth2wormhole contract")]
GetEmitter,
#[clap(
about = "Set the value of is_active config as ops_owner"
)]
#[clap(about = "Set the value of is_active config as ops_owner")]
SetIsActive {
/// Current ops owner keypair path
#[clap(
Expand All @@ -139,5 +137,5 @@ pub enum Action {
possible_values = ["true", "false"],
)]
new_is_active: String,
}
},
}
Loading