diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index f53023a0635..423e0242ef2 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -829,8 +829,7 @@ where let mut have_pruned = false; let mut have_decayed_scorer = false; - let mut cur_batch_delay = batch_delay.get(); - let mut last_forwards_processing_call = sleeper(cur_batch_delay); + let mut last_forwards_processing_call = sleeper(batch_delay.get()); loop { channel_manager.get_cm().process_pending_events_async(async_event_handler).await; @@ -851,11 +850,11 @@ where // generally, and as a fallback place such blocking only immediately before // persistence. peer_manager.as_ref().process_events(); - match check_sleeper(&mut last_forwards_processing_call) { + match check_and_reset_sleeper(&mut last_forwards_processing_call, || { + sleeper(batch_delay.next()) + }) { Some(false) => { channel_manager.get_cm().process_pending_htlc_forwards(); - cur_batch_delay = batch_delay.next(); - last_forwards_processing_call = sleeper(cur_batch_delay); }, Some(true) => break, None => {}, @@ -903,7 +902,9 @@ where } let await_slow = if mobile_interruptable_platform { - match check_sleeper(&mut await_start.unwrap()) { + // Specify a zero new sleeper timeout because we won't use the new sleeper. It is re-initialized in the next + // loop iteration. + match check_and_reset_sleeper(&mut await_start.unwrap(), || sleeper(Duration::ZERO)) { Some(true) => break, Some(false) => true, None => false, @@ -911,11 +912,10 @@ where } else { false }; - match check_sleeper(&mut last_freshness_call) { + match check_and_reset_sleeper(&mut last_freshness_call, || sleeper(FRESHNESS_TIMER)) { Some(false) => { log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); channel_manager.get_cm().timer_tick_occurred(); - last_freshness_call = sleeper(FRESHNESS_TIMER); }, Some(true) => break, None => {}, @@ -947,8 +947,13 @@ where // pruning their network graph. We run once 60 seconds after startup before // continuing our normal cadence. For RGS, since 60 seconds is likely too long, // we prune after an initial sync completes. + let prune_timer = if gossip_sync.prunable_network_graph().is_some() { + NETWORK_PRUNE_TIMER + } else { + FIRST_NETWORK_PRUNE_TIMER + }; let prune_timer_elapsed = { - match check_sleeper(&mut last_prune_call) { + match check_and_reset_sleeper(&mut last_prune_call, || sleeper(prune_timer)) { Some(false) => true, Some(true) => break, None => false, @@ -992,9 +997,6 @@ where have_pruned = true; } - let prune_timer = - if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; - last_prune_call = sleeper(prune_timer); } if !have_decayed_scorer { if let Some(ref scorer) = scorer { @@ -1005,7 +1007,9 @@ where } have_decayed_scorer = true; } - match check_sleeper(&mut last_scorer_persist_call) { + match check_and_reset_sleeper(&mut last_scorer_persist_call, || { + sleeper(SCORER_PERSIST_TIMER) + }) { Some(false) => { if let Some(ref scorer) = scorer { if let Some(duration_since_epoch) = fetch_time() { @@ -1037,12 +1041,11 @@ where // TODO: Once our MSRV is 1.68 we should be able to drop the Box futures.set_c(Box::pin(fut)); } - last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER); }, Some(true) => break, None => {}, } - match check_sleeper(&mut last_sweeper_call) { + match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) { Some(false) => { log_trace!(logger, "Regenerating sweeper spends if necessary"); if let Some(ref sweeper) = sweeper { @@ -1055,7 +1058,6 @@ where // TODO: Once our MSRV is 1.68 we should be able to drop the Box futures.set_d(Box::pin(fut)); } - last_sweeper_call = sleeper(SWEEPER_TIMER); }, Some(true) => break, None => {}, @@ -1066,13 +1068,14 @@ where res?; } - match check_sleeper(&mut last_onion_message_handler_call) { + match check_and_reset_sleeper(&mut last_onion_message_handler_call, || { + sleeper(ONION_MESSAGE_HANDLER_TIMER) + }) { Some(false) => { if let Some(om) = &onion_messenger { log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred"); om.get_om().timer_tick_occurred(); } - last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER); }, Some(true) => break, None => {}, @@ -1096,11 +1099,10 @@ where peer_manager.as_ref().disconnect_all_peers(); last_ping_call = sleeper(PING_TIMER); } else { - match check_sleeper(&mut last_ping_call) { + match check_and_reset_sleeper(&mut last_ping_call, || sleeper(PING_TIMER)) { Some(false) => { log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); peer_manager.as_ref().timer_tick_occurred(); - last_ping_call = sleeper(PING_TIMER); }, Some(true) => break, _ => {}, @@ -1108,11 +1110,10 @@ where } // Rebroadcast pending claims. - match check_sleeper(&mut last_rebroadcast_call) { + match check_and_reset_sleeper(&mut last_rebroadcast_call, || sleeper(REBROADCAST_TIMER)) { Some(false) => { log_trace!(logger, "Rebroadcasting monitor's pending claims"); chain_monitor.rebroadcast_pending_claims(); - last_rebroadcast_call = sleeper(REBROADCAST_TIMER); }, Some(true) => break, None => {}, @@ -1154,13 +1155,18 @@ where Ok(()) } -fn check_sleeper + core::marker::Unpin>( - fut: &mut SleepFuture, +fn check_and_reset_sleeper< + SleepFuture: core::future::Future + core::marker::Unpin, +>( + fut: &mut SleepFuture, mut new_sleeper: impl FnMut() -> SleepFuture, ) -> Option { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(fut).poll(&mut ctx) { - task::Poll::Ready(exit) => Some(exit), + match core::pin::Pin::new(&mut *fut).poll(&mut ctx) { + task::Poll::Ready(exit) => { + *fut = new_sleeper(); + Some(exit) + }, task::Poll::Pending => None, } } @@ -1486,7 +1492,7 @@ impl BackgroundProcessor { NETWORK_GRAPH_PERSISTENCE_KEY, &network_graph.encode(), ) { - log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e); + log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e); } have_pruned = true; } @@ -1515,10 +1521,7 @@ impl BackgroundProcessor { SCORER_PERSISTENCE_KEY, &scorer.encode(), ) { - log_error!(logger, - "Error: Failed to persist scorer, check your disk and permissions {}", - e, - ); + log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e); } } last_scorer_persist_call = Instant::now();