Skip to content

Commit e8fa34e

Browse files
committed
Track payments after they resolve until all HTLCs are finalized
In the next commit, we will reload lost pending payments from ChannelMonitors during restart. However, in order to avoid re-adding pending payments which have already been fulfilled, we must ensure that we do not fully remove pending payments until all HTLCs for the payment have been fully removed from their ChannelMonitors. We do so here, introducing a new PendingOutboundPayment variant called `Completed` which only tracks the set of pending HTLCs.
1 parent 30e69fc commit e8fa34e

File tree

1 file changed

+106
-13
lines changed

1 file changed

+106
-13
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 106 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -416,13 +416,44 @@ pub(crate) enum PendingOutboundPayment {
416416
/// Our best known block height at the time this payment was initiated.
417417
starting_block_height: u32,
418418
},
419+
/// When a payment completes, we continue tracking it until all pendings HTLCs have been
420+
/// resolved. This ensures we don't look up pending payments in ChannelMonitors on restart and
421+
/// add a pending payment that was already completed.
422+
Completed {
423+
session_privs: HashSet<[u8; 32]>,
424+
},
419425
}
420426

421427
impl PendingOutboundPayment {
428+
fn is_retryable(&self) -> bool {
429+
match self {
430+
PendingOutboundPayment::Retryable { .. } => true,
431+
_ => false,
432+
}
433+
}
434+
fn is_completed(&self) -> bool {
435+
match self {
436+
PendingOutboundPayment::Completed { .. } => true,
437+
_ => false,
438+
}
439+
}
440+
441+
fn mark_completed(&mut self) {
442+
let mut session_privs = HashSet::new();
443+
core::mem::swap(&mut session_privs, match self {
444+
PendingOutboundPayment::Legacy { session_privs } |
445+
PendingOutboundPayment::Retryable { session_privs, .. } |
446+
PendingOutboundPayment::Completed { session_privs }
447+
=> session_privs
448+
});
449+
*self = PendingOutboundPayment::Completed { session_privs };
450+
}
451+
422452
fn remove(&mut self, session_priv: &[u8; 32], part_amt_msat: u64) -> bool {
423453
let remove_res = match self {
424454
PendingOutboundPayment::Legacy { session_privs } |
425-
PendingOutboundPayment::Retryable { session_privs, .. } => {
455+
PendingOutboundPayment::Retryable { session_privs, .. } |
456+
PendingOutboundPayment::Completed { session_privs } => {
426457
session_privs.remove(session_priv)
427458
}
428459
};
@@ -440,6 +471,7 @@ impl PendingOutboundPayment {
440471
PendingOutboundPayment::Retryable { session_privs, .. } => {
441472
session_privs.insert(session_priv)
442473
}
474+
PendingOutboundPayment::Completed { .. } => false
443475
};
444476
if insert_res {
445477
if let PendingOutboundPayment::Retryable { ref mut pending_amt_msat, .. } = self {
@@ -452,7 +484,8 @@ impl PendingOutboundPayment {
452484
fn remaining_parts(&self) -> usize {
453485
match self {
454486
PendingOutboundPayment::Legacy { session_privs } |
455-
PendingOutboundPayment::Retryable { session_privs, .. } => {
487+
PendingOutboundPayment::Retryable { session_privs, .. } |
488+
PendingOutboundPayment::Completed { session_privs } => {
456489
session_privs.len()
457490
}
458491
}
@@ -1958,6 +1991,15 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
19581991
let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash);
19591992

19601993
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
1994+
let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap();
1995+
let payment_entry = pending_outbounds.entry(payment_id);
1996+
if let hash_map::Entry::Occupied(payment) = &payment_entry {
1997+
if !payment.get().is_retryable() {
1998+
return Err(APIError::RouteError {
1999+
err: "Payment already completed"
2000+
});
2001+
}
2002+
}
19612003

19622004
let err: Result<(), _> = loop {
19632005
let mut channel_lock = self.channel_state.lock().unwrap();
@@ -1985,8 +2027,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
19852027
}, onion_packet, &self.logger),
19862028
channel_state, chan);
19872029

1988-
let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap();
1989-
let payment = pending_outbounds.entry(payment_id).or_insert_with(|| PendingOutboundPayment::Retryable {
2030+
let payment = payment_entry.or_insert_with(|| PendingOutboundPayment::Retryable {
19902031
session_privs: HashSet::new(),
19912032
pending_amt_msat: 0,
19922033
payment_hash: *payment_hash,
@@ -2182,7 +2223,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
21822223
return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError {
21832224
err: "Unable to retry payments that were initially sent on LDK versions prior to 0.0.102".to_string()
21842225
}))
2185-
}
2226+
},
2227+
PendingOutboundPayment::Completed { .. } => {
2228+
return Err(PaymentSendFailure::ParameterError(APIError::RouteError {
2229+
err: "Payment already completed"
2230+
}));
2231+
},
21862232
}
21872233
} else {
21882234
return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError {
@@ -3010,7 +3056,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
30103056
session_priv_bytes.copy_from_slice(&session_priv[..]);
30113057
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
30123058
if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) {
3013-
if payment.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat) {
3059+
if payment.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat) &&
3060+
!payment.get().is_completed()
3061+
{
30143062
self.pending_events.lock().unwrap().push(
30153063
events::Event::PaymentPathFailed {
30163064
payment_hash,
@@ -3059,6 +3107,10 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
30593107
log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
30603108
return;
30613109
}
3110+
if sessions.get().is_completed() {
3111+
log_trace!(self.logger, "Received failure of HTLC with payment_hash {} after payment completion", log_bytes!(payment_hash.0));
3112+
return;
3113+
}
30623114
if sessions.get().remaining_parts() == 0 {
30633115
all_paths_failed = true;
30643116
}
@@ -3305,15 +3357,45 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
33053357
} else { unreachable!(); }
33063358
}
33073359

3360+
fn finalize_claims(&self, mut sources: Vec<HTLCSource>) {
3361+
for source in sources.drain(..) {
3362+
if let HTLCSource::OutboundRoute { session_priv, payment_id, .. } = source {
3363+
let mut session_priv_bytes = [0; 32];
3364+
session_priv_bytes.copy_from_slice(&session_priv[..]);
3365+
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
3366+
if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(payment_id) {
3367+
assert!(sessions.get().is_completed());
3368+
sessions.get_mut().remove(&session_priv_bytes, 0); // Note that the amount is no longer tracked
3369+
if sessions.get().remaining_parts() == 0 {
3370+
sessions.remove();
3371+
}
3372+
}
3373+
}
3374+
}
3375+
}
33083376
fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool) {
33093377
match source {
33103378
HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
33113379
mem::drop(channel_state_lock);
33123380
let mut session_priv_bytes = [0; 32];
33133381
session_priv_bytes.copy_from_slice(&session_priv[..]);
33143382
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
3315-
let found_payment = if let Some(mut sessions) = outbounds.remove(&payment_id) {
3316-
sessions.remove(&session_priv_bytes, path.last().unwrap().fee_msat)
3383+
let found_payment = if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(payment_id) {
3384+
let found_payment = !sessions.get().is_completed();
3385+
sessions.get_mut().mark_completed();
3386+
if from_onchain {
3387+
// We currently immediately remove HTLCs which were fulfilled on-chain.
3388+
// This could potentially lead to removing a pending payment too early,
3389+
// with a reorg of one block causing us to re-add the completed payment on
3390+
// restart.
3391+
// TODO: We should have a second monitor event that informs us of payments
3392+
// irrevocably completing.
3393+
sessions.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat);
3394+
if sessions.get().remaining_parts() == 0 {
3395+
sessions.remove();
3396+
}
3397+
}
3398+
found_payment
33173399
} else { false };
33183400
if found_payment {
33193401
self.pending_events.lock().unwrap().push(
@@ -3954,6 +4036,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
39544036
});
39554037
}
39564038
break Ok((raa_updates.to_forward_htlcs, raa_updates.failed_htlcs,
4039+
raa_updates.finalized_claim_htlcs,
39574040
chan.get().get_short_channel_id()
39584041
.expect("RAA should only work on a short-id-available channel"),
39594042
chan.get().get_funding_txo().unwrap()))
@@ -3963,11 +4046,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
39634046
};
39644047
self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id);
39654048
match res {
3966-
Ok((pending_forwards, mut pending_failures, short_channel_id, channel_outpoint)) => {
4049+
Ok((pending_forwards, mut pending_failures, finalized_claim_htlcs,
4050+
short_channel_id, channel_outpoint)) =>
4051+
{
39674052
for failure in pending_failures.drain(..) {
39684053
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
39694054
}
39704055
self.forward_htlcs(&mut [(short_channel_id, channel_outpoint, pending_forwards)]);
4056+
self.finalize_claims(finalized_claim_htlcs);
39714057
Ok(())
39724058
},
39734059
Err(e) => Err(e)
@@ -5313,10 +5399,13 @@ impl_writeable_tlv_based!(PendingInboundPayment, {
53135399
(8, min_value_msat, required),
53145400
});
53155401

5316-
impl_writeable_tlv_based_enum!(PendingOutboundPayment,
5402+
impl_writeable_tlv_based_enum_upgradable!(PendingOutboundPayment,
53175403
(0, Legacy) => {
53185404
(0, session_privs, required),
53195405
},
5406+
(1, Completed) => {
5407+
(0, session_privs, required),
5408+
},
53205409
(2, Retryable) => {
53215410
(0, session_privs, required),
53225411
(2, payment_hash, required),
@@ -5325,7 +5414,7 @@ impl_writeable_tlv_based_enum!(PendingOutboundPayment,
53255414
(8, pending_amt_msat, required),
53265415
(10, starting_block_height, required),
53275416
},
5328-
;);
5417+
);
53295418

53305419
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable for ChannelManager<Signer, M, T, K, F, L>
53315420
where M::Target: chain::Watch<Signer>,
@@ -5418,7 +5507,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
54185507
// For backwards compat, write the session privs and their total length.
54195508
let mut num_pending_outbounds_compat: u64 = 0;
54205509
for (_, outbound) in pending_outbound_payments.iter() {
5421-
num_pending_outbounds_compat += outbound.remaining_parts() as u64;
5510+
if !outbound.is_completed() {
5511+
num_pending_outbounds_compat += outbound.remaining_parts() as u64;
5512+
}
54225513
}
54235514
num_pending_outbounds_compat.write(writer)?;
54245515
for (_, outbound) in pending_outbound_payments.iter() {
@@ -5429,6 +5520,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
54295520
session_priv.write(writer)?;
54305521
}
54315522
}
5523+
PendingOutboundPayment::Completed { .. }=> {},
54325524
}
54335525
}
54345526

@@ -5439,7 +5531,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
54395531
PendingOutboundPayment::Legacy { session_privs } |
54405532
PendingOutboundPayment::Retryable { session_privs, .. } => {
54415533
pending_outbound_payments_no_retry.insert(*id, session_privs.clone());
5442-
}
5534+
},
5535+
_ => {},
54435536
}
54445537
}
54455538
write_tlv_fields!(writer, {

0 commit comments

Comments
 (0)