Skip to content

Commit 0305d51

Browse files
committed
Use channel ID over funding outpoint to track monitors in ChannelManager
As motivated by the previous commit, we do some of the same work here at the `ChannelManager` level instead. Unfortunately, we still need to track the funding outpoint to support downgrades by writing the in flight monitor updates as two separate TLVs, one using the channel IDs, and the other using the funding outpoints. Once we are willing to stop supporting downgrades past this version, we can fully drop it.
1 parent c36081e commit 0305d51

File tree

3 files changed

+64
-53
lines changed

3 files changed

+64
-53
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 60 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,7 +1306,8 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
13061306
/// for broadcast messages, where ordering isn't as strict).
13071307
pub(super) pending_msg_events: Vec<MessageSendEvent>,
13081308
/// Map from Channel IDs to pending [`ChannelMonitorUpdate`]s which have been passed to the
1309-
/// user but which have not yet completed.
1309+
/// user but which have not yet completed. We still keep the funding outpoint around to backfill
1310+
/// the legacy TLV field to support downgrading.
13101311
///
13111312
/// Note that the channel may no longer exist. For example if the channel was closed but we
13121313
/// later needed to claim an HTLC which is pending on-chain, we may generate a monitor update
@@ -1318,7 +1319,7 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
13181319
/// where we complete one [`ChannelMonitorUpdate`] (but there are more pending as background
13191320
/// events) but we conclude all pending [`ChannelMonitorUpdate`]s have completed and its safe
13201321
/// to run post-completion actions.
1321-
in_flight_monitor_updates: BTreeMap<OutPoint, Vec<ChannelMonitorUpdate>>,
1322+
in_flight_monitor_updates: BTreeMap<(OutPoint, ChannelId), Vec<ChannelMonitorUpdate>>,
13221323
/// Map from a specific channel to some action(s) that should be taken when all pending
13231324
/// [`ChannelMonitorUpdate`]s for the channel complete updating.
13241325
///
@@ -3281,7 +3282,7 @@ macro_rules! handle_new_monitor_update {
32813282
$chan_id: expr, $counterparty_node_id: expr, $in_flight_updates: ident, $update_idx: ident,
32823283
_internal_outer, $completed: expr
32833284
) => { {
3284-
$in_flight_updates = $peer_state.in_flight_monitor_updates.entry($funding_txo)
3285+
$in_flight_updates = $peer_state.in_flight_monitor_updates.entry(($funding_txo, $chan_id))
32853286
.or_insert_with(Vec::new);
32863287
// During startup, we push monitor updates as background events through to here in
32873288
// order to replay updates that were in-flight when we shut down. Thus, we have to
@@ -4007,7 +4008,7 @@ where
40074008
let per_peer_state = self.per_peer_state.read().unwrap();
40084009
if let Some(peer_state_mtx) = per_peer_state.get(&shutdown_res.counterparty_node_id) {
40094010
let mut peer_state = peer_state_mtx.lock().unwrap();
4010-
if peer_state.in_flight_monitor_updates.get(&funding_txo).map(|l| l.is_empty()).unwrap_or(true) {
4011+
if peer_state.in_flight_monitor_updates.get(&(funding_txo, shutdown_res.channel_id)).map(|l| l.is_empty()).unwrap_or(true) {
40114012
let update_actions = peer_state.monitor_update_blocked_actions
40124013
.remove(&shutdown_res.channel_id).unwrap_or(Vec::new());
40134014

@@ -7566,7 +7567,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
75667567
let peer_state = &mut *peer_state_lock;
75677568

75687569
let remaining_in_flight =
7569-
if let Some(pending) = peer_state.in_flight_monitor_updates.get_mut(funding_txo) {
7570+
if let Some(pending) = peer_state.in_flight_monitor_updates.get_mut(&(*funding_txo, *channel_id)) {
75707571
pending.retain(|upd| upd.update_id > highest_applied_update_id);
75717572
pending.len()
75727573
} else { 0 };
@@ -12885,12 +12886,22 @@ where
1288512886
pending_claiming_payments = None;
1288612887
}
1288712888

12888-
let mut in_flight_monitor_updates: Option<HashMap<(&PublicKey, &OutPoint), &Vec<ChannelMonitorUpdate>>> = None;
12889+
let mut legacy_in_flight_monitor_updates: Option<HashMap<(&PublicKey, &OutPoint), &Vec<ChannelMonitorUpdate>>> = None;
12890+
let mut in_flight_monitor_updates: Option<HashMap<(&PublicKey, &ChannelId), &Vec<ChannelMonitorUpdate>>> = None;
1288912891
for ((counterparty_id, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
12890-
for (funding_outpoint, updates) in peer_state.in_flight_monitor_updates.iter() {
12892+
for ((funding_txo, channel_id), updates) in peer_state.in_flight_monitor_updates.iter() {
1289112893
if !updates.is_empty() {
12892-
if in_flight_monitor_updates.is_none() { in_flight_monitor_updates = Some(new_hash_map()); }
12893-
in_flight_monitor_updates.as_mut().unwrap().insert((counterparty_id, funding_outpoint), updates);
12894+
if legacy_in_flight_monitor_updates.is_none() {
12895+
legacy_in_flight_monitor_updates = Some(new_hash_map());
12896+
}
12897+
legacy_in_flight_monitor_updates.as_mut().unwrap()
12898+
.insert((counterparty_id, funding_txo), updates);
12899+
12900+
if in_flight_monitor_updates.is_none() {
12901+
in_flight_monitor_updates = Some(new_hash_map());
12902+
}
12903+
in_flight_monitor_updates.as_mut().unwrap()
12904+
.insert((counterparty_id, channel_id), updates);
1289412905
}
1289512906
}
1289612907
}
@@ -12905,11 +12916,12 @@ where
1290512916
(7, self.fake_scid_rand_bytes, required),
1290612917
(8, if events_not_backwards_compatible { Some(&*events) } else { None }, option),
1290712918
(9, htlc_purposes, required_vec),
12908-
(10, in_flight_monitor_updates, option),
12919+
(10, legacy_in_flight_monitor_updates, option),
1290912920
(11, self.probing_cookie_secret, required),
1291012921
(13, htlc_onion_fields, optional_vec),
1291112922
(14, decode_update_add_htlcs_opt, option),
1291212923
(15, self.inbound_payment_id_secret, required),
12924+
(17, in_flight_monitor_updates, required),
1291312925
});
1291412926

1291512927
Ok(())
@@ -13045,8 +13057,7 @@ where
1304513057
/// runtime settings which were stored when the ChannelManager was serialized.
1304613058
pub default_config: UserConfig,
1304713059

13048-
/// A map from channel funding outpoints to ChannelMonitors for those channels (ie
13049-
/// value.context.get_funding_txo() should be the key).
13060+
/// A map from channel IDs to ChannelMonitors for those channels.
1305013061
///
1305113062
/// If a monitor is inconsistent with the channel state during deserialization the channel will
1305213063
/// be force-closed using the data in the ChannelMonitor and the channel will be dropped. This
@@ -13057,7 +13068,7 @@ where
1305713068
/// this struct.
1305813069
///
1305913070
/// This is not exported to bindings users because we have no HashMap bindings
13060-
pub channel_monitors: HashMap<OutPoint, &'a ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>>,
13071+
pub channel_monitors: HashMap<ChannelId, &'a ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>>,
1306113072
}
1306213073

1306313074
impl<'a, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, MR: Deref, L: Deref>
@@ -13086,7 +13097,7 @@ where
1308613097
entropy_source, node_signer, signer_provider, fee_estimator, chain_monitor,
1308713098
tx_broadcaster, router, message_router, logger, default_config,
1308813099
channel_monitors: hash_map_from_iter(
13089-
channel_monitors.drain(..).map(|monitor| { (monitor.get_funding_txo().0, monitor) })
13100+
channel_monitors.drain(..).map(|monitor| { (monitor.channel_id(), monitor) })
1309013101
),
1309113102
}
1309213103
}
@@ -13149,22 +13160,21 @@ where
1314913160

1315013161
let mut failed_htlcs = Vec::new();
1315113162
let channel_count: u64 = Readable::read(reader)?;
13152-
let mut funding_txo_set = hash_set_with_capacity(cmp::min(channel_count as usize, 128));
13163+
let mut channel_id_set = hash_set_with_capacity(cmp::min(channel_count as usize, 128));
1315313164
let mut per_peer_state = hash_map_with_capacity(cmp::min(channel_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(PublicKey, Mutex<PeerState<SP>>)>()));
1315413165
let mut outpoint_to_peer = hash_map_with_capacity(cmp::min(channel_count as usize, 128));
1315513166
let mut short_to_chan_info = hash_map_with_capacity(cmp::min(channel_count as usize, 128));
1315613167
let mut channel_closures = VecDeque::new();
1315713168
let mut close_background_events = Vec::new();
13158-
let mut funding_txo_to_channel_id = hash_map_with_capacity(channel_count as usize);
1315913169
for _ in 0..channel_count {
1316013170
let mut channel: FundedChannel<SP> = FundedChannel::read(reader, (
1316113171
&args.entropy_source, &args.signer_provider, best_block_height, &provided_channel_type_features(&args.default_config)
1316213172
))?;
1316313173
let logger = WithChannelContext::from(&args.logger, &channel.context, None);
13174+
let channel_id = channel.context.channel_id();
13175+
channel_id_set.insert(channel_id.clone());
1316413176
let funding_txo = channel.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
13165-
funding_txo_to_channel_id.insert(funding_txo, channel.context.channel_id());
13166-
funding_txo_set.insert(funding_txo.clone());
13167-
if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) {
13177+
if let Some(ref mut monitor) = args.channel_monitors.get_mut(&channel_id) {
1316813178
if channel.get_cur_holder_commitment_transaction_number() > monitor.get_cur_holder_commitment_number() ||
1316913179
channel.get_revoked_counterparty_commitment_transaction_number() > monitor.get_min_seen_secret() ||
1317013180
channel.get_cur_counterparty_commitment_transaction_number() > monitor.get_cur_counterparty_commitment_number() ||
@@ -13247,9 +13257,7 @@ where
1324713257
if let Some(short_channel_id) = channel.context.get_short_channel_id() {
1324813258
short_to_chan_info.insert(short_channel_id, (channel.context.get_counterparty_node_id(), channel.context.channel_id()));
1324913259
}
13250-
if let Some(funding_txo) = channel.context.get_funding_txo() {
13251-
outpoint_to_peer.insert(funding_txo, channel.context.get_counterparty_node_id());
13252-
}
13260+
outpoint_to_peer.insert(funding_txo, channel.context.get_counterparty_node_id());
1325313261
per_peer_state.entry(channel.context.get_counterparty_node_id())
1325413262
.or_insert_with(|| Mutex::new(empty_peer_state()))
1325513263
.get_mut().unwrap()
@@ -13279,8 +13287,8 @@ where
1327913287
}
1328013288
}
1328113289

13282-
for (funding_txo, monitor) in args.channel_monitors.iter() {
13283-
if !funding_txo_set.contains(funding_txo) {
13290+
for (channel_id, monitor) in args.channel_monitors.iter() {
13291+
if !channel_id_set.contains(channel_id) {
1328413292
let mut should_queue_fc_update = false;
1328513293
if let Some(counterparty_node_id) = monitor.get_counterparty_node_id() {
1328613294
// If the ChannelMonitor had any updates, we may need to update it further and
@@ -13318,10 +13326,11 @@ where
1331813326
updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }],
1331913327
channel_id: Some(monitor.channel_id()),
1332013328
};
13329+
let funding_txo = monitor.get_funding_txo().0;
1332113330
if let Some(counterparty_node_id) = monitor.get_counterparty_node_id() {
1332213331
let update = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
1332313332
counterparty_node_id,
13324-
funding_txo: *funding_txo,
13333+
funding_txo,
1332513334
channel_id,
1332613335
update: monitor_update,
1332713336
};
@@ -13334,7 +13343,7 @@ where
1333413343
// generate a `ChannelMonitorUpdate` for it aside from this
1333513344
// `ChannelForceClosed` one.
1333613345
monitor_update.update_id = u64::MAX;
13337-
close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((*funding_txo, channel_id, monitor_update)));
13346+
close_background_events.push(BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((funding_txo, channel_id, monitor_update)));
1333813347
}
1333913348
}
1334013349
}
@@ -13434,7 +13443,10 @@ where
1343413443
let mut pending_claiming_payments = Some(new_hash_map());
1343513444
let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
1343613445
let mut events_override = None;
13437-
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, OutPoint), Vec<ChannelMonitorUpdate>>> = None;
13446+
let mut _legacy_in_flight_monitor_updates: Option<HashMap<(PublicKey, OutPoint), Vec<ChannelMonitorUpdate>>> = None;
13447+
// We use this one over the legacy since they represent the same data, just with a different
13448+
// key. We still need to read the legacy one as it's an even TLV.
13449+
let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, ChannelId), Vec<ChannelMonitorUpdate>>> = None;
1343813450
let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
1343913451
let mut inbound_payment_id_secret = None;
1344013452
read_tlv_fields!(reader, {
@@ -13447,11 +13459,12 @@ where
1344713459
(7, fake_scid_rand_bytes, option),
1344813460
(8, events_override, option),
1344913461
(9, claimable_htlc_purposes, optional_vec),
13450-
(10, in_flight_monitor_updates, option),
13462+
(10, _legacy_in_flight_monitor_updates, option),
1345113463
(11, probing_cookie_secret, option),
1345213464
(13, claimable_htlc_onion_fields, optional_vec),
1345313465
(14, decode_update_add_htlcs, option),
1345413466
(15, inbound_payment_id_secret, option),
13467+
(17, in_flight_monitor_updates, required),
1345513468
});
1345613469
let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
1345713470
if fake_scid_rand_bytes.is_none() {
@@ -13498,19 +13511,20 @@ where
1349813511
// Because the actual handling of the in-flight updates is the same, it's macro'ized here:
1349913512
let mut pending_background_events = Vec::new();
1350013513
macro_rules! handle_in_flight_updates {
13501-
($counterparty_node_id: expr, $chan_in_flight_upds: expr, $funding_txo: expr,
13502-
$monitor: expr, $peer_state: expr, $logger: expr, $channel_info_log: expr
13514+
($counterparty_node_id: expr, $chan_in_flight_upds: expr, $monitor: expr,
13515+
$peer_state: expr, $logger: expr, $channel_info_log: expr
1350313516
) => { {
1350413517
let mut max_in_flight_update_id = 0;
1350513518
$chan_in_flight_upds.retain(|upd| upd.update_id > $monitor.get_latest_update_id());
13519+
let funding_txo = $monitor.get_funding_txo().0;
1350613520
for update in $chan_in_flight_upds.iter() {
1350713521
log_trace!($logger, "Replaying ChannelMonitorUpdate {} for {}channel {}",
1350813522
update.update_id, $channel_info_log, &$monitor.channel_id());
1350913523
max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id);
1351013524
pending_background_events.push(
1351113525
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
1351213526
counterparty_node_id: $counterparty_node_id,
13513-
funding_txo: $funding_txo,
13527+
funding_txo: funding_txo.clone(),
1351413528
channel_id: $monitor.channel_id(),
1351513529
update: update.clone(),
1351613530
});
@@ -13529,7 +13543,7 @@ where
1352913543
.and_modify(|v| *v = cmp::max(max_in_flight_update_id, *v))
1353013544
.or_insert(max_in_flight_update_id);
1353113545
}
13532-
if $peer_state.in_flight_monitor_updates.insert($funding_txo, $chan_in_flight_upds).is_some() {
13546+
if $peer_state.in_flight_monitor_updates.insert((funding_txo, $monitor.channel_id()), $chan_in_flight_upds).is_some() {
1353313547
log_error!($logger, "Duplicate in-flight monitor update set for the same channel!");
1353413548
return Err(DecodeError::InvalidValue);
1353513549
}
@@ -13540,28 +13554,27 @@ where
1354013554
for (counterparty_id, peer_state_mtx) in per_peer_state.iter_mut() {
1354113555
let mut peer_state_lock = peer_state_mtx.lock().unwrap();
1354213556
let peer_state = &mut *peer_state_lock;
13543-
for phase in peer_state.channel_by_id.values() {
13557+
for (channel_id, phase) in &peer_state.channel_by_id {
1354413558
if let Some(chan) = phase.as_funded() {
1354513559
let logger = WithChannelContext::from(&args.logger, &chan.context, None);
1354613560

1354713561
// Channels that were persisted have to be funded, otherwise they should have been
1354813562
// discarded.
13549-
let funding_txo = chan.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
13550-
let monitor = args.channel_monitors.get(&funding_txo)
13563+
let monitor = args.channel_monitors.get(channel_id)
1355113564
.expect("We already checked for monitor presence when loading channels");
1355213565
let mut max_in_flight_update_id = monitor.get_latest_update_id();
1355313566
if let Some(in_flight_upds) = &mut in_flight_monitor_updates {
13554-
if let Some(mut chan_in_flight_upds) = in_flight_upds.remove(&(*counterparty_id, funding_txo)) {
13567+
if let Some(mut chan_in_flight_upds) = in_flight_upds.remove(&(*counterparty_id, *channel_id)) {
1355513568
max_in_flight_update_id = cmp::max(max_in_flight_update_id,
1355613569
handle_in_flight_updates!(*counterparty_id, chan_in_flight_upds,
13557-
funding_txo, monitor, peer_state, logger, ""));
13570+
monitor, peer_state, logger, ""));
1355813571
}
1355913572
}
1356013573
if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id {
1356113574
// If the channel is ahead of the monitor, return DangerousValue:
1356213575
log_error!(logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!");
1356313576
log_error!(logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight",
13564-
chan.context.channel_id(), monitor.get_latest_update_id(), max_in_flight_update_id);
13577+
channel_id, monitor.get_latest_update_id(), max_in_flight_update_id);
1356513578
log_error!(logger, " but the ChannelManager is at update_id {}.", chan.get_latest_unblocked_monitor_update_id());
1356613579
log_error!(logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
1356713580
log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
@@ -13579,23 +13592,21 @@ where
1357913592
}
1358013593

1358113594
if let Some(in_flight_upds) = in_flight_monitor_updates {
13582-
for ((counterparty_id, funding_txo), mut chan_in_flight_updates) in in_flight_upds {
13583-
let channel_id = funding_txo_to_channel_id.get(&funding_txo).copied();
13584-
let logger = WithContext::from(&args.logger, Some(counterparty_id), channel_id, None);
13585-
if let Some(monitor) = args.channel_monitors.get(&funding_txo) {
13595+
for ((counterparty_id, channel_id), mut chan_in_flight_updates) in in_flight_upds {
13596+
let logger = WithContext::from(&args.logger, Some(counterparty_id), Some(channel_id), None);
13597+
if let Some(monitor) = args.channel_monitors.get(&channel_id) {
1358613598
// Now that we've removed all the in-flight monitor updates for channels that are
1358713599
// still open, we need to replay any monitor updates that are for closed channels,
1358813600
// creating the neccessary peer_state entries as we go.
1358913601
let peer_state_mutex = per_peer_state.entry(counterparty_id).or_insert_with(|| {
1359013602
Mutex::new(empty_peer_state())
1359113603
});
1359213604
let mut peer_state = peer_state_mutex.lock().unwrap();
13593-
handle_in_flight_updates!(counterparty_id, chan_in_flight_updates,
13594-
funding_txo, monitor, peer_state, logger, "closed ");
13605+
handle_in_flight_updates!(counterparty_id, chan_in_flight_updates, monitor,
13606+
peer_state, logger, "closed ");
1359513607
} else {
1359613608
log_error!(logger, "A ChannelMonitor is missing even though we have in-flight updates for it! This indicates a potentially-critical violation of the chain::Watch API!");
13597-
log_error!(logger, " The ChannelMonitor for channel {} is missing.", if let Some(channel_id) =
13598-
channel_id { channel_id.to_string() } else { format!("with outpoint {}", funding_txo) } );
13609+
log_error!(logger, " The ChannelMonitor for channel {} is missing.", channel_id.to_string());
1359913610
log_error!(logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
1360013611
log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
1360113612
log_error!(logger, " Without the latest ChannelMonitor we cannot continue without risking funds.");
@@ -13647,7 +13658,7 @@ where
1364713658
.or_insert(update.update_id);
1364813659
}
1364913660
let in_flight_updates = per_peer_state.in_flight_monitor_updates
13650-
.entry(*funding_txo)
13661+
.entry((*funding_txo, *channel_id))
1365113662
.or_insert_with(Vec::new);
1365213663
debug_assert!(!in_flight_updates.iter().any(|upd| upd == update));
1365313664
in_flight_updates.push(update.clone());
@@ -13772,7 +13783,7 @@ where
1377213783
.filter_map(|(htlc_source, (htlc, preimage_opt))| {
1377313784
if let HTLCSource::PreviousHopData(prev_hop) = &htlc_source {
1377413785
if let Some(payment_preimage) = preimage_opt {
13775-
let inbound_edge_monitor = args.channel_monitors.get(&prev_hop.outpoint);
13786+
let inbound_edge_monitor = args.channel_monitors.get(&prev_hop.channel_id);
1377613787
// Note that for channels which have gone to chain,
1377713788
// `get_all_current_outbound_htlcs` is never pruned and always returns
1377813789
// a constant set until the monitor is removed/archived. Thus, we
@@ -14260,7 +14271,7 @@ where
1426014271
);
1426114272
}
1426214273
}
14263-
if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint) {
14274+
if let Some(previous_hop_monitor) = args.channel_monitors.get(&claimable_htlc.prev_hop.channel_id) {
1426414275
// Note that this is unsafe as we no longer require the
1426514276
// `ChannelMonitor`s to be re-persisted prior to this
1426614277
// `ChannelManager` being persisted after we get started running.

0 commit comments

Comments
 (0)