@@ -13,19 +13,19 @@ use {
1313 wormhole:: Wormhole ,
1414 } ,
1515 } ,
16- anyhow:: { anyhow, Result } ,
16+ anyhow:: { anyhow, bail , Result } ,
1717 borsh:: BorshDeserialize ,
1818 futures:: stream:: StreamExt ,
1919 pyth_sdk:: PriceIdentifier ,
20- pyth_sdk_solana:: state:: { load_mapping_account , load_product_account} ,
20+ pyth_sdk_solana:: state:: load_product_account,
2121 solana_account_decoder:: UiAccountEncoding ,
2222 solana_client:: {
2323 nonblocking:: { pubsub_client:: PubsubClient , rpc_client:: RpcClient } ,
2424 rpc_config:: { RpcAccountInfoConfig , RpcProgramAccountsConfig } ,
2525 rpc_filter:: { Memcmp , MemcmpEncodedBytes , RpcFilterType } ,
2626 } ,
2727 solana_sdk:: {
28- account:: Account , bs58 , commitment_config:: CommitmentConfig , pubkey:: Pubkey , system_program,
28+ account:: Account , commitment_config:: CommitmentConfig , pubkey:: Pubkey , system_program,
2929 } ,
3030 std:: { collections:: BTreeMap , sync:: Arc , time:: Duration } ,
3131 tokio:: time:: Instant ,
@@ -230,6 +230,104 @@ where
230230 Ok ( ( ) )
231231}
232232
233+ pub async fn fetch_and_store_price_feeds_metadata < S > (
234+ state : & S ,
235+ oracle_program_address : & Pubkey ,
236+ rpc_client : & RpcClient ,
237+ ) -> Result < Vec < PriceFeedMetadata > >
238+ where
239+ S : PriceFeedMeta + Aggregates ,
240+ {
241+ let price_feeds_metadata =
242+ fetch_price_feeds_metadata ( oracle_program_address, rpc_client) . await ?;
243+
244+ // Wait for the crosschain price feed ids to be available in the state
245+ // This is to prune the price feeds that are not available crosschain yet (i.e. they are coming soon)
246+ let mut all_ids;
247+ let mut retry_count = 0 ;
248+ loop {
249+ all_ids = Aggregates :: get_price_feed_ids ( state) . await ;
250+ if !all_ids. is_empty ( ) {
251+ break ;
252+ }
253+ tracing:: info!( "Waiting for price feed ids..." ) ;
254+ tokio:: time:: sleep ( Duration :: from_secs ( retry_count + 1 ) ) . await ;
255+ retry_count += 1 ;
256+ if retry_count > 10 {
257+ bail ! ( "Failed to fetch price feed ids after 10 retries" ) ;
258+ }
259+ }
260+
261+ // Filter price_feeds_metadata to only include entries with IDs in all_ids
262+ let filtered_metadata: Vec < PriceFeedMetadata > = price_feeds_metadata
263+ . into_iter ( )
264+ . filter ( |metadata| all_ids. contains ( & PriceIdentifier :: from ( metadata. id ) ) )
265+ . collect ( ) ;
266+
267+ state. store_price_feeds_metadata ( & filtered_metadata) . await ?;
268+ Ok ( filtered_metadata)
269+ }
270+
271+ async fn fetch_price_feeds_metadata (
272+ oracle_program_address : & Pubkey ,
273+ rpc_client : & RpcClient ,
274+ ) -> Result < Vec < PriceFeedMetadata > > {
275+ let product_accounts = rpc_client
276+ . get_program_accounts_with_config (
277+ oracle_program_address,
278+ RpcProgramAccountsConfig {
279+ filters : Some ( vec ! [ RpcFilterType :: Memcmp ( Memcmp :: new(
280+ 0 , // offset
281+ // Product account header: <magic:u32le:0xa1b2c3d4> <version:u32le:0x02> <account_type:u32le:0x02>
282+ // The string literal in hex::decode is represented as be (big endian).
283+ MemcmpEncodedBytes :: Bytes ( hex:: decode( "d4c3b2a10200000002000000" ) . unwrap( ) ) ,
284+ ) ) ] ) ,
285+ account_config : RpcAccountInfoConfig {
286+ encoding : Some ( UiAccountEncoding :: Base64Zstd ) ,
287+ commitment : Some ( CommitmentConfig :: confirmed ( ) ) ,
288+ ..Default :: default ( )
289+ } ,
290+ ..Default :: default ( )
291+ } ,
292+ )
293+ . await ?;
294+
295+ let price_feeds_metadata: Vec < PriceFeedMetadata > = product_accounts
296+ . into_iter ( )
297+ . filter_map (
298+ |( pubkey, account) | match load_product_account ( & account. data ) {
299+ Ok ( product_account) => {
300+ if product_account. px_acc == Pubkey :: default ( ) {
301+ return None ;
302+ }
303+
304+ let attributes = product_account
305+ . iter ( )
306+ . filter ( |( key, _) | !key. is_empty ( ) )
307+ . map ( |( key, val) | ( key. to_string ( ) , val. to_string ( ) ) )
308+ . collect :: < BTreeMap < String , String > > ( ) ;
309+
310+ Some ( PriceFeedMetadata {
311+ id : RpcPriceIdentifier :: new ( product_account. px_acc . to_bytes ( ) ) ,
312+ attributes,
313+ } )
314+ }
315+ Err ( e) => {
316+ tracing:: warn!( error = ?e, pubkey = ?pubkey, "Error loading product account" ) ;
317+ None
318+ }
319+ } ,
320+ )
321+ . collect ( ) ;
322+
323+ tracing:: info!(
324+ len = price_feeds_metadata. len( ) ,
325+ "Fetched price feeds metadata"
326+ ) ;
327+
328+ Ok ( price_feeds_metadata)
329+ }
330+
233331#[ tracing:: instrument( skip( opts, state) ) ]
234332pub async fn spawn < S > ( opts : RunOptions , state : Arc < S > ) -> Result < ( ) >
235333where
@@ -300,9 +398,10 @@ where
300398 let mut exit = crate :: EXIT . subscribe ( ) ;
301399 tokio:: spawn ( async move {
302400 // Run fetch and store once before the loop
401+ tracing:: info!( "Fetching and storing price feeds metadata..." ) ;
303402 if let Err ( e) = fetch_and_store_price_feeds_metadata (
304403 price_feeds_state. as_ref ( ) ,
305- & opts. pythnet . mapping_addr ,
404+ & opts. pythnet . oracle_program_addr ,
306405 & rpc_client,
307406 )
308407 . await
@@ -316,9 +415,10 @@ where
316415 tokio:: select! {
317416 _ = exit. changed( ) => break ,
318417 _ = tokio:: time:: sleep( Duration :: from_secs( DEFAULT_PRICE_FEEDS_CACHE_UPDATE_INTERVAL ) ) => {
418+ tracing:: info!( "Fetching and storing price feeds metadata..." ) ;
319419 if let Err ( e) = fetch_and_store_price_feeds_metadata(
320420 price_feeds_state. as_ref( ) ,
321- & opts. pythnet. mapping_addr ,
421+ & opts. pythnet. oracle_program_addr ,
322422 & rpc_client,
323423 )
324424 . await
@@ -338,92 +438,3 @@ where
338438 ) ;
339439 Ok ( ( ) )
340440}
341-
342- pub async fn fetch_and_store_price_feeds_metadata < S > (
343- state : & S ,
344- mapping_address : & Pubkey ,
345- rpc_client : & RpcClient ,
346- ) -> Result < Vec < PriceFeedMetadata > >
347- where
348- S : PriceFeedMeta + Aggregates ,
349- {
350- let price_feeds_metadata = fetch_price_feeds_metadata ( mapping_address, rpc_client) . await ?;
351- let all_ids = Aggregates :: get_price_feed_ids ( state) . await ;
352-
353- // Filter price_feeds_metadata to only include entries with IDs in all_ids
354- let filtered_metadata: Vec < PriceFeedMetadata > = price_feeds_metadata
355- . into_iter ( )
356- . filter ( |metadata| all_ids. contains ( & PriceIdentifier :: from ( metadata. id ) ) )
357- . collect ( ) ;
358-
359- state. store_price_feeds_metadata ( & filtered_metadata) . await ?;
360- Ok ( filtered_metadata)
361- }
362-
363- async fn fetch_price_feeds_metadata (
364- mapping_address : & Pubkey ,
365- rpc_client : & RpcClient ,
366- ) -> Result < Vec < PriceFeedMetadata > > {
367- let mut price_feeds_metadata = Vec :: < PriceFeedMetadata > :: new ( ) ;
368- let mapping_data = rpc_client. get_account_data ( mapping_address) . await ?;
369- let mapping_acct = load_mapping_account ( & mapping_data) ?;
370-
371- // Split product keys into chunks of 150 to avoid too many open files error (error trying to connect: tcp open error: Too many open files (os error 24))
372- for product_keys_chunk in mapping_acct
373- . products
374- . iter ( )
375- . filter ( |& prod_pkey| * prod_pkey != Pubkey :: default ( ) )
376- . collect :: < Vec < _ > > ( )
377- . chunks ( 150 )
378- {
379- // Prepare a list of futures for fetching product account data for each chunk
380- let fetch_product_data_futures = product_keys_chunk
381- . iter ( )
382- . map ( |prod_pkey| rpc_client. get_account_data ( prod_pkey) )
383- . collect :: < Vec < _ > > ( ) ;
384-
385- // Await all futures concurrently within the chunk
386- let products_data_results = futures:: future:: join_all ( fetch_product_data_futures) . await ;
387-
388- for prod_data_result in products_data_results {
389- match prod_data_result {
390- Ok ( prod_data) => {
391- let prod_acct = match load_product_account ( & prod_data) {
392- Ok ( prod_acct) => prod_acct,
393- Err ( e) => {
394- println ! ( "Error loading product account: {}" , e) ;
395- continue ;
396- }
397- } ;
398-
399- // TODO: Add stricter type checking for attributes
400- let attributes = prod_acct
401- . iter ( )
402- . filter ( |( key, _) | !key. is_empty ( ) )
403- . map ( |( key, val) | ( key. to_string ( ) , val. to_string ( ) ) )
404- . collect :: < BTreeMap < String , String > > ( ) ;
405-
406- if prod_acct. px_acc != Pubkey :: default ( ) {
407- let px_pkey = prod_acct. px_acc ;
408- let px_pkey_bytes = bs58:: decode ( & px_pkey. to_string ( ) ) . into_vec ( ) ?;
409- let px_pkey_array: [ u8 ; 32 ] = px_pkey_bytes
410- . try_into ( )
411- . expect ( "Invalid length for PriceIdentifier" ) ;
412-
413- let price_feed_metadata = PriceFeedMetadata {
414- id : RpcPriceIdentifier :: new ( px_pkey_array) ,
415- attributes,
416- } ;
417-
418- price_feeds_metadata. push ( price_feed_metadata) ;
419- }
420- }
421- Err ( e) => {
422- println ! ( "Error loading product account: {}" , e) ;
423- continue ;
424- }
425- }
426- }
427- }
428- Ok ( price_feeds_metadata)
429- }
0 commit comments