Skip to content

Commit 0cdbca5

Browse files
committed
Move timeout to network level
1 parent 7d07b7e commit 0cdbca5

File tree

4 files changed

+47
-23
lines changed

4 files changed

+47
-23
lines changed

config/config.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ key_store.root_path = "/path/to/keystore"
6767
# This can be omitted when oracle.subscriber_enabled is set to false.
6868
# wss_url = "ws://api.devnet.solana.com"
6969

70+
# Timeout for the requests to the RPC
71+
# rpc_timeout = "10s"
72+
7073
# Path to the key store.
7174
# key_store.root_path = "/path/to/keystore"
7275

@@ -89,9 +92,6 @@ key_store.root_path = "/path/to/keystore"
8992
# Age after which a price update is considered stale and not published
9093
# exporter.staleness_threshold = "5s"
9194

92-
# RPC timeout for the requests to the RPC for sending price updates and monitoring them
93-
# exporter.rpc_timeout = "10s"
94-
9595
# Maximum size of a batch
9696
# exporter.max_batch_size = 12
9797

src/agent/solana.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub mod network {
2424
Serialize,
2525
},
2626
slog::Logger,
27+
std::time::Duration,
2728
tokio::{
2829
sync::{
2930
mpsc,
@@ -38,25 +39,29 @@ pub mod network {
3839
#[serde(default)]
3940
pub struct Config {
4041
/// HTTP RPC endpoint
41-
pub rpc_url: String,
42+
pub rpc_url: String,
4243
/// WSS RPC endpoint
43-
pub wss_url: String,
44+
pub wss_url: String,
45+
/// Timeout for the requests to the RPC
46+
#[serde(with = "humantime_serde")]
47+
pub rpc_timeout: Duration,
4448
/// Keystore
45-
pub key_store: key_store::Config,
49+
pub key_store: key_store::Config,
4650
/// Configuration for the Oracle reading data from this network
47-
pub oracle: oracle::Config,
51+
pub oracle: oracle::Config,
4852
/// Configuration for the Exporter publishing data to this network
49-
pub exporter: exporter::Config,
53+
pub exporter: exporter::Config,
5054
}
5155

5256
impl Default for Config {
5357
fn default() -> Self {
5458
Self {
55-
rpc_url: "http://localhost:8899".to_string(),
56-
wss_url: "ws://localhost:8900".to_string(),
57-
key_store: Default::default(),
58-
oracle: Default::default(),
59-
exporter: Default::default(),
59+
rpc_url: "http://localhost:8899".to_string(),
60+
wss_url: "ws://localhost:8900".to_string(),
61+
rpc_timeout: Duration::from_secs(10),
62+
key_store: Default::default(),
63+
oracle: Default::default(),
64+
exporter: Default::default(),
6065
}
6166
}
6267
}
@@ -72,6 +77,7 @@ pub mod network {
7277
config.oracle.clone(),
7378
&config.rpc_url,
7479
&config.wss_url,
80+
config.rpc_timeout,
7581
KeyStore::new(config.key_store.clone())?,
7682
global_store_update_tx,
7783
logger.clone(),
@@ -81,6 +87,7 @@ pub mod network {
8187
let exporter_jhs = exporter::spawn_exporter(
8288
config.exporter,
8389
&config.rpc_url,
90+
config.rpc_timeout,
8491
KeyStore::new(config.key_store.clone())?,
8592
local_store_tx,
8693
logger,

src/agent/solana/exporter.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,6 @@ pub struct Config {
9797
/// Age after which a price update is considered stale and not published
9898
#[serde(with = "humantime_serde")]
9999
pub staleness_threshold: Duration,
100-
/// RPC timeout for the requests to the RPC for sending price updates and monitoring them
101-
#[serde(with = "humantime_serde")]
102-
pub rpc_timeout: Duration,
103100
/// Maximum size of a batch
104101
pub max_batch_size: usize,
105102
/// Capacity of the channel between the Exporter and the Transaction Monitor
@@ -120,7 +117,6 @@ impl Default for Config {
120117
refresh_network_state_interval_duration: Duration::from_millis(200),
121118
publish_interval_duration: Duration::from_secs(1),
122119
staleness_threshold: Duration::from_secs(5),
123-
rpc_timeout: Duration::from_secs(10),
124120
max_batch_size: 12,
125121
inflight_transactions_channel_capacity: 10000,
126122
transaction_monitor: Default::default(),
@@ -134,14 +130,16 @@ impl Default for Config {
134130
pub fn spawn_exporter(
135131
config: Config,
136132
rpc_url: &str,
133+
rpc_timeout: Duration,
137134
key_store: KeyStore,
138135
local_store_tx: Sender<store::local::Message>,
139136
logger: Logger,
140137
) -> Result<Vec<JoinHandle<()>>> {
141138
// Create and spawn the network state querier
142139
let (network_state_tx, network_state_rx) = watch::channel(Default::default());
143140
let mut network_state_querier = NetworkStateQuerier::new(
144-
&rpc_url,
141+
rpc_url,
142+
rpc_timeout,
145143
time::interval(config.refresh_network_state_interval_duration),
146144
network_state_tx,
147145
logger.clone(),
@@ -154,6 +152,7 @@ pub fn spawn_exporter(
154152
let mut transaction_monitor = TransactionMonitor::new(
155153
config.transaction_monitor.clone(),
156154
rpc_url,
155+
rpc_timeout,
157156
transactions_rx,
158157
logger.clone(),
159158
);
@@ -163,6 +162,7 @@ pub fn spawn_exporter(
163162
let mut exporter = Exporter::new(
164163
config,
165164
rpc_url,
165+
rpc_timeout,
166166
key_store,
167167
local_store_tx,
168168
network_state_rx,
@@ -210,6 +210,7 @@ impl Exporter {
210210
pub fn new(
211211
config: Config,
212212
rpc_url: &str,
213+
rpc_timeout: Duration,
213214
key_store: KeyStore,
214215
local_store_tx: Sender<store::local::Message>,
215216
network_state_rx: watch::Receiver<NetworkState>,
@@ -218,7 +219,7 @@ impl Exporter {
218219
) -> Self {
219220
let publish_interval = time::interval(config.publish_interval_duration);
220221
Exporter {
221-
rpc_client: RpcClient::new_with_timeout(rpc_url.to_string(), config.rpc_timeout),
222+
rpc_client: RpcClient::new_with_timeout(rpc_url.to_string(), rpc_timeout),
222223
config,
223224
publish_interval,
224225
key_store,
@@ -449,12 +450,13 @@ struct NetworkStateQuerier {
449450
impl NetworkStateQuerier {
450451
pub fn new(
451452
rpc_endpoint: &str,
453+
rpc_timeout: Duration,
452454
query_interval: Interval,
453455
network_state_tx: watch::Sender<NetworkState>,
454456
logger: Logger,
455457
) -> Self {
456458
NetworkStateQuerier {
457-
rpc_client: RpcClient::new(rpc_endpoint.to_string()),
459+
rpc_client: RpcClient::new_with_timeout(rpc_endpoint.to_string(), rpc_timeout),
458460
query_interval,
459461
network_state_tx,
460462
logger,
@@ -562,11 +564,12 @@ mod transaction_monitor {
562564
pub fn new(
563565
config: Config,
564566
rpc_url: &str,
567+
rpc_timeout: Duration,
565568
transactions_rx: mpsc::Receiver<Signature>,
566569
logger: Logger,
567570
) -> Self {
568571
let poll_interval = time::interval(config.poll_interval_duration);
569-
let rpc_client = RpcClient::new(rpc_url.to_string());
572+
let rpc_client = RpcClient::new_with_timeout(rpc_url.to_string(), rpc_timeout);
570573
TransactionMonitor {
571574
config,
572575
rpc_client,

src/agent/solana/oracle.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ pub fn spawn_oracle(
123123
config: Config,
124124
rpc_url: &str,
125125
wss_url: &str,
126+
rpc_timeout: Duration,
126127
key_store: KeyStore,
127128
global_store_update_tx: mpsc::Sender<global::Update>,
128129
logger: Logger,
@@ -135,6 +136,7 @@ pub fn spawn_oracle(
135136
let subscriber = Subscriber::new(
136137
rpc_url.to_string(),
137138
wss_url.to_string(),
139+
rpc_timeout,
138140
config.commitment,
139141
key_store.program_key.clone(),
140142
updates_tx,
@@ -149,6 +151,7 @@ pub fn spawn_oracle(
149151
data_tx,
150152
key_store.mapping_key,
151153
rpc_url,
154+
rpc_timeout,
152155
config.commitment,
153156
config.poll_interval_duration,
154157
logger.clone(),
@@ -337,12 +340,16 @@ impl Poller {
337340
data_tx: mpsc::Sender<Data>,
338341
mapping_account_key: Pubkey,
339342
rpc_url: &str,
343+
rpc_timeout: Duration,
340344
commitment: CommitmentLevel,
341345
poll_interval_duration: Duration,
342346
logger: Logger,
343347
) -> Self {
344-
let rpc_client =
345-
RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig { commitment });
348+
let rpc_client = RpcClient::new_with_timeout_and_commitment(
349+
rpc_url.to_string(),
350+
rpc_timeout,
351+
CommitmentConfig { commitment },
352+
);
346353
let poll_interval = tokio::time::interval(poll_interval_duration);
347354

348355
Poller {
@@ -513,6 +520,7 @@ mod subscriber {
513520
BlockchainShadow,
514521
SyncOptions,
515522
},
523+
std::time::Duration,
516524
tokio::sync::{
517525
broadcast,
518526
mpsc,
@@ -527,6 +535,9 @@ mod subscriber {
527535
/// WSS RPC endpoint
528536
wss_url: String,
529537

538+
/// Timeout for RPC requests
539+
rpc_timeout: Duration,
540+
530541
/// Commitment level used to read account data
531542
commitment: CommitmentLevel,
532543

@@ -544,6 +555,7 @@ mod subscriber {
544555
pub fn new(
545556
rpc_url: String,
546557
wss_url: String,
558+
rpc_timeout: Duration,
547559
commitment: CommitmentLevel,
548560
account_key: Pubkey,
549561
updates_tx: mpsc::Sender<(Pubkey, solana_sdk::account::Account)>,
@@ -552,6 +564,7 @@ mod subscriber {
552564
Subscriber {
553565
rpc_url,
554566
wss_url,
567+
rpc_timeout,
555568
commitment,
556569
account_key,
557570
updates_tx,
@@ -597,6 +610,7 @@ mod subscriber {
597610
self.wss_url.clone(),
598611
),
599612
commitment: self.commitment,
613+
rpc_timeout: self.rpc_timeout,
600614
max_lag: Some(10000),
601615
..SyncOptions::default()
602616
},

0 commit comments

Comments
 (0)