@@ -34,6 +34,7 @@ use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload
3434use crate :: util:: async_poll:: { MultiResultFuturePoller , ResultFuture } ;
3535use crate :: util:: logger:: { Logger , WithContext } ;
3636use crate :: util:: ser:: Writeable ;
37+ use crate :: util:: wakers:: { Future , Notifier } ;
3738
3839use core:: fmt;
3940use core:: ops:: Deref ;
@@ -266,6 +267,9 @@ pub struct OnionMessenger<
266267 pending_intercepted_msgs_events : Mutex < Vec < Event > > ,
267268 pending_peer_connected_events : Mutex < Vec < Event > > ,
268269 pending_events_processor : AtomicBool ,
270+ /// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for
271+ /// it to give to users.
272+ event_notifier : Notifier ,
269273}
270274
271275/// [`OnionMessage`]s buffered to be sent.
@@ -290,13 +294,19 @@ impl OnionMessageRecipient {
290294 }
291295 }
292296
293- fn enqueue_message ( & mut self , message : OnionMessage ) {
297+ // Returns whether changes were made that are pending event processing
298+ fn enqueue_message ( & mut self , message : OnionMessage ) -> bool {
299+ let mut pending_event_processing = false ;
294300 let pending_messages = match self {
295301 OnionMessageRecipient :: ConnectedPeer ( pending_messages) => pending_messages,
296- OnionMessageRecipient :: PendingConnection ( pending_messages, _, _) => pending_messages,
302+ OnionMessageRecipient :: PendingConnection ( pending_messages, _, _) => {
303+ pending_event_processing = true ;
304+ pending_messages
305+ }
297306 } ;
298307
299308 pending_messages. push_back ( message) ;
309+ pending_event_processing
300310 }
301311
302312 fn dequeue_message ( & mut self ) -> Option < OnionMessage > {
@@ -1037,6 +1047,7 @@ macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset:
10371047 if $res. iter( ) . any( |r| r. is_err( ) ) {
10381048 // We failed handling some events. Return to have them eventually replayed.
10391049 $self. pending_events_processor. store( false , Ordering :: Release ) ;
1050+ $self. event_notifier. notify( ) ;
10401051 return ;
10411052 }
10421053 }
@@ -1119,6 +1130,7 @@ where
11191130 pending_intercepted_msgs_events : Mutex :: new ( Vec :: new ( ) ) ,
11201131 pending_peer_connected_events : Mutex :: new ( Vec :: new ( ) ) ,
11211132 pending_events_processor : AtomicBool :: new ( false ) ,
1133+ event_notifier : Notifier :: new ( ) ,
11221134 }
11231135 }
11241136
@@ -1228,13 +1240,19 @@ where
12281240 hash_map:: Entry :: Vacant ( e) => match addresses {
12291241 None => Err ( SendError :: InvalidFirstHop ( first_node_id) ) ,
12301242 Some ( addresses) => {
1231- e. insert ( OnionMessageRecipient :: pending_connection ( addresses) )
1243+ let notify = e. insert ( OnionMessageRecipient :: pending_connection ( addresses) )
12321244 . enqueue_message ( onion_message) ;
1245+ if notify {
1246+ self . event_notifier . notify ( ) ;
1247+ }
12331248 Ok ( SendSuccess :: BufferedAwaitingConnection ( first_node_id) )
12341249 } ,
12351250 } ,
12361251 hash_map:: Entry :: Occupied ( mut e) => {
1237- e. get_mut ( ) . enqueue_message ( onion_message) ;
1252+ let notify = e. get_mut ( ) . enqueue_message ( onion_message) ;
1253+ if notify {
1254+ self . event_notifier . notify ( ) ;
1255+ }
12381256 if e. get ( ) . is_connected ( ) {
12391257 Ok ( SendSuccess :: Buffered )
12401258 } else {
@@ -1345,6 +1363,18 @@ where
13451363 return
13461364 }
13471365 pending_intercepted_msgs_events. push ( event) ;
1366+ self . event_notifier . notify ( ) ;
1367+ }
1368+
1369+ /// Gets a [`Future`] that completes when an event is available via
1370+ /// [`EventsProvider::process_pending_events`] or [`Self::process_pending_events_async`].
1371+ ///
1372+ /// Note that callbacks registered on the [`Future`] MUST NOT call back into this
1373+ /// [`OnionMessenger`] and should instead register actions to be taken later.
1374+ ///
1375+ /// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
1376+ pub fn get_update_future ( & self ) -> Future {
1377+ self . event_notifier . get_future ( )
13481378 }
13491379
13501380 /// Processes any events asynchronously using the given handler.
@@ -1616,6 +1646,7 @@ where
16161646 pending_peer_connected_events. push (
16171647 Event :: OnionMessagePeerConnected { peer_node_id : * their_node_id }
16181648 ) ;
1649+ self . event_notifier . notify ( ) ;
16191650 }
16201651 } else {
16211652 self . message_recipients . lock ( ) . unwrap ( ) . remove ( their_node_id) ;
0 commit comments