@@ -491,8 +491,11 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
491491 /// which may generate a claim event, we may receive similar duplicate claim/fail MonitorEvents
492492 /// after reloading from disk while replaying blocks against ChannelMonitors.
493493 ///
494+ /// Each payment has each of its MPP part's session_priv bytes in the HashSet of the map (even
495+ /// payments over a single path).
496+ ///
494497 /// Locked *after* channel_state.
495- pending_outbound_payments : Mutex < HashSet < [ u8 ; 32 ] > > ,
498+ pending_outbound_payments : Mutex < HashMap < MppId , HashSet < [ u8 ; 32 ] > > > ,
496499
497500 our_network_key : SecretKey ,
498501 our_network_pubkey : PublicKey ,
@@ -1156,7 +1159,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
11561159 pending_msg_events : Vec :: new ( ) ,
11571160 } ) ,
11581161 pending_inbound_payments : Mutex :: new ( HashMap :: new ( ) ) ,
1159- pending_outbound_payments : Mutex :: new ( HashSet :: new ( ) ) ,
1162+ pending_outbound_payments : Mutex :: new ( HashMap :: new ( ) ) ,
11601163
11611164 our_network_key : keys_manager. get_node_secret ( ) ,
11621165 our_network_pubkey : PublicKey :: from_secret_key ( & secp_ctx, & keys_manager. get_node_secret ( ) ) ,
@@ -1853,7 +1856,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
18531856 let onion_packet = onion_utils:: construct_onion_packet ( onion_payloads, onion_keys, prng_seed, payment_hash) ;
18541857
18551858 let _persistence_guard = PersistenceNotifierGuard :: notify_on_drop ( & self . total_consistency_lock , & self . persistence_notifier ) ;
1856- assert ! ( self . pending_outbound_payments. lock( ) . unwrap( ) . insert( session_priv_bytes) ) ;
1859+ let mut pending_outbounds = self . pending_outbound_payments . lock ( ) . unwrap ( ) ;
1860+ let sessions = pending_outbounds. entry ( mpp_id) . or_insert ( HashSet :: new ( ) ) ;
1861+ assert ! ( sessions. insert( session_priv_bytes) ) ;
18571862
18581863 let err: Result < ( ) , _ > = loop {
18591864 let mut channel_lock = self . channel_state . lock ( ) . unwrap ( ) ;
@@ -2832,23 +2837,27 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
28322837 self . fail_htlc_backwards_internal ( channel_state,
28332838 htlc_src, & payment_hash, HTLCFailReason :: Reason { failure_code, data : onion_failure_data} ) ;
28342839 } ,
2835- HTLCSource :: OutboundRoute { session_priv, .. } => {
2836- if {
2837- let mut session_priv_bytes = [ 0 ; 32 ] ;
2838- session_priv_bytes. copy_from_slice ( & session_priv[ ..] ) ;
2839- self . pending_outbound_payments . lock ( ) . unwrap ( ) . remove ( & session_priv_bytes)
2840- } {
2841- self . pending_events . lock ( ) . unwrap ( ) . push (
2842- events:: Event :: PaymentFailed {
2843- payment_hash,
2844- rejected_by_dest : false ,
2845- network_update : None ,
2846- #[ cfg( test) ]
2847- error_code : None ,
2848- #[ cfg( test) ]
2849- error_data : None ,
2840+ HTLCSource :: OutboundRoute { session_priv, mpp_id, .. } => {
2841+ let mut session_priv_bytes = [ 0 ; 32 ] ;
2842+ session_priv_bytes. copy_from_slice ( & session_priv[ ..] ) ;
2843+ let mut outbounds = self . pending_outbound_payments . lock ( ) . unwrap ( ) ;
2844+ if let hash_map:: Entry :: Occupied ( mut sessions) = outbounds. entry ( mpp_id) {
2845+ if sessions. get_mut ( ) . remove ( & session_priv_bytes) {
2846+ self . pending_events . lock ( ) . unwrap ( ) . push (
2847+ events:: Event :: PaymentFailed {
2848+ payment_hash,
2849+ rejected_by_dest : false ,
2850+ network_update : None ,
2851+ #[ cfg( test) ]
2852+ error_code : None ,
2853+ #[ cfg( test) ]
2854+ error_data : None ,
2855+ }
2856+ ) ;
2857+ if sessions. get ( ) . len ( ) == 0 {
2858+ sessions. remove ( ) ;
28502859 }
2851- )
2860+ }
28522861 } else {
28532862 log_trace ! ( self . logger, "Received duplicative fail for HTLC with payment_hash {}" , log_bytes!( payment_hash. 0 ) ) ;
28542863 }
@@ -2873,12 +2882,19 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
28732882 // from block_connected which may run during initialization prior to the chain_monitor
28742883 // being fully configured. See the docs for `ChannelManagerReadArgs` for more.
28752884 match source {
2876- HTLCSource :: OutboundRoute { ref path, session_priv, .. } => {
2877- if {
2878- let mut session_priv_bytes = [ 0 ; 32 ] ;
2879- session_priv_bytes. copy_from_slice ( & session_priv[ ..] ) ;
2880- !self . pending_outbound_payments . lock ( ) . unwrap ( ) . remove ( & session_priv_bytes)
2881- } {
2885+ HTLCSource :: OutboundRoute { ref path, session_priv, mpp_id, .. } => {
2886+ let mut session_priv_bytes = [ 0 ; 32 ] ;
2887+ session_priv_bytes. copy_from_slice ( & session_priv[ ..] ) ;
2888+ let mut outbounds = self . pending_outbound_payments . lock ( ) . unwrap ( ) ;
2889+ if let hash_map:: Entry :: Occupied ( mut sessions) = outbounds. entry ( mpp_id) {
2890+ if !sessions. get_mut ( ) . remove ( & session_priv_bytes) {
2891+ log_trace ! ( self . logger, "Received duplicative fail for HTLC with payment_hash {}" , log_bytes!( payment_hash. 0 ) ) ;
2892+ return ;
2893+ }
2894+ if sessions. get ( ) . len ( ) == 0 {
2895+ sessions. remove ( ) ;
2896+ }
2897+ } else {
28822898 log_trace ! ( self . logger, "Received duplicative fail for HTLC with payment_hash {}" , log_bytes!( payment_hash. 0 ) ) ;
28832899 return ;
28842900 }
@@ -3119,17 +3135,22 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
31193135
31203136 fn claim_funds_internal ( & self , mut channel_state_lock : MutexGuard < ChannelHolder < Signer > > , source : HTLCSource , payment_preimage : PaymentPreimage , forwarded_htlc_value_msat : Option < u64 > , from_onchain : bool ) {
31213137 match source {
3122- HTLCSource :: OutboundRoute { session_priv, .. } => {
3138+ HTLCSource :: OutboundRoute { session_priv, mpp_id , .. } => {
31233139 mem:: drop ( channel_state_lock) ;
3124- if {
3125- let mut session_priv_bytes = [ 0 ; 32 ] ;
3126- session_priv_bytes. copy_from_slice ( & session_priv[ ..] ) ;
3127- self . pending_outbound_payments . lock ( ) . unwrap ( ) . remove ( & session_priv_bytes)
3128- } {
3129- let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) ;
3130- pending_events. push ( events:: Event :: PaymentSent {
3131- payment_preimage
3132- } ) ;
3140+ let mut session_priv_bytes = [ 0 ; 32 ] ;
3141+ session_priv_bytes. copy_from_slice ( & session_priv[ ..] ) ;
3142+ let mut outbounds = self . pending_outbound_payments . lock ( ) . unwrap ( ) ;
3143+ if let Some ( sessions) = outbounds. get_mut ( & mpp_id) {
3144+ if sessions. remove ( & session_priv_bytes) {
3145+ self . pending_events . lock ( ) . unwrap ( ) . push (
3146+ events:: Event :: PaymentSent { payment_preimage }
3147+ ) ;
3148+ if sessions. len ( ) == 0 {
3149+ outbounds. remove ( & mpp_id) ;
3150+ }
3151+ } else {
3152+ log_trace ! ( self . logger, "Received duplicative fulfill for HTLC with payment_preimage {}" , log_bytes!( payment_preimage. 0 ) ) ;
3153+ }
31333154 } else {
31343155 log_trace ! ( self . logger, "Received duplicative fulfill for HTLC with payment_preimage {}" , log_bytes!( payment_preimage. 0 ) ) ;
31353156 }
@@ -5105,12 +5126,21 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
51055126 }
51065127
51075128 let pending_outbound_payments = self . pending_outbound_payments . lock ( ) . unwrap ( ) ;
5108- ( pending_outbound_payments. len ( ) as u64 ) . write ( writer) ?;
5109- for session_priv in pending_outbound_payments. iter ( ) {
5110- session_priv. write ( writer) ?;
5129+ // For backwards compat, write the session privs and their total length.
5130+ let mut num_pending_outbounds_compat: u64 = 0 ;
5131+ for ( _, outbounds) in pending_outbound_payments. iter ( ) {
5132+ num_pending_outbounds_compat += outbounds. len ( ) as u64 ;
5133+ }
5134+ num_pending_outbounds_compat. write ( writer) ?;
5135+ for ( _, outbounds) in pending_outbound_payments. iter ( ) {
5136+ for outbound in outbounds. iter ( ) {
5137+ outbound. write ( writer) ?;
5138+ }
51115139 }
51125140
5113- write_tlv_fields ! ( writer, { } ) ;
5141+ write_tlv_fields ! ( writer, {
5142+ ( 1 , pending_outbound_payments, required) ,
5143+ } ) ;
51145144
51155145 Ok ( ( ) )
51165146 }
@@ -5363,15 +5393,23 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
53635393 }
53645394 }
53655395
5366- let pending_outbound_payments_count: u64 = Readable :: read ( reader) ?;
5367- let mut pending_outbound_payments: HashSet < [ u8 ; 32 ] > = HashSet :: with_capacity ( cmp:: min ( pending_outbound_payments_count as usize , MAX_ALLOC_SIZE /32 ) ) ;
5368- for _ in 0 ..pending_outbound_payments_count {
5369- if !pending_outbound_payments. insert ( Readable :: read ( reader) ?) {
5370- return Err ( DecodeError :: InvalidValue ) ;
5371- }
5396+ let pending_outbound_payments_count_compat: u64 = Readable :: read ( reader) ?;
5397+ let mut pending_outbound_payments_compat: HashMap < MppId , HashSet < [ u8 ; 32 ] > > =
5398+ HashMap :: with_capacity ( cmp:: min ( pending_outbound_payments_count_compat as usize , MAX_ALLOC_SIZE /32 ) ) ;
5399+ for _ in 0 ..pending_outbound_payments_count_compat {
5400+ let session_priv = Readable :: read ( reader) ?;
5401+ if pending_outbound_payments_compat. insert ( MppId ( session_priv) , [ session_priv] . iter ( ) . cloned ( ) . collect ( ) ) . is_some ( ) {
5402+ return Err ( DecodeError :: InvalidValue )
5403+ } ;
53725404 }
53735405
5374- read_tlv_fields ! ( reader, { } ) ;
5406+ let mut pending_outbound_payments = None ;
5407+ read_tlv_fields ! ( reader, {
5408+ ( 1 , pending_outbound_payments, option) ,
5409+ } ) ;
5410+ if pending_outbound_payments. is_none ( ) {
5411+ pending_outbound_payments = Some ( pending_outbound_payments_compat) ;
5412+ }
53755413
53765414 let mut secp_ctx = Secp256k1 :: new ( ) ;
53775415 secp_ctx. seeded_randomize ( & args. keys_manager . get_secure_random_bytes ( ) ) ;
@@ -5392,7 +5430,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
53925430 pending_msg_events : Vec :: new ( ) ,
53935431 } ) ,
53945432 pending_inbound_payments : Mutex :: new ( pending_inbound_payments) ,
5395- pending_outbound_payments : Mutex :: new ( pending_outbound_payments) ,
5433+ pending_outbound_payments : Mutex :: new ( pending_outbound_payments. unwrap ( ) ) ,
53965434
53975435 our_network_key : args. keys_manager . get_node_secret ( ) ,
53985436 our_network_pubkey : PublicKey :: from_secret_key ( & secp_ctx, & args. keys_manager . get_node_secret ( ) ) ,
0 commit comments