Skip to content

Commit b20e481

Browse files
committed
Add monitor update handling to update_maps_on_chan_removal
Closing channels requires a two step process - first `update_maps_on_chan_removal` is called while holding the same per-peer lock under which the channel reached the terminal state, then after dropping the same lock(s), `finish_close_channel` is called. Because the channel is closed and thus no further `ChannelMonitorUpdate`s are generated for the off-chain state, we'd previously applied the `ChannelMonitorUpdate` in `finish_close_channel`. This was tweaked somewhat in c99d3d7 when we stopped using `u64::MAX` for any updates after closure. However, we worked around the races that implied by setting the `update_id` only when we go to apply the `ChannelMonitorUpdate`, rather than when we create it. In a coming commit, we'll need to have an `update_id` immediately upon creation (to track in-flight updates that haven't reached application yet). This implies that we can no longer apply closure `ChannelMonitorUpdate`s after dropping the per-peer lock(s), as the updates must be well-ordered with any later updates to the same channel, even after it has been closed. Thus, here, we add `ChannelMonitorUpdate` handling to `update_maps_on_chan_removal`, renaming it `locked_close_channel` to better capture its new purpose.
1 parent ca7bd32 commit b20e481

File tree

1 file changed

+121
-90
lines changed

1 file changed

+121
-90
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 121 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -2942,8 +2942,29 @@ macro_rules! handle_error {
29422942
///
29432943
/// Note that this step can be skipped if the channel was never opened (through the creation of a
29442944
/// [`ChannelMonitor`]/channel funding transaction) to begin with.
2945-
macro_rules! update_maps_on_chan_removal {
2946-
($self: expr, $peer_state: expr, $channel_context: expr) => {{
2945+
macro_rules! locked_close_channel {
2946+
($self: ident, $peer_state: expr, $channel_context: expr, $shutdown_res_mut: expr) => {{
2947+
if let Some((counterparty_node_id, funding_txo, channel_id, update)) = $shutdown_res_mut.monitor_update.take() {
2948+
if $self.background_events_processed_since_startup.load(Ordering::Acquire) {
2949+
handle_new_monitor_update!($self, funding_txo, update, $peer_state,
2950+
$channel_context, REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER);
2951+
} else {
2952+
let in_flight_updates = $peer_state.in_flight_monitor_updates.entry(funding_txo)
2953+
.or_insert_with(Vec::new);
2954+
in_flight_updates.iter().position(|upd| upd == &update)
2955+
.unwrap_or_else(|| {
2956+
in_flight_updates.push(update.clone());
2957+
0
2958+
});
2959+
let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
2960+
counterparty_node_id,
2961+
funding_txo,
2962+
channel_id,
2963+
update,
2964+
};
2965+
$self.pending_background_events.lock().unwrap().push(event);
2966+
}
2967+
}
29472968
// If there's a possibility that we need to generate further monitor updates for this
29482969
// channel, we need to store the last update_id of it. However, we don't want to insert
29492970
// into the map (which prevents the `PeerState` from being cleaned up) for channels that
@@ -3003,8 +3024,8 @@ macro_rules! convert_chan_phase_err {
30033024
ChannelError::Close((msg, reason)) => {
30043025
let logger = WithChannelContext::from(&$self.logger, &$channel.context, None);
30053026
log_error!(logger, "Closing channel {} due to close-required error: {}", $channel_id, msg);
3006-
update_maps_on_chan_removal!($self, $peer_state, $channel.context);
3007-
let shutdown_res = $channel.context.force_shutdown(true, reason);
3027+
let mut shutdown_res = $channel.context.force_shutdown(true, reason);
3028+
locked_close_channel!($self, $peer_state, &$channel.context, &mut shutdown_res);
30083029
let err =
30093030
MsgHandleErrInternal::from_finish_shutdown(msg, *$channel_id, shutdown_res, $channel_update);
30103031
(true, err)
@@ -3071,10 +3092,10 @@ macro_rules! try_chan_phase_entry {
30713092
}
30723093

30733094
macro_rules! remove_channel_phase {
3074-
($self: expr, $peer_state: expr, $entry: expr) => {
3095+
($self: ident, $peer_state: expr, $entry: expr, $shutdown_res_mut: expr) => {
30753096
{
30763097
let channel = $entry.remove_entry().1;
3077-
update_maps_on_chan_removal!($self, $peer_state, &channel.context());
3098+
locked_close_channel!($self, $peer_state, &channel.context(), $shutdown_res_mut);
30783099
channel
30793100
}
30803101
}
@@ -3793,8 +3814,10 @@ where
37933814
peer_state_lock, peer_state, per_peer_state, chan);
37943815
}
37953816
} else {
3796-
let mut chan_phase = remove_channel_phase!(self, peer_state, chan_phase_entry);
3797-
shutdown_result = Some(chan_phase.context_mut().force_shutdown(false, ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(false) }));
3817+
let mut close = chan_phase_entry.get_mut().context_mut()
3818+
.force_shutdown(false, ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(false) });
3819+
remove_channel_phase!(self, peer_state, chan_phase_entry, close);
3820+
shutdown_result = Some(close);
37983821
}
37993822
},
38003823
hash_map::Entry::Vacant(_) => {
@@ -3940,7 +3963,7 @@ where
39403963
}
39413964

39423965
/// When a channel is removed, two things need to happen:
3943-
/// (a) [`update_maps_on_chan_removal`] must be called in the same `per_peer_state` lock as
3966+
/// (a) [`locked_close_channel`] must be called in the same `per_peer_state` lock as
39443967
/// the channel-closing action,
39453968
/// (b) this needs to be called without holding any locks (except
39463969
/// [`ChannelManager::total_consistency_lock`].
@@ -3964,12 +3987,29 @@ where
39643987
self.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
39653988
}
39663989
if let Some((_, funding_txo, _channel_id, monitor_update)) = shutdown_res.monitor_update {
3967-
// There isn't anything we can do if we get an update failure - we're already
3968-
// force-closing. The monitor update on the required in-memory copy should broadcast
3969-
// the latest local state, which is the best we can do anyway. Thus, it is safe to
3970-
// ignore the result here.
3990+
debug_assert!(false, "This should have been handled in `locked_close_channel`");
39713991
let _ = self.apply_post_close_monitor_update(shutdown_res.counterparty_node_id, shutdown_res.channel_id, funding_txo, monitor_update);
39723992
}
3993+
if self.background_events_processed_since_startup.load(Ordering::Acquire) {
3994+
// If a `ChannelMonitorUpdate` was applied (i.e. any time we have a funding txo and are
3995+
// not in the startup sequence, check if we need to handle any
3996+
// `ChannelUpdateCompletionAction`s.
3997+
if let Some(funding_txo) = shutdown_res.channel_funding_txo {
3998+
let per_peer_state = self.per_peer_state.read().unwrap();
3999+
if let Some(peer_state_mtx) = per_peer_state.get(&shutdown_res.counterparty_node_id) {
4000+
let mut peer_state = peer_state_mtx.lock().unwrap();
4001+
if peer_state.in_flight_monitor_updates.get(&funding_txo).map(|l| l.is_empty()).unwrap_or(true) {
4002+
let update_actions = peer_state.monitor_update_blocked_actions
4003+
.remove(&shutdown_res.channel_id).unwrap_or(Vec::new());
4004+
4005+
mem::drop(peer_state);
4006+
mem::drop(per_peer_state);
4007+
4008+
self.handle_monitor_update_completion_actions(update_actions);
4009+
}
4010+
}
4011+
}
4012+
}
39734013
let mut shutdown_results = Vec::new();
39744014
if let Some(txid) = shutdown_res.unbroadcasted_batch_funding_txid {
39754015
let mut funding_batch_states = self.funding_batch_states.lock().unwrap();
@@ -3980,8 +4020,9 @@ where
39804020
if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
39814021
let mut peer_state = peer_state_mutex.lock().unwrap();
39824022
if let Some(mut chan) = peer_state.channel_by_id.remove(&channel_id) {
3983-
update_maps_on_chan_removal!(self, peer_state, &chan.context());
3984-
shutdown_results.push(chan.context_mut().force_shutdown(false, ClosureReason::FundingBatchClosure));
4023+
let mut close_res = chan.context_mut().force_shutdown(false, ClosureReason::FundingBatchClosure);
4024+
locked_close_channel!(self, &mut *peer_state, chan.context(), close_res);
4025+
shutdown_results.push(close_res);
39854026
}
39864027
}
39874028
has_uncompleted_channel = Some(has_uncompleted_channel.map_or(!state, |v| v || !state));
@@ -4038,23 +4079,26 @@ where
40384079
ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(broadcast) }
40394080
};
40404081
let logger = WithContext::from(&self.logger, Some(*peer_node_id), Some(*channel_id), None);
4041-
if let hash_map::Entry::Occupied(chan_phase_entry) = peer_state.channel_by_id.entry(channel_id.clone()) {
4082+
if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(channel_id.clone()) {
40424083
log_error!(logger, "Force-closing channel {}", channel_id);
4043-
let mut chan_phase = remove_channel_phase!(self, peer_state, chan_phase_entry);
4044-
mem::drop(peer_state);
4045-
mem::drop(per_peer_state);
4046-
match chan_phase {
4047-
ChannelPhase::Funded(mut chan) => {
4048-
self.finish_close_channel(chan.context.force_shutdown(broadcast, closure_reason));
4049-
(self.get_channel_update_for_broadcast(&chan).ok(), chan.context.get_counterparty_node_id())
4084+
let (mut shutdown_res, update_opt) = match chan_phase_entry.get_mut() {
4085+
ChannelPhase::Funded(ref mut chan) => {
4086+
(
4087+
chan.context.force_shutdown(broadcast, closure_reason),
4088+
self.get_channel_update_for_broadcast(&chan).ok(),
4089+
)
40504090
},
40514091
ChannelPhase::UnfundedOutboundV1(_) | ChannelPhase::UnfundedInboundV1(_) |
40524092
ChannelPhase::UnfundedOutboundV2(_) | ChannelPhase::UnfundedInboundV2(_) => {
4053-
self.finish_close_channel(chan_phase.context_mut().force_shutdown(false, closure_reason));
40544093
// Unfunded channel has no update
4055-
(None, chan_phase.context().get_counterparty_node_id())
4094+
(chan_phase_entry.get_mut().context_mut().force_shutdown(false, closure_reason), None)
40564095
},
4057-
}
4096+
};
4097+
let chan_phase = remove_channel_phase!(self, peer_state, chan_phase_entry, shutdown_res);
4098+
mem::drop(peer_state);
4099+
mem::drop(per_peer_state);
4100+
self.finish_close_channel(shutdown_res);
4101+
(update_opt, chan_phase.context().get_counterparty_node_id())
40584102
} else if peer_state.inbound_channel_request_by_id.remove(channel_id).is_some() {
40594103
log_error!(logger, "Force-closing channel {}", &channel_id);
40604104
// N.B. that we don't send any channel close event here: we
@@ -5318,9 +5362,10 @@ where
53185362
.map(|peer_state_mutex| peer_state_mutex.lock().unwrap())
53195363
.and_then(|mut peer_state| peer_state.channel_by_id.remove(&channel_id).map(|chan| (chan, peer_state)))
53205364
.map(|(mut chan, mut peer_state)| {
5321-
update_maps_on_chan_removal!(self, peer_state, &chan.context());
53225365
let closure_reason = ClosureReason::ProcessingError { err: e.clone() };
5323-
shutdown_results.push(chan.context_mut().force_shutdown(false, closure_reason));
5366+
let mut close_res = chan.context_mut().force_shutdown(false, closure_reason);
5367+
locked_close_channel!(self, peer_state, chan.context(), close_res);
5368+
shutdown_results.push(close_res);
53245369
peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
53255370
node_id: counterparty_node_id,
53265371
action: msgs::ErrorAction::SendErrorMessage {
@@ -6420,8 +6465,9 @@ where
64206465
log_error!(logger,
64216466
"Force-closing pending channel with ID {} for not establishing in a timely manner",
64226467
context.channel_id());
6423-
update_maps_on_chan_removal!(self, $peer_state, context);
6424-
shutdown_channels.push(context.force_shutdown(false, ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(false) }));
6468+
let mut close_res = context.force_shutdown(false, ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(false) });
6469+
locked_close_channel!(self, $peer_state, context, close_res);
6470+
shutdown_channels.push(close_res);
64256471
$pending_msg_events.push(MessageSendEvent::HandleError {
64266472
node_id: context.get_counterparty_node_id(),
64276473
action: msgs::ErrorAction::SendErrorMessage {
@@ -8597,9 +8643,12 @@ where
85978643
},
85988644
ChannelPhase::UnfundedInboundV1(_) | ChannelPhase::UnfundedOutboundV1(_) |
85998645
ChannelPhase::UnfundedInboundV2(_) | ChannelPhase::UnfundedOutboundV2(_) => {
8600-
log_error!(self.logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id);
8601-
let mut chan = remove_channel_phase!(self, peer_state, chan_phase_entry);
8602-
finish_shutdown = Some(chan.context_mut().force_shutdown(false, ClosureReason::CounterpartyCoopClosedUnfundedChannel));
8646+
let context = phase.context_mut();
8647+
let logger = WithChannelContext::from(&self.logger, context, None);
8648+
log_error!(logger, "Immediately closing unfunded channel {} as peer asked to cooperatively shut it down (which is unnecessary)", &msg.channel_id);
8649+
let mut close_res = phase.context_mut().force_shutdown(false, ClosureReason::CounterpartyCoopClosedUnfundedChannel);
8650+
remove_channel_phase!(self, peer_state, chan_phase_entry, close_res);
8651+
finish_shutdown = Some(close_res);
86038652
},
86048653
}
86058654
} else {
@@ -8640,14 +8689,19 @@ where
86408689
msg,
86418690
});
86428691
}
8643-
if tx.is_some() {
8692+
if let Some(mut close_res) = shutdown_result {
86448693
// We're done with this channel, we've got a signed closing transaction and
86458694
// will send the closing_signed back to the remote peer upon return. This
86468695
// also implies there are no pending HTLCs left on the channel, so we can
86478696
// fully delete it from tracking (the channel monitor is still around to
86488697
// watch for old state broadcasts)!
8649-
(tx, Some(remove_channel_phase!(self, peer_state, chan_phase_entry)), shutdown_result)
8650-
} else { (tx, None, shutdown_result) }
8698+
debug_assert!(tx.is_some());
8699+
let channel_phase = remove_channel_phase!(self, peer_state, chan_phase_entry, close_res);
8700+
(tx, Some(channel_phase), Some(close_res))
8701+
} else {
8702+
debug_assert!(tx.is_none());
8703+
(tx, None, None)
8704+
}
86518705
} else {
86528706
return try_chan_phase_entry!(self, peer_state, Err(ChannelError::close(
86538707
"Got a closing_signed message for an unfunded channel!".into())), chan_phase_entry);
@@ -9370,26 +9424,31 @@ where
93709424
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
93719425
let peer_state = &mut *peer_state_lock;
93729426
let pending_msg_events = &mut peer_state.pending_msg_events;
9373-
if let hash_map::Entry::Occupied(chan_phase_entry) = peer_state.channel_by_id.entry(channel_id) {
9374-
if let ChannelPhase::Funded(mut chan) = remove_channel_phase!(self, peer_state, chan_phase_entry) {
9375-
let reason = if let MonitorEvent::HolderForceClosedWithInfo { reason, .. } = monitor_event {
9376-
reason
9377-
} else {
9378-
ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }
9379-
};
9380-
failed_channels.push(chan.context.force_shutdown(false, reason.clone()));
9427+
if let hash_map::Entry::Occupied(mut chan_phase_entry) = peer_state.channel_by_id.entry(channel_id) {
9428+
let reason = if let MonitorEvent::HolderForceClosedWithInfo { reason, .. } = monitor_event {
9429+
reason
9430+
} else {
9431+
ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) }
9432+
};
9433+
let mut shutdown_res = chan_phase_entry.get_mut().context_mut().force_shutdown(false, reason.clone());
9434+
let chan_phase = remove_channel_phase!(self, peer_state, chan_phase_entry, shutdown_res);
9435+
failed_channels.push(shutdown_res);
9436+
pending_msg_events.push(events::MessageSendEvent::HandleError {
9437+
node_id: chan_phase.context().get_counterparty_node_id(),
9438+
action: msgs::ErrorAction::DisconnectPeer {
9439+
msg: Some(msgs::ErrorMessage {
9440+
channel_id: chan_phase.context().channel_id(),
9441+
data: reason.to_string()
9442+
})
9443+
},
9444+
});
9445+
if let ChannelPhase::Funded(chan) = chan_phase {
93819446
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
93829447
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
93839448
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
93849449
msg: update
93859450
});
93869451
}
9387-
pending_msg_events.push(events::MessageSendEvent::HandleError {
9388-
node_id: chan.context.get_counterparty_node_id(),
9389-
action: msgs::ErrorAction::DisconnectPeer {
9390-
msg: Some(msgs::ErrorMessage { channel_id: chan.context.channel_id(), data: reason.to_string() })
9391-
},
9392-
});
93939452
}
93949453
}
93959454
}
@@ -9567,7 +9626,7 @@ where
95679626
let context = &chan.context();
95689627
let logger = WithChannelContext::from(&self.logger, context, None);
95699628
log_trace!(logger, "Removing channel {} now that the signer is unblocked", context.channel_id());
9570-
update_maps_on_chan_removal!(self, peer_state, context);
9629+
update_maps_on_chan_removal!(self, peer_state, context, shutdown_result);
95719630
shutdown_results.push(shutdown_result);
95729631
false
95739632
} else {
@@ -9608,7 +9667,8 @@ where
96089667
});
96099668
}
96109669
debug_assert_eq!(shutdown_result_opt.is_some(), chan.is_shutdown());
9611-
if let Some(shutdown_result) = shutdown_result_opt {
9670+
if let Some(mut shutdown_result) = shutdown_result_opt {
9671+
locked_close_channel!(self, peer_state, &chan.context, shutdown_result);
96129672
shutdown_results.push(shutdown_result);
96139673
}
96149674
if let Some(tx) = tx_opt {
@@ -9623,7 +9683,6 @@ where
96239683

96249684
log_info!(logger, "Broadcasting {}", log_tx!(tx));
96259685
self.tx_broadcaster.broadcast_transactions(&[&tx]);
9626-
update_maps_on_chan_removal!(self, peer_state, &chan.context);
96279686
false
96289687
} else { true }
96299688
},
@@ -9652,38 +9711,6 @@ where
96529711
has_update
96539712
}
96549713

