Skip to content

Commit 24dcac0

Browse files
committed
Move ChannelManager::monitor_updated to a MonitorEvent
In the next commit we'll need ChainMonitor to "see" when a monitor persistence completes, which means `monitor_updated` needs to move to `ChainMonitor`. The simplest way to then communicate that information to `ChannelManager` is via `MonitorEvet`s, which seems to line up ok, even if they're now constructed by multiple different places.
1 parent df8bde9 commit 24dcac0

File tree

6 files changed

+121
-66
lines changed

6 files changed

+121
-66
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -855,22 +855,26 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
855855

856856
0x08 => {
857857
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
858-
nodes[0].channel_monitor_updated(&chan_1_funding, *id);
858+
monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id);
859+
nodes[0].process_monitor_events();
859860
}
860861
},
861862
0x09 => {
862863
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
863-
nodes[1].channel_monitor_updated(&chan_1_funding, *id);
864+
monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id);
865+
nodes[1].process_monitor_events();
864866
}
865867
},
866868
0x0a => {
867869
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
868-
nodes[1].channel_monitor_updated(&chan_2_funding, *id);
870+
monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id);
871+
nodes[1].process_monitor_events();
869872
}
870873
},
871874
0x0b => {
872875
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
873-
nodes[2].channel_monitor_updated(&chan_2_funding, *id);
876+
monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id);
877+
nodes[2].process_monitor_events();
874878
}
875879
},
876880

@@ -1077,16 +1081,20 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
10771081
*monitor_c.persister.update_ret.lock().unwrap() = Ok(());
10781082

10791083
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
1080-
nodes[0].channel_monitor_updated(&chan_1_funding, *id);
1084+
monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id);
1085+
nodes[0].process_monitor_events();
10811086
}
10821087
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
1083-
nodes[1].channel_monitor_updated(&chan_1_funding, *id);
1088+
monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id);
1089+
nodes[1].process_monitor_events();
10841090
}
10851091
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
1086-
nodes[1].channel_monitor_updated(&chan_2_funding, *id);
1092+
monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id);
1093+
nodes[1].process_monitor_events();
10871094
}
10881095
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
1089-
nodes[2].channel_monitor_updated(&chan_2_funding, *id);
1096+
monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id);
1097+
nodes[2].process_monitor_events();
10901098
}
10911099

10921100
// Next, make sure peers are all connected to each other

lightning/src/chain/chainmonitor.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use util::events::EventHandler;
3838
use ln::channelmanager::ChannelDetails;
3939

4040
use prelude::*;
41-
use sync::{RwLock, RwLockReadGuard};
41+
use sync::{RwLock, RwLockReadGuard, Mutex};
4242
use core::ops::Deref;
4343

4444
/// `Persist` defines behavior for persisting channel monitors: this could mean
@@ -134,6 +134,7 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
134134
logger: L,
135135
fee_estimator: F,
136136
persister: P,
137+
pending_monitor_events: Mutex<Vec<MonitorEvent>>,
137138
}
138139

139140
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
@@ -207,6 +208,7 @@ where C::Target: chain::Filter,
207208
logger,
208209
fee_estimator: feeest,
209210
persister,
211+
pending_monitor_events: Mutex::new(Vec::new()),
210212
}
211213
}
212214

@@ -262,6 +264,29 @@ where C::Target: chain::Filter,
262264
self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor
263265
}
264266

267+
/// Indicates the persistence of a [`ChannelMonitor`] has completed after
268+
/// [`ChannelMonitorUpdateErr::TemporaryFailure`] was returned from an update operation.
269+
///
270+
/// All ChannelMonitor updates up to and including highest_applied_update_id must have been
271+
/// fully committed in every copy of the given channels' ChannelMonitors.
272+
///
273+
/// Note that there is no effect to calling with a highest_applied_update_id other than the
274+
/// current latest ChannelMonitorUpdate and one call to this function after multiple
275+
/// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field
276+
/// exists largely only to prevent races between this and concurrent update_monitor calls.
277+
///
278+
/// Thus, the anticipated use is, at a high level:
279+
/// 1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the
280+
/// update to disk and begins updating any remote (e.g. watchtower/backup) copies,
281+
/// returning [`ChannelMonitorUpdateErr::TemporaryFailure`],
282+
/// 2) once all remote copies are updated, you call this function with the update_id that
283+
/// completed, and once it is the latest the Channel will be re-enabled.
284+
pub fn channel_monitor_updated(&self, funding_txo: OutPoint, highest_applied_update_id: u64) {
285+
self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted {
286+
funding_txo, monitor_update_id: highest_applied_update_id
287+
});
288+
}
289+
265290
#[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
266291
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
267292
use util::events::EventsProvider;
@@ -431,7 +456,7 @@ where C::Target: chain::Filter,
431456
}
432457

433458
fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
434-
let mut pending_monitor_events = Vec::new();
459+
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
435460
for monitor_state in self.monitors.read().unwrap().values() {
436461
pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
437462
}

