@@ -44,6 +44,7 @@ use crate::prelude::*;
4444use crate :: sync:: { RwLock , RwLockReadGuard , Mutex , MutexGuard } ;
4545use core:: ops:: Deref ;
4646use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
47+ use bitcoin:: hashes:: Hash ;
4748use bitcoin:: secp256k1:: PublicKey ;
4849
4950/// `Persist` defines behavior for persisting channel monitors: this could mean
@@ -263,7 +264,7 @@ where C::Target: chain::Filter,
263264 for funding_outpoint in funding_outpoints. iter ( ) {
264265 let monitor_lock = self . monitors . read ( ) . unwrap ( ) ;
265266 if let Some ( monitor_state) = monitor_lock. get ( funding_outpoint) {
266- if self . update_monitor_with_chain_data ( header, txdata, & process, funding_outpoint, & monitor_state) . is_err ( ) {
267+ if self . update_monitor_with_chain_data ( header, best_height , txdata, & process, funding_outpoint, & monitor_state) . is_err ( ) {
267268 // Take the monitors lock for writing so that we poison it and any future
268269 // operations going forward fail immediately.
269270 core:: mem:: drop ( monitor_lock) ;
@@ -278,7 +279,7 @@ where C::Target: chain::Filter,
278279 let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
279280 for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
280281 if !funding_outpoints. contains ( funding_outpoint) {
281- if self . update_monitor_with_chain_data ( header, txdata, & process, funding_outpoint, & monitor_state) . is_err ( ) {
282+ if self . update_monitor_with_chain_data ( header, best_height , txdata, & process, funding_outpoint, & monitor_state) . is_err ( ) {
282283 log_error ! ( self . logger, "{}" , err_str) ;
283284 panic ! ( "{}" , err_str) ;
284285 }
@@ -297,14 +298,23 @@ where C::Target: chain::Filter,
297298 }
298299
299300 fn update_monitor_with_chain_data < FN > (
300- & self , header : & Header , txdata : & TransactionData , process : FN , funding_outpoint : & OutPoint ,
301- monitor_state : & MonitorHolder < ChannelSigner >
301+ & self , header : & Header , best_height : Option < u32 > , txdata : & TransactionData , process : FN , funding_outpoint : & OutPoint ,
302+ monitor_state : & MonitorHolder < ChannelSigner > ,
302303 ) -> Result < ( ) , ( ) > where FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs > {
303304 let monitor = & monitor_state. monitor ;
304305 let logger = WithChannelMonitor :: from ( & self . logger , & monitor, None ) ;
305- let mut txn_outputs;
306- {
307- txn_outputs = process ( monitor, txdata) ;
306+
307+ let mut txn_outputs = process ( monitor, txdata) ;
308+
309+ let get_partition_key = |funding_outpoint : & OutPoint | {
310+ let funding_txid_hash = funding_outpoint. txid . to_raw_hash ( ) ;
311+ let funding_txid_hash_bytes = funding_txid_hash. as_byte_array ( ) ;
312+ let funding_txid_u32 = u32:: from_be_bytes ( [ funding_txid_hash_bytes[ 0 ] , funding_txid_hash_bytes[ 1 ] , funding_txid_hash_bytes[ 2 ] , funding_txid_hash_bytes[ 3 ] ] ) ;
313+ funding_txid_u32. wrapping_add ( best_height. unwrap_or_default ( ) )
314+ } ;
315+ const CHAINSYNC_MONITOR_PARTITION_FACTOR : u32 = 50 ; // ~ 8hours
316+ let has_pending_claims = monitor_state. monitor . has_pending_claims ( ) ;
317+ if has_pending_claims || get_partition_key ( funding_outpoint) % CHAINSYNC_MONITOR_PARTITION_FACTOR == 0 {
308318 log_trace ! ( logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
309319 match self . persister . update_persisted_channel ( * funding_outpoint, None , monitor) {
310320 ChannelMonitorUpdateStatus :: Completed =>
@@ -313,10 +323,10 @@ where C::Target: chain::Filter,
313323 ) ,
314324 ChannelMonitorUpdateStatus :: InProgress => {
315325 log_trace ! ( logger, "Channel Monitor sync for channel {} in progress." , log_funding_info!( monitor) ) ;
316- } ,
326+ }
317327 ChannelMonitorUpdateStatus :: UnrecoverableError => {
318328 return Err ( ( ) ) ;
319- } ,
329+ }
320330 }
321331 }
322332
@@ -870,14 +880,17 @@ impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref,
870880
871881#[ cfg( test) ]
872882mod tests {
873- use crate :: check_added_monitors;
883+ use crate :: { check_added_monitors, check_closed_event } ;
874884 use crate :: { expect_payment_path_successful, get_event_msg} ;
875885 use crate :: { get_htlc_update_msgs, get_revoke_commit_msgs} ;
876886 use crate :: chain:: { ChannelMonitorUpdateStatus , Watch } ;
877- use crate :: events:: { Event , MessageSendEvent , MessageSendEventsProvider } ;
887+ use crate :: chain:: channelmonitor:: ANTI_REORG_DELAY ;
888+ use crate :: events:: { ClosureReason , Event , MessageSendEvent , MessageSendEventsProvider } ;
878889 use crate :: ln:: functional_test_utils:: * ;
879890 use crate :: ln:: msgs:: ChannelMessageHandler ;
880891
892+ const CHAINSYNC_MONITOR_PARTITION_FACTOR : u32 = 50 ;
893+
881894 #[ test]
882895 fn test_async_ooo_offchain_updates ( ) {
883896 // Test that if we have multiple offchain updates being persisted and they complete
@@ -983,6 +996,79 @@ mod tests {
983996 check_added_monitors ! ( nodes[ 0 ] , 1 ) ;
984997 }
985998
999+ #[ test]
1000+ fn test_chainsync_triggers_distributed_monitor_persistence ( ) {
1001+ let chanmon_cfgs = create_chanmon_cfgs ( 3 ) ;
1002+ let node_cfgs = create_node_cfgs ( 3 , & chanmon_cfgs) ;
1003+ let node_chanmgrs = create_node_chanmgrs ( 3 , & node_cfgs, & [ None , None , None ] ) ;
1004+ let nodes = create_network ( 3 , & node_cfgs, & node_chanmgrs) ;
1005+
1006+ // Use FullBlockViaListen to avoid duplicate calls to process_chain_data and skips_blocks() in
1007+ // case of other connect_styles.
1008+ * nodes[ 0 ] . connect_style . borrow_mut ( ) = ConnectStyle :: FullBlockViaListen ;
1009+ * nodes[ 1 ] . connect_style . borrow_mut ( ) = ConnectStyle :: FullBlockViaListen ;
1010+ * nodes[ 2 ] . connect_style . borrow_mut ( ) = ConnectStyle :: FullBlockViaListen ;
1011+
1012+ let _channel_1 = create_announced_chan_between_nodes ( & nodes, 0 , 1 ) . 2 ;
1013+ let channel_2 = create_announced_chan_between_nodes_with_value ( & nodes, 0 , 2 , 1_000_000 , 0 ) . 2 ;
1014+
1015+ chanmon_cfgs[ 0 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1016+ chanmon_cfgs[ 1 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1017+ chanmon_cfgs[ 2 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1018+
1019+ connect_blocks ( & nodes[ 0 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR * 2 ) ;
1020+ connect_blocks ( & nodes[ 1 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR * 2 ) ;
1021+ connect_blocks ( & nodes[ 2 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR * 2 ) ;
1022+
1023+ // Connecting [`DEFAULT_CHAINSYNC_PARTITION_FACTOR`] * 2 blocks should trigger only 2 writes
1024+ // per monitor/channel.
1025+ assert_eq ! ( 2 * 2 , chanmon_cfgs[ 0 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1026+ assert_eq ! ( 2 , chanmon_cfgs[ 1 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1027+ assert_eq ! ( 2 , chanmon_cfgs[ 2 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1028+
1029+ // Test that monitors with pending_claims are persisted on every block.
1030+ // Now, close channel_2 i.e. b/w node-0 and node-2 to create pending_claim in node[0].
1031+ nodes[ 0 ] . node . force_close_broadcasting_latest_txn ( & channel_2, & nodes[ 2 ] . node . get_our_node_id ( ) , "Channel force-closed" . to_string ( ) ) . unwrap ( ) ;
1032+ check_closed_event ! ( & nodes[ 0 ] , 1 , ClosureReason :: HolderForceClosed { broadcasted_latest_txn: Some ( true ) } , false ,
1033+ [ nodes[ 2 ] . node. get_our_node_id( ) ] , 1000000 ) ;
1034+ check_closed_broadcast ( & nodes[ 0 ] , 1 , true ) ;
1035+ let close_tx = nodes[ 0 ] . tx_broadcaster . txn_broadcasted . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
1036+ assert_eq ! ( close_tx. len( ) , 1 ) ;
1037+
1038+ mine_transaction ( & nodes[ 2 ] , & close_tx[ 0 ] ) ;
1039+ check_added_monitors ( & nodes[ 2 ] , 1 ) ;
1040+ check_closed_broadcast ( & nodes[ 2 ] , 1 , true ) ;
1041+ check_closed_event ! ( & nodes[ 2 ] , 1 , ClosureReason :: CommitmentTxConfirmed , false ,
1042+ [ nodes[ 0 ] . node. get_our_node_id( ) ] , 1000000 ) ;
1043+
1044+ chanmon_cfgs[ 0 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1045+ chanmon_cfgs[ 2 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1046+
1047+ // For channel_2, there should be a monitor write for every block connection.
1048+ // We connect [`DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR`] blocks since we don't know when
1049+ // channel_1 monitor persistence will occur, with [`DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR`]
1050+ // it will be persisted exactly once.
1051+ connect_blocks ( & nodes[ 0 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR ) ;
1052+ connect_blocks ( & nodes[ 2 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR ) ;
1053+
1054+ // DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR writes for channel_2 due to pending_claim, 1 for
1055+ // channel_1
1056+ assert_eq ! ( ( CHAINSYNC_MONITOR_PARTITION_FACTOR + 1 ) as usize , chanmon_cfgs[ 0 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1057+ // For node[2], there is no pending_claim
1058+ assert_eq ! ( 1 , chanmon_cfgs[ 2 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1059+
1060+ // Confirm claim for node[0] with ANTI_REORG_DELAY and reset monitor write counter.
1061+ mine_transaction ( & nodes[ 0 ] , & close_tx[ 0 ] ) ;
1062+ connect_blocks ( & nodes[ 0 ] , ANTI_REORG_DELAY - 1 ) ;
1063+ check_added_monitors ( & nodes[ 0 ] , 1 ) ;
1064+ chanmon_cfgs[ 0 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1065+
1066+ // Again connect 1 full cycle of DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR blocks, it should only
1067+ // result in 1 write per monitor/channel.
1068+ connect_blocks ( & nodes[ 0 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR ) ;
1069+ assert_eq ! ( 2 , chanmon_cfgs[ 0 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1070+ }
1071+
9861072 #[ test]
9871073 #[ cfg( feature = "std" ) ]
9881074 fn update_during_chainsync_poisons_channel ( ) {
@@ -991,12 +1077,15 @@ mod tests {
9911077 let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
9921078 let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
9931079 create_announced_chan_between_nodes ( & nodes, 0 , 1 ) ;
1080+ * nodes[ 0 ] . connect_style . borrow_mut ( ) = ConnectStyle :: FullBlockViaListen ;
9941081
9951082 chanmon_cfgs[ 0 ] . persister . set_update_ret ( ChannelMonitorUpdateStatus :: UnrecoverableError ) ;
9961083
9971084 assert ! ( std:: panic:: catch_unwind( || {
9981085 // Returning an UnrecoverableError should always panic immediately
999- connect_blocks( & nodes[ 0 ] , 1 ) ;
1086+ // Connecting [`DEFAULT_CHAINSYNC_PARTITION_FACTOR`] blocks so that we trigger some persistence
1087+ // after accounting for block-height based partitioning/distribution.
1088+ connect_blocks( & nodes[ 0 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR ) ;
10001089 } ) . is_err( ) ) ;
10011090 assert ! ( std:: panic:: catch_unwind( || {
10021091 // ...and also poison our locks causing later use to panic as well
0 commit comments