Skip to content

Commit 99f22ae

Browse files
committed
Add method to drop TimerLoop
1 parent ee7a107 commit 99f22ae

File tree

1 file changed

+98
-12
lines changed

1 file changed

+98
-12
lines changed

util/timer/src/timer.rs

Lines changed: 98 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::collections::VecDeque;
2121
use std::string::ToString;
2222
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2323
use std::sync::{Arc, Weak};
24-
use std::thread;
24+
use std::thread::{self, JoinHandle};
2525
use std::time::{Duration, Instant};
2626

2727
use parking_lot::{Condvar, Mutex, RwLock};
@@ -38,28 +38,35 @@ pub struct TimerLoop {
3838
timer_id_nonce: AtomicUsize,
3939

4040
scheduler: Arc<Scheduler>,
41+
scheduler_join_handle: Option<JoinHandle<()>>,
42+
43+
worker_queue: Arc<WorkerQueue>,
44+
worker_join_handles: Vec<JoinHandle<()>>,
4145
}
4246

4347
impl TimerLoop {
4448
pub fn new(worker_size: usize) -> TimerLoop {
4549
let scheduler = Arc::new(Scheduler::new());
4650

4751
let worker_queue = Arc::new(WorkerQueue::new());
48-
49-
spawn_workers(worker_size, &worker_queue);
50-
{
52+
let worker_join_handles = spawn_workers(worker_size, &worker_queue);
53+
let scheduler_join_handle = {
5154
let worker_queue = Arc::clone(&worker_queue);
5255
let scheduler = Arc::clone(&scheduler);
5356
thread::Builder::new()
5457
.name("timer.scheduler".to_string())
5558
.spawn(move || scheduler.run(&worker_queue))
56-
.unwrap();
57-
}
59+
.unwrap()
60+
};
5861

5962
TimerLoop {
6063
timer_id_nonce: AtomicUsize::new(0),
6164

6265
scheduler,
66+
scheduler_join_handle: Some(scheduler_join_handle),
67+
68+
worker_queue,
69+
worker_join_handles,
6370
}
6471
}
6572

@@ -80,6 +87,19 @@ impl TimerLoop {
8087
}
8188
}
8289

90+
impl Drop for TimerLoop {
91+
// drop() will clear all remaining schedules.
92+
fn drop(&mut self) {
93+
self.scheduler.clear_and_stop();
94+
self.worker_queue.clear_and_stop();
95+
96+
self.scheduler_join_handle.take().unwrap().join().unwrap();
97+
for join_handle in self.worker_join_handles.drain(..) {
98+
join_handle.join().unwrap();
99+
}
100+
}
101+
}
102+
83103
type TimerId = usize;
84104

85105
#[derive(Clone)]
@@ -161,6 +181,12 @@ impl Scheduler {
161181
result
162182
}
163183

184+
fn clear_and_stop(&self) {
185+
let mut scheduler = self.inner.lock();
186+
scheduler.clear_and_stop();
187+
self.condvar.notify_all();
188+
}
189+
164190
fn run(&self, worker_queue: &WorkerQueue) {
165191
let mut scheduler = self.inner.lock();
166192
while !scheduler.stop {
@@ -172,6 +198,8 @@ impl Scheduler {
172198
None => self.condvar.wait(&mut scheduler),
173199
}
174200
}
201+
worker_queue.enqueue_finished();
202+
ctrace!(TIMER, "Scheduler loop has been stopped");
175203
}
176204
}
177205

