Skip to content

Commit ddd5249

Browse files
committed
Parallelize persistence in the async bg processor
1 parent b356433 commit ddd5249

File tree

3 files changed

+103
-53
lines changed

3 files changed

+103
-53
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 95 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,13 @@ use lightning_rapid_gossip_sync::RapidGossipSync;
7070

7171
use lightning_liquidity::ALiquidityManager;
7272

73+
use core::future::Future;
7374
use core::ops::Deref;
75+
use core::pin::Pin;
7476
use core::time::Duration;
7577

78+
use lightning::util::async_poll::{MultiResultFuturePoller, ResultFuture};
79+
7680
#[cfg(feature = "std")]
7781
use core::sync::atomic::{AtomicBool, Ordering};
7882
#[cfg(feature = "std")]
@@ -627,11 +631,11 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
627631
pub async fn process_events_async<
628632
'a,
629633
UL: 'static + Deref,
630-
CF: 'static + Deref,
631-
T: 'static + Deref,
632-
F: 'static + Deref,
634+
CF: 'static + Deref + Sync,
635+
T: 'static + Deref + Sync,
636+
F: 'static + Deref + Sync,
633637
G: 'static + Deref<Target = NetworkGraph<L>>,
634-
L: 'static + Deref,
638+
L: 'static + Deref + Sync,
635639
P: 'static + Deref,
636640
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
637641
EventHandler: Fn(Event) -> EventHandlerFuture,
@@ -646,10 +650,10 @@ pub async fn process_events_async<
646650
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
647651
PM: 'static + Deref,
648652
LM: 'static + Deref,
649-
D: 'static + Deref,
650-
O: 'static + Deref,
651-
K: 'static + Deref,
652-
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
653+
D: 'static + Deref + Sync,
654+
O: 'static + Deref + Sync,
655+
K: 'static + Deref + Sync,
656+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> + Clone + Send,
653657
S: 'static + Deref<Target = SC> + Send + Sync,
654658
SC: for<'b> WriteableScore<'b>,
655659
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -826,17 +830,27 @@ where
826830
None => {},
827831
}
828832

833+
let mut futures = Vec::new();
834+
829835
// Persist channel manager.
830836
if channel_manager.get_cm().get_and_clear_needs_persistence() {
831837
log_trace!(logger, "Persisting ChannelManager...");
832-
kv_store
833-
.write(
834-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
835-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
836-
CHANNEL_MANAGER_PERSISTENCE_KEY,
837-
&channel_manager.get_cm().encode(),
838-
)
839-
.await?;
838+
let res = kv_store.write(
839+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
840+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
841+
CHANNEL_MANAGER_PERSISTENCE_KEY,
842+
&channel_manager.get_cm().encode(),
843+
);
844+
845+
let fut: Pin<Box<dyn Future<Output = Result<(), (String, bool)>> + Send + 'static>> =
846+
Box::pin(async move {
847+
res.await.map_err(|e| {
848+
(format!("failed to persist channel manager, check your disk and permissions {}", e), true)
849+
})
850+
});
851+
852+
futures.push(ResultFuture::Pending(fut));
853+
840854
log_trace!(logger, "Done persisting ChannelManager.");
841855
}
842856

@@ -864,17 +878,22 @@ where
864878
log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually.");
865879
log_trace!(logger, "Persisting network graph.");
866880
}
867-
if let Err(e) = kv_store
868-
.write(
869-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
870-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
871-
NETWORK_GRAPH_PERSISTENCE_KEY,
872-
&network_graph.encode(),
873-
)
874-
.await
875-
{
876-
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
877-
}
881+
let res = kv_store.write(
882+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
883+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
884+
NETWORK_GRAPH_PERSISTENCE_KEY,
885+
&network_graph.encode(),
886+
);
887+
let fut: Pin<
888+
Box<dyn Future<Output = Result<(), (String, bool)>> + Send + 'static>,
889+
> = Box::pin(async move {
890+
res.await.map_err(|e| {
891+
(format!("failed to persist network graph, check your disk and permissions {}", e), false)
892+
})
893+
});
894+
895+
futures.push(ResultFuture::Pending(fut));
896+
878897
have_pruned = true;
879898
}
880899
let prune_timer =
@@ -901,21 +920,22 @@ where
901920
} else {
902921
log_trace!(logger, "Persisting scorer");
903922
}
904-
if let Err(e) = kv_store
905-
.write(
906-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
907-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
908-
SCORER_PERSISTENCE_KEY,
909-
&scorer.encode(),
910-
)
911-
.await
912-
{
913-
log_error!(
914-
logger,
915-
"Error: Failed to persist scorer, check your disk and permissions {}",
916-
e
917-
);
918-
}
923+
let res = kv_store.write(
924+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
925+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
926+
SCORER_PERSISTENCE_KEY,
927+
&scorer.encode(),
928+
);
929+
let fut: Pin<
930+
Box<dyn Future<Output = Result<(), (String, bool)>> + Send + 'static>,
931+
> =
932+
Box::pin(async move {
933+
res.await.map_err(|e| {
934+
(format!("failed to persist scorer, check your disk and permissions {}", e), false)
935+
})
936+
});
937+
938+
futures.push(ResultFuture::Pending(fut));
919939
}
920940
last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
921941
},
@@ -928,14 +948,36 @@ where
928948
Some(false) => {
929949
log_trace!(logger, "Regenerating sweeper spends if necessary");
930950
if let Some(ref sweeper) = sweeper {
931-
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
951+
let sweeper = sweeper.clone();
952+
let fut: Pin<
953+
Box<dyn Future<Output = Result<(), (String, bool)>> + Send + 'static>,
954+
> = Box::pin(async move {
955+
sweeper.regenerate_and_broadcast_spend_if_necessary().await.map_err(|_| {
956+
(format!("sweeper failed to regenerate and broadcast spends"), false)
957+
})
958+
});
959+
960+
futures.push(ResultFuture::Pending(fut));
932961
}
933962
last_sweeper_call = sleeper(SWEEPER_TIMER);
934963
},
935964
Some(true) => break,
936965
None => {},
937966
}
938967

