@@ -45,7 +45,7 @@ impl Notifier {
4545 pub ( crate ) fn notify ( & self ) {
4646 let mut lock = self . notify_pending . lock ( ) . unwrap ( ) ;
4747 if let Some ( future_state) = & lock. 1 {
48- if future_state . lock ( ) . unwrap ( ) . complete ( ) {
48+ if complete_future ( future_state ) {
4949 lock. 1 = None ;
5050 return ;
5151 }
@@ -69,6 +69,7 @@ impl Notifier {
6969 } else {
7070 let state = Arc :: new ( Mutex :: new ( FutureState {
7171 callbacks : Vec :: new ( ) ,
72+ callbacks_with_state : Vec :: new ( ) ,
7273 complete : lock. 0 ,
7374 callbacks_made : false ,
7475 } ) ) ;
@@ -112,19 +113,24 @@ pub(crate) struct FutureState {
112113 // first bool - set to false if we're just calling a Waker, and true if we're calling an actual
113114 // user-provided function.
114115 callbacks : Vec < ( bool , Box < dyn FutureCallback > ) > ,
116+ callbacks_with_state : Vec < ( bool , Box < dyn Fn ( & Arc < Mutex < FutureState > > ) -> ( ) + Send > ) > ,
115117 complete : bool ,
116118 callbacks_made : bool ,
117119}
118120
119- impl FutureState {
120- fn complete ( & mut self ) -> bool {
121- for ( counts_as_call, callback) in self . callbacks . drain ( ..) {
122- callback. call ( ) ;
123- self . callbacks_made |= counts_as_call;
124- }
125- self . complete = true ;
126- self . callbacks_made
121+ fn complete_future ( this : & Arc < Mutex < FutureState > > ) -> bool {
122+ let mut state_lock = this. lock ( ) . unwrap ( ) ;
123+ let state = & mut * state_lock;
124+ for ( counts_as_call, callback) in state. callbacks . drain ( ..) {
125+ callback. call ( ) ;
126+ state. callbacks_made |= counts_as_call;
127+ }
128+ for ( counts_as_call, callback) in state. callbacks_with_state . drain ( ..) {
129+ ( callback) ( this) ;
130+ state. callbacks_made |= counts_as_call;
127131 }
132+ state. complete = true ;
133+ state. callbacks_made
128134}
129135
130136/// A simple future which can complete once, and calls some callback(s) when it does so.
@@ -240,14 +246,13 @@ impl Sleeper {
240246 for notifier_mtx in self . notifiers . iter ( ) {
241247 let cv_ref = Arc :: clone ( & cv) ;
242248 let notified_fut_ref = Arc :: clone ( & notified_fut_mtx) ;
243- let notifier_ref = Arc :: clone ( & notifier_mtx) ;
244249 let mut notifier = notifier_mtx. lock ( ) . unwrap ( ) ;
245250 if notifier. complete {
246- * notified_fut_mtx. lock ( ) . unwrap ( ) = Some ( notifier_ref ) ;
251+ * notified_fut_mtx. lock ( ) . unwrap ( ) = Some ( Arc :: clone ( & notifier_mtx ) ) ;
247252 break ;
248253 }
249- notifier. callbacks . push ( ( false , Box :: new ( move || {
250- * notified_fut_ref. lock ( ) . unwrap ( ) = Some ( Arc :: clone ( & notifier_ref) ) ;
254+ notifier. callbacks_with_state . push ( ( false , Box :: new ( move |notifier_ref | {
255+ * notified_fut_ref. lock ( ) . unwrap ( ) = Some ( Arc :: clone ( notifier_ref) ) ;
251256 cv_ref. notify_all ( ) ;
252257 } ) ) ) ;
253258 }
@@ -407,11 +412,50 @@ mod tests {
407412 }
408413 }
409414
415+ #[ cfg( feature = "std" ) ]
416+ #[ test]
417+ fn test_state_drops ( ) {
418+ // Previously, there was a leak if a `Notifier` was `drop`ed without ever being notified
419+ // but after having been slept-on. This tests for that leak.
420+ use crate :: sync:: Arc ;
421+ use std:: thread;
422+
423+ let notifier_a = Arc :: new ( Notifier :: new ( ) ) ;
424+ let notifier_b = Arc :: new ( Notifier :: new ( ) ) ;
425+
426+ let thread_notifier_a = Arc :: clone ( & notifier_a) ;
427+
428+ let future_a = notifier_a. get_future ( ) ;
429+ let future_state_a = Arc :: downgrade ( & future_a. state ) ;
430+
431+ let future_b = notifier_b. get_future ( ) ;
432+ let future_state_b = Arc :: downgrade ( & future_b. state ) ;
433+
434+ let join_handle = thread:: spawn ( move || {
435+ // Let the other thread get to the wait point, then notify it.
436+ std:: thread:: sleep ( Duration :: from_millis ( 50 ) ) ;
437+ thread_notifier_a. notify ( ) ;
438+ } ) ;
439+
440+ // Wait on the other thread to finish its sleep, note that the leak only happened if we
441+ // actually have to sleep here, not if we immediately return.
442+ Sleeper :: from_two_futures ( future_a, future_b) . wait ( ) ;
443+
444+ join_handle. join ( ) . unwrap ( ) ;
445+
446+ // then drop the notifiers and make sure the future states are gone.
447+ mem:: drop ( notifier_a) ;
448+ mem:: drop ( notifier_b) ;
449+
450+ assert ! ( future_state_a. upgrade( ) . is_none( ) && future_state_b. upgrade( ) . is_none( ) ) ;
451+ }
452+
410453 #[ test]
411454 fn test_future_callbacks ( ) {
412455 let future = Future {
413456 state : Arc :: new ( Mutex :: new ( FutureState {
414457 callbacks : Vec :: new ( ) ,
458+ callbacks_with_state : Vec :: new ( ) ,
415459 complete : false ,
416460 callbacks_made : false ,
417461 } ) )
@@ -421,21 +465,22 @@ mod tests {
421465 future. register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
422466
423467 assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
424- future. state . lock ( ) . unwrap ( ) . complete ( ) ;
468+ complete_future ( & future. state ) ;
425469 assert ! ( callback. load( Ordering :: SeqCst ) ) ;
426- future. state . lock ( ) . unwrap ( ) . complete ( ) ;
470+ complete_future ( & future. state ) ;
427471 }
428472
429473 #[ test]
430474 fn test_pre_completed_future_callbacks ( ) {
431475 let future = Future {
432476 state : Arc :: new ( Mutex :: new ( FutureState {
433477 callbacks : Vec :: new ( ) ,
478+ callbacks_with_state : Vec :: new ( ) ,
434479 complete : false ,
435480 callbacks_made : false ,
436481 } ) )
437482 } ;
438- future. state . lock ( ) . unwrap ( ) . complete ( ) ;
483+ complete_future ( & future. state ) ;
439484
440485 let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
441486 let callback_ref = Arc :: clone ( & callback) ;
@@ -469,6 +514,7 @@ mod tests {
469514 let mut future = Future {
470515 state : Arc :: new ( Mutex :: new ( FutureState {
471516 callbacks : Vec :: new ( ) ,
517+ callbacks_with_state : Vec :: new ( ) ,
472518 complete : false ,
473519 callbacks_made : false ,
474520 } ) )
@@ -483,7 +529,7 @@ mod tests {
483529 assert_eq ! ( Pin :: new( & mut second_future) . poll( & mut Context :: from_waker( & second_waker) ) , Poll :: Pending ) ;
484530 assert ! ( !second_woken. load( Ordering :: SeqCst ) ) ;
485531
486- future. state . lock ( ) . unwrap ( ) . complete ( ) ;
532+ complete_future ( & future. state ) ;
487533 assert ! ( woken. load( Ordering :: SeqCst ) ) ;
488534 assert ! ( second_woken. load( Ordering :: SeqCst ) ) ;
489535 assert_eq ! ( Pin :: new( & mut future) . poll( & mut Context :: from_waker( & waker) ) , Poll :: Ready ( ( ) ) ) ;
0 commit comments