@@ -400,28 +400,34 @@ where
400400/// If you have many stale updates stored (such as after a crash with pending lazy deletes), and
401401/// would like to get rid of them, consider using the
402402/// [`MonitorUpdatingPersister::cleanup_stale_updates`] function.
403- pub struct MonitorUpdatingPersister < K : Deref , L : Deref , ES : Deref , SP : Deref >
403+ pub struct MonitorUpdatingPersister < K : Deref , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
404404where
405405 K :: Target : KVStore ,
406406 L :: Target : Logger ,
407407 ES :: Target : EntropySource + Sized ,
408408 SP :: Target : SignerProvider + Sized ,
409+ BI :: Target : BroadcasterInterface ,
410+ FE :: Target : FeeEstimator
409411{
410412 kv_store : K ,
411413 logger : L ,
412414 maximum_pending_updates : u64 ,
413415 entropy_source : ES ,
414416 signer_provider : SP ,
417+ broadcaster : BI ,
418+ fee_estimator : FE
415419}
416420
417421#[ allow( dead_code) ]
418- impl < K : Deref , L : Deref , ES : Deref , SP : Deref >
419- MonitorUpdatingPersister < K , L , ES , SP >
422+ impl < K : Deref , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
423+ MonitorUpdatingPersister < K , L , ES , SP , BI , FE >
420424where
421425 K :: Target : KVStore ,
422426 L :: Target : Logger ,
423427 ES :: Target : EntropySource + Sized ,
424428 SP :: Target : SignerProvider + Sized ,
429+ BI :: Target : BroadcasterInterface ,
430+ FE :: Target : FeeEstimator
425431{
426432 /// Constructs a new [`MonitorUpdatingPersister`].
427433 ///
@@ -441,14 +447,16 @@ where
441447 /// [`MonitorUpdatingPersister::cleanup_stale_updates`].
442448 pub fn new (
443449 kv_store : K , logger : L , maximum_pending_updates : u64 , entropy_source : ES ,
444- signer_provider : SP ,
450+ signer_provider : SP , broadcaster : BI , fee_estimator : FE
445451 ) -> Self {
446452 MonitorUpdatingPersister {
447453 kv_store,
448454 logger,
449455 maximum_pending_updates,
450456 entropy_source,
451457 signer_provider,
458+ broadcaster,
459+ fee_estimator
452460 }
453461 }
454462
@@ -639,13 +647,15 @@ where
639647 }
640648}
641649
642- impl < ChannelSigner : EcdsaChannelSigner , K : Deref , L : Deref , ES : Deref , SP : Deref >
643- Persist < ChannelSigner > for MonitorUpdatingPersister < K , L , ES , SP >
650+ impl < ChannelSigner : EcdsaChannelSigner , K : Deref , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref >
651+ Persist < ChannelSigner > for MonitorUpdatingPersister < K , L , ES , SP , BI , FE >
644652where
645653 K :: Target : KVStore ,
646654 L :: Target : Logger ,
647655 ES :: Target : EntropySource + Sized ,
648656 SP :: Target : SignerProvider + Sized ,
657+ BI :: Target : BroadcasterInterface ,
658+ FE :: Target : FeeEstimator
649659{
650660 /// Persists a new channel. This means writing the entire monitor to the
651661 /// parametrized [`KVStore`].
@@ -770,13 +780,53 @@ where
770780 Ok ( ( _block_hash, monitor) ) => monitor,
771781 Err ( _) => return
772782 } ;
783+
784+ let mut current_update_id = monitor. get_latest_update_id ( ) ;
785+ loop {
786+ current_update_id = match current_update_id. checked_add ( 1 ) {
787+ Some ( next_update_id) => next_update_id,
788+ None => break ,
789+ } ;
790+ let update_name = UpdateName :: from ( current_update_id) ;
791+ let update = match self . read_monitor_update ( & monitor_name, & update_name) {
792+ Ok ( update) => update,
793+ Err ( err) if err. kind ( ) == io:: ErrorKind :: NotFound => {
794+ // We can't find any more updates, so we are done.
795+ break ;
796+ }
797+ Err ( err) => {
798+ log_error ! (
799+ self . logger,
800+ "Monitor update read failed. monitor: {} update: {} reason: {:?}" ,
801+ monitor_name. as_str( ) ,
802+ update_name. as_str( ) ,
803+ err
804+ ) ;
805+ panic ! ( )
806+ } ,
807+ } ;
808+
809+ monitor. update_monitor ( & update, & self . broadcaster , & self . fee_estimator , & self . logger )
810+ . map_err ( |e| {
811+ log_error ! (
812+ self . logger,
813+ "Monitor update failed. monitor: {} update: {} reason: {:?}" ,
814+ monitor_name. as_str( ) ,
815+ update_name. as_str( ) ,
816+ e
817+ ) ;
818+ io:: Error :: new ( io:: ErrorKind :: Other , "Monitor update failed" )
819+ } )
820+ . expect ( "Could not apply monitor update during archiving" ) ;
821+ }
822+
773823 match self . kv_store . write (
774824 ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
775825 ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
776826 monitor_name. as_str ( ) ,
777- & monitor. encode ( )
827+ & monitor. encode ( ) ,
778828 ) {
779- Ok ( ( ) ) => { } ,
829+ Ok ( ( ) ) => { }
780830 Err ( _e) => return ,
781831 } ;
782832 let _ = self . kv_store . remove (
@@ -788,12 +838,14 @@ where
788838 }
789839}
790840
791- impl < K : Deref , L : Deref , ES : Deref , SP : Deref > MonitorUpdatingPersister < K , L , ES , SP >
841+ impl < K : Deref , L : Deref , ES : Deref , SP : Deref , BI : Deref , FE : Deref > MonitorUpdatingPersister < K , L , ES , SP , BI , FE >
792842where
793843 ES :: Target : EntropySource + Sized ,
794844 K :: Target : KVStore ,
795845 L :: Target : Logger ,
796- SP :: Target : SignerProvider + Sized
846+ SP :: Target : SignerProvider + Sized ,
847+ BI :: Target : BroadcasterInterface ,
848+ FE :: Target : FeeEstimator
797849{
798850 // Cleans up monitor updates for given monitor in range `start..=end`.
799851 fn cleanup_in_range ( & self , monitor_name : MonitorName , start : u64 , end : u64 ) {
@@ -962,13 +1014,17 @@ mod tests {
9621014 maximum_pending_updates : persister_0_max_pending_updates,
9631015 entropy_source : & chanmon_cfgs[ 0 ] . keys_manager ,
9641016 signer_provider : & chanmon_cfgs[ 0 ] . keys_manager ,
1017+ broadcaster : & chanmon_cfgs[ 0 ] . tx_broadcaster ,
1018+ fee_estimator : & chanmon_cfgs[ 0 ] . fee_estimator ,
9651019 } ;
9661020 let persister_1 = MonitorUpdatingPersister {
9671021 kv_store : & TestStore :: new ( false ) ,
9681022 logger : & TestLogger :: new ( ) ,
9691023 maximum_pending_updates : persister_1_max_pending_updates,
9701024 entropy_source : & chanmon_cfgs[ 1 ] . keys_manager ,
9711025 signer_provider : & chanmon_cfgs[ 1 ] . keys_manager ,
1026+ broadcaster : & chanmon_cfgs[ 0 ] . tx_broadcaster ,
1027+ fee_estimator : & chanmon_cfgs[ 0 ] . fee_estimator ,
9721028 } ;
9731029 let mut node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
9741030 let chain_mon_0 = test_utils:: TestChainMonitor :: new (
@@ -1129,6 +1185,8 @@ mod tests {
11291185 maximum_pending_updates : 11 ,
11301186 entropy_source : node_cfgs[ 0 ] . keys_manager ,
11311187 signer_provider : node_cfgs[ 0 ] . keys_manager ,
1188+ broadcaster : node_cfgs[ 0 ] . tx_broadcaster ,
1189+ fee_estimator : node_cfgs[ 0 ] . fee_estimator ,
11321190 } ;
11331191 match ro_persister. persist_new_channel ( test_txo, & added_monitors[ 0 ] . 1 ) {
11341192 ChannelMonitorUpdateStatus :: UnrecoverableError => {
@@ -1168,13 +1226,17 @@ mod tests {
11681226 maximum_pending_updates : test_max_pending_updates,
11691227 entropy_source : & chanmon_cfgs[ 0 ] . keys_manager ,
11701228 signer_provider : & chanmon_cfgs[ 0 ] . keys_manager ,
1229+ broadcaster : & chanmon_cfgs[ 0 ] . tx_broadcaster ,
1230+ fee_estimator : & chanmon_cfgs[ 0 ] . fee_estimator ,
11711231 } ;
11721232 let persister_1 = MonitorUpdatingPersister {
11731233 kv_store : & TestStore :: new ( false ) ,
11741234 logger : & TestLogger :: new ( ) ,
11751235 maximum_pending_updates : test_max_pending_updates,
11761236 entropy_source : & chanmon_cfgs[ 1 ] . keys_manager ,
11771237 signer_provider : & chanmon_cfgs[ 1 ] . keys_manager ,
1238+ broadcaster : & chanmon_cfgs[ 0 ] . tx_broadcaster ,
1239+ fee_estimator : & chanmon_cfgs[ 0 ] . fee_estimator ,
11781240 } ;
11791241 let mut node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
11801242 let chain_mon_0 = test_utils:: TestChainMonitor :: new (
0 commit comments