@@ -799,7 +799,13 @@ pub(crate) enum MonitorUpdateCompletionAction {
799799 /// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for
800800 /// this payment. Note that this is only best-effort. On restart it's possible such a duplicate
801801 /// event can be generated.
802- PaymentClaimed { payment_hash: PaymentHash },
802+ PaymentClaimed {
803+ payment_hash: PaymentHash,
804+ /// A pending MPP claim which hasn't yet completed.
805+ ///
806+ /// Not written to disk.
807+ pending_mpp_claim: Option<(PublicKey, ChannelId, u64, PendingMPPClaimPointer)>,
808+ },
803809 /// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the
804810 /// operation of another channel.
805811 ///
@@ -833,7 +839,10 @@ pub(crate) enum MonitorUpdateCompletionAction {
833839}
834840
835841impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
836- (0, PaymentClaimed) => { (0, payment_hash, required) },
842+ (0, PaymentClaimed) => {
843+ (0, payment_hash, required),
844+ (9999999999, pending_mpp_claim, (static_value, None)),
845+ },
837846 // Note that FreeOtherChannelImmediately should never be written - we were supposed to free
838847 // *immediately*. However, for simplicity we implement read/write here.
839848 (1, FreeOtherChannelImmediately) => {
@@ -6259,13 +6268,44 @@ where
62596268 return;
62606269 }
62616270 if valid_mpp {
6262- for htlc in sources.drain(..) {
6271+ let mut pending_claim_ptr_opt = None;
6272+ let mut source_claim_pairs = Vec::with_capacity(sources.len());
6273+ if sources.len() > 1 {
6274+ let mut pending_claims = PendingMPPClaim {
6275+ channels_without_preimage: Vec::new(),
6276+ channels_with_preimage: Vec::new(),
6277+ };
6278+ for htlc in sources.drain(..) {
6279+ if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
6280+ let htlc_id = htlc.prev_hop.htlc_id;
6281+ let chan_id = htlc.prev_hop.channel_id;
6282+ let chan_outpoint = htlc.prev_hop.outpoint;
6283+ pending_claims.channels_without_preimage.push((cp_id, chan_outpoint, chan_id, htlc_id));
6284+ source_claim_pairs.push((htlc, Some((cp_id, chan_id, htlc_id))));
6285+ }
6286+ }
6287+ pending_claim_ptr_opt = Some(Arc::new(Mutex::new(pending_claims)));
6288+ } else {
6289+ for htlc in sources.drain(..) {
6290+ source_claim_pairs.push((htlc, None));
6291+ }
6292+ }
6293+ for (htlc, mpp_claim) in source_claim_pairs.drain(..) {
6294+ let mut pending_mpp_claim = None;
6295+ let pending_claim_ptr = pending_claim_ptr_opt.as_ref().map(|pending_claim| {
6296+ pending_mpp_claim = mpp_claim.map(|(cp_id, chan_id, htlc_id)|
6297+ (cp_id, chan_id, htlc_id, PendingMPPClaimPointer(Arc::clone(pending_claim)))
6298+ );
6299+ RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
6300+ pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)),
6301+ }
6302+ });
62636303 let prev_hop_chan_id = htlc.prev_hop.channel_id;
62646304 if let Err((pk, err)) = self.claim_funds_from_hop(
62656305 htlc.prev_hop, payment_preimage,
62666306 |_, definitely_duplicate| {
62676307 debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
6268- Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } )
6308+ ( Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim }), pending_claim_ptr )
62696309 }
62706310 ) {
62716311 if let msgs::ErrorAction::IgnoreError = err.err.action {
@@ -6296,7 +6336,7 @@ where
62966336 }
62976337 }
62986338
6299- fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> Option<MonitorUpdateCompletionAction>>(&self,
6339+ fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> ( Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>) >(&self,
63006340 prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
63016341 -> Result<(), (PublicKey, MsgHandleErrInternal)> {
63026342 //TODO: Delay the claimed_funds relaying just like we do outbound relay!
@@ -6335,11 +6375,15 @@ where
63356375
63366376 match fulfill_res {
63376377 UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
6338- if let Some(action) = completion_action(Some(htlc_value_msat), false) {
6378+ let (action_opt, raa_blocker_opt) = completion_action(Some(htlc_value_msat), false);
6379+ if let Some(action) = action_opt {
63396380 log_trace!(logger, "Tracking monitor update completion action for channel {}: {:?}",
63406381 chan_id, action);
63416382 peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
63426383 }
6384+ if let Some(raa_blocker) = raa_blocker_opt {
6385+ peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
6386+ }
63436387 if !during_init {
63446388 handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
63456389 peer_state, per_peer_state, chan);
@@ -6357,11 +6401,17 @@ where
63576401 }
63586402 }
63596403 UpdateFulfillCommitFetch::DuplicateClaim {} => {
6360- let action = if let Some(action) = completion_action(None, true) {
6404+ let (action_opt, raa_blocker_opt) = completion_action(None, true);
6405+ let action = if let Some(action) = action_opt {
63616406 action
63626407 } else {
63636408 return Ok(());
63646409 };
6410+ if let Some(raa_blocker) = raa_blocker_opt {
6411+ debug_assert!(peer_state.actions_blocking_raa_monitor_updates.get(&chan_id).unwrap().contains(&raa_blocker));
6412+ // XXX: push raa_blocker_opt?
6413+ }
6414+
63656415 mem::drop(peer_state_lock);
63666416
63676417 log_trace!(logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
@@ -6448,7 +6498,47 @@ where
64486498 // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
64496499 // generally always allowed to be duplicative (and it's specifically noted in
64506500 // `PaymentForwarded`).
6451- self.handle_monitor_update_completion_actions(completion_action(None, false));
6501+ let (action_opt, raa_blocker_opt) = completion_action(None, false);
6502+
6503+ if let Some(raa_blocker) = raa_blocker_opt {
6504+ let counterparty_node_id = prev_hop.counterparty_node_id.or_else(||
6505+ // prev_hop.counterparty_node_id is always available for payments received after
6506+ // LDK 0.0.123, but for those received on 0.0.123 and claimed later, we need to
6507+ // look up the counterparty in the `action_opt`, if possible.
6508+ if let Some(action) = &action_opt {
6509+ if let MonitorUpdateCompletionAction::PaymentClaimed { pending_mpp_claim, .. } = action {
6510+ if let Some((node_id, _, _, _)) = pending_mpp_claim {
6511+ Some(*node_id)
6512+ } else { None }
6513+ } else { None }
6514+ } else { None });
6515+ if let Some(counterparty_node_id) = counterparty_node_id {
6516+ // TODO: Avoid always blocking the world for the write lock here.
6517+ let mut per_peer_state = self.per_peer_state.write().unwrap();
6518+ let peer_state_mutex = per_peer_state.entry(counterparty_node_id).or_insert_with(||
6519+ Mutex::new(PeerState {
6520+ channel_by_id: new_hash_map(),
6521+ inbound_channel_request_by_id: new_hash_map(),
6522+ latest_features: InitFeatures::empty(),
6523+ pending_msg_events: Vec::new(),
6524+ in_flight_monitor_updates: BTreeMap::new(),
6525+ monitor_update_blocked_actions: BTreeMap::new(),
6526+ actions_blocking_raa_monitor_updates: BTreeMap::new(),
6527+ is_connected: false,
6528+ }));
6529+ let mut peer_state = peer_state_mutex.lock().unwrap();
6530+
6531+ peer_state.actions_blocking_raa_monitor_updates
6532+ .entry(prev_hop.channel_id)
6533+ .or_insert_with(Vec::new)
6534+ .push(raa_blocker);
6535+ } else {
6536+ debug_assert!(false,
6537+ "RAA ChannelMonitorUpdate blockers are only set with PaymentClaimed completion actions, so we should always have a counterparty node id");
6538+ }
6539+ }
6540+
6541+ self.handle_monitor_update_completion_actions(action_opt);
64526542 Ok(())
64536543 }
64546544
@@ -6548,16 +6638,16 @@ where
65486638 }
65496639 }), "{:?}", *background_events);
65506640 }
6551- None
6641+ ( None, None)
65526642 } else if definitely_duplicate {
65536643 if let Some(other_chan) = chan_to_release {
6554- Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
6644+ ( Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
65556645 downstream_counterparty_node_id: other_chan.counterparty_node_id,
65566646 downstream_funding_outpoint: other_chan.funding_txo,
65576647 downstream_channel_id: other_chan.channel_id,
65586648 blocking_action: other_chan.blocking_action,
6559- })
6560- } else { None }
6649+ }), None)
6650+ } else { ( None, None) }
65616651 } else {
65626652 let total_fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
65636653 if let Some(claimed_htlc_value) = htlc_claim_value_msat {
@@ -6566,7 +6656,7 @@ where
65666656 } else { None };
65676657 debug_assert!(skimmed_fee_msat <= total_fee_earned_msat,
65686658 "skimmed_fee_msat must always be included in total_fee_earned_msat");
6569- Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
6659+ ( Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
65706660 event: events::Event::PaymentForwarded {
65716661 prev_channel_id: Some(prev_channel_id),
65726662 next_channel_id: Some(next_channel_id),
@@ -6578,7 +6668,7 @@ where
65786668 outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
65796669 },
65806670 downstream_counterparty_and_funding_outpoint: chan_to_release,
6581- })
6671+ }), None)
65826672 }
65836673 });
65846674 if let Err((pk, err)) = res {
@@ -6599,9 +6689,44 @@ where
65996689 debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
66006690 debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
66016691
6692+ let mut freed_channels = Vec::new();
6693+
66026694 for action in actions.into_iter() {
66036695 match action {
6604- MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
6696+ MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim } => {
6697+ if let Some((counterparty_node_id, chan_id, htlc_id, claim_ptr)) = pending_mpp_claim {
6698+ let per_peer_state = self.per_peer_state.read().unwrap();
6699+ per_peer_state.get(&counterparty_node_id).map(|peer_state_mutex| {
6700+ let mut peer_state = peer_state_mutex.lock().unwrap();
6701+ let blockers_entry = peer_state.actions_blocking_raa_monitor_updates.entry(chan_id);
6702+ if let btree_map::Entry::Occupied(mut blockers) = blockers_entry {
6703+ blockers.get_mut().retain(|blocker|
6704+ if let &RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { pending_claim } = &blocker {
6705+ if *pending_claim == claim_ptr {
6706+ let mut pending_claim_state_lock = pending_claim.0.lock().unwrap();
6707+ let pending_claim_state = &mut *pending_claim_state_lock;
6708+ pending_claim_state.channels_without_preimage.retain(|(cp, outp, cid, hid)| {
6709+ if *cp == counterparty_node_id && *cid == chan_id && *hid == htlc_id {
6710+ pending_claim_state.channels_with_preimage.push((*cp, *outp, *cid));
6711+ false
6712+ } else { true }
6713+ });
6714+ if pending_claim_state.channels_without_preimage.is_empty() {
6715+ for (cp, outp, cid) in pending_claim_state.channels_with_preimage.iter() {
6716+ freed_channels.push((*cp, *outp, *cid, blocker.clone()));
6717+ }
6718+ }
6719+ !pending_claim_state.channels_without_preimage.is_empty()
6720+ } else { true }
6721+ } else { true }
6722+ );
6723+ if blockers.get().is_empty() {
6724+ blockers.remove();
6725+ }
6726+ }
6727+ });
6728+ }
6729+
66056730 let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
66066731 if let Some(ClaimingPayment {
66076732 amount_msat,
@@ -6645,6 +6770,10 @@ where
66456770 },
66466771 }
66476772 }
6773+
6774+ for (node_id, funding_outpoint, channel_id, blocker) in freed_channels {
6775+ self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
6776+ }
66486777 }
66496778
66506779 /// Handles a channel reentering a functional state, either due to reconnect or a monitor
0 commit comments