@@ -210,6 +238,7 @@ struct SchedulerInner {
210238
heap: BinaryHeap<Reverse<TimeOrdered<Schedule>>>,
211239
stop: bool,
212240
}
241+
213242
impl SchedulerInner {
214243
fn new() -> SchedulerInner {
215244
SchedulerInner {
@@ -226,6 +255,11 @@ impl SchedulerInner {
226255
after: Duration,
227256
repeat: Option<Duration>,
228257
) -> Result<(), ScheduleError> {
258+
if self.stop {
259+
cdebug!(TIMER, "schedule: TimerLoop has been finished");
260+
return Ok(())
261+
}
262+
229263
let schedule_id = ScheduleId(requested_timer.timer_id, timer_token);
230264
let handler = {
231265
let guard = requested_timer.handler.read();
@@ -320,6 +354,14 @@ impl SchedulerInner {
320354
}
321355
}
322356

357+
fn clear_and_stop(&mut self) {
358+
self.states.clear();
359+
for Reverse(TimeOrdered(schedule)) in self.heap.drain() {
360+
schedule.state_control.cancel();
361+
}
362+
self.stop = true;
363+
}
364+
323365
fn handle_timeout(&mut self, worker_queue: &WorkerQueue) -> Option<Duration> {
324366
loop {
325367
let now = Instant::now();
@@ -547,11 +589,15 @@ impl Callback {
547589
}
548590
}
549591

550-
fn spawn_workers(size: usize, queue: &Arc<WorkerQueue>) {
592+
fn spawn_workers(size: usize, queue: &Arc<WorkerQueue>) -> Vec<JoinHandle<()>> {
593+
let mut handles = Vec::with_capacity(size);
551594
for i in 0..size {
552595
let queue = Arc::clone(queue);
553-
thread::Builder::new().name(format!("timer.worker.{}", i)).spawn(move || worker_loop(&queue)).unwrap();
596+
let handle =
597+
thread::Builder::new().name(format!("timer.worker.{}", i)).spawn(move || worker_loop(&queue)).unwrap();
598+
handles.push(handle);
554599
}
600+
handles
555601
}
556602

557603
fn worker_loop(queue: &Arc<WorkerQueue>) {
@@ -581,26 +627,27 @@ fn worker_loop(queue: &Arc<WorkerQueue>) {
581627
}
582628
});
583629
}
630+
ctrace!(TIMER, "Worker loop has been stopped");
584631
}
585632

586633
struct WorkerQueue {
587634
queue: Mutex<VecDeque<Callback>>,
588635
condvar: Condvar,
589-
stop: AtomicBool,
636+
finished: AtomicBool,
590637
}
591638

592639
impl WorkerQueue {
593640
fn new() -> WorkerQueue {
594641
WorkerQueue {
595642
queue: Mutex::new(VecDeque::new()),
596643
condvar: Condvar::new(),
597-
stop: AtomicBool::new(false),
644+
finished: AtomicBool::new(false),
598645
}
599646
}
600647

601648
fn enqueue(&self, callback: Callback) {
602649
let mut queue = self.queue.lock();
603-
if self.stop.load(Ordering::SeqCst) {
650+
if self.finished.load(Ordering::SeqCst) {
604651
return
605652
}
606653
queue.push_back(callback);
@@ -610,13 +657,25 @@ impl WorkerQueue {
610657
fn dequeue(&self) -> Option<Callback> {
611658
let mut queue = self.queue.lock();
612659
while queue.is_empty() {
613-
if self.stop.load(Ordering::SeqCst) {
660+
if self.finished.load(Ordering::SeqCst) {
614661
return None
615662
}
616663
self.condvar.wait(&mut queue);
617664
}
618665
queue.pop_front()
619666
}
667+
668+
fn clear_and_stop(&self) {
669+
let mut queue = self.queue.lock();
670+
queue.clear();
671+
self.finished.store(true, Ordering::SeqCst);
672+
self.condvar.notify_all();
673+
}
674+
675+
fn enqueue_finished(&self) {
676+
self.finished.store(true, Ordering::SeqCst);
677+
self.condvar.notify_all();
678+
}
620679
}
621680

622681
#[cfg(test)]
@@ -883,4 +942,31 @@ mod tests {
883942
);
884943
}
885944
}
945+
946+
#[test]
947+
fn test_timerloop_drop() {
948+
let timer_token = 100;
949+
let timer_loop = Box::new(TimerLoop::new(4));
950+
let pair = Arc::new((Condvar::new(), Mutex::new(None)));
951+
let handler = {
952+
let pair = Arc::clone(&pair);
953+
Arc::new(CallbackHandler(move |_| {
954+
let (ref condvar, ref mutex) = *pair;
955+
let mut value = mutex.lock();
956+
*value = Some(());
957+
condvar.notify_all();
958+
}))
959+
};
960+
let timer = new_timer(&timer_loop, "test", Arc::downgrade(&handler));
961+
assert_eq!(timer.schedule_once(tick(), timer_token), Ok(()));
962+
963+
drop(timer_loop);
964+
965+
let (ref condvar, ref mutex) = *pair;
966+
let mut value = mutex.lock();
967+
condvar.wait_for(&mut value, long_tick());
968+
assert_eq!(*value, None);
969+
970+
assert_eq!(timer.schedule_once(tick(), timer_token), Err(ScheduleError::TimerLoopDropped));
971+
}
886972
}

0 commit comments

Comments
 (0)