@@ -33,10 +33,16 @@ use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature};
3333use bitcoin:: secp256k1:: { PublicKey , Scalar , Secp256k1 , SecretKey , Signing } ;
3434use bitcoin:: { ScriptBuf , Transaction , TxOut , Txid } ;
3535
36- use std:: ops:: Deref ;
37- use std:: sync:: { Arc , Condvar , Mutex , RwLock } ;
36+ use std:: mem;
37+ use std:: ops:: { Deref , DerefMut } ;
38+ use std:: sync:: { Arc , Mutex , RwLock } ;
3839use std:: time:: Duration ;
3940
41+ enum WalletSyncStatus {
42+ Completed ,
43+ InProgress { subscribers : Vec < tokio:: sync:: oneshot:: Sender < Result < ( ) , Error > > > } ,
44+ }
45+
4046pub struct Wallet < D , B : Deref , E : Deref , L : Deref >
4147where
4248 D : BatchDatabase ,
5157 // A cache storing the most recently retrieved fee rate estimations.
5258 broadcaster : B ,
5359 fee_estimator : E ,
54- sync_lock : ( Mutex < ( ) > , Condvar ) ,
60+ // A Mutex holding the current sync status.
61+ sync_status : Mutex < WalletSyncStatus > ,
5562 // TODO: Drop this workaround after BDK 1.0 upgrade.
5663 balance_cache : RwLock < Balance > ,
5764 logger : L ,
@@ -76,69 +83,67 @@ where
7683 } ) ;
7784
7885 let inner = Mutex :: new ( wallet) ;
79- let sync_lock = ( Mutex :: new ( ( ) ) , Condvar :: new ( ) ) ;
86+ let sync_status = Mutex :: new ( WalletSyncStatus :: Completed ) ;
8087 let balance_cache = RwLock :: new ( start_balance) ;
81- Self { blockchain, inner, broadcaster, fee_estimator, sync_lock , balance_cache, logger }
88+ Self { blockchain, inner, broadcaster, fee_estimator, sync_status , balance_cache, logger }
8289 }
8390
8491 pub ( crate ) async fn sync ( & self ) -> Result < ( ) , Error > {
85- let ( lock, cvar) = & self . sync_lock ;
86-
87- let guard = match lock. try_lock ( ) {
88- Ok ( guard) => guard,
89- Err ( _) => {
90- log_info ! ( self . logger, "Sync in progress, skipping." ) ;
91- let guard = cvar. wait ( lock. lock ( ) . unwrap ( ) ) ;
92- drop ( guard) ;
93- cvar. notify_all ( ) ;
94- return Ok ( ( ) ) ;
95- } ,
96- } ;
92+ if let Some ( sync_receiver) = self . register_or_subscribe_pending_sync ( ) {
93+ log_info ! ( self . logger, "Sync in progress, skipping." ) ;
94+ return sync_receiver. await . map_err ( |e| {
95+ debug_assert ! ( false , "Failed to receive wallet sync result: {:?}" , e) ;
96+ log_error ! ( self . logger, "Failed to receive wallet sync result: {:?}" , e) ;
97+ Error :: WalletOperationFailed
98+ } ) ?;
99+ }
97100
98- let sync_options = SyncOptions { progress : None } ;
99- let wallet_lock = self . inner . lock ( ) . unwrap ( ) ;
100- let res = match wallet_lock. sync ( & self . blockchain , sync_options) . await {
101- Ok ( ( ) ) => {
102- // TODO: Drop this workaround after BDK 1.0 upgrade.
103- // Update balance cache after syncing.
104- if let Ok ( balance) = wallet_lock. get_balance ( ) {
105- * self . balance_cache . write ( ) . unwrap ( ) = balance;
106- }
107- Ok ( ( ) )
108- } ,
109- Err ( e) => match e {
110- bdk:: Error :: Esplora ( ref be) => match * * be {
111- bdk:: blockchain:: esplora:: EsploraError :: Reqwest ( _) => {
112- // Drop lock, sleep for a second, retry.
113- drop ( wallet_lock) ;
114- tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
115- log_error ! (
116- self . logger,
117- "Sync failed due to HTTP connection error, retrying: {}" ,
118- e
119- ) ;
120- let sync_options = SyncOptions { progress : None } ;
121- self . inner
122- . lock ( )
123- . unwrap ( )
124- . sync ( & self . blockchain , sync_options)
125- . await
126- . map_err ( |e| From :: from ( e) )
101+ let res = {
102+ let sync_options = SyncOptions { progress : None } ;
103+ let wallet_lock = self . inner . lock ( ) . unwrap ( ) ;
104+ match wallet_lock. sync ( & self . blockchain , sync_options) . await {
105+ Ok ( ( ) ) => {
106+ // TODO: Drop this workaround after BDK 1.0 upgrade.
107+ // Update balance cache after syncing.
108+ if let Ok ( balance) = wallet_lock. get_balance ( ) {
109+ * self . balance_cache . write ( ) . unwrap ( ) = balance;
110+ }
111+ Ok ( ( ) )
112+ } ,
113+ Err ( e) => match e {
114+ bdk:: Error :: Esplora ( ref be) => match * * be {
115+ bdk:: blockchain:: esplora:: EsploraError :: Reqwest ( _) => {
116+ // Drop lock, sleep for a second, retry.
117+ drop ( wallet_lock) ;
118+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
119+ log_error ! (
120+ self . logger,
121+ "Sync failed due to HTTP connection error, retrying: {}" ,
122+ e
123+ ) ;
124+ let sync_options = SyncOptions { progress : None } ;
125+ self . inner
126+ . lock ( )
127+ . unwrap ( )
128+ . sync ( & self . blockchain , sync_options)
129+ . await
130+ . map_err ( |e| From :: from ( e) )
131+ } ,
132+ _ => {
133+ log_error ! ( self . logger, "Sync failed due to Esplora error: {}" , e) ;
134+ Err ( From :: from ( e) )
135+ } ,
127136 } ,
128137 _ => {
129- log_error ! ( self . logger, "Sync failed due to Esplora error: {}" , e) ;
138+ log_error ! ( self . logger, "Wallet sync error: {}" , e) ;
130139 Err ( From :: from ( e) )
131140 } ,
132141 } ,
133- _ => {
134- log_error ! ( self . logger, "Wallet sync error: {}" , e) ;
135- Err ( From :: from ( e) )
136- } ,
137- } ,
142+ }
138143 } ;
139144
140- drop ( guard ) ;
141- cvar . notify_all ( ) ;
145+ self . propagate_result_to_subscribers ( res ) ;
146+
142147 res
143148 }
144149
@@ -291,6 +296,55 @@ where
291296
292297 Ok ( txid)
293298 }
299+
300+ fn register_or_subscribe_pending_sync (
301+ & self ,
302+ ) -> Option < tokio:: sync:: oneshot:: Receiver < Result < ( ) , Error > > > {
303+ let mut sync_status_lock = self . sync_status . lock ( ) . unwrap ( ) ;
304+ match sync_status_lock. deref_mut ( ) {
305+ WalletSyncStatus :: Completed => {
306+ // We're first to register for a sync.
307+ * sync_status_lock = WalletSyncStatus :: InProgress { subscribers : Vec :: new ( ) } ;
308+ None
309+ } ,
310+ WalletSyncStatus :: InProgress { subscribers } => {
311+ // A sync is in-progress, we subscribe.
312+ let ( tx, rx) = tokio:: sync:: oneshot:: channel ( ) ;
313+ subscribers. push ( tx) ;
314+ Some ( rx)
315+ } ,
316+ }
317+ }
318+
319+ fn propagate_result_to_subscribers ( & self , res : Result < ( ) , Error > ) {
320+ // Send the notification to any other tasks that might be waiting on it by now.
321+ let mut waiting_subscribers = Vec :: new ( ) ;
322+ {
323+ let mut sync_status_lock = self . sync_status . lock ( ) . unwrap ( ) ;
324+ match sync_status_lock. deref_mut ( ) {
325+ WalletSyncStatus :: Completed => {
326+ // No sync in-progress, do nothing.
327+ return ;
328+ } ,
329+ WalletSyncStatus :: InProgress { subscribers } => {
330+ // A sync is in-progress, we notify subscribers.
331+ mem:: swap ( & mut waiting_subscribers, subscribers) ;
332+ * sync_status_lock = WalletSyncStatus :: Completed ;
333+ } ,
334+ }
335+ }
336+
337+ for sender in waiting_subscribers {
338+ sender. send ( res) . unwrap_or_else ( |e| {
339+ debug_assert ! ( false , "Failed to send wallet sync result to subscribers: {:?}" , e) ;
340+ log_error ! (
341+ self . logger,
342+ "Failed to send wallet sync result to subscribers: {:?}" ,
343+ e
344+ ) ;
345+ } ) ;
346+ }
347+ }
294348}
295349
296350impl < D , B : Deref , E : Deref , L : Deref > WalletSource for Wallet < D , B , E , L >
0 commit comments