@@ -263,82 +263,67 @@ where C::Target: chain::Filter,
263263 where
264264 FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
265265 {
266- let mut dependent_txdata = Vec :: new ( ) ;
267- {
268- let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
269- if let Some ( height) = best_height {
270- // If the best block height is being updated, update highest_chain_height under the
271- // monitors write lock.
272- let old_height = self . highest_chain_height . load ( Ordering :: Acquire ) ;
273- let new_height = height as usize ;
274- if new_height > old_height {
275- self . highest_chain_height . store ( new_height, Ordering :: Release ) ;
276- }
266+ let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
267+ if let Some ( height) = best_height {
268+ // If the best block height is being updated, update highest_chain_height under the
269+ // monitors write lock.
270+ let old_height = self . highest_chain_height . load ( Ordering :: Acquire ) ;
271+ let new_height = height as usize ;
272+ if new_height > old_height {
273+ self . highest_chain_height . store ( new_height, Ordering :: Release ) ;
277274 }
275+ }
278276
279- for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
280- let monitor = & monitor_state. monitor ;
281- let mut txn_outputs;
282- {
283- txn_outputs = process ( monitor, txdata) ;
284- let update_id = MonitorUpdateId {
285- contents : UpdateOrigin :: ChainSync ( self . sync_persistence_id . get_increment ( ) ) ,
286- } ;
287- let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
288- if let Some ( height) = best_height {
289- if !monitor_state. has_pending_chainsync_updates ( & pending_monitor_updates) {
290- // If there are not ChainSync persists awaiting completion, go ahead and
291- // set last_chain_persist_height here - we wouldn't want the first
292- // TemporaryFailure to always immediately be considered "overly delayed".
293- monitor_state. last_chain_persist_height . store ( height as usize , Ordering :: Release ) ;
294- }
277+ for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
278+ let monitor = & monitor_state. monitor ;
279+ let mut txn_outputs;
280+ {
281+ txn_outputs = process ( monitor, txdata) ;
282+ let update_id = MonitorUpdateId {
283+ contents : UpdateOrigin :: ChainSync ( self . sync_persistence_id . get_increment ( ) ) ,
284+ } ;
285+ let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
286+ if let Some ( height) = best_height {
287+ if !monitor_state. has_pending_chainsync_updates ( & pending_monitor_updates) {
288+ // If there are not ChainSync persists awaiting completion, go ahead and
289+ // set last_chain_persist_height here - we wouldn't want the first
290+ // TemporaryFailure to always immediately be considered "overly delayed".
291+ monitor_state. last_chain_persist_height . store ( height as usize , Ordering :: Release ) ;
295292 }
293+ }
296294
297- log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
298- match self . persister . update_persisted_channel ( * funding_outpoint, & None , monitor, update_id) {
299- Ok ( ( ) ) =>
300- log_trace ! ( self . logger, "Finished syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ,
301- Err ( ChannelMonitorUpdateErr :: PermanentFailure ) => {
302- monitor_state. channel_perm_failed . store ( true , Ordering :: Release ) ;
303- self . pending_monitor_events . lock ( ) . unwrap ( ) . push ( ( * funding_outpoint, vec ! [ MonitorEvent :: UpdateFailed ( * funding_outpoint) ] , monitor. get_counterparty_node_id ( ) ) ) ;
304- } ,
305- Err ( ChannelMonitorUpdateErr :: TemporaryFailure ) => {
306- log_debug ! ( self . logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
307- pending_monitor_updates. push ( update_id) ;
308- } ,
309- }
295+ log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
296+ match self . persister . update_persisted_channel ( * funding_outpoint, & None , monitor, update_id) {
297+ Ok ( ( ) ) =>
298+ log_trace ! ( self . logger, "Finished syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ,
299+ Err ( ChannelMonitorUpdateErr :: PermanentFailure ) => {
300+ monitor_state. channel_perm_failed . store ( true , Ordering :: Release ) ;
301+ self . pending_monitor_events . lock ( ) . unwrap ( ) . push ( ( * funding_outpoint, vec ! [ MonitorEvent :: UpdateFailed ( * funding_outpoint) ] , monitor. get_counterparty_node_id ( ) ) ) ;
302+ } ,
303+ Err ( ChannelMonitorUpdateErr :: TemporaryFailure ) => {
304+ log_debug ! ( self . logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
305+ pending_monitor_updates. push ( update_id) ;
306+ } ,
310307 }
308+ }
311309
312- // Register any new outputs with the chain source for filtering, storing any dependent
313- // transactions from within the block that previously had not been included in txdata.
314- if let Some ( ref chain_source) = self . chain_source {
315- let block_hash = header. block_hash ( ) ;
316- for ( txid, mut outputs) in txn_outputs. drain ( ..) {
317- for ( idx, output) in outputs. drain ( ..) {
318- // Register any new outputs with the chain source for filtering and recurse
319- // if it indicates that there are dependent transactions within the block
320- // that had not been previously included in txdata.
321- let output = WatchedOutput {
322- block_hash : Some ( block_hash) ,
323- outpoint : OutPoint { txid, index : idx as u16 } ,
324- script_pubkey : output. script_pubkey ,
325- } ;
326- if let Some ( tx) = chain_source. register_output ( output) {
327- dependent_txdata. push ( tx) ;
328- }
329- }
310+ // Register any new outputs with the chain source for filtering, storing any dependent
311+ // transactions from within the block that previously had not been included in txdata.
312+ if let Some ( ref chain_source) = self . chain_source {
313+ let block_hash = header. block_hash ( ) ;
314+ for ( txid, mut outputs) in txn_outputs. drain ( ..) {
315+ for ( idx, output) in outputs. drain ( ..) {
316+ // Register any new outputs with the chain source for filtering
317+ let output = WatchedOutput {
318+ block_hash : Some ( block_hash) ,
319+ outpoint : OutPoint { txid, index : idx as u16 } ,
320+ script_pubkey : output. script_pubkey ,
321+ } ;
322+ chain_source. register_output ( output)
330323 }
331324 }
332325 }
333326 }
334-
335- // Recursively call for any dependent transactions that were identified by the chain source.
336- if !dependent_txdata. is_empty ( ) {
337- dependent_txdata. sort_unstable_by_key ( |( index, _tx) | * index) ;
338- dependent_txdata. dedup_by_key ( |( index, _tx) | * index) ;
339- let txdata: Vec < _ > = dependent_txdata. iter ( ) . map ( |( index, tx) | ( * index, tx) ) . collect ( ) ;
340- self . process_chain_data ( header, None , & txdata, process) ; // We skip the best height the second go-around
341- }
342327 }
343328
344329 /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
@@ -746,50 +731,6 @@ mod tests {
746731 use ln:: msgs:: ChannelMessageHandler ;
747732 use util:: errors:: APIError ;
748733 use util:: events:: { ClosureReason , MessageSendEvent , MessageSendEventsProvider } ;
749- use util:: test_utils:: { OnRegisterOutput , TxOutReference } ;
750-
751- /// Tests that in-block dependent transactions are processed by `block_connected` when not
752- /// included in `txdata` but returned by [`chain::Filter::register_output`]. For instance,
753- /// a (non-anchor) commitment transaction's HTLC output may be spent in the same block as the
754- /// commitment transaction itself. An Electrum client may filter the commitment transaction but
755- /// needs to return the HTLC transaction so it can be processed.
756- #[ test]
757- fn connect_block_checks_dependent_transactions ( ) {
758- let chanmon_cfgs = create_chanmon_cfgs ( 2 ) ;
759- let node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
760- let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
761- let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
762- let channel = create_announced_chan_between_nodes (
763- & nodes, 0 , 1 , InitFeatures :: known ( ) , InitFeatures :: known ( ) ) ;
764-
765- // Send a payment, saving nodes[0]'s revoked commitment and HTLC-Timeout transactions.
766- let ( commitment_tx, htlc_tx) = {
767- let payment_preimage = route_payment ( & nodes[ 0 ] , & vec ! ( & nodes[ 1 ] ) [ ..] , 5_000_000 ) . 0 ;
768- let mut txn = get_local_commitment_txn ! ( nodes[ 0 ] , channel. 2 ) ;
769- claim_payment ( & nodes[ 0 ] , & vec ! ( & nodes[ 1 ] ) [ ..] , payment_preimage) ;
770-
771- assert_eq ! ( txn. len( ) , 2 ) ;
772- ( txn. remove ( 0 ) , txn. remove ( 0 ) )
773- } ;
774-
775- // Set expectations on nodes[1]'s chain source to return dependent transactions.
776- let htlc_output = TxOutReference ( commitment_tx. clone ( ) , 0 ) ;
777- let to_local_output = TxOutReference ( commitment_tx. clone ( ) , 1 ) ;
778- let htlc_timeout_output = TxOutReference ( htlc_tx. clone ( ) , 0 ) ;
779- nodes[ 1 ] . chain_source
780- . expect ( OnRegisterOutput { with : htlc_output, returns : Some ( ( 1 , htlc_tx) ) } )
781- . expect ( OnRegisterOutput { with : to_local_output, returns : None } )
782- . expect ( OnRegisterOutput { with : htlc_timeout_output, returns : None } ) ;
783-
784- // Notify nodes[1] that nodes[0]'s revoked commitment transaction was mined. The chain
785- // source should return the dependent HTLC transaction when the HTLC output is registered.
786- mine_transaction ( & nodes[ 1 ] , & commitment_tx) ;
787-
788- // Clean up so uninteresting assertions don't fail.
789- check_added_monitors ! ( nodes[ 1 ] , 1 ) ;
790- nodes[ 1 ] . node . get_and_clear_pending_msg_events ( ) ;
791- nodes[ 1 ] . node . get_and_clear_pending_events ( ) ;
792- }
793734
794735 #[ test]
795736 fn test_async_ooo_offchain_updates ( ) {
0 commit comments