lightning/src/chain/channelmonitor.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,29 @@ pub enum MonitorEvent {
131131

132132
/// A monitor event that the Channel's commitment transaction was confirmed.
133133
CommitmentTxConfirmed(OutPoint),
134+
135+
/// Indicates a [`ChannelMonitor`] update has completed. See
136+
/// [`ChannelMonitorUpdateErr::TemporaryFailure`] for more information on how this is used.
137+
///
138+
/// [`ChannelMonitorUpdateErr::TemporaryFailure`]: super::ChannelMonitorUpdateErr::TemporaryFailure
139+
UpdateCompleted {
140+
/// The funding outpoint of the [`ChannelMonitor`] that was updated
141+
funding_txo: OutPoint,
142+
/// The Update ID from [`ChannelMonitorUpdate::update_id`] which was applied or
143+
/// [`ChannelMonitor::get_latest_update_id`].
144+
///
145+
/// Note that this should only be set to a given update's ID if all previous updates for the
146+
/// same [`ChannelMonitor`] have been applied and persisted.
147+
monitor_update_id: u64,
148+
},
134149
}
135-
impl_writeable_tlv_based_enum_upgradable!(MonitorEvent, ;
150+
impl_writeable_tlv_based_enum_upgradable!(MonitorEvent,
151+
// Note that UpdateCompleted is currently never serialized to disk as it is generated only in ChainMonitor
152+
(0, UpdateCompleted) => {
153+
(0, funding_txo, required),
154+
(2, monitor_update_id, required),
155+
},
156+
;
136157
(2, HTLCEvent),
137158
(4, CommitmentTxConfirmed),
138159
);
@@ -854,14 +875,19 @@ impl<Signer: Sign> Writeable for ChannelMonitorImpl<Signer> {
854875
writer.write_all(&payment_preimage.0[..])?;
855876
}
856877

857-
writer.write_all(&byte_utils::be64_to_array(self.pending_monitor_events.len() as u64))?;
878+
writer.write_all(&(self.pending_monitor_events.iter().filter(|ev| match ev {
879+
MonitorEvent::HTLCEvent(_) => true,
880+
MonitorEvent::CommitmentTxConfirmed(_) => true,
881+
_ => false,
882+
}).count() as u64).to_be_bytes())?;
858883
for event in self.pending_monitor_events.iter() {
859884
match event {
860885
MonitorEvent::HTLCEvent(upd) => {
861886
0u8.write(writer)?;
862887
upd.write(writer)?;
863888
},
864-
MonitorEvent::CommitmentTxConfirmed(_) => 1u8.write(writer)?
889+
MonitorEvent::CommitmentTxConfirmed(_) => 1u8.write(writer)?,
890+
_ => {}, // Covered in the TLV writes below
865891
}
866892
}
867893

lightning/src/chain/mod.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,10 @@ pub enum ChannelMonitorUpdateErr {
182182
/// our state failed, but is expected to succeed at some point in the future).
183183
///
184184
/// Such a failure will "freeze" a channel, preventing us from revoking old states or
185-
/// submitting new commitment transactions to the counterparty. Once the update(s) which failed
186-
/// have been successfully applied, ChannelManager::channel_monitor_updated can be used to
187-
/// restore the channel to an operational state.
185+
/// submitting new commitment transactions to the counterparty. Once the update(s) that failed
186+
/// have been successfully applied, a [`MonitorEvent::UpdateCompleted`] event should be returned
187+
/// via [`Watch::release_pending_monitor_events`] which will then restore the channel to an
188+
/// operational state.
188189
///
189190
/// Note that a given ChannelManager will *never* re-generate a given ChannelMonitorUpdate. If
190191
/// you return a TemporaryFailure you must ensure that it is written to disk safely before
@@ -198,13 +199,14 @@ pub enum ChannelMonitorUpdateErr {
198199
/// the channel which would invalidate previous ChannelMonitors are not made when a channel has
199200
/// been "frozen".
200201
///
201-
/// Note that even if updates made after TemporaryFailure succeed you must still call
202-
/// channel_monitor_updated to ensure you have the latest monitor and re-enable normal channel
203-
/// operation.
202+
/// Note that even if updates made after TemporaryFailure succeed you must still provide a
203+
/// [`MonitorEvent::UpdateCompleted`] to ensure you have the latest monitor and re-enable
204+
/// normal channel operation. Note that this is normally generated through a call to
205+
/// [`ChainMonitor::channel_monitor_updated`].
204206
///
205-
/// Note that the update being processed here will not be replayed for you when you call
206-
/// ChannelManager::channel_monitor_updated, so you must store the update itself along
207-
/// with the persisted ChannelMonitor on your own local disk prior to returning a
207+
/// Note that the update being processed here will not be replayed for you when you return a
208+
/// [`MonitorEvent::UpdateCompleted`] event via [`Watch::release_pending_monitor_events`], so
209+
/// you must store the update itself on your own local disk prior to returning a
208210
/// TemporaryFailure. You may, of course, employ a journaling approach, storing only the
209211
/// ChannelMonitorUpdate on disk without updating the monitor itself, replaying the journal at
210212
/// reload-time.
@@ -280,6 +282,9 @@ pub trait Watch<ChannelSigner: Sign> {
280282

281283
/// Returns any monitor events since the last call. Subsequent calls must only return new
282284
/// events.
285+
///
286+
/// For details on asynchronous [`ChannelMonitor`] updating and returning
287+
/// [`MonitorEvent::UpdateCompleted`] here, see [`ChannelMonitorUpdateErr::TemporaryFailure`].
283288
fn release_pending_monitor_events(&self) -> Vec<MonitorEvent>;
284289
}
285290

0 commit comments

Comments
 (0)