@@ -146,9 +146,7 @@ pub fn spawn_oracle(
146146 let ( updates_tx, updates_rx) = mpsc:: channel ( config. updates_channel_capacity ) ;
147147 if config. subscriber_enabled {
148148 let subscriber = Subscriber :: new (
149- rpc_url. to_string ( ) ,
150149 wss_url. to_string ( ) ,
151- rpc_timeout,
152150 config. commitment ,
153151 key_store. program_key ,
154152 updates_tx,
@@ -658,14 +656,9 @@ mod subscriber {
658656 /// Subscriber subscribes to all changes on the given account, and sends those changes
659657 /// on updates_tx. This is a convenience wrapper around the Blockchain Shadow crate.
660658 pub struct Subscriber {
661- /// HTTP RPC endpoint
662- rpc_url : String ,
663659 /// WSS RPC endpoint
664660 wss_url : String ,
665661
666- /// Timeout for RPC requests
667- rpc_timeout : Duration ,
668-
669662 /// Commitment level used to read account data
670663 commitment : CommitmentLevel ,
671664
@@ -681,18 +674,14 @@ mod subscriber {
681674
682675 impl Subscriber {
683676 pub fn new (
684- rpc_url : String ,
685677 wss_url : String ,
686- rpc_timeout : Duration ,
687678 commitment : CommitmentLevel ,
688679 program_key : Pubkey ,
689680 updates_tx : mpsc:: Sender < ( Pubkey , solana_sdk:: account:: Account ) > ,
690681 logger : Logger ,
691682 ) -> Self {
692683 Subscriber {
693- rpc_url,
694684 wss_url,
695- rpc_timeout,
696685 commitment,
697686 program_key,
698687 updates_tx,
@@ -707,7 +696,8 @@ mod subscriber {
707696 error ! ( self . logger, "{}" , err) ;
708697 debug ! ( self . logger, "error context" ; "context" => format!( "{:?}" , err) ) ;
709698 if current_time. elapsed ( ) < Duration :: from_secs ( 30 ) {
710- tracing:: error!(
699+ warn ! (
700+ self . logger,
711701 "Subscriber restarting too quickly. Sleeping for 1 second."
712702 ) ;
713703 tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
@@ -717,15 +707,15 @@ mod subscriber {
717707 }
718708
719709 pub async fn start ( & self ) -> Result < ( ) > {
720- debug ! ( self . logger, "subscribed to program account updates" ; "program_key" => self . program_key. to_string( ) ) ;
721-
722710 let client = PubsubClient :: new ( self . wss_url . as_str ( ) )
723711 . await
724712 . expect ( "failed to create pubsub client" ) ;
725713
726714 let config = RpcProgramAccountsConfig {
727715 account_config : RpcAccountInfoConfig {
728- commitment : Some ( CommitmentConfig :: confirmed ( ) ) ,
716+ commitment : Some ( CommitmentConfig {
717+ commitment : self . commitment ,
718+ } ) ,
729719 encoding : Some ( UiAccountEncoding :: Base64Zstd ) ,
730720 ..Default :: default ( )
731721 } ,
@@ -737,6 +727,8 @@ mod subscriber {
737727 . program_subscribe ( & self . program_key , Some ( config) )
738728 . await ?;
739729
730+ debug ! ( self . logger, "subscribed to program account updates" ; "program_key" => self . program_key. to_string( ) ) ;
731+
740732 loop {
741733 match tokio_stream:: StreamExt :: next ( & mut notif) . await {
742734 Some ( update) => {
@@ -754,7 +746,8 @@ mod subscriber {
754746 . map_err ( |_| anyhow ! ( "failed to send update to oracle" ) ) ?;
755747 }
756748 None => {
757- return Err ( anyhow ! ( "Subscriber closed connection" ) ) ;
749+ debug ! ( self . logger, "subscriber closed connection" ) ;
750+ return Ok ( ( ) ) ;
758751 }
759752 }
760753 }
0 commit comments