@@ -69,6 +69,7 @@ impl Notifier {
6969 } else {
7070 let state = Arc :: new ( Mutex :: new ( FutureState {
7171 callbacks : Vec :: new ( ) ,
72+ callbacks_with_state : Vec :: new ( ) ,
7273 complete : lock. 0 ,
7374 callbacks_made : false ,
7475 } ) ) ;
@@ -112,6 +113,7 @@ pub(crate) struct FutureState {
112113 // first bool - set to false if we're just calling a Waker, and true if we're calling an actual
113114 // user-provided function.
114115 callbacks : Vec < ( bool , Box < dyn FutureCallback > ) > ,
116+ callbacks_with_state : Vec < ( bool , Box < dyn Fn ( & Arc < Mutex < FutureState > > ) -> ( ) + Send > ) > ,
115117 complete : bool ,
116118 callbacks_made : bool ,
117119}
@@ -123,6 +125,10 @@ fn complete_future(this: &Arc<Mutex<FutureState>>) -> bool {
123125 callback. call ( ) ;
124126 state. callbacks_made |= counts_as_call;
125127 }
128+ for ( counts_as_call, callback) in state. callbacks_with_state . drain ( ..) {
129+ ( callback) ( this) ;
130+ state. callbacks_made |= counts_as_call;
131+ }
126132 state. complete = true ;
127133 state. callbacks_made
128134}
@@ -240,14 +246,13 @@ impl Sleeper {
240246 for notifier_mtx in self . notifiers . iter ( ) {
241247 let cv_ref = Arc :: clone ( & cv) ;
242248 let notified_fut_ref = Arc :: clone ( & notified_fut_mtx) ;
243- let notifier_ref = Arc :: clone ( & notifier_mtx) ;
244249 let mut notifier = notifier_mtx. lock ( ) . unwrap ( ) ;
245250 if notifier. complete {
246- * notified_fut_mtx. lock ( ) . unwrap ( ) = Some ( notifier_ref ) ;
251+ * notified_fut_mtx. lock ( ) . unwrap ( ) = Some ( Arc :: clone ( & notifier_mtx ) ) ;
247252 break ;
248253 }
249- notifier. callbacks . push ( ( false , Box :: new ( move || {
250- * notified_fut_ref. lock ( ) . unwrap ( ) = Some ( Arc :: clone ( & notifier_ref) ) ;
254+ notifier. callbacks_with_state . push ( ( false , Box :: new ( move |notifier_ref | {
255+ * notified_fut_ref. lock ( ) . unwrap ( ) = Some ( Arc :: clone ( notifier_ref) ) ;
251256 cv_ref. notify_all ( ) ;
252257 } ) ) ) ;
253258 }
@@ -407,11 +412,50 @@ mod tests {
407412 }
408413 }
409414
415+ #[ cfg( feature = "std" ) ]
416+ #[ test]
417+ fn test_state_drops ( ) {
418+ // Previously, there was a leak if a `Notifier` was `drop`ed without ever being notified
419+ // but after having been slept-on. This tests for that leak.
420+ use crate :: sync:: Arc ;
421+ use std:: thread;
422+
423+ let notifier_a = Arc :: new ( Notifier :: new ( ) ) ;
424+ let notifier_b = Arc :: new ( Notifier :: new ( ) ) ;
425+
426+ let thread_notifier_a = Arc :: clone ( & notifier_a) ;
427+
428+ let future_a = notifier_a. get_future ( ) ;
429+ let future_state_a = Arc :: downgrade ( & future_a. state ) ;
430+
431+ let future_b = notifier_b. get_future ( ) ;
432+ let future_state_b = Arc :: downgrade ( & future_b. state ) ;
433+
434+ let join_handle = thread:: spawn ( move || {
435+ // Let the other thread get to the wait point, then notify it.
436+ std:: thread:: sleep ( Duration :: from_millis ( 50 ) ) ;
437+ thread_notifier_a. notify ( ) ;
438+ } ) ;
439+
440+ // Wait on the other thread to finish its sleep, note that the leak only happened if we
441+ // actually have to sleep here, not if we immediately return.
442+ Sleeper :: from_two_futures ( future_a, future_b) . wait ( ) ;
443+
444+ join_handle. join ( ) . unwrap ( ) ;
445+
446+ // then drop the notifiers and make sure the future states are gone.
447+ mem:: drop ( notifier_a) ;
448+ mem:: drop ( notifier_b) ;
449+
450+ assert ! ( future_state_a. upgrade( ) . is_none( ) && future_state_b. upgrade( ) . is_none( ) ) ;
451+ }
452+
410453 #[ test]
411454 fn test_future_callbacks ( ) {
412455 let future = Future {
413456 state : Arc :: new ( Mutex :: new ( FutureState {
414457 callbacks : Vec :: new ( ) ,
458+ callbacks_with_state : Vec :: new ( ) ,
415459 complete : false ,
416460 callbacks_made : false ,
417461 } ) )
@@ -431,6 +475,7 @@ mod tests {
431475 let future = Future {
432476 state : Arc :: new ( Mutex :: new ( FutureState {
433477 callbacks : Vec :: new ( ) ,
478+ callbacks_with_state : Vec :: new ( ) ,
434479 complete : false ,
435480 callbacks_made : false ,
436481 } ) )
@@ -469,6 +514,7 @@ mod tests {
469514 let mut future = Future {
470515 state : Arc :: new ( Mutex :: new ( FutureState {
471516 callbacks : Vec :: new ( ) ,
517+ callbacks_with_state : Vec :: new ( ) ,
472518 complete : false ,
473519 callbacks_made : false ,
474520 } ) )
0 commit comments