1010 Context ,
1111 Result ,
1212 } ,
13- futures_util:: future:: join_all,
1413 pyth_sdk_solana:: state:: {
1514 load_mapping_account,
1615 load_price_account,
3534 HashMap ,
3635 HashSet ,
3736 } ,
38- iter:: zip,
3937 time:: Duration ,
4038 } ,
4139 tokio:: {
@@ -48,15 +46,15 @@ use {
4846#[ derive( Default , Debug , Clone ) ]
4947pub struct Data {
5048 pub mapping_accounts : HashMap < Pubkey , MappingAccount > ,
51- pub product_accounts : HashMap < Pubkey , ProductAccount > ,
52- pub price_accounts : HashMap < Pubkey , PriceAccount > ,
49+ pub product_accounts : HashMap < Pubkey , ProductEntry > ,
50+ pub price_accounts : HashMap < Pubkey , PriceEntry > ,
5351}
5452
5553impl Data {
5654 fn new (
5755 mapping_accounts : HashMap < Pubkey , MappingAccount > ,
58- product_accounts : HashMap < Pubkey , ProductAccount > ,
59- price_accounts : HashMap < Pubkey , PriceAccount > ,
56+ product_accounts : HashMap < Pubkey , ProductEntry > ,
57+ price_accounts : HashMap < Pubkey , PriceEntry > ,
6058 ) -> Self {
6159 Data {
6260 mapping_accounts,
@@ -68,11 +66,11 @@ impl Data {
6866
6967pub type MappingAccount = pyth_sdk_solana:: state:: MappingAccount ;
7068#[ derive( Debug , Clone ) ]
71- pub struct ProductAccount {
69+ pub struct ProductEntry {
7270 pub account_data : pyth_sdk_solana:: state:: ProductAccount ,
7371 pub price_accounts : Vec < Pubkey > ,
7472}
75- pub type PriceAccount = pyth_sdk_solana:: state:: PriceAccount ;
73+ pub type PriceEntry = pyth_sdk_solana:: state:: PriceAccount ;
7674
7775// Oracle is responsible for fetching Solana account data stored in the Pyth on-chain Oracle.
7876pub struct Oracle {
@@ -105,6 +103,13 @@ pub struct Config {
105103 pub updates_channel_capacity : usize ,
106104 /// Capacity of the channel over which the Poller sends data to the Oracle
107105 pub data_channel_capacity : usize ,
106+
107+ /// Ask the RPC for up to this many product/price accounts in a
108+ /// single request. Tune this setting if you're experiencing
109+ /// timeouts on data fetching. In order to keep concurrent open
110+ /// socket count at bay, the batches are looked up sequentially,
111+ /// trading off overall time it takes to fetch all symbols.
112+ pub max_lookup_batch_size : usize ,
108113}
109114
110115impl Default for Config {
@@ -115,6 +120,7 @@ impl Default for Config {
115120 subscriber_enabled : true ,
116121 updates_channel_capacity : 10000 ,
117122 data_channel_capacity : 10000 ,
123+ max_lookup_batch_size : 200 ,
118124 }
119125 }
120126}
@@ -154,6 +160,7 @@ pub fn spawn_oracle(
154160 rpc_timeout,
155161 config. commitment ,
156162 config. poll_interval_duration ,
163+ config. max_lookup_batch_size ,
157164 logger. clone ( ) ,
158165 ) ;
159166 jhs. push ( tokio:: spawn ( async move { poller. run ( ) . await } ) ) ;
@@ -292,7 +299,7 @@ impl Oracle {
292299 async fn notify_product_account_update (
293300 & self ,
294301 account_key : & Pubkey ,
295- account : & ProductAccount ,
302+ account : & ProductEntry ,
296303 ) -> Result < ( ) > {
297304 self . global_store_tx
298305 . send ( global:: Update :: ProductAccountUpdate {
@@ -306,7 +313,7 @@ impl Oracle {
306313 async fn notify_price_account_update (
307314 & self ,
308315 account_key : & Pubkey ,
309- account : & PriceAccount ,
316+ account : & PriceEntry ,
310317 ) -> Result < ( ) > {
311318 self . global_store_tx
312319 . send ( global:: Update :: PriceAccountUpdate {
@@ -331,6 +338,9 @@ struct Poller {
331338 /// The interval with which to poll for data
332339 poll_interval : Interval ,
333340
341+ /// Passed from Oracle config
342+ max_lookup_batch_size : usize ,
343+
334344 /// Logger
335345 logger : Logger ,
336346}
@@ -343,6 +353,7 @@ impl Poller {
343353 rpc_timeout : Duration ,
344354 commitment : CommitmentLevel ,
345355 poll_interval_duration : Duration ,
356+ max_lookup_batch_size : usize ,
346357 logger : Logger ,
347358 ) -> Self {
348359 let rpc_client = RpcClient :: new_with_timeout_and_commitment (
@@ -357,6 +368,7 @@ impl Poller {
357368 mapping_account_key,
358369 rpc_client,
359370 poll_interval,
371+ max_lookup_batch_size,
360372 logger,
361373 }
362374 }
@@ -419,88 +431,120 @@ impl Poller {
419431 async fn fetch_product_and_price_accounts < ' a , A > (
420432 & self ,
421433 mapping_accounts : A ,
422- ) -> Result < (
423- HashMap < Pubkey , ProductAccount > ,
424- HashMap < Pubkey , PriceAccount > ,
425- ) >
434+ ) -> Result < ( HashMap < Pubkey , ProductEntry > , HashMap < Pubkey , PriceEntry > ) >
426435 where
427436 A : IntoIterator < Item = & ' a MappingAccount > ,
428437 {
429- let mut pubkeys = vec ! [ ] ;
430- let mut futures = vec ! [ ] ;
438+ let mut product_keys = vec ! [ ] ;
431439
432- // Fetch all product accounts in parallel
440+ // Get all product keys
433441 for mapping_account in mapping_accounts {
434442 for account_key in mapping_account
435443 . products
436444 . iter ( )
437445 . filter ( |pubkey| * * pubkey != Pubkey :: default ( ) )
438446 {
439- pubkeys. push ( account_key. clone ( ) ) ;
440- futures. push ( self . fetch_product_account ( account_key) ) ;
447+ product_keys. push ( account_key. clone ( ) ) ;
441448 }
442449 }
443450
444- let future_results = join_all ( futures)
445- . await
446- . into_iter ( )
447- . collect :: < Result < Vec < _ > > > ( ) ?;
448-
449- let product_accounts = zip (
450- pubkeys. into_iter ( ) ,
451- future_results
452- . clone ( )
453- . into_iter ( )
454- . map ( |( product_account, _) | product_account) ,
455- )
456- . collect ( ) ;
457-
458- let price_accounts = future_results
459- . into_iter ( )
460- . flat_map ( |( _, price_accounts) | price_accounts. into_iter ( ) )
461- . collect ( ) ;
462-
463- Ok ( ( product_accounts, price_accounts) )
451+ let mut product_entries = HashMap :: new ( ) ;
452+ let mut price_entries = HashMap :: new ( ) ;
453+
454+ // Lookup products and their prices using the configured batch size
455+ for product_key_batch in product_keys. as_slice ( ) . chunks ( self . max_lookup_batch_size ) {
456+ let ( mut batch_products, mut batch_prices) = self
457+ . fetch_batch_of_product_and_price_accounts ( product_key_batch)
458+ . await ?;
459+
460+ product_entries. extend ( batch_products. drain ( ) ) ;
461+ price_entries. extend ( batch_prices. drain ( ) ) ;
462+ }
463+
464+ Ok ( ( product_entries, price_entries) )
464465 }
465466
466- async fn fetch_product_account (
467+ async fn fetch_batch_of_product_and_price_accounts (
467468 & self ,
468- product_account_key : & Pubkey ,
469- ) -> Result < ( ProductAccount , HashMap < Pubkey , PriceAccount > ) > {
470- // Fetch the product account
471- let product_account = * load_product_account (
472- & self
473- . rpc_client
474- . get_account_data ( product_account_key)
475- . await ?,
476- )
477- . with_context ( || format ! ( "load product account {}" , product_account_key) ) ?;
478-
479- // Fetch the price accounts associated with this product account
480- let mut price_accounts = HashMap :: new ( ) ;
481- let mut price_account_key = product_account. px_acc ;
482- while price_account_key != Pubkey :: default ( ) {
483- let price_account = self . fetch_price_account ( & price_account_key) . await ?;
484- price_accounts. insert ( price_account_key, price_account) ;
485-
486- price_account_key = price_account. next ;
469+ product_key_batch : & [ Pubkey ] ,
470+ ) -> Result < ( HashMap < Pubkey , ProductEntry > , HashMap < Pubkey , PriceEntry > ) > {
471+ let mut product_entries = HashMap :: new ( ) ;
472+
473+ let product_keys = product_key_batch;
474+
475+ // Look up the batch with a single request
476+ let product_accounts = self . rpc_client . get_multiple_accounts ( product_keys) . await ?;
477+
478+ // Log missing products, fill the product entries with initial values
479+ for ( product_key, product_account) in product_keys. iter ( ) . zip ( product_accounts) {
480+ if let Some ( prod_acc) = product_account {
481+ let product = load_product_account ( prod_acc. data . as_slice ( ) )
482+ . context ( format ! ( "Could not parse product account {}" , product_key) ) ?;
483+
484+ product_entries. insert (
485+ * product_key,
486+ ProductEntry {
487+ account_data : * product,
488+ price_accounts : vec ! [ ] ,
489+ } ,
490+ ) ;
491+ } else {
492+ warn ! ( self . logger, "Oracle: Could not find product on chain, skipping" ;
493+ "product_key" => product_key. to_string( ) , ) ;
494+ }
487495 }
488496
489- // Create the product account object
490- let product_account = ProductAccount {
491- account_data : product_account,
492- price_accounts : price_accounts. keys ( ) . cloned ( ) . collect ( ) ,
493- } ;
497+ let mut price_entries = HashMap :: new ( ) ;
494498
495- Ok ( ( product_account, price_accounts) )
496- }
499+ // Starting with top-level prices, look up price accounts in
500+ // batches, filling price entries and adding found prices to
501+ // the product entries
502+ let mut todo = product_entries
503+ . values ( )
504+ . map ( |p| p. account_data . px_acc )
505+ . collect :: < Vec < _ > > ( ) ;
506+
507+ while !todo. is_empty ( ) {
508+ let price_accounts = self
509+ . rpc_client
510+ . get_multiple_accounts ( todo. as_slice ( ) )
511+ . await ?;
497512
498- async fn fetch_price_account ( & self , price_account_key : & Pubkey ) -> Result < PriceAccount > {
499- let data = self . rpc_client . get_account_data ( price_account_key) . await ?;
500- let price_account = * load_price_account ( & data)
501- . with_context ( || format ! ( "load price account {}" , price_account_key) ) ?;
513+ // Any non-zero price.next pubkey will be gathered here and looked up on next iteration
514+ let mut next_todo = vec ! [ ] ;
515+
516+ // Process the response of each lookup request. If there's
517+ // a next price, it will be looked up on next iteration,
518+ // as todo gets replaced with next_todo.
519+ for ( price_key, price_account) in todo. iter ( ) . zip ( price_accounts) {
520+ if let Some ( price_acc) = price_account {
521+ let price = load_price_account ( & price_acc. data )
522+ . context ( format ! ( "Could not parse price account at {}" , price_key) ) ?;
523+
524+ if let Some ( prod) = product_entries. get_mut ( & price. prod ) {
525+ prod. price_accounts . push ( * price_key) ;
526+ price_entries. insert ( * price_key, * price) ;
527+ } else {
528+ warn ! ( self . logger, "Could not find product entry for price, listed in its prod field, skipping" ;
529+ "missing_product" => price. prod. to_string( ) ,
530+ "price_key" => price_key. to_string( ) ,
531+ ) ;
532+
533+ continue ;
534+ }
535+
536+ if price. next != Pubkey :: default ( ) {
537+ next_todo. push ( price. next . clone ( ) ) ;
538+ }
539+ } else {
540+ warn ! ( self . logger, "Could not look up price account on chain, skipping" ; "price_key" => price_key. to_string( ) , ) ;
541+ continue ;
542+ }
543+ }
502544
503- Ok ( price_account)
545+ todo = next_todo;
546+ }
547+ Ok ( ( product_entries, price_entries) )
504548 }
505549}
506550
0 commit comments