Skip to content

Commit b68a228

Browse files
committed
Parallelize persistence in the async bg processor
1 parent b356433 commit b68a228

File tree

3 files changed

+134
-57
lines changed

3 files changed

+134
-57
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 121 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ extern crate core;
2121

2222
#[cfg(not(feature = "std"))]
2323
extern crate alloc;
24+
#[cfg(not(feature = "std"))]
25+
use alloc::format;
26+
#[cfg(not(feature = "std"))]
27+
use alloc::vec::Vec;
2428

2529
#[macro_use]
2630
extern crate lightning;
@@ -70,9 +74,13 @@ use lightning_rapid_gossip_sync::RapidGossipSync;
7074

7175
use lightning_liquidity::ALiquidityManager;
7276

77+
use core::future::Future;
7378
use core::ops::Deref;
79+
use core::pin::Pin;
7480
use core::time::Duration;
7581

82+
use lightning::util::async_poll::{MaybeSync, MultiResultFuturePoller, ResultFuture};
83+
7684
#[cfg(feature = "std")]
7785
use core::sync::atomic::{AtomicBool, Ordering};
7886
#[cfg(feature = "std")]
@@ -627,11 +635,11 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
627635
pub async fn process_events_async<
628636
'a,
629637
UL: 'static + Deref,
630-
CF: 'static + Deref,
631-
T: 'static + Deref,
632-
F: 'static + Deref,
638+
CF: 'static + Deref + Send + Sync,
639+
T: 'static + Deref + Send + Sync,
640+
F: 'static + Deref + Send + Sync,
633641
G: 'static + Deref<Target = NetworkGraph<L>>,
634-
L: 'static + Deref,
642+
L: 'static + Deref + Send + Sync,
635643
P: 'static + Deref,
636644
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
637645
EventHandler: Fn(Event) -> EventHandlerFuture,
@@ -646,10 +654,10 @@ pub async fn process_events_async<
646654
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
647655
PM: 'static + Deref,
648656
LM: 'static + Deref,
649-
D: 'static + Deref,
650-
O: 'static + Deref,
651-
K: 'static + Deref,
652-
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
657+
D: 'static + Deref + Send + Sync,
658+
O: 'static + Deref + Send + Sync,
659+
K: 'static + Deref + Send + Sync,
660+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> + Clone + Send,
653661
S: 'static + Deref<Target = SC> + Send + Sync,
654662
SC: for<'b> WriteableScore<'b>,
655663
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -674,7 +682,7 @@ where
674682
PM::Target: APeerManager,
675683
LM::Target: ALiquidityManager,
676684
O::Target: 'static + OutputSpender,
677-
D::Target: 'static + ChangeDestinationSource,
685+
D::Target: 'static + ChangeDestinationSource + MaybeSync,
678686
K::Target: 'static + KVStore,
679687
{
680688
let async_event_handler = |event| {
@@ -826,17 +834,24 @@ where
826834
None => {},
827835
}
828836

837+
let mut futures = Vec::new();
838+
829839
// Persist channel manager.
830840
if channel_manager.get_cm().get_and_clear_needs_persistence() {
831841
log_trace!(logger, "Persisting ChannelManager...");
832-
kv_store
833-
.write(
834-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
835-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
836-
CHANNEL_MANAGER_PERSISTENCE_KEY,
837-
&channel_manager.get_cm().encode(),
838-
)
839-
.await?;
842+
let res = kv_store.write(
843+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
844+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
845+
CHANNEL_MANAGER_PERSISTENCE_KEY,
846+
&channel_manager.get_cm().encode(),
847+
);
848+
849+
let fut: Pin<
850+
Box<dyn Future<Output = Result<(), (lightning::io::Error, bool)>> + Send + 'static>,
851+
> = Box::pin(async move { res.await.map_err(|e| (e, true)) });
852+
853+
futures.push(ResultFuture::Pending(fut));
854+
840855
log_trace!(logger, "Done persisting ChannelManager.");
841856
}
842857

@@ -864,17 +879,29 @@ where
864879
log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually.");
865880
log_trace!(logger, "Persisting network graph.");
866881
}
867-
if let Err(e) = kv_store
868-
.write(
869-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
870-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
871-
NETWORK_GRAPH_PERSISTENCE_KEY,
872-
&network_graph.encode(),
873-
)
874-
.await
875-
{
876-
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
877-
}
882+
let res = kv_store.write(
883+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
884+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
885+
NETWORK_GRAPH_PERSISTENCE_KEY,
886+
&network_graph.encode(),
887+
);
888+
let fut: Pin<
889+
Box<
890+
dyn Future<Output = Result<(), (lightning::io::Error, bool)>>
891+
+ Send
892+
+ 'static,
893+
>,
894+
> = Box::pin(async move {
895+
res.await.map_err(|e| {
896+
(lightning::io::Error::new(
897+
lightning::io::ErrorKind::Other,
898+
format!("failed to persist network graph, check your disk and permissions {}", e)),
899+
false)
900+
})
901+
});
902+
903+
futures.push(ResultFuture::Pending(fut));
904+
878905
have_pruned = true;
879906
}
880907
let prune_timer =
@@ -901,21 +928,28 @@ where
901928
} else {
902929
log_trace!(logger, "Persisting scorer");
903930
}
904-
if let Err(e) = kv_store
905-
.write(
906-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
907-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
908-
SCORER_PERSISTENCE_KEY,
909-
&scorer.encode(),
910-
)
911-
.await
912-
{
913-
log_error!(
914-
logger,
915-
"Error: Failed to persist scorer, check your disk and permissions {}",
916-
e
917-
);
918-
}
931+
let res = kv_store.write(
932+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
933+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
934+
SCORER_PERSISTENCE_KEY,
935+
&scorer.encode(),
936+
);
937+
let fut: Pin<
938+
Box<
939+
dyn Future<Output = Result<(), (lightning::io::Error, bool)>>
940+
+ Send
941+
+ 'static,
942+
>,
943+
> = Box::pin(async move {
944+
res.await.map_err(|e| {
945+
(lightning::io::Error::new(
946+
lightning::io::ErrorKind::Other,
947+
format!("failed to persist scorer, check your disk and permissions {}", e)),
948+
false)
949+
})
950+
});
951+
952+
futures.push(ResultFuture::Pending(fut));
919953
}
920954
last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
921955
},
@@ -928,14 +962,46 @@ where
928962
Some(false) => {
929963
log_trace!(logger, "Regenerating sweeper spends if necessary");
930964
if let Some(ref sweeper) = sweeper {
931-
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
965+
let sweeper = sweeper.clone();
966+
let fut: Pin<
967+
Box<
968+
dyn Future<Output = Result<(), (lightning::io::Error, bool)>>
969+
+ Send
970+
+ 'static,
971+
>,
972+
> = Box::pin(async move {
973+
sweeper.regenerate_and_broadcast_spend_if_necessary().await.map_err(|_| {
974+
(
975+
lightning::io::Error::new(
976+
lightning::io::ErrorKind::Other,
977+
"failed to persist sweeper, check your disk and permissions",
978+
),
979+
false,
980+
)
981+
})
982+
});
983+
984+
futures.push(ResultFuture::Pending(fut));
932985
}
933986
last_sweeper_call = sleeper(SWEEPER_TIMER);
934987
},
935988
Some(true) => break,
936989
None => {},
937990
}
938991

