@@ -29,7 +29,7 @@ use bitcoin::hash_types::Txid;
29
29
use chain;
30
30
use chain:: { ChannelMonitorUpdateErr , Filter , WatchedOutput } ;
31
31
use chain:: chaininterface:: { BroadcasterInterface , FeeEstimator } ;
32
- use chain:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdate , Balance , MonitorEvent , TransactionOutputs } ;
32
+ use chain:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdate , Balance , MonitorEvent , TransactionOutputs , LATENCY_GRACE_PERIOD_BLOCKS } ;
33
33
use chain:: transaction:: { OutPoint , TransactionData } ;
34
34
use chain:: keysinterface:: Sign ;
35
35
use util:: atomic_counter:: AtomicCounter ;
@@ -42,7 +42,7 @@ use ln::channelmanager::ChannelDetails;
42
42
use prelude:: * ;
43
43
use sync:: { RwLock , RwLockReadGuard , Mutex , MutexGuard } ;
44
44
use core:: ops:: Deref ;
45
- use core:: sync:: atomic:: { AtomicBool , Ordering } ;
45
+ use core:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
46
46
47
47
#[ derive( Clone , Copy , Hash , PartialEq , Eq ) ]
48
48
/// A specific update stored in a `MonitorUpdateId`, separated out to make the contents entirely
@@ -165,6 +165,13 @@ struct MonitorHolder<ChannelSigner: Sign> {
165
165
/// processed the closure event, we set this to true and return PermanentFailure for any other
166
166
/// chain::Watch events.
167
167
channel_perm_failed : AtomicBool ,
168
+ /// The last block height at which which no [`UpdateOrigin::ChainSync`] monitor updates were
169
+ /// present in `pending_monitor_updates`.
170
+ /// If its been more than [`LATENCY_GRACE_PERIOD_BLOCKS`] since we started waiting on a chain
171
+ /// sync event, we let montior events return to `ChannelManager` because we cannot hold them up
172
+ /// forever or we'll end up with HTLC preimages waiting to feed back into an upstream channel
173
+ /// forever, risking funds loss.
174
+ last_chain_persist_height : AtomicUsize ,
168
175
}
169
176
170
177
impl < ChannelSigner : Sign > MonitorHolder < ChannelSigner > {
@@ -223,6 +230,8 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
223
230
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
224
231
/// from the user and not from a [`ChannelMonitor`].
225
232
pending_monitor_events : Mutex < Vec < MonitorEvent > > ,
233
+ /// The best block height seen, used as a proxy for the passage of time.
234
+ highest_chain_height : AtomicUsize ,
226
235
}
227
236
228
237
impl < ChannelSigner : Sign , C : Deref , T : Deref , F : Deref , L : Deref , P : Deref > ChainMonitor < ChannelSigner , C , T , F , L , P >
@@ -241,11 +250,16 @@ where C::Target: chain::Filter,
241
250
/// calls must not exclude any transactions matching the new outputs nor any in-block
242
251
/// descendants of such transactions. It is not necessary to re-fetch the block to obtain
243
252
/// updated `txdata`.
244
- fn process_chain_data < FN > ( & self , header : & BlockHeader , txdata : & TransactionData , process : FN )
253
+ ///
254
+ /// Calls which represent a new blockchain tip height should set `best_height`.
255
+ fn process_chain_data < FN > ( & self , header : & BlockHeader , best_height : Option < u32 > , txdata : & TransactionData , process : FN )
245
256
where
246
257
FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
247
258
{
248
259
let mut dependent_txdata = Vec :: new ( ) ;
260
+ if let Some ( height) = best_height {
261
+ self . highest_chain_height . store ( height as usize , Ordering :: Release ) ;
262
+ }
249
263
{
250
264
let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
251
265
for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
@@ -257,6 +271,16 @@ where C::Target: chain::Filter,
257
271
contents : UpdateOrigin :: ChainSync ( self . sync_persistence_id . get_increment ( ) ) ,
258
272
} ;
259
273
let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
274
+ if let Some ( height) = best_height {
275
+ if !pending_monitor_updates. iter ( ) . any ( |update_id|
276
+ if let UpdateOrigin :: ChainSync ( _) = update_id. contents { true } else { false } )
277
+ {
278
+ // If there are not ChainSync persists awaiting completion, go ahead and
279
+ // set last_chain_persist_height here - we wouldn't want the first
280
+ // TemporaryFailure to always immediately be considered "overly delayed".
281
+ monitor_state. last_chain_persist_height . store ( height as usize , Ordering :: Release ) ;
282
+ }
283
+ }
260
284
261
285
log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
262
286
match self . persister . update_persisted_channel ( * funding_outpoint, & None , monitor, update_id) {
@@ -301,7 +325,7 @@ where C::Target: chain::Filter,
301
325
dependent_txdata. sort_unstable_by_key ( |( index, _tx) | * index) ;
302
326
dependent_txdata. dedup_by_key ( |( index, _tx) | * index) ;
303
327
let txdata: Vec < _ > = dependent_txdata. iter ( ) . map ( |( index, tx) | ( * index, tx) ) . collect ( ) ;
304
- self . process_chain_data ( header, & txdata, process) ;
328
+ self . process_chain_data ( header, None , & txdata, process) ; // We skip the best height the second go-around
305
329
}
306
330
}
307
331
@@ -322,6 +346,7 @@ where C::Target: chain::Filter,
322
346
fee_estimator : feeest,
323
347
persister,
324
348
pending_monitor_events : Mutex :: new ( Vec :: new ( ) ) ,
349
+ highest_chain_height : AtomicUsize :: new ( 0 ) ,
325
350
}
326
351
}
327
352
@@ -425,9 +450,13 @@ where C::Target: chain::Filter,
425
450
} ) ;
426
451
} ,
427
452
MonitorUpdateId { contents : UpdateOrigin :: ChainSync ( _) } => {
428
- // We've already done everything we need to, the next time
429
- // release_pending_monitor_events is called, any events for this ChannelMonitor
430
- // will be returned if there's no more SyncPersistId events left.
453
+ if !pending_monitor_updates. iter ( ) . any ( |update_id|
454
+ if let UpdateOrigin :: ChainSync ( _) = update_id. contents { true } else { false } )
455
+ {
456
+ monitor_data. last_chain_persist_height . store ( self . highest_chain_height . load ( Ordering :: Acquire ) , Ordering :: Release ) ;
457
+ // The next time release_pending_monitor_events is called, any events for this
458
+ // ChannelMonitor will be returned.
459
+ }
431
460
} ,
432
461
}
433
462
Ok ( ( ) )
@@ -467,7 +496,7 @@ where
467
496
let header = & block. header ;
468
497
let txdata: Vec < _ > = block. txdata . iter ( ) . enumerate ( ) . collect ( ) ;
469
498
log_debug ! ( self . logger, "New best block {} at height {} provided via block_connected" , header. block_hash( ) , height) ;
470
- self . process_chain_data ( header, & txdata, |monitor, txdata| {
499
+ self . process_chain_data ( header, Some ( height ) , & txdata, |monitor, txdata| {
471
500
monitor. block_connected (
472
501
header, txdata, height, & * self . broadcaster , & * self . fee_estimator , & * self . logger )
473
502
} ) ;
@@ -494,7 +523,7 @@ where
494
523
{
495
524
fn transactions_confirmed ( & self , header : & BlockHeader , txdata : & TransactionData , height : u32 ) {
496
525
log_debug ! ( self . logger, "{} provided transactions confirmed at height {} in block {}" , txdata. len( ) , height, header. block_hash( ) ) ;
497
- self . process_chain_data ( header, txdata, |monitor, txdata| {
526
+ self . process_chain_data ( header, None , txdata, |monitor, txdata| {
498
527
monitor. transactions_confirmed (
499
528
header, txdata, height, & * self . broadcaster , & * self . fee_estimator , & * self . logger )
500
529
} ) ;
@@ -510,7 +539,7 @@ where
510
539
511
540
fn best_block_updated ( & self , header : & BlockHeader , height : u32 ) {
512
541
log_debug ! ( self . logger, "New best block {} at height {} provided via best_block_updated" , header. block_hash( ) , height) ;
513
- self . process_chain_data ( header, & [ ] , |monitor, txdata| {
542
+ self . process_chain_data ( header, Some ( height ) , & [ ] , |monitor, txdata| {
514
543
// While in practice there shouldn't be any recursive calls when given empty txdata,
515
544
// it's still possible if a chain::Filter implementation returns a transaction.
516
545
debug_assert ! ( txdata. is_empty( ) ) ;
@@ -577,6 +606,7 @@ where C::Target: chain::Filter,
577
606
monitor,
578
607
pending_monitor_updates : Mutex :: new ( pending_monitor_updates) ,
579
608
channel_perm_failed : AtomicBool :: new ( false ) ,
609
+ last_chain_persist_height : AtomicUsize :: new ( 0 ) ,
580
610
} ) ;
581
611
persist_res
582
612
}
@@ -633,7 +663,10 @@ where C::Target: chain::Filter,
633
663
let mut pending_monitor_events = self . pending_monitor_events . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
634
664
for monitor_state in self . monitors . read ( ) . unwrap ( ) . values ( ) {
635
665
let is_pending_monitor_update = monitor_state. has_pending_chainsync_updates ( & monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ) ;
636
- if is_pending_monitor_update {
666
+ if is_pending_monitor_update &&
667
+ monitor_state. last_chain_persist_height . load ( Ordering :: Acquire ) + LATENCY_GRACE_PERIOD_BLOCKS as usize
668
+ > self . highest_chain_height . load ( Ordering :: Acquire )
669
+ {
637
670
log_info ! ( self . logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!" ) ;
638
671
} else {
639
672
if monitor_state. channel_perm_failed . load ( Ordering :: Acquire ) {
@@ -647,6 +680,11 @@ where C::Target: chain::Filter,
647
680
// updated.
648
681
log_info ! ( self . logger, "A Channel Monitor sync returned PermanentFailure. Returning monitor events but duplicate events may appear after reload!" ) ;
649
682
}
683
+ if is_pending_monitor_update {
684
+ log_error ! ( self . logger, "A ChannelMonitor sync took longer than {} blocks to complete." , LATENCY_GRACE_PERIOD_BLOCKS ) ;
685
+ log_error ! ( self . logger, " To avoid funds-loss, we are allowing monitor updates to be released." ) ;
686
+ log_error ! ( self . logger, " This may cause duplicate payment events to be generated." ) ;
687
+ }
650
688
pending_monitor_events. append ( & mut monitor_state. monitor . get_and_clear_pending_monitor_events ( ) ) ;
651
689
}
652
690
}
0 commit comments