@@ -757,6 +757,42 @@ enum BackgroundEvent {
757757 },
758758}
759759
760+ /// A pointer to a channel which is unblocked when an event is surfaced
761+ #[derive(Debug)]
762+ pub(crate) struct EventUnblockedChannel {
763+ counterparty_node_id: PublicKey,
764+ funding_txo: OutPoint,
765+ channel_id: ChannelId,
766+ blocking_action: RAAMonitorUpdateBlockingAction,
767+ }
768+
769+ impl Writeable for EventUnblockedChannel {
770+ fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
771+ self.counterparty_node_id.write(writer)?;
772+ self.funding_txo.write(writer)?;
773+ self.channel_id.write(writer)?;
774+ self.blocking_action.write(writer)
775+ }
776+ }
777+
778+ impl MaybeReadable for EventUnblockedChannel {
779+ fn read<R: Read>(reader: &mut R) -> Result<Option<Self>, DecodeError> {
780+ let counterparty_node_id = Readable::read(reader)?;
781+ let funding_txo = Readable::read(reader)?;
782+ let channel_id = Readable::read(reader)?;
783+ let blocking_action = match RAAMonitorUpdateBlockingAction::read(reader)? {
784+ Some(blocking_action) => blocking_action,
785+ None => return Ok(None),
786+ };
787+ Ok(Some(EventUnblockedChannel {
788+ counterparty_node_id,
789+ funding_txo,
790+ channel_id,
791+ blocking_action,
792+ }))
793+ }
794+ }
795+
760796#[derive(Debug)]
761797pub(crate) enum MonitorUpdateCompletionAction {
762798 /// Indicates that a payment ultimately destined for us was claimed and we should emit an
@@ -774,7 +810,7 @@ pub(crate) enum MonitorUpdateCompletionAction {
774810 /// outbound edge.
775811 EmitEventAndFreeOtherChannel {
776812 event: events::Event,
777- downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, ChannelId, RAAMonitorUpdateBlockingAction) >,
813+ downstream_counterparty_and_funding_outpoint: Option<EventUnblockedChannel >,
778814 },
779815 /// Indicates we should immediately resume the operation of another channel, unless there is
780816 /// some other reason why the channel is blocked. In practice this simply means immediately
@@ -803,7 +839,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
803839 (1, FreeOtherChannelImmediately) => {
804840 (0, downstream_counterparty_node_id, required),
805841 (2, downstream_funding_outpoint, required),
806- (4, blocking_action, required ),
842+ (4, blocking_action, upgradable_required ),
807843 // Note that by the time we get past the required read above, downstream_funding_outpoint will be
808844 // filled in, so we can safely unwrap it here.
809845 (5, downstream_channel_id, (default_value, ChannelId::v1_from_funding_outpoint(downstream_funding_outpoint.0.unwrap()))),
@@ -815,7 +851,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
815851 // monitor updates which aren't properly blocked or resumed, however that's fine - we don't
816852 // support async monitor updates even in LDK 0.0.116 and once we do we'll require no
817853 // downgrades to prior versions.
818- (1, downstream_counterparty_and_funding_outpoint, option ),
854+ (1, downstream_counterparty_and_funding_outpoint, upgradable_option ),
819855 },
820856);
821857
@@ -837,6 +873,30 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
837873 };
838874);
839875
876+ #[derive(Debug)]
877+ pub(crate) struct PendingMPPClaim {
878+ channels_without_preimage: Vec<(PublicKey, OutPoint, ChannelId, u64)>,
879+ channels_with_preimage: Vec<(PublicKey, OutPoint, ChannelId)>,
880+ }
881+
882+ #[derive(Clone)]
883+ pub(crate) struct PendingMPPClaimPointer(Arc<Mutex<PendingMPPClaim>>);
884+
885+ impl PartialEq for PendingMPPClaimPointer {
886+ fn eq(&self, o: &Self) -> bool { Arc::ptr_eq(&self.0, &o.0) }
887+ }
888+ impl Eq for PendingMPPClaimPointer {}
889+
890+ impl core::fmt::Debug for PendingMPPClaimPointer {
891+ fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
892+ let state = self.0.lock().unwrap();
893+ f.debug_struct("PendingMPPClaimPointer")
894+ .field("channels_without_preimage", &state.channels_without_preimage)
895+ .field("channels_with_preimage", &state.channels_with_preimage)
896+ .finish()
897+ }
898+ }
899+
840900#[derive(Clone, PartialEq, Eq, Debug)]
841901/// If something is blocked on the completion of an RAA-generated [`ChannelMonitorUpdate`] we track
842902/// the blocked action here. See enum variants for more info.
@@ -850,6 +910,16 @@ pub(crate) enum RAAMonitorUpdateBlockingAction {
850910 /// The HTLC ID on the inbound edge.
851911 htlc_id: u64,
852912 },
913+ /// We claimed an MPP payment across multiple channels. We have to block removing the payment
914+ /// preimage from any monitor until the last monitor is updated to contain the payment
915+ /// preimage. Otherwise we may not be able to replay the preimage on the monitor(s) which
916+ /// weren't updated on startup.
917+ ///
918+ /// This variant is *not* written to disk, instead being inferred from [`ChannelMonitor`]
919+ /// state.
920+ ClaimedMPPPayment {
921+ pending_claim: PendingMPPClaimPointer,
922+ }
853923}
854924
855925impl RAAMonitorUpdateBlockingAction {
@@ -861,10 +931,57 @@ impl RAAMonitorUpdateBlockingAction {
861931 }
862932}
863933
864- impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction,
865- (0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) }
866- ;);
934+ impl Writeable for RAAMonitorUpdateBlockingAction {
935+ fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
936+ match self {
937+ RAAMonitorUpdateBlockingAction::ForwardedPaymentInboundClaim { channel_id, htlc_id } => {
938+ 0u8.write(writer)?;
939+ write_tlv_fields!(writer, {
940+ (0, channel_id, required),
941+ (2, htlc_id, required),
942+ });
943+ },
944+ RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { .. } => {
945+ 1u8.write(writer)?;
946+ write_tlv_fields!(writer, {});
947+ // This is rebuilt on restart, so we don't bother writing it.
948+ },
949+ }
950+ Ok(())
951+ }
952+ }
867953
954+ impl Readable for Option<RAAMonitorUpdateBlockingAction> {
955+ fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
956+ Ok(RAAMonitorUpdateBlockingAction::read(reader)?)
957+ }
958+ }
959+
960+ impl MaybeReadable for RAAMonitorUpdateBlockingAction {
961+ fn read<R: Read>(reader: &mut R) -> Result<Option<Self>, DecodeError> {
962+ match <u8 as Readable>::read(reader)? {
963+ 0 => {
964+ _init_and_read_len_prefixed_tlv_fields!(reader, {
965+ (0, channel_id, required),
966+ (2, htlc_id, required),
967+ });
968+ Ok(Some(RAAMonitorUpdateBlockingAction::ForwardedPaymentInboundClaim {
969+ channel_id: channel_id.0.unwrap(),
970+ htlc_id: htlc_id.0.unwrap(),
971+ }))
972+ },
973+ // 1 is ClaimedMPPPayment and is handled in the general odd handling below
974+ x if x % 2 == 1 => {
975+ // Discard the contents
976+ let tlv_len: BigSize = Readable::read(reader)?;
977+ FixedLengthReader::new(reader, tlv_len.0)
978+ .eat_remaining().map_err(|_| DecodeError::ShortRead)?;
979+ Ok(None)
980+ },
981+ _ => Err(DecodeError::InvalidValue),
982+ }
983+ }
984+ }
868985
869986/// State we hold per-peer.
870987pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
@@ -6369,7 +6486,12 @@ where
63696486 |htlc_claim_value_msat, definitely_duplicate| {
63706487 let chan_to_release =
63716488 if let Some(node_id) = next_channel_counterparty_node_id {
6372- Some((node_id, next_channel_outpoint, next_channel_id, completed_blocker))
6489+ Some(EventUnblockedChannel {
6490+ counterparty_node_id: node_id,
6491+ funding_txo: next_channel_outpoint,
6492+ channel_id: next_channel_id,
6493+ blocking_action: completed_blocker
6494+ })
63736495 } else {
63746496 // We can only get `None` here if we are processing a
63756497 // `ChannelMonitor`-originated event, in which case we
@@ -6430,10 +6552,10 @@ where
64306552 } else if definitely_duplicate {
64316553 if let Some(other_chan) = chan_to_release {
64326554 Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
6433- downstream_counterparty_node_id: other_chan.0 ,
6434- downstream_funding_outpoint: other_chan.1 ,
6435- downstream_channel_id: other_chan.2 ,
6436- blocking_action: other_chan.3 ,
6555+ downstream_counterparty_node_id: other_chan.counterparty_node_id ,
6556+ downstream_funding_outpoint: other_chan.funding_txo ,
6557+ downstream_channel_id: other_chan.channel_id ,
6558+ blocking_action: other_chan.blocking_action ,
64376559 })
64386560 } else { None }
64396561 } else {
@@ -6504,8 +6626,11 @@ where
65046626 event, downstream_counterparty_and_funding_outpoint
65056627 } => {
65066628 self.pending_events.lock().unwrap().push_back((event, None));
6507- if let Some((node_id, funding_outpoint, channel_id, blocker)) = downstream_counterparty_and_funding_outpoint {
6508- self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
6629+ if let Some(unblocked) = downstream_counterparty_and_funding_outpoint {
6630+ self.handle_monitor_update_release(
6631+ unblocked.counterparty_node_id, unblocked.funding_txo,
6632+ unblocked.channel_id, Some(unblocked.blocking_action),
6633+ );
65096634 }
65106635 },
65116636 MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
@@ -11992,7 +12117,12 @@ where
1199212117 for action in actions.iter() {
1199312118 if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
1199412119 downstream_counterparty_and_funding_outpoint:
11995- Some((blocked_node_id, _blocked_channel_outpoint, blocked_channel_id, blocking_action)), ..
12120+ Some(EventUnblockedChannel {
12121+ counterparty_node_id: blocked_node_id,
12122+ funding_txo: _,
12123+ channel_id: blocked_channel_id,
12124+ blocking_action,
12125+ }), ..
1199612126 } = action {
1199712127 if let Some(blocked_peer_state) = per_peer_state.get(blocked_node_id) {
1199812128 log_trace!(logger,
0 commit comments