Skip to content

Commit 32314e3

Browse files
authored
Merge pull request #4063 from TheBlueMatt/2025-09-async-chainmonitor
Add support for native async `KVStore` persist to `ChainMonitor`
2 parents 6f536ec + 462a647 commit 32314e3

File tree

7 files changed

+1187
-337
lines changed

7 files changed

+1187
-337
lines changed

lightning-block-sync/src/gossip.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,10 @@ use bitcoin::hash_types::BlockHash;
1010
use bitcoin::transaction::{OutPoint, TxOut};
1111

1212
use lightning::ln::peer_handler::APeerManager;
13-
1413
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
1514
use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult};
16-
1715
use lightning::util::logger::Logger;
16+
use lightning::util::native_async::FutureSpawner;
1817

1918
use std::collections::VecDeque;
2019
use std::future::Future;
@@ -43,17 +42,6 @@ pub trait UtxoSource: BlockSource + 'static {
4342
fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool>;
4443
}
4544

46-
/// A generic trait which is able to spawn futures in the background.
47-
///
48-
/// If the `tokio` feature is enabled, this is implemented on `TokioSpawner` struct which
49-
/// delegates to `tokio::spawn()`.
50-
pub trait FutureSpawner: Send + Sync + 'static {
51-
/// Spawns the given future as a background task.
52-
///
53-
/// This method MUST NOT block on the given future immediately.
54-
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T);
55-
}
56-
5745
#[cfg(feature = "tokio")]
5846
/// A trivial [`FutureSpawner`] which delegates to `tokio::spawn`.
5947
pub struct TokioSpawner;

lightning/src/chain/chainmonitor.rs

Lines changed: 162 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,14 @@ use crate::ln::our_peer_storage::{DecryptedOurPeerStorage, PeerStorageMonitorHol
4646
use crate::ln::types::ChannelId;
4747
use crate::prelude::*;
4848
use crate::sign::ecdsa::EcdsaChannelSigner;
49-
use crate::sign::{EntropySource, PeerStorageKey};
49+
use crate::sign::{EntropySource, PeerStorageKey, SignerProvider};
5050
use crate::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard};
5151
use crate::types::features::{InitFeatures, NodeFeatures};
52+
use crate::util::async_poll::{MaybeSend, MaybeSync};
5253
use crate::util::errors::APIError;
5354
use crate::util::logger::{Logger, WithContext};
54-
use crate::util::persist::MonitorName;
55+
use crate::util::native_async::FutureSpawner;
56+
use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync};
5557
#[cfg(peer_storage)]
5658
use crate::util::ser::{VecWriter, Writeable};
5759
use crate::util::wakers::{Future, Notifier};
@@ -192,6 +194,17 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
192194
/// restart, this method must in that case be idempotent, ensuring it can handle scenarios where
193195
/// the monitor already exists in the archive.
194196
fn archive_persisted_channel(&self, monitor_name: MonitorName);
197+
198+
/// Fetches the set of [`ChannelMonitorUpdate`]s, previously persisted with
199+
/// [`Self::update_persisted_channel`], which have completed.
200+
///
201+
/// Returning an update here is equivalent to calling
202+
/// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and
203+
/// hidden in the docs.
204+
#[doc(hidden)]
205+
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
206+
Vec::new()
207+
}
195208
}
196209

197210
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
@@ -235,6 +248,93 @@ impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, Chann
235248
}
236249
}
237250