992+
// Run persistence tasks in parallel.
993+
let multi_res = MultiResultFuturePoller::new(futures).await;
994+
for res in multi_res {
995+
if let Err((e, exit)) = res {
996+
log_error!(logger, "Error: {}", e);
997+
998+
if exit {
999+
log_error!(logger, "Exiting background processor");
1000+
return Err(e);
1001+
}
1002+
}
1003+
}
1004+
9391005
// Onion messenger timer tick.
9401006
match check_sleeper(&mut last_onion_message_handler_call) {
9411007
Some(false) => {
@@ -1025,9 +1091,9 @@ fn check_sleeper<SleepFuture: core::future::Future<Output = bool> + core::marker
10251091
/// synchronous background persistence.
10261092
pub async fn process_events_async_with_kv_store_sync<
10271093
UL: 'static + Deref,
1028-
CF: 'static + Deref,
1029-
T: 'static + Deref,
1030-
F: 'static + Deref,
1094+
CF: 'static + Deref + Send + Sync,
1095+
T: 'static + Deref + Send + Sync,
1096+
F: 'static + Deref + Send + Sync,
10311097
G: 'static + Deref<Target = NetworkGraph<L>>,
10321098
L: 'static + Deref + Send + Sync,
10331099
P: 'static + Deref,
@@ -1044,10 +1110,13 @@ pub async fn process_events_async_with_kv_store_sync<
10441110
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
10451111
PM: 'static + Deref,
10461112
LM: 'static + Deref,
1047-
D: 'static + Deref,
1048-
O: 'static + Deref,
1049-
K: 'static + Deref,
1050-
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, KVStoreSyncWrapper<K>, L, O>>,
1113+
D: 'static + Deref + Send + Sync,
1114+
O: 'static + Deref + Send + Sync,
1115+
K: 'static + Deref + Send + Sync,
1116+
OS: 'static
1117+
+ Deref<Target = OutputSweeper<T, D, F, CF, KVStoreSyncWrapper<K>, L, O>>
1118+
+ Clone
1119+
+ Send,
10511120
S: 'static + Deref<Target = SC> + Send + Sync,
10521121
SC: for<'b> WriteableScore<'b>,
10531122
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
@@ -1072,7 +1141,7 @@ where
10721141
PM::Target: APeerManager,
10731142
LM::Target: ALiquidityManager,
10741143
O::Target: 'static + OutputSpender,
1075-
D::Target: 'static + ChangeDestinationSource,
1144+
D::Target: 'static + ChangeDestinationSource + MaybeSync,
10761145
K::Target: 'static + KVStoreSync,
10771146
{
10781147
let kv_store = KVStoreSyncWrapper(kv_store);

lightning/src/util/async_poll.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,21 @@ use core::marker::Unpin;
1515
use core::pin::Pin;
1616
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
1717

18-
pub(crate) enum ResultFuture<F: Future<Output = Result<(), E>>, E: Unpin> {
18+
/// A future that can be in a pending or ready state, where the ready state contains a `Result`.
19+
pub enum ResultFuture<F: Future<Output = Result<(), E>>, E: Unpin> {
20+
/// The future is still pending and needs to be polled again.
1921
Pending(F),
22+
/// The future has completed and contains a result.
2023
Ready(Result<(), E>),
2124
}
2225

23-
pub(crate) struct MultiResultFuturePoller<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> {
26+
/// A utility to poll multiple futures that return results, collecting their results into a vector.
27+
pub struct MultiResultFuturePoller<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> {
2428
futures_state: Vec<ResultFuture<F, E>>,
2529
}
2630

2731
impl<F: Future<Output = Result<(), E>> + Unpin, E: Unpin> MultiResultFuturePoller<F, E> {
32+
/// Creates a new `MultiResultFuturePoller` with the given futures.
2833
pub fn new(futures_state: Vec<ResultFuture<F, E>>) -> Self {
2934
Self { futures_state }
3035
}
@@ -95,21 +100,24 @@ pub(crate) fn dummy_waker() -> Waker {
95100
#[cfg(feature = "std")]
96101
pub type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = Result<T, ()>> + 'a + Send>>;
97102
#[cfg(not(feature = "std"))]
103+
/// A type alias for a future that returns a result of type T.
98104
pub type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = Result<T, ()>> + 'a>>;
99105

100-
// Marker trait to optionally implement `Sync` under std.
106+
/// Marker trait to optionally implement `Sync` under std.
101107
#[cfg(feature = "std")]
102108
pub use core::marker::Sync as MaybeSync;
103109

104110
#[cfg(not(feature = "std"))]
111+
/// Marker trait to optionally implement `Sync` under std.
105112
pub trait MaybeSync {}
106113
#[cfg(not(feature = "std"))]
107114
impl<T> MaybeSync for T where T: ?Sized {}
108115

109-
// Marker trait to optionally implement `Send` under std.
116+
/// Marker trait to optionally implement `Send` under std.
110117
#[cfg(feature = "std")]
111118
pub use core::marker::Send as MaybeSend;
112119
#[cfg(not(feature = "std"))]
120+
/// Marker trait to optionally implement `Send` under std.
113121
pub trait MaybeSend {}
114122
#[cfg(not(feature = "std"))]
115123
impl<T> MaybeSend for T where T: ?Sized {}

lightning/src/util/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub mod ser;
3232
pub mod sweep;
3333
pub mod wakers;
3434

35-
pub(crate) mod async_poll;
35+
pub mod async_poll;
3636
pub(crate) mod atomic_counter;
3737
pub(crate) mod byte_utils;
3838
pub mod hash_tables;

0 commit comments

Comments
 (0)