From 82108e4064ddabc3de3d441843a5b58e1ae41ace Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Thu, 16 Feb 2023 20:26:55 +0000 Subject: [PATCH 1/5] Add a key for primary vs secondary in logs --- src/agent.rs | 4 ++-- src/bin/agent.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/agent.rs b/src/agent.rs index 128035e7..bfff806c 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -103,7 +103,7 @@ impl Agent { self.config.primary_network.clone(), local_store_tx.clone(), primary_oracle_updates_tx, - logger.clone(), + logger.new(o!("primary" => true)), )?); // Spawn the secondary network, if needed @@ -112,7 +112,7 @@ impl Agent { config.clone(), local_store_tx.clone(), secondary_oracle_updates_tx, - logger.clone(), + logger.new(o!("primary" => false)), )?); } diff --git a/src/bin/agent.rs b/src/bin/agent.rs index dbf52e05..1a3b7614 100644 --- a/src/bin/agent.rs +++ b/src/bin/agent.rs @@ -32,7 +32,7 @@ async fn main() { let logger = slog::Logger::root( slog_async::Async::default( LogBuilder::new( - slog_term::CompactFormat::new(slog_term::TermDecorator::new().stdout().build()) + slog_term::FullFormat::new(slog_term::TermDecorator::new().stdout().build()) .build() .fuse(), ) From 5f554d71091331f83119d9185a5b52dfc60c8823 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Fri, 17 Feb 2023 10:06:16 +0000 Subject: [PATCH 2/5] Add timeout for exporter --- config/config.toml | 3 +++ src/agent/solana/exporter.rs | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/config/config.toml b/config/config.toml index ca4dfbb4..8a4ea7f6 100644 --- a/config/config.toml +++ b/config/config.toml @@ -89,6 +89,9 @@ key_store.root_path = "/path/to/keystore" # Age after which a price update is considered stale and not published # exporter.staleness_threshold = "5s" +# RPC timeout for the requests to the RPC for sending price updates and monitoring them +# exporter.rpc_timeout = "10s" + # Maximum size of a batch # exporter.max_batch_size = 12 diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index 7ce715a7..c781b2e5 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -97,6 +97,9 @@ pub struct Config { /// Age after which a price update is considered stale and not published #[serde(with = "humantime_serde")] pub staleness_threshold: Duration, + /// RPC timeout for the requests to the RPC for sending price updates and monitoring them + #[serde(with = "humantime_serde")] + pub rpc_timeout: Duration, /// Maximum size of a batch pub max_batch_size: usize, /// Capacity of the channel between the Exporter and the Transaction Monitor @@ -117,6 +120,7 @@ impl Default for Config { refresh_network_state_interval_duration: Duration::from_millis(200), publish_interval_duration: Duration::from_secs(1), staleness_threshold: Duration::from_secs(5), + rpc_timeout: Duration::from_secs(10), max_batch_size: 12, inflight_transactions_channel_capacity: 10000, transaction_monitor: Default::default(), @@ -214,7 +218,7 @@ impl Exporter { ) -> Self { let publish_interval = time::interval(config.publish_interval_duration); Exporter { - rpc_client: RpcClient::new(rpc_url.to_string()), + rpc_client: RpcClient::new_with_timeout(rpc_url.to_string(), config.rpc_timeout), config, publish_interval, key_store, From 7d07b7eabf0c0addb302196aa2d02a5c753cda02 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Fri, 17 Feb 2023 10:07:07 +0000 Subject: [PATCH 3/5] Bump version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 745a67e4..a73cddaa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2384,7 +2384,7 @@ dependencies = [ [[package]] name = "pyth-agent" -version = "0.1.2" +version = "0.1.3" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index b2648476..02383f2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyth-agent" -version = "0.1.2" +version = "0.1.3" edition = "2021" [[bin]] From 0cdbca5e87239b2062f83b2e44424f2425a1a4bc Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Fri, 17 Feb 2023 10:24:28 +0000 Subject: [PATCH 4/5] Move timeout to network level --- config/config.toml | 6 +++--- src/agent/solana.rs | 27 +++++++++++++++++---------- src/agent/solana/exporter.rs | 19 +++++++++++-------- src/agent/solana/oracle.rs | 18 ++++++++++++++++-- 4 files changed, 47 insertions(+), 23 deletions(-) diff --git a/config/config.toml b/config/config.toml index 8a4ea7f6..cf928bc7 100644 --- a/config/config.toml +++ b/config/config.toml @@ -67,6 +67,9 @@ key_store.root_path = "/path/to/keystore" # This can be omitted when oracle.subscriber_enabled is set to false. # wss_url = "ws://api.devnet.solana.com" +# Timeout for the requests to the RPC +# rpc_timeout = "10s" + # Path to the key store. # key_store.root_path = "/path/to/keystore" @@ -89,9 +92,6 @@ key_store.root_path = "/path/to/keystore" # Age after which a price update is considered stale and not published # exporter.staleness_threshold = "5s" -# RPC timeout for the requests to the RPC for sending price updates and monitoring them -# exporter.rpc_timeout = "10s" - # Maximum size of a batch # exporter.max_batch_size = 12 diff --git a/src/agent/solana.rs b/src/agent/solana.rs index 6b56d8e8..a45bdeff 100644 --- a/src/agent/solana.rs +++ b/src/agent/solana.rs @@ -24,6 +24,7 @@ pub mod network { Serialize, }, slog::Logger, + std::time::Duration, tokio::{ sync::{ mpsc, @@ -38,25 +39,29 @@ pub mod network { #[serde(default)] pub struct Config { /// HTTP RPC endpoint - pub rpc_url: String, + pub rpc_url: String, /// WSS RPC endpoint - pub wss_url: String, + pub wss_url: String, + /// Timeout for the requests to the RPC + #[serde(with = "humantime_serde")] + pub rpc_timeout: Duration, /// Keystore - pub key_store: key_store::Config, + pub key_store: key_store::Config, /// Configuration for the Oracle reading data from this network - pub oracle: oracle::Config, + pub oracle: oracle::Config, /// Configuration for the Exporter publishing data to this network - pub exporter: exporter::Config, + pub exporter: exporter::Config, } impl Default for Config { fn default() -> Self { Self { - rpc_url: "http://localhost:8899".to_string(), - wss_url: "ws://localhost:8900".to_string(), - key_store: Default::default(), - oracle: Default::default(), - exporter: Default::default(), + rpc_url: "http://localhost:8899".to_string(), + wss_url: "ws://localhost:8900".to_string(), + rpc_timeout: Duration::from_secs(10), + key_store: Default::default(), + oracle: Default::default(), + exporter: Default::default(), } } } @@ -72,6 +77,7 @@ pub mod network { config.oracle.clone(), &config.rpc_url, &config.wss_url, + config.rpc_timeout, KeyStore::new(config.key_store.clone())?, global_store_update_tx, logger.clone(), @@ -81,6 +87,7 @@ pub mod network { let exporter_jhs = exporter::spawn_exporter( config.exporter, &config.rpc_url, + config.rpc_timeout, KeyStore::new(config.key_store.clone())?, local_store_tx, logger, diff --git a/src/agent/solana/exporter.rs b/src/agent/solana/exporter.rs index c781b2e5..ed10c5cc 100644 --- a/src/agent/solana/exporter.rs +++ b/src/agent/solana/exporter.rs @@ -97,9 +97,6 @@ pub struct Config { /// Age after which a price update is considered stale and not published #[serde(with = "humantime_serde")] pub staleness_threshold: Duration, - /// RPC timeout for the requests to the RPC for sending price updates and monitoring them - #[serde(with = "humantime_serde")] - pub rpc_timeout: Duration, /// Maximum size of a batch pub max_batch_size: usize, /// Capacity of the channel between the Exporter and the Transaction Monitor @@ -120,7 +117,6 @@ impl Default for Config { refresh_network_state_interval_duration: Duration::from_millis(200), publish_interval_duration: Duration::from_secs(1), staleness_threshold: Duration::from_secs(5), - rpc_timeout: Duration::from_secs(10), max_batch_size: 12, inflight_transactions_channel_capacity: 10000, transaction_monitor: Default::default(), @@ -134,6 +130,7 @@ impl Default for Config { pub fn spawn_exporter( config: Config, rpc_url: &str, + rpc_timeout: Duration, key_store: KeyStore, local_store_tx: Sender, logger: Logger, @@ -141,7 +138,8 @@ pub fn spawn_exporter( // Create and spawn the network state querier let (network_state_tx, network_state_rx) = watch::channel(Default::default()); let mut network_state_querier = NetworkStateQuerier::new( - &rpc_url, + rpc_url, + rpc_timeout, time::interval(config.refresh_network_state_interval_duration), network_state_tx, logger.clone(), @@ -154,6 +152,7 @@ pub fn spawn_exporter( let mut transaction_monitor = TransactionMonitor::new( config.transaction_monitor.clone(), rpc_url, + rpc_timeout, transactions_rx, logger.clone(), ); @@ -163,6 +162,7 @@ pub fn spawn_exporter( let mut exporter = Exporter::new( config, rpc_url, + rpc_timeout, key_store, local_store_tx, network_state_rx, @@ -210,6 +210,7 @@ impl Exporter { pub fn new( config: Config, rpc_url: &str, + rpc_timeout: Duration, key_store: KeyStore, local_store_tx: Sender, network_state_rx: watch::Receiver, @@ -218,7 +219,7 @@ impl Exporter { ) -> Self { let publish_interval = time::interval(config.publish_interval_duration); Exporter { - rpc_client: RpcClient::new_with_timeout(rpc_url.to_string(), config.rpc_timeout), + rpc_client: RpcClient::new_with_timeout(rpc_url.to_string(), rpc_timeout), config, publish_interval, key_store, @@ -449,12 +450,13 @@ struct NetworkStateQuerier { impl NetworkStateQuerier { pub fn new( rpc_endpoint: &str, + rpc_timeout: Duration, query_interval: Interval, network_state_tx: watch::Sender, logger: Logger, ) -> Self { NetworkStateQuerier { - rpc_client: RpcClient::new(rpc_endpoint.to_string()), + rpc_client: RpcClient::new_with_timeout(rpc_endpoint.to_string(), rpc_timeout), query_interval, network_state_tx, logger, @@ -562,11 +564,12 @@ mod transaction_monitor { pub fn new( config: Config, rpc_url: &str, + rpc_timeout: Duration, transactions_rx: mpsc::Receiver, logger: Logger, ) -> Self { let poll_interval = time::interval(config.poll_interval_duration); - let rpc_client = RpcClient::new(rpc_url.to_string()); + let rpc_client = RpcClient::new_with_timeout(rpc_url.to_string(), rpc_timeout); TransactionMonitor { config, rpc_client, diff --git a/src/agent/solana/oracle.rs b/src/agent/solana/oracle.rs index 80748f7d..12c8e431 100644 --- a/src/agent/solana/oracle.rs +++ b/src/agent/solana/oracle.rs @@ -123,6 +123,7 @@ pub fn spawn_oracle( config: Config, rpc_url: &str, wss_url: &str, + rpc_timeout: Duration, key_store: KeyStore, global_store_update_tx: mpsc::Sender, logger: Logger, @@ -135,6 +136,7 @@ pub fn spawn_oracle( let subscriber = Subscriber::new( rpc_url.to_string(), wss_url.to_string(), + rpc_timeout, config.commitment, key_store.program_key.clone(), updates_tx, @@ -149,6 +151,7 @@ pub fn spawn_oracle( data_tx, key_store.mapping_key, rpc_url, + rpc_timeout, config.commitment, config.poll_interval_duration, logger.clone(), @@ -337,12 +340,16 @@ impl Poller { data_tx: mpsc::Sender, mapping_account_key: Pubkey, rpc_url: &str, + rpc_timeout: Duration, commitment: CommitmentLevel, poll_interval_duration: Duration, logger: Logger, ) -> Self { - let rpc_client = - RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig { commitment }); + let rpc_client = RpcClient::new_with_timeout_and_commitment( + rpc_url.to_string(), + rpc_timeout, + CommitmentConfig { commitment }, + ); let poll_interval = tokio::time::interval(poll_interval_duration); Poller { @@ -513,6 +520,7 @@ mod subscriber { BlockchainShadow, SyncOptions, }, + std::time::Duration, tokio::sync::{ broadcast, mpsc, @@ -527,6 +535,9 @@ mod subscriber { /// WSS RPC endpoint wss_url: String, + /// Timeout for RPC requests + rpc_timeout: Duration, + /// Commitment level used to read account data commitment: CommitmentLevel, @@ -544,6 +555,7 @@ mod subscriber { pub fn new( rpc_url: String, wss_url: String, + rpc_timeout: Duration, commitment: CommitmentLevel, account_key: Pubkey, updates_tx: mpsc::Sender<(Pubkey, solana_sdk::account::Account)>, @@ -552,6 +564,7 @@ mod subscriber { Subscriber { rpc_url, wss_url, + rpc_timeout, commitment, account_key, updates_tx, @@ -597,6 +610,7 @@ mod subscriber { self.wss_url.clone(), ), commitment: self.commitment, + rpc_timeout: self.rpc_timeout, max_lag: Some(10000), ..SyncOptions::default() }, From d7d500f0f01ca351d4928e975c6a90875dc5b7d3 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Fri, 17 Feb 2023 10:39:06 +0000 Subject: [PATCH 5/5] Bump warp dependency Suggested by dependabot --- Cargo.lock | 61 +++++++----------------------------------------------- Cargo.toml | 2 +- 2 files changed, 9 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a73cddaa..abbda64b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1240,7 +1240,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util 0.7.3", + "tokio-util", "tracing", ] @@ -2417,7 +2417,7 @@ dependencies = [ "tokio", "tokio-retry", "tokio-stream", - "tokio-util 0.7.3", + "tokio-util", "tracing", "warp", ] @@ -2731,7 +2731,7 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-rustls", - "tokio-util 0.7.3", + "tokio-util", "tower-service", "url", "wasm-bindgen", @@ -4233,19 +4233,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-tungstenite" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "511de3f85caf1c98983545490c3d09685fa8eb634e57eec22bb4db271f46cbd8" -dependencies = [ - "futures-util", - "log", - "pin-project", - "tokio", - "tungstenite 0.14.0", -] - [[package]] name = "tokio-tungstenite" version = "0.16.1" @@ -4276,20 +4263,6 @@ dependencies = [ "webpki-roots", ] -[[package]] -name = "tokio-util" -version = "0.6.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "log", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.3" @@ -4372,25 +4345,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" -[[package]] -name = "tungstenite" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0b2d8558abd2e276b0a8df5c05a2ec762609344191e5fd23e292c910e9165b5" -dependencies = [ - "base64 0.13.0", - "byteorder", - "bytes", - "http", - "httparse", - "log", - "rand 0.8.5", - "sha-1 0.9.8", - "thiserror", - "url", - "utf-8", -] - [[package]] name = "tungstenite" version = "0.16.0" @@ -4576,9 +4530,9 @@ dependencies = [ [[package]] name = "warp" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cef4e1e9114a4b7f1ac799f16ce71c14de5778500c5450ec6b7b920c55b587e" +checksum = "ed7b8be92646fc3d18b06147664ebc5f48d222686cb11a8755e561a735aacc6d" dependencies = [ "bytes", "futures-channel", @@ -4592,14 +4546,15 @@ dependencies = [ "multipart", "percent-encoding", "pin-project", + "rustls-pemfile 0.2.1", "scoped-tls", "serde", "serde_json", "serde_urlencoded", "tokio", "tokio-stream", - "tokio-tungstenite 0.15.0", - "tokio-util 0.6.10", + "tokio-tungstenite 0.17.2", + "tokio-util", "tower-service", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 02383f2b..6ce9413a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ path = "src/bin/agent.rs" anyhow = "1.0.55" serde = { version = "1.0.136", features = ["derive"] } async-trait = "0.1.52" -warp = { version = "0.3.1", features = ["websocket"] } +warp = { version = "0.3.3", features = ["websocket"] } tokio = { version = "1.0", features = ["full"] } tokio-stream = "0.1.1" futures-util = { version = "0.3", default-features = false, features = [