@@ -44,6 +44,7 @@ use crate::prelude::*;
4444use crate :: sync:: { RwLock , RwLockReadGuard , Mutex , MutexGuard } ;
4545use core:: ops:: Deref ;
4646use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
47+ use bitcoin:: hashes:: Hash ;
4748use bitcoin:: secp256k1:: PublicKey ;
4849
4950/// `Persist` defines behavior for persisting channel monitors: this could mean
@@ -260,10 +261,11 @@ where C::Target: chain::Filter,
260261 {
261262 let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down." ;
262263 let funding_outpoints = hash_set_from_iter ( self . monitors . read ( ) . unwrap ( ) . keys ( ) . cloned ( ) ) ;
264+ let channel_count = funding_outpoints. len ( ) ;
263265 for funding_outpoint in funding_outpoints. iter ( ) {
264266 let monitor_lock = self . monitors . read ( ) . unwrap ( ) ;
265267 if let Some ( monitor_state) = monitor_lock. get ( funding_outpoint) {
266- if self . update_monitor_with_chain_data ( header, txdata, & process, funding_outpoint, & monitor_state) . is_err ( ) {
268+ if self . update_monitor_with_chain_data ( header, best_height , txdata, & process, funding_outpoint, & monitor_state, channel_count ) . is_err ( ) {
267269 // Take the monitors lock for writing so that we poison it and any future
268270 // operations going forward fail immediately.
269271 core:: mem:: drop ( monitor_lock) ;
@@ -278,7 +280,7 @@ where C::Target: chain::Filter,
278280 let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
279281 for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
280282 if !funding_outpoints. contains ( funding_outpoint) {
281- if self . update_monitor_with_chain_data ( header, txdata, & process, funding_outpoint, & monitor_state) . is_err ( ) {
283+ if self . update_monitor_with_chain_data ( header, best_height , txdata, & process, funding_outpoint, & monitor_state, channel_count ) . is_err ( ) {
282284 log_error ! ( self . logger, "{}" , err_str) ;
283285 panic ! ( "{}" , err_str) ;
284286 }
@@ -297,14 +299,29 @@ where C::Target: chain::Filter,
297299 }
298300
299301 fn update_monitor_with_chain_data < FN > (
300- & self , header : & Header , txdata : & TransactionData , process : FN , funding_outpoint : & OutPoint ,
301- monitor_state : & MonitorHolder < ChannelSigner >
302+ & self , header : & Header , best_height : Option < u32 > , txdata : & TransactionData , process : FN , funding_outpoint : & OutPoint ,
303+ monitor_state : & MonitorHolder < ChannelSigner > , channel_count : usize ,
302304 ) -> Result < ( ) , ( ) > where FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs > {
303305 let monitor = & monitor_state. monitor ;
304306 let logger = WithChannelMonitor :: from ( & self . logger , & monitor, None ) ;
305- let mut txn_outputs;
306- {
307- txn_outputs = process ( monitor, txdata) ;
307+
308+ let mut txn_outputs = process ( monitor, txdata) ;
309+
310+ let get_partition_key = |funding_outpoint : & OutPoint | {
311+ let funding_txid_hash = funding_outpoint. txid . to_raw_hash ( ) ;
312+ let funding_txid_hash_bytes = funding_txid_hash. as_byte_array ( ) ;
313+ let funding_txid_u32 = u32:: from_be_bytes ( [ funding_txid_hash_bytes[ 0 ] , funding_txid_hash_bytes[ 1 ] , funding_txid_hash_bytes[ 2 ] , funding_txid_hash_bytes[ 3 ] ] ) ;
314+ funding_txid_u32. wrapping_add ( best_height. unwrap_or_default ( ) )
315+ } ;
316+
317+ let partition_factor = if channel_count < 15 {
318+ 5
319+ } else {
320+ 50 // ~ 8hours
321+ } ;
322+
323+ let has_pending_claims = monitor_state. monitor . has_pending_claims ( ) ;
324+ if has_pending_claims || get_partition_key ( funding_outpoint) % partition_factor == 0 {
308325 log_trace ! ( logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
309326 match self . persister . update_persisted_channel ( * funding_outpoint, None , monitor) {
310327 ChannelMonitorUpdateStatus :: Completed =>
@@ -313,10 +330,10 @@ where C::Target: chain::Filter,
313330 ) ,
314331 ChannelMonitorUpdateStatus :: InProgress => {
315332 log_trace ! ( logger, "Channel Monitor sync for channel {} in progress." , log_funding_info!( monitor) ) ;
316- } ,
333+ }
317334 ChannelMonitorUpdateStatus :: UnrecoverableError => {
318335 return Err ( ( ) ) ;
319- } ,
336+ }
320337 }
321338 }
322339
@@ -870,14 +887,17 @@ impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref,
870887
871888#[ cfg( test) ]
872889mod tests {
873- use crate :: check_added_monitors;
890+ use crate :: { check_added_monitors, check_closed_event } ;
874891 use crate :: { expect_payment_path_successful, get_event_msg} ;
875892 use crate :: { get_htlc_update_msgs, get_revoke_commit_msgs} ;
876893 use crate :: chain:: { ChannelMonitorUpdateStatus , Watch } ;
877- use crate :: events:: { Event , MessageSendEvent , MessageSendEventsProvider } ;
894+ use crate :: chain:: channelmonitor:: ANTI_REORG_DELAY ;
895+ use crate :: events:: { ClosureReason , Event , MessageSendEvent , MessageSendEventsProvider } ;
878896 use crate :: ln:: functional_test_utils:: * ;
879897 use crate :: ln:: msgs:: ChannelMessageHandler ;
880898
899+ const CHAINSYNC_MONITOR_PARTITION_FACTOR : u32 = 5 ;
900+
881901 #[ test]
882902 fn test_async_ooo_offchain_updates ( ) {
883903 // Test that if we have multiple offchain updates being persisted and they complete
@@ -983,6 +1003,79 @@ mod tests {
9831003 check_added_monitors ! ( nodes[ 0 ] , 1 ) ;
9841004 }
9851005
1006+ #[ test]
1007+ fn test_chainsync_triggers_distributed_monitor_persistence ( ) {
1008+ let chanmon_cfgs = create_chanmon_cfgs ( 3 ) ;
1009+ let node_cfgs = create_node_cfgs ( 3 , & chanmon_cfgs) ;
1010+ let node_chanmgrs = create_node_chanmgrs ( 3 , & node_cfgs, & [ None , None , None ] ) ;
1011+ let nodes = create_network ( 3 , & node_cfgs, & node_chanmgrs) ;
1012+
1013+ // Use FullBlockViaListen to avoid duplicate calls to process_chain_data and skips_blocks() in
1014+ // case of other connect_styles.
1015+ * nodes[ 0 ] . connect_style . borrow_mut ( ) = ConnectStyle :: FullBlockViaListen ;
1016+ * nodes[ 1 ] . connect_style . borrow_mut ( ) = ConnectStyle :: FullBlockViaListen ;
1017+ * nodes[ 2 ] . connect_style . borrow_mut ( ) = ConnectStyle :: FullBlockViaListen ;
1018+
1019+ let _channel_1 = create_announced_chan_between_nodes ( & nodes, 0 , 1 ) . 2 ;
1020+ let channel_2 = create_announced_chan_between_nodes_with_value ( & nodes, 0 , 2 , 1_000_000 , 0 ) . 2 ;
1021+
1022+ chanmon_cfgs[ 0 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1023+ chanmon_cfgs[ 1 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1024+ chanmon_cfgs[ 2 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1025+
1026+ connect_blocks ( & nodes[ 0 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR * 2 ) ;
1027+ connect_blocks ( & nodes[ 1 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR * 2 ) ;
1028+ connect_blocks ( & nodes[ 2 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR * 2 ) ;
1029+
1030+ // Connecting [`DEFAULT_CHAINSYNC_PARTITION_FACTOR`] * 2 blocks should trigger only 2 writes
1031+ // per monitor/channel.
1032+ assert_eq ! ( 2 * 2 , chanmon_cfgs[ 0 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1033+ assert_eq ! ( 2 , chanmon_cfgs[ 1 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1034+ assert_eq ! ( 2 , chanmon_cfgs[ 2 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1035+
1036+ // Test that monitors with pending_claims are persisted on every block.
1037+ // Now, close channel_2 i.e. b/w node-0 and node-2 to create pending_claim in node[0].
1038+ nodes[ 0 ] . node . force_close_broadcasting_latest_txn ( & channel_2, & nodes[ 2 ] . node . get_our_node_id ( ) , "Channel force-closed" . to_string ( ) ) . unwrap ( ) ;
1039+ check_closed_event ! ( & nodes[ 0 ] , 1 , ClosureReason :: HolderForceClosed { broadcasted_latest_txn: Some ( true ) } , false ,
1040+ [ nodes[ 2 ] . node. get_our_node_id( ) ] , 1000000 ) ;
1041+ check_closed_broadcast ( & nodes[ 0 ] , 1 , true ) ;
1042+ let close_tx = nodes[ 0 ] . tx_broadcaster . txn_broadcasted . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
1043+ assert_eq ! ( close_tx. len( ) , 1 ) ;
1044+
1045+ mine_transaction ( & nodes[ 2 ] , & close_tx[ 0 ] ) ;
1046+ check_added_monitors ( & nodes[ 2 ] , 1 ) ;
1047+ check_closed_broadcast ( & nodes[ 2 ] , 1 , true ) ;
1048+ check_closed_event ! ( & nodes[ 2 ] , 1 , ClosureReason :: CommitmentTxConfirmed , false ,
1049+ [ nodes[ 0 ] . node. get_our_node_id( ) ] , 1000000 ) ;
1050+
1051+ chanmon_cfgs[ 0 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1052+ chanmon_cfgs[ 2 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1053+
1054+ // For channel_2, there should be a monitor write for every block connection.
1055+ // We connect [`DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR`] blocks since we don't know when
1056+ // channel_1 monitor persistence will occur, with [`DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR`]
1057+ // it will be persisted exactly once.
1058+ connect_blocks ( & nodes[ 0 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR ) ;
1059+ connect_blocks ( & nodes[ 2 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR ) ;
1060+
1061+ // DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR writes for channel_2 due to pending_claim, 1 for
1062+ // channel_1
1063+ assert_eq ! ( ( CHAINSYNC_MONITOR_PARTITION_FACTOR + 1 ) as usize , chanmon_cfgs[ 0 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1064+ // For node[2], there is no pending_claim
1065+ assert_eq ! ( 1 , chanmon_cfgs[ 2 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1066+
1067+ // Confirm claim for node[0] with ANTI_REORG_DELAY and reset monitor write counter.
1068+ mine_transaction ( & nodes[ 0 ] , & close_tx[ 0 ] ) ;
1069+ connect_blocks ( & nodes[ 0 ] , ANTI_REORG_DELAY - 1 ) ;
1070+ check_added_monitors ( & nodes[ 0 ] , 1 ) ;
1071+ chanmon_cfgs[ 0 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1072+
1073+ // Again connect 1 full cycle of DEFAULT_CHAINSYNC_MONITOR_PARTITION_FACTOR blocks, it should only
1074+ // result in 1 write per monitor/channel.
1075+ connect_blocks ( & nodes[ 0 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR ) ;
1076+ assert_eq ! ( 2 , chanmon_cfgs[ 0 ] . persister. chain_sync_monitor_persistences. lock( ) . unwrap( ) . len( ) ) ;
1077+ }
1078+
9861079 #[ test]
9871080 #[ cfg( feature = "std" ) ]
9881081 fn update_during_chainsync_poisons_channel ( ) {
@@ -991,12 +1084,15 @@ mod tests {
9911084 let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
9921085 let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
9931086 create_announced_chan_between_nodes ( & nodes, 0 , 1 ) ;
1087+ * nodes[ 0 ] . connect_style . borrow_mut ( ) = ConnectStyle :: FullBlockViaListen ;
9941088
9951089 chanmon_cfgs[ 0 ] . persister . set_update_ret ( ChannelMonitorUpdateStatus :: UnrecoverableError ) ;
9961090
9971091 assert ! ( std:: panic:: catch_unwind( || {
9981092 // Returning an UnrecoverableError should always panic immediately
999- connect_blocks( & nodes[ 0 ] , 1 ) ;
1093+ // Connecting [`DEFAULT_CHAINSYNC_PARTITION_FACTOR`] blocks so that we trigger some persistence
1094+ // after accounting for block-height based partitioning/distribution.
1095+ connect_blocks( & nodes[ 0 ] , CHAINSYNC_MONITOR_PARTITION_FACTOR ) ;
10001096 } ) . is_err( ) ) ;
10011097 assert ! ( std:: panic:: catch_unwind( || {
10021098 // ...and also poison our locks causing later use to panic as well
0 commit comments