9655-
/// Handle a list of channel failures during a block_connected or block_disconnected call,
9656-
/// pushing the channel monitor update (if any) to the background events queue and removing the
9657-
/// Channel object.
9658-
fn handle_init_event_channel_failures(&self, mut failed_channels: Vec<ShutdownResult>) {
9659-
for mut failure in failed_channels.drain(..) {
9660-
// Either a commitment transactions has been confirmed on-chain or
9661-
// Channel::block_disconnected detected that the funding transaction has been
9662-
// reorganized out of the main chain.
9663-
// We cannot broadcast our latest local state via monitor update (as
9664-
// Channel::force_shutdown tries to make us do) as we may still be in initialization,
9665-
// so we track the update internally and handle it when the user next calls
9666-
// timer_tick_occurred, guaranteeing we're running normally.
9667-
if let Some((counterparty_node_id, funding_txo, channel_id, update)) = failure.monitor_update.take() {
9668-
assert_eq!(update.updates.len(), 1);
9669-
if let ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast } = update.updates[0] {
9670-
assert!(should_broadcast);
9671-
} else { unreachable!(); }
9672-
if self.background_events_processed_since_startup.load(Ordering::Acquire) {
9673-
let res = self.chain_monitor.update_channel(funding_txo, &update);
9674-
debug_assert_eq!(res, ChannelMonitorUpdateStatus::Completed,
9675-
"TODO: We don't currently handle failures here, this logic is removed in the next commit");
9676-
} else {
9677-
self.pending_background_events.lock().unwrap().push(
9678-
BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
9679-
counterparty_node_id, funding_txo, update, channel_id,
9680-
});
9681-
}
9682-
}
9683-
self.finish_close_channel(failure);
9684-
}
9685-
}
9686-
96879714
/// Utility for creating a BOLT11 invoice that can be verified by [`ChannelManager`] without
96889715
/// storing any additional state. It achieves this by including a [`PaymentSecret`] in the
96899716
/// invoice which it uses to verify that the invoice has not expired and the payment amount is
@@ -11095,11 +11122,12 @@ where
1109511122
}
1109611123
}
1109711124
} else if let Err(reason) = res {
11098-
update_maps_on_chan_removal!(self, peer_state, &channel.context);
1109911125
// It looks like our counterparty went on-chain or funding transaction was
1110011126
// reorged out of the main chain. Close the channel.
1110111127
let reason_message = format!("{}", reason);
11102-
failed_channels.push(channel.context.force_shutdown(true, reason));
11128+
let mut close_res = channel.context.force_shutdown(true, reason);
11129+
locked_close_channel!(self, peer_state, &channel.context, close_res);
11130+
failed_channels.push(close_res);
1110311131
if let Ok(update) = self.get_channel_update_for_broadcast(&channel) {
1110411132
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
1110511133
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
@@ -11175,7 +11203,9 @@ where
1117511203
});
1117611204
}
1117711205

11178-
self.handle_init_event_channel_failures(failed_channels);
11206+
for failure in failed_channels {
11207+
self.finish_close_channel(failure);
11208+
}
1117911209

1118011210
for (source, payment_hash, reason, destination) in timed_out_htlcs.drain(..) {
1118111211
self.fail_htlc_backwards_internal(&source, &payment_hash, &reason, destination);
@@ -11534,8 +11564,9 @@ where
1153411564
},
1153511565
};
1153611566
// Clean up for removal.
11537-
update_maps_on_chan_removal!(self, peer_state, &context);
11538-
failed_channels.push(context.force_shutdown(false, ClosureReason::DisconnectedPeer));
11567+
let mut close_res = context.force_shutdown(false, ClosureReason::DisconnectedPeer);
11568+
locked_close_channel!(self, peer_state, &context, close_res);
11569+
failed_channels.push(close_res);
1153911570
false
1154011571
});
1154111572
// Note that we don't bother generating any events for pre-accept channels -

0 commit comments

Comments
 (0)