diff --git a/util/timer/src/timer.rs b/util/timer/src/timer.rs index ff5bd49350..c0e4990a81 100644 --- a/util/timer/src/timer.rs +++ b/util/timer/src/timer.rs @@ -21,7 +21,7 @@ use std::collections::VecDeque; use std::string::ToString; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Weak}; -use std::thread; +use std::thread::{self, JoinHandle}; use std::time::{Duration, Instant}; use parking_lot::{Condvar, Mutex, RwLock}; @@ -34,10 +34,14 @@ pub trait TimeoutHandler: Send + Sync { fn on_timeout(&self, _token: TimerToken); } -#[derive(Clone)] pub struct TimerLoop { - timer_id_nonce: Arc, + timer_id_nonce: AtomicUsize, + scheduler: Arc, + scheduler_join_handle: Option>, + + worker_queue: Arc, + worker_join_handles: Vec>, } impl TimerLoop { @@ -45,20 +49,24 @@ impl TimerLoop { let scheduler = Arc::new(Scheduler::new()); let worker_queue = Arc::new(WorkerQueue::new()); - - spawn_workers(worker_size, &worker_queue); - { + let worker_join_handles = spawn_workers(worker_size, &worker_queue); + let scheduler_join_handle = { let worker_queue = Arc::clone(&worker_queue); let scheduler = Arc::clone(&scheduler); thread::Builder::new() .name("timer.scheduler".to_string()) .spawn(move || scheduler.run(&worker_queue)) - .unwrap(); - } + .unwrap() + }; TimerLoop { - timer_id_nonce: Arc::new(AtomicUsize::new(0)), + timer_id_nonce: AtomicUsize::new(0), + scheduler, + scheduler_join_handle: Some(scheduler_join_handle), + + worker_queue, + worker_join_handles, } } @@ -79,6 +87,19 @@ impl TimerLoop { } } +impl Drop for TimerLoop { + // drop() will clear all remaining schedules. + fn drop(&mut self) { + self.scheduler.clear_and_stop(); + self.worker_queue.clear_and_stop(); + + self.scheduler_join_handle.take().unwrap().join().unwrap(); + for join_handle in self.worker_join_handles.drain(..) { + join_handle.join().unwrap(); + } + } +} + type TimerId = usize; #[derive(Clone)] @@ -160,6 +181,12 @@ impl Scheduler { result } + fn clear_and_stop(&self) { + let mut scheduler = self.inner.lock(); + scheduler.clear_and_stop(); + self.condvar.notify_all(); + } + fn run(&self, worker_queue: &WorkerQueue) { let mut scheduler = self.inner.lock(); while !scheduler.stop { @@ -171,6 +198,8 @@ impl Scheduler { None => self.condvar.wait(&mut scheduler), } } + worker_queue.enqueue_finished(); + ctrace!(TIMER, "Scheduler loop has been stopped"); } } @@ -209,6 +238,7 @@ struct SchedulerInner { heap: BinaryHeap>>, stop: bool, } + impl SchedulerInner { fn new() -> SchedulerInner { SchedulerInner { @@ -225,6 +255,11 @@ impl SchedulerInner { after: Duration, repeat: Option, ) -> Result<(), ScheduleError> { + if self.stop { + cdebug!(TIMER, "schedule: TimerLoop has been finished"); + return Ok(()) + } + let schedule_id = ScheduleId(requested_timer.timer_id, timer_token); let handler = { let guard = requested_timer.handler.read(); @@ -319,6 +354,14 @@ impl SchedulerInner { } } + fn clear_and_stop(&mut self) { + self.states.clear(); + for Reverse(TimeOrdered(schedule)) in self.heap.drain() { + schedule.state_control.cancel(); + } + self.stop = true; + } + fn handle_timeout(&mut self, worker_queue: &WorkerQueue) -> Option { loop { let now = Instant::now(); @@ -546,11 +589,15 @@ impl Callback { } } -fn spawn_workers(size: usize, queue: &Arc) { +fn spawn_workers(size: usize, queue: &Arc) -> Vec> { + let mut handles = Vec::with_capacity(size); for i in 0..size { let queue = Arc::clone(queue); - thread::Builder::new().name(format!("timer.worker.{}", i)).spawn(move || worker_loop(&queue)).unwrap(); + let handle = + thread::Builder::new().name(format!("timer.worker.{}", i)).spawn(move || worker_loop(&queue)).unwrap(); + handles.push(handle); } + handles } fn worker_loop(queue: &Arc) { @@ -580,12 +627,13 @@ fn worker_loop(queue: &Arc) { } }); } + ctrace!(TIMER, "Worker loop has been stopped"); } struct WorkerQueue { queue: Mutex>, condvar: Condvar, - stop: AtomicBool, + finished: AtomicBool, } impl WorkerQueue { @@ -593,13 +641,13 @@ impl WorkerQueue { WorkerQueue { queue: Mutex::new(VecDeque::new()), condvar: Condvar::new(), - stop: AtomicBool::new(false), + finished: AtomicBool::new(false), } } fn enqueue(&self, callback: Callback) { let mut queue = self.queue.lock(); - if self.stop.load(Ordering::SeqCst) { + if self.finished.load(Ordering::SeqCst) { return } queue.push_back(callback); @@ -609,13 +657,25 @@ impl WorkerQueue { fn dequeue(&self) -> Option { let mut queue = self.queue.lock(); while queue.is_empty() { - if self.stop.load(Ordering::SeqCst) { + if self.finished.load(Ordering::SeqCst) { return None } self.condvar.wait(&mut queue); } queue.pop_front() } + + fn clear_and_stop(&self) { + let mut queue = self.queue.lock(); + queue.clear(); + self.finished.store(true, Ordering::SeqCst); + self.condvar.notify_all(); + } + + fn enqueue_finished(&self) { + self.finished.store(true, Ordering::SeqCst); + self.condvar.notify_all(); + } } #[cfg(test)] @@ -882,4 +942,31 @@ mod tests { ); } } + + #[test] + fn test_timerloop_drop() { + let timer_token = 100; + let timer_loop = Box::new(TimerLoop::new(4)); + let pair = Arc::new((Condvar::new(), Mutex::new(None))); + let handler = { + let pair = Arc::clone(&pair); + Arc::new(CallbackHandler(move |_| { + let (ref condvar, ref mutex) = *pair; + let mut value = mutex.lock(); + *value = Some(()); + condvar.notify_all(); + })) + }; + let timer = new_timer(&timer_loop, "test", Arc::downgrade(&handler)); + assert_eq!(timer.schedule_once(tick(), timer_token), Ok(())); + + drop(timer_loop); + + let (ref condvar, ref mutex) = *pair; + let mut value = mutex.lock(); + condvar.wait_for(&mut value, long_tick()); + assert_eq!(*value, None); + + assert_eq!(timer.schedule_once(tick(), timer_token), Err(ScheduleError::TimerLoopDropped)); + } }