Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 34 additions & 31 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,8 +829,7 @@ where
let mut have_pruned = false;
let mut have_decayed_scorer = false;

let mut cur_batch_delay = batch_delay.get();
let mut last_forwards_processing_call = sleeper(cur_batch_delay);
let mut last_forwards_processing_call = sleeper(batch_delay.get());

loop {
channel_manager.get_cm().process_pending_events_async(async_event_handler).await;
Expand All @@ -851,11 +850,11 @@ where
// generally, and as a fallback place such blocking only immediately before
// persistence.
peer_manager.as_ref().process_events();
match check_sleeper(&mut last_forwards_processing_call) {
match check_and_reset_sleeper(&mut last_forwards_processing_call, || {
sleeper(batch_delay.next())
}) {
Some(false) => {
channel_manager.get_cm().process_pending_htlc_forwards();
cur_batch_delay = batch_delay.next();
last_forwards_processing_call = sleeper(cur_batch_delay);
},
Some(true) => break,
None => {},
Expand Down Expand Up @@ -903,19 +902,20 @@ where
}

let await_slow = if mobile_interruptable_platform {
match check_sleeper(&mut await_start.unwrap()) {
// Specify a zero new sleeper timeout because we won't use the new sleeper. It is re-initialized in the next
// loop iteration.
match check_and_reset_sleeper(&mut await_start.unwrap(), || sleeper(Duration::ZERO)) {
Some(true) => break,
Some(false) => true,
None => false,
}
} else {
false
};
match check_sleeper(&mut last_freshness_call) {
match check_and_reset_sleeper(&mut last_freshness_call, || sleeper(FRESHNESS_TIMER)) {
Some(false) => {
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
channel_manager.get_cm().timer_tick_occurred();
last_freshness_call = sleeper(FRESHNESS_TIMER);
},
Some(true) => break,
None => {},
Expand Down Expand Up @@ -947,8 +947,13 @@ where
// pruning their network graph. We run once 60 seconds after startup before
// continuing our normal cadence. For RGS, since 60 seconds is likely too long,
// we prune after an initial sync completes.
let prune_timer = if gossip_sync.prunable_network_graph().is_some() {
NETWORK_PRUNE_TIMER
} else {
FIRST_NETWORK_PRUNE_TIMER
};
let prune_timer_elapsed = {
match check_sleeper(&mut last_prune_call) {
match check_and_reset_sleeper(&mut last_prune_call, || sleeper(prune_timer)) {
Comment on lines +950 to +956
Copy link
Contributor

@elnosh elnosh Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this cause the first prune after 60s to not run in the GossipSync::P2P case? since it will override last_prune_call with sleeper after NETWORK_PRUNE_TIMER.

Edit: ignore, it won't.

Some(false) => true,
Some(true) => break,
None => false,
Expand Down Expand Up @@ -992,9 +997,6 @@ where

have_pruned = true;
}
let prune_timer =
if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
last_prune_call = sleeper(prune_timer);
}
if !have_decayed_scorer {
if let Some(ref scorer) = scorer {
Expand All @@ -1005,7 +1007,9 @@ where
}
have_decayed_scorer = true;
}
match check_sleeper(&mut last_scorer_persist_call) {
match check_and_reset_sleeper(&mut last_scorer_persist_call, || {
sleeper(SCORER_PERSIST_TIMER)
}) {
Some(false) => {
if let Some(ref scorer) = scorer {
if let Some(duration_since_epoch) = fetch_time() {
Expand Down Expand Up @@ -1037,12 +1041,11 @@ where
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
futures.set_c(Box::pin(fut));
}
last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
},
Some(true) => break,
None => {},
}
match check_sleeper(&mut last_sweeper_call) {
match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) {
Some(false) => {
log_trace!(logger, "Regenerating sweeper spends if necessary");
if let Some(ref sweeper) = sweeper {
Expand All @@ -1055,7 +1058,6 @@ where
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
futures.set_d(Box::pin(fut));
}
last_sweeper_call = sleeper(SWEEPER_TIMER);
},
Some(true) => break,
None => {},
Expand All @@ -1066,13 +1068,14 @@ where
res?;
}

match check_sleeper(&mut last_onion_message_handler_call) {
match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
sleeper(ONION_MESSAGE_HANDLER_TIMER)
}) {
Some(false) => {
if let Some(om) = &onion_messenger {
log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
om.get_om().timer_tick_occurred();
}
last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
},
Some(true) => break,
None => {},
Expand All @@ -1096,23 +1099,21 @@ where
peer_manager.as_ref().disconnect_all_peers();
last_ping_call = sleeper(PING_TIMER);
} else {
match check_sleeper(&mut last_ping_call) {
match check_and_reset_sleeper(&mut last_ping_call, || sleeper(PING_TIMER)) {
Some(false) => {
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
peer_manager.as_ref().timer_tick_occurred();
last_ping_call = sleeper(PING_TIMER);
},
Some(true) => break,
_ => {},
}
}

// Rebroadcast pending claims.
match check_sleeper(&mut last_rebroadcast_call) {
match check_and_reset_sleeper(&mut last_rebroadcast_call, || sleeper(REBROADCAST_TIMER)) {
Some(false) => {
log_trace!(logger, "Rebroadcasting monitor's pending claims");
chain_monitor.rebroadcast_pending_claims();
last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
},
Some(true) => break,
None => {},
Expand Down Expand Up @@ -1154,13 +1155,18 @@ where
Ok(())
}

fn check_sleeper<SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin>(
fut: &mut SleepFuture,
fn check_and_reset_sleeper<
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
>(
fut: &mut SleepFuture, mut new_sleeper: impl FnMut() -> SleepFuture,
) -> Option<bool> {
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match core::pin::Pin::new(fut).poll(&mut ctx) {
task::Poll::Ready(exit) => Some(exit),
match core::pin::Pin::new(&mut *fut).poll(&mut ctx) {
task::Poll::Ready(exit) => {
*fut = new_sleeper();
Some(exit)
},
task::Poll::Pending => None,
}
}
Expand Down Expand Up @@ -1486,7 +1492,7 @@ impl BackgroundProcessor {
NETWORK_GRAPH_PERSISTENCE_KEY,
&network_graph.encode(),
) {
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e);
}
have_pruned = true;
}
Expand Down Expand Up @@ -1515,10 +1521,7 @@ impl BackgroundProcessor {
SCORER_PERSISTENCE_KEY,
&scorer.encode(),
) {
log_error!(logger,
"Error: Failed to persist scorer, check your disk and permissions {}",
e,
);
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this similar log msg from the async version can be fixed as well

}
}
last_scorer_persist_call = Instant::now();
Expand Down
Loading