@@ -14,6 +14,7 @@ use lightning::chain::keysinterface::{Sign, KeysInterface};
1414use lightning:: ln:: channelmanager:: ChannelManager ;
1515use lightning:: ln:: msgs:: { ChannelMessageHandler , RoutingMessageHandler } ;
1616use lightning:: ln:: peer_handler:: { PeerManager , SocketDescriptor } ;
17+ use lightning:: util:: events:: { EventHandler , EventsProvider } ;
1718use lightning:: util:: logger:: Logger ;
1819use std:: sync:: Arc ;
1920use std:: sync:: atomic:: { AtomicBool , Ordering } ;
@@ -109,11 +110,12 @@ impl BackgroundProcessor {
109110 Descriptor : ' static + SocketDescriptor + Send + Sync ,
110111 CMH : ' static + Deref + Send + Sync ,
111112 RMH : ' static + Deref + Send + Sync ,
113+ EH : ' static + EventHandler + Send + Sync ,
112114 CMP : ' static + Send + ChannelManagerPersister < Signer , M , T , K , F , L > ,
113115 CM : ' static + Deref < Target = ChannelManager < Signer , M , T , K , F , L > > + Send + Sync ,
114116 PM : ' static + Deref < Target = PeerManager < Descriptor , CMH , RMH , L > > + Send + Sync ,
115117 >
116- ( handler : CMP , channel_manager : CM , peer_manager : PM , logger : L ) -> Self
118+ ( persister : CMP , event_handler : EH , channel_manager : CM , peer_manager : PM , logger : L ) -> Self
117119 where
118120 M :: Target : ' static + chain:: Watch < Signer > ,
119121 T :: Target : ' static + BroadcasterInterface ,
@@ -129,10 +131,11 @@ impl BackgroundProcessor {
129131 let mut current_time = Instant :: now ( ) ;
130132 loop {
131133 peer_manager. process_events ( ) ;
134+ channel_manager. process_pending_events ( & event_handler) ;
132135 let updates_available =
133136 channel_manager. await_persistable_update_timeout ( Duration :: from_millis ( 100 ) ) ;
134137 if updates_available {
135- handler . persist_manager ( & * channel_manager) ?;
138+ persister . persist_manager ( & * channel_manager) ?;
136139 }
137140 // Exit the loop if the background processor was requested to stop.
138141 if stop_thread. load ( Ordering :: Acquire ) == true {
@@ -162,10 +165,8 @@ mod tests {
162165 use bitcoin:: blockdata:: constants:: genesis_block;
163166 use bitcoin:: blockdata:: transaction:: { Transaction , TxOut } ;
164167 use bitcoin:: network:: constants:: Network ;
165- use lightning:: chain;
166- use lightning:: chain:: chaininterface:: { BroadcasterInterface , FeeEstimator } ;
167168 use lightning:: chain:: chainmonitor;
168- use lightning:: chain:: keysinterface:: { Sign , InMemorySigner , KeysInterface , KeysManager } ;
169+ use lightning:: chain:: keysinterface:: { InMemorySigner , KeysInterface , KeysManager } ;
169170 use lightning:: chain:: transaction:: OutPoint ;
170171 use lightning:: get_event_msg;
171172 use lightning:: ln:: channelmanager:: { BestBlock , ChainParameters , ChannelManager , SimpleArcChannelManager } ;
@@ -174,15 +175,14 @@ mod tests {
174175 use lightning:: ln:: peer_handler:: { PeerManager , MessageHandler , SocketDescriptor } ;
175176 use lightning:: util:: config:: UserConfig ;
176177 use lightning:: util:: events:: { Event , MessageSendEventsProvider , MessageSendEvent } ;
177- use lightning:: util:: logger:: Logger ;
178178 use lightning:: util:: ser:: Writeable ;
179179 use lightning:: util:: test_utils;
180180 use lightning_persister:: FilesystemPersister ;
181181 use std:: fs;
182182 use std:: path:: PathBuf ;
183183 use std:: sync:: { Arc , Mutex } ;
184184 use std:: time:: Duration ;
185- use super :: BackgroundProcessor ;
185+ use super :: { BackgroundProcessor , FRESHNESS_TIMER } ;
186186
187187 #[ derive( Clone , Eq , Hash , PartialEq ) ]
188188 struct TestDescriptor { }
@@ -246,13 +246,27 @@ mod tests {
246246 }
247247
248248 macro_rules! open_channel {
249+ ( $node_a: expr, $node_b: expr, $channel_value: expr) => { {
250+ begin_open_channel!( $node_a, $node_b, $channel_value) ;
251+ let events = $node_a. node. get_and_clear_pending_events( ) ;
252+ assert_eq!( events. len( ) , 1 ) ;
253+ let ( temporary_channel_id, tx) = handle_funding_generation_ready!( events[ 0 ] , $channel_value) ;
254+ end_open_channel!( $node_a, $node_b, temporary_channel_id, tx) ;
255+ tx
256+ } }
257+ }
258+
259+ macro_rules! begin_open_channel {
249260 ( $node_a: expr, $node_b: expr, $channel_value: expr) => { {
250261 $node_a. node. create_channel( $node_b. node. get_our_node_id( ) , $channel_value, 100 , 42 , None ) . unwrap( ) ;
251262 $node_b. node. handle_open_channel( & $node_a. node. get_our_node_id( ) , InitFeatures :: known( ) , & get_event_msg!( $node_a, MessageSendEvent :: SendOpenChannel , $node_b. node. get_our_node_id( ) ) ) ;
252263 $node_a. node. handle_accept_channel( & $node_b. node. get_our_node_id( ) , InitFeatures :: known( ) , & get_event_msg!( $node_b, MessageSendEvent :: SendAcceptChannel , $node_a. node. get_our_node_id( ) ) ) ;
253- let events = $node_a. node. get_and_clear_pending_events( ) ;
254- assert_eq!( events. len( ) , 1 ) ;
255- let ( temporary_channel_id, tx) = match events[ 0 ] {
264+ } }
265+ }
266+
267+ macro_rules! handle_funding_generation_ready {
268+ ( $event: expr, $channel_value: expr) => { {
269+ match $event {
256270 Event :: FundingGenerationReady { ref temporary_channel_id, ref channel_value_satoshis, ref output_script, user_channel_id } => {
257271 assert_eq!( * channel_value_satoshis, $channel_value) ;
258272 assert_eq!( user_channel_id, 42 ) ;
@@ -263,12 +277,15 @@ mod tests {
263277 ( * temporary_channel_id, tx)
264278 } ,
265279 _ => panic!( "Unexpected event" ) ,
266- } ;
280+ }
281+ } }
282+ }
267283
268- $node_a. node. funding_transaction_generated( & temporary_channel_id, tx. clone( ) ) . unwrap( ) ;
284+ macro_rules! end_open_channel {
285+ ( $node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => { {
286+ $node_a. node. funding_transaction_generated( & $temporary_channel_id, $tx. clone( ) ) . unwrap( ) ;
269287 $node_b. node. handle_funding_created( & $node_a. node. get_our_node_id( ) , & get_event_msg!( $node_a, MessageSendEvent :: SendFundingCreated , $node_b. node. get_our_node_id( ) ) ) ;
270288 $node_a. node. handle_funding_signed( & $node_b. node. get_our_node_id( ) , & get_event_msg!( $node_b, MessageSendEvent :: SendFundingSigned , $node_a. node. get_our_node_id( ) ) ) ;
271- tx
272289 } }
273290 }
274291
@@ -279,13 +296,16 @@ mod tests {
279296 // re-persistence and is successfully re-persisted.
280297 let nodes = create_nodes ( 2 , "test_background_processor" . to_string ( ) ) ;
281298
299+ // Go through the channel creation process so that each node has something to persist. Since
300+ // open_channel consumes events, it must complete before starting BackgroundProcessor to
301+ // avoid a race with processing events.
302+ let tx = open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 100000 ) ;
303+
282304 // Initiate the background processors to watch each node.
283305 let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
284- let callback = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
285- let bg_processor = BackgroundProcessor :: start ( callback, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
286-
287- // Go through the channel creation process until each node should have something persisted.
288- let tx = open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 100000 ) ;
306+ let persister = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
307+ let event_handler = |_| { } ;
308+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
289309
290310 macro_rules! check_persisted_data {
291311 ( $node: expr, $filepath: expr, $expected_bytes: expr) => {
@@ -336,8 +356,9 @@ mod tests {
336356 // `FRESHNESS_TIMER`.
337357 let nodes = create_nodes ( 1 , "test_timer_tick_called" . to_string ( ) ) ;
338358 let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
339- let callback = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
340- let bg_processor = BackgroundProcessor :: start ( callback, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
359+ let persister = move |node : & ChannelManager < InMemorySigner , Arc < ChainMonitor > , Arc < test_utils:: TestBroadcaster > , Arc < KeysManager > , Arc < test_utils:: TestFeeEstimator > , Arc < test_utils:: TestLogger > > | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
360+ let event_handler = |_| { } ;
361+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
341362 loop {
342363 let log_entries = nodes[ 0 ] . logger . lines . lock ( ) . unwrap ( ) ;
343364 let desired_log = "Calling ChannelManager's and PeerManager's timer_tick_occurred" . to_string ( ) ;
@@ -352,21 +373,37 @@ mod tests {
352373 #[ test]
353374 fn test_persist_error ( ) {
354375 // Test that if we encounter an error during manager persistence, the thread panics.
355- fn persist_manager < Signer , M , T , K , F , L > ( _data : & ChannelManager < Signer , Arc < M > , Arc < T > , Arc < K > , Arc < F > , Arc < L > > ) -> Result < ( ) , std:: io:: Error >
356- where Signer : ' static + Sign ,
357- M : ' static + chain:: Watch < Signer > ,
358- T : ' static + BroadcasterInterface ,
359- K : ' static + KeysInterface < Signer =Signer > ,
360- F : ' static + FeeEstimator ,
361- L : ' static + Logger ,
362- {
363- Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , "test" ) )
364- }
365-
366376 let nodes = create_nodes ( 2 , "test_persist_error" . to_string ( ) ) ;
367- let bg_processor = BackgroundProcessor :: start ( persist_manager, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
368377 open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 100000 ) ;
369378
379+ let persister = |_: & _ | Err ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , "test" ) ) ;
380+ let event_handler = |_| { } ;
381+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
370382 let _ = bg_processor. thread_handle . join ( ) . unwrap ( ) . expect_err ( "Errored persisting manager: test" ) ;
371383 }
384+
385+ #[ test]
386+ fn test_background_event_handling ( ) {
387+ let nodes = create_nodes ( 2 , "test_background_event_handling" . to_string ( ) ) ;
388+ let channel_value = 100000 ;
389+ let data_dir = nodes[ 0 ] . persister . get_data_dir ( ) ;
390+ let persister = move |node : & _ | FilesystemPersister :: persist_manager ( data_dir. clone ( ) , node) ;
391+
392+ // Set up a background event handler for FundingGenerationReady events.
393+ let ( sender, receiver) = std:: sync:: mpsc:: sync_channel ( 1 ) ;
394+ let event_handler = move |event| {
395+ sender. send ( handle_funding_generation_ready ! ( event, channel_value) ) . unwrap ( ) ;
396+ } ;
397+ let bg_processor = BackgroundProcessor :: start ( persister, event_handler, nodes[ 0 ] . node . clone ( ) , nodes[ 0 ] . peer_manager . clone ( ) , nodes[ 0 ] . logger . clone ( ) ) ;
398+
399+ // Open a channel and check that the FundingGenerationReady event was handled.
400+ begin_open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , channel_value) ;
401+ let timeout = Duration :: from_secs ( 5 * FRESHNESS_TIMER ) ;
402+ let ( temporary_channel_id, tx) = receiver
403+ . recv_timeout ( timeout)
404+ . expect ( "FundingGenerationReady not handled within deadline" ) ;
405+ end_open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , temporary_channel_id, tx) ;
406+
407+ assert ! ( bg_processor. stop( ) . is_ok( ) ) ;
408+ }
372409}
0 commit comments