@@ -24,6 +24,7 @@ use crate::chain::chainmonitor::{Persist, MonitorUpdateId};
2424use crate :: sign:: { EntropySource , NodeSigner , ecdsa:: WriteableEcdsaChannelSigner , SignerProvider } ;
2525use crate :: chain:: transaction:: OutPoint ;
2626use crate :: chain:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdate , CLOSED_CHANNEL_UPDATE_ID } ;
27+ use crate :: chain:: ChannelMonitorUpdateStatus ;
2728use crate :: ln:: channelmanager:: ChannelManager ;
2829use crate :: routing:: router:: Router ;
2930use crate :: routing:: gossip:: NetworkGraph ;
@@ -610,24 +611,6 @@ where
610611 ) -> chain:: ChannelMonitorUpdateStatus {
611612 // Determine the proper key for this monitor
612613 let monitor_name = MonitorName :: from ( funding_txo) ;
613- let maybe_old_monitor = self . read_monitor ( & monitor_name) ;
614- match maybe_old_monitor {
615- Ok ( ( _, ref old_monitor) ) => {
616- // Check that this key isn't already storing a monitor with a higher update_id
617- // (collision)
618- if old_monitor. get_latest_update_id ( ) > monitor. get_latest_update_id ( ) {
619- log_error ! (
620- self . logger,
621- "Tried to write a monitor at the same outpoint {} with a higher update_id!" ,
622- monitor_name. as_str( )
623- ) ;
624- return chain:: ChannelMonitorUpdateStatus :: UnrecoverableError ;
625- }
626- }
627- // This means the channel monitor is new.
628- Err ( ref e) if e. kind ( ) == io:: ErrorKind :: NotFound => { }
629- _ => return chain:: ChannelMonitorUpdateStatus :: UnrecoverableError ,
630- }
631614 // Serialize and write the new monitor
632615 let mut monitor_bytes = Vec :: with_capacity (
633616 MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL . len ( ) + monitor. serialized_length ( ) ,
@@ -641,59 +624,6 @@ where
641624 & monitor_bytes,
642625 ) {
643626 Ok ( _) => {
644- // Assess cleanup. Typically, we'll clean up only between the last two known full
645- // monitors.
646- if let Ok ( ( _, old_monitor) ) = maybe_old_monitor {
647- let start = old_monitor. get_latest_update_id ( ) ;
648- let end = if monitor. get_latest_update_id ( ) == CLOSED_CHANNEL_UPDATE_ID {
649- // We don't want to clean the rest of u64, so just do possible pending
650- // updates. Note that we never write updates at
651- // `CLOSED_CHANNEL_UPDATE_ID`.
652- cmp:: min (
653- start. saturating_add ( self . maximum_pending_updates ) ,
654- CLOSED_CHANNEL_UPDATE_ID - 1 ,
655- )
656- } else {
657- monitor. get_latest_update_id ( ) . saturating_sub ( 1 )
658- } ;
659- // We should bother cleaning up only if there's at least one update
660- // expected.
661- for update_id in start..=end {
662- let update_name = UpdateName :: from ( update_id) ;
663- #[ cfg( debug_assertions) ]
664- {
665- if let Ok ( update) =
666- self . read_monitor_update ( & monitor_name, & update_name)
667- {
668- // Assert that we are reading what we think we are.
669- debug_assert_eq ! ( update. update_id, update_name. 0 ) ;
670- } else if update_id != start && monitor. get_latest_update_id ( ) != CLOSED_CHANNEL_UPDATE_ID
671- {
672- // We're deleting something we should know doesn't exist.
673- panic ! (
674- "failed to read monitor update {}" ,
675- update_name. as_str( )
676- ) ;
677- }
678- // On closed channels, we will unavoidably try to read
679- // non-existent updates since we have to guess at the range of
680- // stale updates, so do nothing.
681- }
682- if let Err ( e) = self . kv_store . remove (
683- CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
684- monitor_name. as_str ( ) ,
685- update_name. as_str ( ) ,
686- true ,
687- ) {
688- log_error ! (
689- self . logger,
690- "error cleaning up channel monitor updates for monitor {}, reason: {}" ,
691- monitor_name. as_str( ) ,
692- e
693- ) ;
694- } ;
695- }
696- } ;
697627 chain:: ChannelMonitorUpdateStatus :: Completed
698628 }
699629 Err ( e) => {
@@ -751,8 +681,43 @@ where
751681 }
752682 }
753683 } else {
754- // We could write this update, but it meets criteria of our design that call for a full monitor write.
755- self . persist_new_channel ( funding_txo, monitor, monitor_update_call_id)
684+ let monitor_name = MonitorName :: from ( funding_txo) ;
685+ // In case of channel-close monitor update, we need to read old monitor before persisting
686+ // the new one in order to determine the cleanup range.
687+ let maybe_old_monitor = match monitor. get_latest_update_id ( ) {
688+ CLOSED_CHANNEL_UPDATE_ID => Some ( self . read_monitor ( & monitor_name) ) ,
689+ _ => None
690+ } ;
691+
692+ // We could write this update, but it meets criteria of our design that calls for a full monitor write.
693+ let monitor_update_status = self . persist_new_channel ( funding_txo, monitor, monitor_update_call_id) ;
694+
695+ if let ChannelMonitorUpdateStatus :: Completed = monitor_update_status {
696+ let cleanup_range = if monitor. get_latest_update_id ( ) == CLOSED_CHANNEL_UPDATE_ID {
697+ match maybe_old_monitor {
698+ Some ( Ok ( ( _, ref old_monitor) ) ) => {
699+ let start = old_monitor. get_latest_update_id ( ) ;
700+ // We never persist an update with update_id = CLOSED_CHANNEL_UPDATE_ID
701+ let end = cmp:: min (
702+ start. saturating_add ( self . maximum_pending_updates ) ,
703+ CLOSED_CHANNEL_UPDATE_ID - 1 ,
704+ ) ;
705+ Some ( ( start, end) )
706+ }
707+ _ => None
708+ }
709+ } else {
710+ let end = monitor. get_latest_update_id ( ) ;
711+ let start = end. saturating_sub ( self . maximum_pending_updates ) ;
712+ Some ( ( start, end) )
713+ } ;
714+
715+ if let Some ( ( start, end) ) = cleanup_range {
716+ self . cleanup_in_range ( monitor_name, start, end) ;
717+ }
718+ }
719+
720+ monitor_update_status
756721 }
757722 } else {
758723 // There is no update given, so we must persist a new monitor.
@@ -761,6 +726,34 @@ where
761726 }
762727}
763728
729+ impl < K : Deref , L : Deref , ES : Deref , SP : Deref > MonitorUpdatingPersister < K , L , ES , SP >
730+ where
731+ ES :: Target : EntropySource + Sized ,
732+ K :: Target : KVStore ,
733+ L :: Target : Logger ,
734+ SP :: Target : SignerProvider + Sized
735+ {
736+ // Cleans up monitor updates for given monitor in range `start..=end`.
737+ fn cleanup_in_range ( & self , monitor_name : MonitorName , start : u64 , end : u64 ) {
738+ for update_id in start..=end {
739+ let update_name = UpdateName :: from ( update_id) ;
740+ if let Err ( e) = self . kv_store . remove (
741+ CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
742+ monitor_name. as_str ( ) ,
743+ update_name. as_str ( ) ,
744+ true ,
745+ ) {
746+ log_error ! (
747+ self . logger,
748+ "error cleaning up channel monitor updates for monitor {}, reason: {}" ,
749+ monitor_name. as_str( ) ,
750+ e
751+ ) ;
752+ } ;
753+ }
754+ }
755+ }
756+
764757/// A struct representing a name for a monitor.
765758#[ derive( Debug ) ]
766759struct MonitorName ( String ) ;
@@ -896,20 +889,21 @@ mod tests {
896889 #[ test]
897890 fn persister_with_real_monitors ( ) {
898891 // This value is used later to limit how many iterations we perform.
899- let test_max_pending_updates = 7 ;
892+ let persister_0_max_pending_updates = 7 ;
893+ // Intentionally set this to a smaller value to test a different alignment.
894+ let persister_1_max_pending_updates = 3 ;
900895 let chanmon_cfgs = create_chanmon_cfgs ( 4 ) ;
901896 let persister_0 = MonitorUpdatingPersister {
902897 kv_store : & TestStore :: new ( false ) ,
903898 logger : & TestLogger :: new ( ) ,
904- maximum_pending_updates : test_max_pending_updates ,
899+ maximum_pending_updates : persister_0_max_pending_updates ,
905900 entropy_source : & chanmon_cfgs[ 0 ] . keys_manager ,
906901 signer_provider : & chanmon_cfgs[ 0 ] . keys_manager ,
907902 } ;
908903 let persister_1 = MonitorUpdatingPersister {
909904 kv_store : & TestStore :: new ( false ) ,
910905 logger : & TestLogger :: new ( ) ,
911- // Intentionally set this to a smaller value to test a different alignment.
912- maximum_pending_updates : 3 ,
906+ maximum_pending_updates : persister_1_max_pending_updates,
913907 entropy_source : & chanmon_cfgs[ 1 ] . keys_manager ,
914908 signer_provider : & chanmon_cfgs[ 1 ] . keys_manager ,
915909 } ;
@@ -934,7 +928,6 @@ mod tests {
934928 node_cfgs[ 1 ] . chain_monitor = chain_mon_1;
935929 let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
936930 let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
937-
938931 let broadcaster_0 = & chanmon_cfgs[ 2 ] . tx_broadcaster ;
939932 let broadcaster_1 = & chanmon_cfgs[ 3 ] . tx_broadcaster ;
940933
@@ -957,10 +950,11 @@ mod tests {
957950 for ( _, mon) in persisted_chan_data_0. iter( ) {
958951 // check that when we read it, we got the right update id
959952 assert_eq!( mon. get_latest_update_id( ) , $expected_update_id) ;
960- // if the CM is at the correct update id without updates, ensure no updates are stored
953+
954+ // if the CM is at consolidation threshold, ensure no updates are stored.
961955 let monitor_name = MonitorName :: from( mon. get_funding_txo( ) . 0 ) ;
962- let ( _ , cm_0 ) = persister_0 . read_monitor ( & monitor_name ) . unwrap ( ) ;
963- if cm_0 . get_latest_update_id( ) == $expected_update_id {
956+ if mon . get_latest_update_id ( ) % persister_0_max_pending_updates == 0
957+ || mon . get_latest_update_id( ) == CLOSED_CHANNEL_UPDATE_ID {
964958 assert_eq!(
965959 persister_0. kv_store. list( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
966960 monitor_name. as_str( ) ) . unwrap( ) . len( ) ,
@@ -975,8 +969,9 @@ mod tests {
975969 for ( _, mon) in persisted_chan_data_1. iter( ) {
976970 assert_eq!( mon. get_latest_update_id( ) , $expected_update_id) ;
977971 let monitor_name = MonitorName :: from( mon. get_funding_txo( ) . 0 ) ;
978- let ( _, cm_1) = persister_1. read_monitor( & monitor_name) . unwrap( ) ;
979- if cm_1. get_latest_update_id( ) == $expected_update_id {
972+ // if the CM is at consolidation threshold, ensure no updates are stored.
973+ if mon. get_latest_update_id( ) % persister_1_max_pending_updates == 0
974+ || mon. get_latest_update_id( ) == CLOSED_CHANNEL_UPDATE_ID {
980975 assert_eq!(
981976 persister_1. kv_store. list( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
982977 monitor_name. as_str( ) ) . unwrap( ) . len( ) ,
@@ -1001,7 +996,7 @@ mod tests {
1001996 // Send a few more payments to try all the alignments of max pending updates with
1002997 // updates for a payment sent and received.
1003998 let mut sender = 0 ;
1004- for i in 3 ..=test_max_pending_updates * 2 {
999+ for i in 3 ..=persister_0_max_pending_updates * 2 {
10051000 let receiver;
10061001 if sender == 0 {
10071002 sender = 1 ;
0 commit comments