@@ -40,13 +40,18 @@ impl SplitTickedAsyncExecutor {
4040 #[ cfg( feature = "tick_event" ) ]
4141 let ( tick_event_tx, tick_event_rx) = tokio:: sync:: watch:: channel ( 1.0 ) ;
4242
43+ #[ cfg( feature = "timer_registration" ) ]
44+ let ( timer_registration_tx, timer_registration_rx) = mpsc:: channel ( ) ;
45+
4346 let spawner = TickedAsyncExecutorSpawner {
4447 task_tx,
4548 num_woken_tasks : num_woken_tasks. clone ( ) ,
4649 num_spawned_tasks : num_spawned_tasks. clone ( ) ,
4750 observer : observer. clone ( ) ,
4851 #[ cfg( feature = "tick_event" ) ]
4952 tick_event_rx,
53+ #[ cfg( feature = "timer_registration" ) ]
54+ timer_registration_tx,
5055 } ;
5156 let ticker = TickedAsyncExecutorTicker {
5257 task_rx,
@@ -55,6 +60,10 @@ impl SplitTickedAsyncExecutor {
5560 observer,
5661 #[ cfg( feature = "tick_event" ) ]
5762 tick_event_tx,
63+ #[ cfg( feature = "timer_registration" ) ]
64+ timer_registration_rx,
65+ #[ cfg( feature = "timer_registration" ) ]
66+ timers : Vec :: new ( ) ,
5867 } ;
5968 ( spawner, ticker)
6069 }
@@ -72,6 +81,8 @@ pub struct TickedAsyncExecutorSpawner<O> {
7281
7382 #[ cfg( feature = "tick_event" ) ]
7483 tick_event_rx : tokio:: sync:: watch:: Receiver < f64 > ,
84+ #[ cfg( feature = "timer_registration" ) ]
85+ timer_registration_tx : mpsc:: Sender < ( f64 , tokio:: sync:: oneshot:: Sender < ( ) > ) > ,
7586}
7687
7788impl < O > TickedAsyncExecutorSpawner < O >
@@ -96,15 +107,19 @@ where
96107
97108 #[ cfg( feature = "tick_event" ) ]
98109 pub fn create_timer_from_tick_event ( & self ) -> crate :: TickedTimerFromTickEvent {
99- let tick_recv = self . tick_event_rx . clone ( ) ;
100- crate :: TickedTimerFromTickEvent :: new ( tick_recv)
110+ crate :: TickedTimerFromTickEvent :: new ( self . tick_event_rx . clone ( ) )
101111 }
102112
103113 #[ cfg( feature = "tick_event" ) ]
104114 pub fn tick_channel ( & self ) -> tokio:: sync:: watch:: Receiver < f64 > {
105115 self . tick_event_rx . clone ( )
106116 }
107117
118+ #[ cfg( feature = "timer_registration" ) ]
119+ pub fn create_timer_from_timer_registration ( & self ) -> crate :: TickedTimerFromTimerRegistration {
120+ crate :: TickedTimerFromTimerRegistration :: new ( self . timer_registration_tx . clone ( ) )
121+ }
122+
108123 pub fn num_tasks ( & self ) -> usize {
109124 self . num_spawned_tasks . load ( Ordering :: Relaxed )
110125 }
@@ -151,6 +166,11 @@ pub struct TickedAsyncExecutorTicker<O> {
151166
152167 #[ cfg( feature = "tick_event" ) ]
153168 tick_event_tx : tokio:: sync:: watch:: Sender < f64 > ,
169+
170+ #[ cfg( feature = "timer_registration" ) ]
171+ timer_registration_rx : mpsc:: Receiver < ( f64 , tokio:: sync:: oneshot:: Sender < ( ) > ) > ,
172+ #[ cfg( feature = "timer_registration" ) ]
173+ timers : Vec < ( f64 , tokio:: sync:: oneshot:: Sender < ( ) > ) > ,
154174}
155175
156176impl < O > TickedAsyncExecutorTicker < O >
@@ -161,6 +181,9 @@ where
161181 #[ cfg( feature = "tick_event" ) ]
162182 let _r = self . tick_event_tx . send ( delta) ;
163183
184+ #[ cfg( feature = "timer_registration" ) ]
185+ self . timer_registration_tick ( delta) ;
186+
164187 let mut num_woken_tasks = self . num_woken_tasks . load ( Ordering :: Relaxed ) ;
165188 if let Some ( limit) = limit {
166189 // Woken tasks should not exceed the allowed limit
@@ -183,4 +206,24 @@ where
183206 self . tick ( constant_delta, None ) ;
184207 }
185208 }
209+
210+ #[ cfg( feature = "timer_registration" ) ]
211+ fn timer_registration_tick ( & mut self , delta : f64 ) {
212+ // Get new timers
213+ let mut new_timers = self . timer_registration_rx . try_iter ( ) . collect :: < Vec < _ > > ( ) ;
214+ self . timers . append ( & mut new_timers) ;
215+
216+ // Countdown timers
217+ self . timers . iter_mut ( ) . for_each ( |( elapsed, _) | {
218+ * elapsed -= delta;
219+ } ) ;
220+
221+ // Extract timers that have elapsed
222+ // Notify corresponding channels
223+ self . timers
224+ . extract_if ( .., |( elapsed, _) | * elapsed <= 0.0 )
225+ . for_each ( |( _, rx) | {
226+ let _ignore = rx. send ( ( ) ) ;
227+ } ) ;
228+ }
186229}
0 commit comments