968+
// Run persistence tasks in parallel.
969+
let multi_res = MultiResultFuturePoller::new(futures).await;
970+
for res in multi_res {
971+
if let Err((msg, exit)) = res {
972+
log_error!(logger, "Error: {}", msg);
973+
974+
if exit {
975+
log_error!(logger, "Exiting background processor");
976+
return Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, msg));
977+
}
978+
}
979+
}
980+
939981
// Onion messenger timer tick.
940982
match check_sleeper(&mut last_onion_message_handler_call) {
941983
Some(false) => {
@@ -1025,9 +1067,9 @@ fn check_sleeper<SleepFuture: core::future::Future<Output = bool> + core::marker
10251067
/// synchronous background persistence.
10261068
pub async fn process_events_async_with_kv_store_sync<
10271069
UL: 'static + Deref,
1028-
CF: 'static + Deref,
1029-
T: 'static + Deref,
1030-
F: 'static + Deref,
1070+
CF: 'static + Deref + Sync,
1071+
T: 'static + Deref + Sync,
1072+
F: 'static + Deref + Sync,
10311073
G: 'static + Deref<Target = NetworkGraph<L>>,
10321074
L: 'static + Deref + Send + Sync,
10331075
P: 'static + Deref,
@@ -1044,10 +1086,13 @@ pub async fn process_events_async_with_kv_store_sync<
10441086
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
10451087
PM: 'static + Deref,
10461088
LM: 'static + Deref,
1047-
D: 'static + Deref,
1048-
O: 'static + Deref,
1049-
K: 'static + Deref,
1050-
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, KVStoreSyncWrapper<K>, L, O>>,
1089+
D: 'static + Deref + Sync,
1090+
O: 'static + Deref + Sync,
1091+
K: 'static + Deref + Sync,
1092+
OS: 'static
1093+
+ Deref<Target = OutputSweeper<T, D, F, CF, KVStoreSyncWrapper<K>, L, O>>
1094+
+ Clone
1095+
+ Send,
10511096
S: 'static + Deref<Target = SC> + Send + Sync,
10521097
SC: for<'b> WriteableScore<'b>,
10531098
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,

lightning/src/util/async_poll.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,21 @@ use core::marker::Unpin;
1515
use core::pin::Pin;
1616
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
1717

18-
pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Unpin> {
18+
/// A future that can be in a pending or ready state, where the ready state contains a `Result`.
19+
pub enum ResultFuture<F: Future<Output = Result<(), E>>, E: Unpin> {
20+
/// The future is still pending and needs to be polled again.
1921
Pending(F),
22+
/// The future has completed and contains a result.
2023
Ready(Result<(), E>),
2124
}
2225

23-
pub(crate) struct MultiResultFuturePoller<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> {
26+
/// A utility to poll multiple futures that return results, collecting their results into a vector.
27+
pub struct MultiResultFuturePoller<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> {
2428
futures_state: Vec<ResultFuture<F, E>>,
2529
}
2630

2731
impl<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> MultiResultFuturePoller<F, E> {
32+
/// Creates a new `MultiResultFuturePoller` with the given futures.
2833
pub fn new(futures_state: Vec<ResultFuture<F, E>>) -> Self {
2934
Self { futures_state }
3035
}

lightning/src/util/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub mod ser;
3232
pub mod sweep;
3333
pub mod wakers;
3434

35-
pub(crate) mod async_poll;
35+
pub mod async_poll;
3636
pub(crate) mod atomic_counter;
3737
pub(crate) mod byte_utils;
3838
pub mod hash_tables;

0 commit comments

Comments
 (0)