@@ -34,6 +34,9 @@ use std::thread::JoinHandle;
3434use std:: time:: { Duration , Instant } ;
3535use std:: ops:: Deref ;
3636
37+ #[ cfg( feature = "futures" ) ]
38+ use futures:: { select, future:: FutureExt } ;
39+
3740/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
3841/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
3942/// responsibilities are:
@@ -222,6 +225,203 @@ where A::Target: chain::Access, L::Target: Logger {
222225 }
223226}
224227
228+ macro_rules! define_run_body {
229+ ( $persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
230+ $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
231+ $loop_exit_check: expr, $await: expr)
232+ => { {
233+ let event_handler = DecoratingEventHandler {
234+ event_handler: $event_handler,
235+ gossip_sync: & $gossip_sync,
236+ } ;
237+
238+ log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
239+ $channel_manager. timer_tick_occurred( ) ;
240+
241+ let mut last_freshness_call = Instant :: now( ) ;
242+ let mut last_ping_call = Instant :: now( ) ;
243+ let mut last_prune_call = Instant :: now( ) ;
244+ let mut last_scorer_persist_call = Instant :: now( ) ;
245+ let mut have_pruned = false ;
246+
247+ loop {
248+ $channel_manager. process_pending_events( & event_handler) ;
249+ $chain_monitor. process_pending_events( & event_handler) ;
250+
251+ // Note that the PeerManager::process_events may block on ChannelManager's locks,
252+ // hence it comes last here. When the ChannelManager finishes whatever it's doing,
253+ // we want to ensure we get into `persist_manager` as quickly as we can, especially
254+ // without running the normal event processing above and handing events to users.
255+ //
256+ // Specifically, on an *extremely* slow machine, we may see ChannelManager start
257+ // processing a message effectively at any point during this loop. In order to
258+ // minimize the time between such processing completing and persisting the updated
259+ // ChannelManager, we want to minimize methods blocking on a ChannelManager
260+ // generally, and as a fallback place such blocking only immediately before
261+ // persistence.
262+ $peer_manager. process_events( ) ;
263+
264+ // We wait up to 100ms, but track how long it takes to detect being put to sleep,
265+ // see `await_start`'s use below.
266+ let await_start = Instant :: now( ) ;
267+ let updates_available = $await;
268+ let await_time = await_start. elapsed( ) ;
269+
270+ if updates_available {
271+ log_trace!( $logger, "Persisting ChannelManager..." ) ;
272+ $persister. persist_manager( & * $channel_manager) ?;
273+ log_trace!( $logger, "Done persisting ChannelManager." ) ;
274+ }
275+ // Exit the loop if the background processor was requested to stop.
276+ if $loop_exit_check {
277+ log_trace!( $logger, "Terminating background processor." ) ;
278+ break ;
279+ }
280+ if last_freshness_call. elapsed( ) . as_secs( ) > FRESHNESS_TIMER {
281+ log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred" ) ;
282+ $channel_manager. timer_tick_occurred( ) ;
283+ last_freshness_call = Instant :: now( ) ;
284+ }
285+ if await_time > Duration :: from_secs( 1 ) {
286+ // On various platforms, we may be starved of CPU cycles for several reasons.
287+ // E.g. on iOS, if we've been in the background, we will be entirely paused.
288+ // Similarly, if we're on a desktop platform and the device has been asleep, we
289+ // may not get any cycles.
290+ // We detect this by checking if our max-100ms-sleep, above, ran longer than a
291+ // full second, at which point we assume sockets may have been killed (they
292+ // appear to be at least on some platforms, even if it has only been a second).
293+ // Note that we have to take care to not get here just because user event
294+ // processing was slow at the top of the loop. For example, the sample client
295+ // may call Bitcoin Core RPCs during event handling, which very often takes
296+ // more than a handful of seconds to complete, and shouldn't disconnect all our
297+ // peers.
298+ log_trace!( $logger, "100ms sleep took more than a second, disconnecting peers." ) ;
299+ $peer_manager. disconnect_all_peers( ) ;
300+ last_ping_call = Instant :: now( ) ;
301+ } else if last_ping_call. elapsed( ) . as_secs( ) > PING_TIMER {
302+ log_trace!( $logger, "Calling PeerManager's timer_tick_occurred" ) ;
303+ $peer_manager. timer_tick_occurred( ) ;
304+ last_ping_call = Instant :: now( ) ;
305+ }
306+
307+ // Note that we want to run a graph prune once not long after startup before
308+ // falling back to our usual hourly prunes. This avoids short-lived clients never
309+ // pruning their network graph. We run once 60 seconds after startup before
310+ // continuing our normal cadence.
311+ if last_prune_call. elapsed( ) . as_secs( ) > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
312+ // The network graph must not be pruned while rapid sync completion is pending
313+ log_trace!( $logger, "Assessing prunability of network graph" ) ;
314+ if let Some ( network_graph) = $gossip_sync. prunable_network_graph( ) {
315+ network_graph. remove_stale_channels( ) ;
316+
317+ if let Err ( e) = $persister. persist_graph( network_graph) {
318+ log_error!( $logger, "Error: Failed to persist network graph, check your disk and permissions {}" , e)
319+ }
320+
321+ last_prune_call = Instant :: now( ) ;
322+ have_pruned = true ;
323+ } else {
324+ log_trace!( $logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph." ) ;
325+ }
326+ }
327+
328+ if last_scorer_persist_call. elapsed( ) . as_secs( ) > SCORER_PERSIST_TIMER {
329+ if let Some ( ref scorer) = $scorer {
330+ log_trace!( $logger, "Persisting scorer" ) ;
331+ if let Err ( e) = $persister. persist_scorer( & scorer) {
332+ log_error!( $logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
333+ }
334+ }
335+ last_scorer_persist_call = Instant :: now( ) ;
336+ }
337+ }
338+
339+ // After we exit, ensure we persist the ChannelManager one final time - this avoids
340+ // some races where users quit while channel updates were in-flight, with
341+ // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
342+ $persister. persist_manager( & * $channel_manager) ?;
343+
344+ // Persist Scorer on exit
345+ if let Some ( ref scorer) = $scorer {
346+ $persister. persist_scorer( & scorer) ?;
347+ }
348+
349+ // Persist NetworkGraph on exit
350+ if let Some ( network_graph) = $gossip_sync. network_graph( ) {
351+ $persister. persist_graph( network_graph) ?;
352+ }
353+
354+ Ok ( ( ) )
355+ } }
356+ }
357+
358+ /// Processes background events in a future.
359+ ///
360+ /// `sleeper` should return a future which completes in the given amount of time and returns a
361+ /// boolean indicating whether the background processing should continue. Once `sleeper` returns a
362+ /// future which outputs false, the loop will exit and this function's future will complete.
363+ ///
364+ /// See [`BackgroundProcessor::start`] for information on which actions this handles.
365+ #[ cfg( feature = "futures" ) ]
366+ pub async fn process_events_async <
367+ ' a ,
368+ Signer : ' static + Sign ,
369+ CA : ' static + Deref + Send + Sync ,
370+ CF : ' static + Deref + Send + Sync ,
371+ CW : ' static + Deref + Send + Sync ,
372+ T : ' static + Deref + Send + Sync ,
373+ K : ' static + Deref + Send + Sync ,
374+ F : ' static + Deref + Send + Sync ,
375+ G : ' static + Deref < Target = NetworkGraph < L > > + Send + Sync ,
376+ L : ' static + Deref + Send + Sync ,
377+ P : ' static + Deref + Send + Sync ,
378+ Descriptor : ' static + SocketDescriptor + Send + Sync ,
379+ CMH : ' static + Deref + Send + Sync ,
380+ RMH : ' static + Deref + Send + Sync ,
381+ EH : ' static + EventHandler + Send ,
382+ PS : ' static + Deref + Send ,
383+ M : ' static + Deref < Target = ChainMonitor < Signer , CF , T , F , L , P > > + Send + Sync ,
384+ CM : ' static + Deref < Target = ChannelManager < Signer , CW , T , K , F , L > > + Send + Sync ,
385+ PGS : ' static + Deref < Target = P2PGossipSync < G , CA , L > > + Send + Sync ,
386+ RGS : ' static + Deref < Target = RapidGossipSync < G , L > > + Send ,
387+ UMH : ' static + Deref + Send + Sync ,
388+ PM : ' static + Deref < Target = PeerManager < Descriptor , CMH , RMH , L , UMH > > + Send + Sync ,
389+ S : ' static + Deref < Target = SC > + Send + Sync ,
390+ SC : WriteableScore < ' a > ,
391+ SleepFuture : core:: future:: Future < Output = bool > ,
392+ Sleeper : Fn ( Duration ) -> SleepFuture
393+ > (
394+ persister : PS , event_handler : EH , chain_monitor : M , channel_manager : CM ,
395+ gossip_sync : GossipSync < PGS , RGS , G , CA , L > , peer_manager : PM , logger : L , scorer : Option < S > ,
396+ sleeper : Sleeper ,
397+ ) -> Result < ( ) , std:: io:: Error >
398+ where
399+ CA :: Target : ' static + chain:: Access ,
400+ CF :: Target : ' static + chain:: Filter ,
401+ CW :: Target : ' static + chain:: Watch < Signer > ,
402+ T :: Target : ' static + BroadcasterInterface ,
403+ K :: Target : ' static + KeysInterface < Signer = Signer > ,
404+ F :: Target : ' static + FeeEstimator ,
405+ L :: Target : ' static + Logger ,
406+ P :: Target : ' static + Persist < Signer > ,
407+ CMH :: Target : ' static + ChannelMessageHandler ,
408+ RMH :: Target : ' static + RoutingMessageHandler ,
409+ UMH :: Target : ' static + CustomMessageHandler ,
410+ PS :: Target : ' static + Persister < ' a , Signer , CW , T , K , F , L , SC > ,
411+ {
412+ let mut should_continue = true ;
413+ define_run_body ! ( persister, event_handler, chain_monitor, channel_manager,
414+ gossip_sync, peer_manager, logger, scorer, should_continue, {
415+ select! {
416+ _ = channel_manager. get_persistable_update_future( ) . fuse( ) => true ,
417+ cont = sleeper( Duration :: from_millis( 100 ) ) . fuse( ) => {
418+ should_continue = cont;
419+ false
420+ }
421+ }
422+ } )
423+ }
424+
225425impl BackgroundProcessor {
226426 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
227427 /// documentation].
@@ -315,129 +515,9 @@ impl BackgroundProcessor {
315515 let stop_thread = Arc :: new ( AtomicBool :: new ( false ) ) ;
316516 let stop_thread_clone = stop_thread. clone ( ) ;
317517 let handle = thread:: spawn ( move || -> Result < ( ) , std:: io:: Error > {
318- let event_handler = DecoratingEventHandler {
319- event_handler,
320- gossip_sync : & gossip_sync,
321- } ;
322-
323- log_trace ! ( logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
324- channel_manager. timer_tick_occurred ( ) ;
325-
326- let mut last_freshness_call = Instant :: now ( ) ;
327- let mut last_ping_call = Instant :: now ( ) ;
328- let mut last_prune_call = Instant :: now ( ) ;
329- let mut last_scorer_persist_call = Instant :: now ( ) ;
330- let mut have_pruned = false ;
331-
332- loop {
333- channel_manager. process_pending_events ( & event_handler) ;
334- chain_monitor. process_pending_events ( & event_handler) ;
335-
336- // Note that the PeerManager::process_events may block on ChannelManager's locks,
337- // hence it comes last here. When the ChannelManager finishes whatever it's doing,
338- // we want to ensure we get into `persist_manager` as quickly as we can, especially
339- // without running the normal event processing above and handing events to users.
340- //
341- // Specifically, on an *extremely* slow machine, we may see ChannelManager start
342- // processing a message effectively at any point during this loop. In order to
343- // minimize the time between such processing completing and persisting the updated
344- // ChannelManager, we want to minimize methods blocking on a ChannelManager
345- // generally, and as a fallback place such blocking only immediately before
346- // persistence.
347- peer_manager. process_events ( ) ;
348-
349- // We wait up to 100ms, but track how long it takes to detect being put to sleep,
350- // see `await_start`'s use below.
351- let await_start = Instant :: now ( ) ;
352- let updates_available =
353- channel_manager. await_persistable_update_timeout ( Duration :: from_millis ( 100 ) ) ;
354- let await_time = await_start. elapsed ( ) ;
355-
356- if updates_available {
357- log_trace ! ( logger, "Persisting ChannelManager..." ) ;
358- persister. persist_manager ( & * channel_manager) ?;
359- log_trace ! ( logger, "Done persisting ChannelManager." ) ;
360- }
361- // Exit the loop if the background processor was requested to stop.
362- if stop_thread. load ( Ordering :: Acquire ) == true {
363- log_trace ! ( logger, "Terminating background processor." ) ;
364- break ;
365- }
366- if last_freshness_call. elapsed ( ) . as_secs ( ) > FRESHNESS_TIMER {
367- log_trace ! ( logger, "Calling ChannelManager's timer_tick_occurred" ) ;
368- channel_manager. timer_tick_occurred ( ) ;
369- last_freshness_call = Instant :: now ( ) ;
370- }
371- if await_time > Duration :: from_secs ( 1 ) {
372- // On various platforms, we may be starved of CPU cycles for several reasons.
373- // E.g. on iOS, if we've been in the background, we will be entirely paused.
374- // Similarly, if we're on a desktop platform and the device has been asleep, we
375- // may not get any cycles.
376- // We detect this by checking if our max-100ms-sleep, above, ran longer than a
377- // full second, at which point we assume sockets may have been killed (they
378- // appear to be at least on some platforms, even if it has only been a second).
379- // Note that we have to take care to not get here just because user event
380- // processing was slow at the top of the loop. For example, the sample client
381- // may call Bitcoin Core RPCs during event handling, which very often takes
382- // more than a handful of seconds to complete, and shouldn't disconnect all our
383- // peers.
384- log_trace ! ( logger, "100ms sleep took more than a second, disconnecting peers." ) ;
385- peer_manager. disconnect_all_peers ( ) ;
386- last_ping_call = Instant :: now ( ) ;
387- } else if last_ping_call. elapsed ( ) . as_secs ( ) > PING_TIMER {
388- log_trace ! ( logger, "Calling PeerManager's timer_tick_occurred" ) ;
389- peer_manager. timer_tick_occurred ( ) ;
390- last_ping_call = Instant :: now ( ) ;
391- }
392-
393- // Note that we want to run a graph prune once not long after startup before
394- // falling back to our usual hourly prunes. This avoids short-lived clients never
395- // pruning their network graph. We run once 60 seconds after startup before
396- // continuing our normal cadence.
397- if last_prune_call. elapsed ( ) . as_secs ( ) > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
398- // The network graph must not be pruned while rapid sync completion is pending
399- log_trace ! ( logger, "Assessing prunability of network graph" ) ;
400- if let Some ( network_graph) = gossip_sync. prunable_network_graph ( ) {
401- network_graph. remove_stale_channels ( ) ;
402-
403- if let Err ( e) = persister. persist_graph ( network_graph) {
404- log_error ! ( logger, "Error: Failed to persist network graph, check your disk and permissions {}" , e)
405- }
406-
407- last_prune_call = Instant :: now ( ) ;
408- have_pruned = true ;
409- } else {
410- log_trace ! ( logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph." ) ;
411- }
412- }
413-
414- if last_scorer_persist_call. elapsed ( ) . as_secs ( ) > SCORER_PERSIST_TIMER {
415- if let Some ( ref scorer) = scorer {
416- log_trace ! ( logger, "Persisting scorer" ) ;
417- if let Err ( e) = persister. persist_scorer ( & scorer) {
418- log_error ! ( logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
419- }
420- }
421- last_scorer_persist_call = Instant :: now ( ) ;
422- }
423- }
424-
425- // After we exit, ensure we persist the ChannelManager one final time - this avoids
426- // some races where users quit while channel updates were in-flight, with
427- // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
428- persister. persist_manager ( & * channel_manager) ?;
429-
430- // Persist Scorer on exit
431- if let Some ( ref scorer) = scorer {
432- persister. persist_scorer ( & scorer) ?;
433- }
434-
435- // Persist NetworkGraph on exit
436- if let Some ( network_graph) = gossip_sync. network_graph ( ) {
437- persister. persist_graph ( network_graph) ?;
438- }
439-
440- Ok ( ( ) )
518+ define_run_body ! ( persister, event_handler, chain_monitor, channel_manager,
519+ gossip_sync, peer_manager, logger, scorer, stop_thread. load( Ordering :: Acquire ) ,
520+ channel_manager. await_persistable_update_timeout( Duration :: from_millis( 100 ) ) )
441521 } ) ;
442522 Self { stop_thread : stop_thread_clone, thread_handle : Some ( handle) }
443523 }
0 commit comments