88 exporter:: Exporter ,
99 transactions:: Transactions ,
1010 } ,
11+ utils:: rpc_multi_client:: RpcMultiClient ,
1112 } ,
1213 anyhow:: Result ,
1314 futures_util:: future,
1415 serde:: {
1516 Deserialize ,
1617 Serialize ,
1718 } ,
18- solana_client:: nonblocking:: rpc_client:: RpcClient ,
1919 solana_sdk:: commitment_config:: CommitmentConfig ,
2020 std:: {
2121 sync:: Arc ,
2727 time:: Interval ,
2828 } ,
2929 tracing:: instrument,
30+ url:: Url ,
3031} ;
3132
3233#[ derive( Clone , Serialize , Deserialize , Debug ) ]
@@ -111,7 +112,7 @@ pub struct NetworkState {
111112/// fetching the blockhash and slot number.
112113struct NetworkStateQuerier {
113114 /// The RPC client
114- rpc_client : RpcClient ,
115+ rpc_multi_client : RpcMultiClient ,
115116
116117 /// The interval with which to query the network state
117118 query_interval : Interval ,
@@ -122,20 +123,21 @@ struct NetworkStateQuerier {
122123
123124impl NetworkStateQuerier {
124125 #[ instrument(
125- skip( rpc_endpoint , rpc_timeout, query_interval) ,
126+ skip( rpc_urls , rpc_timeout, query_interval) ,
126127 fields(
127128 rpc_timeout = rpc_timeout. as_millis( ) ,
128129 query_interval = query_interval. period( ) . as_millis( ) ,
129130 )
130131 ) ]
131132 pub fn new (
132- rpc_endpoint : & str ,
133+ rpc_urls : & [ Url ] ,
133134 rpc_timeout : Duration ,
134135 query_interval : Interval ,
135136 network_state_tx : watch:: Sender < NetworkState > ,
136137 ) -> Self {
138+ let rpc_multi_client = RpcMultiClient :: new_with_timeout ( rpc_urls. to_vec ( ) , rpc_timeout) ;
137139 NetworkStateQuerier {
138- rpc_client : RpcClient :: new_with_timeout ( rpc_endpoint . to_string ( ) , rpc_timeout ) ,
140+ rpc_multi_client ,
139141 query_interval,
140142 network_state_tx,
141143 }
@@ -154,9 +156,9 @@ impl NetworkStateQuerier {
154156 async fn query_network_state ( & mut self ) -> Result < ( ) > {
155157 // Fetch the blockhash and current slot in parallel
156158 let current_slot_future = self
157- . rpc_client
159+ . rpc_multi_client
158160 . get_slot_with_commitment ( CommitmentConfig :: confirmed ( ) ) ;
159- let latest_blockhash_future = self . rpc_client . get_latest_blockhash ( ) ;
161+ let latest_blockhash_future = self . rpc_multi_client . get_latest_blockhash ( ) ;
160162
161163 let ( current_slot_result, latest_blockhash_result) =
162164 future:: join ( current_slot_future, latest_blockhash_future) . await ;
@@ -183,7 +185,7 @@ where
183185 // Create and spawn the network state querier
184186 let ( network_state_tx, network_state_rx) = watch:: channel ( Default :: default ( ) ) ;
185187 let mut network_state_querier = NetworkStateQuerier :: new (
186- & config. rpc_url ,
188+ & config. rpc_urls ,
187189 config. rpc_timeout ,
188190 tokio:: time:: interval ( config. exporter . refresh_network_state_interval_duration ) ,
189191 network_state_tx,
@@ -224,8 +226,9 @@ mod exporter {
224226 publish_batches,
225227 Exporter ,
226228 } ,
229+ utils:: rpc_multi_client:: RpcMultiClient ,
227230 } ,
228- solana_client :: nonblocking :: rpc_client :: RpcClient ,
231+ solana_sdk :: commitment_config :: CommitmentConfig ,
229232 std:: sync:: Arc ,
230233 tokio:: sync:: watch,
231234 } ;
@@ -243,10 +246,14 @@ mod exporter {
243246 let mut dynamic_compute_unit_price_update_interval =
244247 tokio:: time:: interval ( config. exporter . publish_interval_duration ) ;
245248
246- let client = Arc :: new ( RpcClient :: new_with_timeout (
247- config. rpc_url . to_string ( ) ,
248- config. rpc_timeout ,
249- ) ) ;
249+ let rpc_multi_client: Arc < RpcMultiClient > =
250+ Arc :: new ( RpcMultiClient :: new_with_timeout_and_commitment (
251+ config. rpc_urls . clone ( ) ,
252+ config. rpc_timeout ,
253+ CommitmentConfig {
254+ commitment : config. oracle . commitment ,
255+ } ,
256+ ) ) ;
250257 let Ok ( key_store) = KeyStore :: new ( config. key_store . clone ( ) ) else {
251258 tracing:: warn!( "Key store not available, Exporter won't start." ) ;
252259 return ;
@@ -265,7 +272,7 @@ mod exporter {
265272 let publisher_buffer_key = Exporter :: get_publisher_buffer_key( & * state) . await ;
266273 if let Err ( err) = publish_batches(
267274 state. clone( ) ,
268- client . clone( ) ,
275+ rpc_multi_client . clone( ) ,
269276 network,
270277 & network_state_rx,
271278 key_store. accumulator_key,
@@ -293,7 +300,7 @@ mod exporter {
293300 if let Err ( err) = Exporter :: update_recent_compute_unit_price(
294301 & * state,
295302 & publish_keypair,
296- & client ,
303+ & rpc_multi_client ,
297304 config. exporter. staleness_threshold,
298305 config. exporter. unchanged_publish_threshold,
299306 ) . await {
@@ -312,12 +319,12 @@ mod transaction_monitor {
312319 crate :: agent:: {
313320 solana:: network,
314321 state:: transactions:: Transactions ,
322+ utils:: rpc_multi_client:: RpcMultiClient ,
315323 } ,
316324 serde:: {
317325 Deserialize ,
318326 Serialize ,
319327 } ,
320- solana_client:: nonblocking:: rpc_client:: RpcClient ,
321328 std:: {
322329 sync:: Arc ,
323330 time:: Duration ,
@@ -352,13 +359,16 @@ mod transaction_monitor {
352359 where
353360 S : Transactions ,
354361 {
355- let client = RpcClient :: new_with_timeout ( config. rpc_url . to_string ( ) , config. rpc_timeout ) ;
362+ let rpc_multi_client =
363+ RpcMultiClient :: new_with_timeout ( config. rpc_urls . clone ( ) , config. rpc_timeout ) ;
356364 let mut poll_interval =
357365 tokio:: time:: interval ( config. exporter . transaction_monitor . poll_interval_duration ) ;
358366
359367 loop {
360368 poll_interval. tick ( ) . await ;
361- if let Err ( err) = Transactions :: poll_transactions_status ( & * state, & client) . await {
369+ if let Err ( err) =
370+ Transactions :: poll_transactions_status ( & * state, & rpc_multi_client) . await
371+ {
362372 tracing:: error!( err = ?err, "Transaction monitor failed." ) ;
363373 }
364374 }
0 commit comments