251+
/// An unconstructable [`Persist`]er which is used under the hood when you call
252+
/// [`ChainMonitor::new_async_beta`].
253+
pub struct AsyncPersister<
254+
K: Deref + MaybeSend + MaybeSync + 'static,
255+
S: FutureSpawner,
256+
L: Deref + MaybeSend + MaybeSync + 'static,
257+
ES: Deref + MaybeSend + MaybeSync + 'static,
258+
SP: Deref + MaybeSend + MaybeSync + 'static,
259+
BI: Deref + MaybeSend + MaybeSync + 'static,
260+
FE: Deref + MaybeSend + MaybeSync + 'static,
261+
> where
262+
K::Target: KVStore + MaybeSync,
263+
L::Target: Logger,
264+
ES::Target: EntropySource + Sized,
265+
SP::Target: SignerProvider + Sized,
266+
BI::Target: BroadcasterInterface,
267+
FE::Target: FeeEstimator,
268+
{
269+
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>,
270+
}
271+
272+
impl<
273+
K: Deref + MaybeSend + MaybeSync + 'static,
274+
S: FutureSpawner,
275+
L: Deref + MaybeSend + MaybeSync + 'static,
276+
ES: Deref + MaybeSend + MaybeSync + 'static,
277+
SP: Deref + MaybeSend + MaybeSync + 'static,
278+
BI: Deref + MaybeSend + MaybeSync + 'static,
279+
FE: Deref + MaybeSend + MaybeSync + 'static,
280+
> Deref for AsyncPersister<K, S, L, ES, SP, BI, FE>
281+
where
282+
K::Target: KVStore + MaybeSync,
283+
L::Target: Logger,
284+
ES::Target: EntropySource + Sized,
285+
SP::Target: SignerProvider + Sized,
286+
BI::Target: BroadcasterInterface,
287+
FE::Target: FeeEstimator,
288+
{
289+
type Target = Self;
290+
fn deref(&self) -> &Self {
291+
self
292+
}
293+
}
294+
295+
impl<
296+
K: Deref + MaybeSend + MaybeSync + 'static,
297+
S: FutureSpawner,
298+
L: Deref + MaybeSend + MaybeSync + 'static,
299+
ES: Deref + MaybeSend + MaybeSync + 'static,
300+
SP: Deref + MaybeSend + MaybeSync + 'static,
301+
BI: Deref + MaybeSend + MaybeSync + 'static,
302+
FE: Deref + MaybeSend + MaybeSync + 'static,
303+
> Persist<<SP::Target as SignerProvider>::EcdsaSigner> for AsyncPersister<K, S, L, ES, SP, BI, FE>
304+
where
305+
K::Target: KVStore + MaybeSync,
306+
L::Target: Logger,
307+
ES::Target: EntropySource + Sized,
308+
SP::Target: SignerProvider + Sized,
309+
BI::Target: BroadcasterInterface,
310+
FE::Target: FeeEstimator,
311+
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
312+
{
313+
fn persist_new_channel(
314+
&self, monitor_name: MonitorName,
315+
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
316+
) -> ChannelMonitorUpdateStatus {
317+
self.persister.spawn_async_persist_new_channel(monitor_name, monitor);
318+
ChannelMonitorUpdateStatus::InProgress
319+
}
320+
321+
fn update_persisted_channel(
322+
&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
323+
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
324+
) -> ChannelMonitorUpdateStatus {
325+
self.persister.spawn_async_update_persisted_channel(monitor_name, monitor_update, monitor);
326+
ChannelMonitorUpdateStatus::InProgress
327+
}
328+
329+
fn archive_persisted_channel(&self, monitor_name: MonitorName) {
330+
self.persister.spawn_async_archive_persisted_channel(monitor_name);
331+
}
332+
333+
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
334+
self.persister.get_and_clear_completed_updates()
335+
}
336+
}
337+
238338
/// An implementation of [`chain::Watch`] for monitoring channels.
239339
///
240340
/// Connected and disconnected blocks must be provided to `ChainMonitor` as documented by
@@ -291,6 +391,63 @@ pub struct ChainMonitor<
291391
our_peerstorage_encryption_key: PeerStorageKey,
292392
}
293393

394+
impl<
395+
K: Deref + MaybeSend + MaybeSync + 'static,
396+
S: FutureSpawner,
397+
SP: Deref + MaybeSend + MaybeSync + 'static,
398+
C: Deref,
399+
T: Deref + MaybeSend + MaybeSync + 'static,
400+
F: Deref + MaybeSend + MaybeSync + 'static,
401+
L: Deref + MaybeSend + MaybeSync + 'static,
402+
ES: Deref + MaybeSend + MaybeSync + 'static,
403+
>
404+
ChainMonitor<
405+
<SP::Target as SignerProvider>::EcdsaSigner,
406+
C,
407+
T,
408+
F,
409+
L,
410+
AsyncPersister<K, S, L, ES, SP, T, F>,
411+
ES,
412+
> where
413+
K::Target: KVStore + MaybeSync,
414+
SP::Target: SignerProvider + Sized,
415+
C::Target: chain::Filter,
416+
T::Target: BroadcasterInterface,
417+
F::Target: FeeEstimator,
418+
L::Target: Logger,
419+
ES::Target: EntropySource + Sized,
420+
<SP::Target as SignerProvider>::EcdsaSigner: MaybeSend + 'static,
421+
{
422+
/// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
423+
///
424+
/// This behaves the same as [`ChainMonitor::new`] except that it relies on
425+
/// [`MonitorUpdatingPersisterAsync`] and thus allows persistence to be completed async.
426+
///
427+
/// Note that async monitor updating is considered beta, and bugs may be triggered by its use.
428+
pub fn new_async_beta(
429+
chain_source: Option<C>, broadcaster: T, logger: L, feeest: F,
430+
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, T, F>, _entropy_source: ES,
431+
_our_peerstorage_encryption_key: PeerStorageKey,
432+
) -> Self {
433+
Self {
434+
monitors: RwLock::new(new_hash_map()),
435+
chain_source,
436+
broadcaster,
437+
logger,
438+
fee_estimator: feeest,
439+
persister: AsyncPersister { persister },
440+
_entropy_source,
441+
pending_monitor_events: Mutex::new(Vec::new()),
442+
highest_chain_height: AtomicUsize::new(0),
443+
event_notifier: Notifier::new(),
444+
pending_send_only_events: Mutex::new(Vec::new()),
445+
#[cfg(peer_storage)]
446+
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
447+
}
448+
}
449+
}
450+
294451
impl<
295452
ChannelSigner: EcdsaChannelSigner,
296453
C: Deref,
@@ -1357,6 +1514,9 @@ where
13571514
fn release_pending_monitor_events(
13581515
&self,
13591516
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
1517+
for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() {
1518+
let _ = self.channel_monitor_updated(channel_id, update_id);
1519+
}
13601520
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
13611521
for monitor_state in self.monitors.read().unwrap().values() {
13621522
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();

0 commit comments

Comments
 (0)