Skip to content

Commit dc0d761

Browse files
committed
Add method to drop TimerLoop
1 parent 39f21a3 commit dc0d761

File tree

1 file changed

+90
-8
lines changed

1 file changed

+90
-8
lines changed

util/timer/src/timer.rs

Lines changed: 90 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@
1414
// You should have received a copy of the GNU Affero General Public License
1515
// along with this program. If not, see <https://www.gnu.org/licenses/>.
1616

17+
use std::cell::Cell;
1718
use std::cmp::Reverse;
1819
use std::collections::binary_heap::BinaryHeap;
1920
use std::collections::hash_map::{Entry, HashMap};
2021
use std::collections::VecDeque;
2122
use std::string::ToString;
2223
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2324
use std::sync::{Arc, Weak};
24-
use std::thread;
25+
use std::thread::{self, JoinHandle};
2526
use std::time::{Duration, Instant};
2627

2728
use parking_lot::{Condvar, Mutex, RwLock};
@@ -36,28 +37,37 @@ pub trait TimeoutHandler: Send + Sync {
3637
#[derive(Clone)]
3738
pub struct TimerLoop {
3839
timer_id_nonce: Arc<AtomicUsize>,
40+
3941
scheduler: Arc<Scheduler>,
42+
scheduler_join_handle: Arc<Mutex<Cell<Option<JoinHandle<()>>>>>,
43+
44+
worker_queue: Arc<WorkerQueue>,
45+
worker_join_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
4046
}
4147

4248
impl TimerLoop {
4349
pub fn new(worker_size: usize) -> TimerLoop {
4450
let scheduler = Arc::new(Scheduler::new());
4551

4652
let worker_queue = Arc::new(WorkerQueue::new());
47-
48-
spawn_workers(worker_size, &worker_queue);
49-
{
53+
let worker_join_handles = spawn_workers(worker_size, &worker_queue);
54+
let scheduler_join_handle = {
5055
let worker_queue = Arc::clone(&worker_queue);
5156
let scheduler = Arc::clone(&scheduler);
5257
thread::Builder::new()
5358
.name("timer.scheduler".to_string())
5459
.spawn(move || scheduler.run(&worker_queue))
55-
.unwrap();
56-
}
60+
.unwrap()
61+
};
5762

5863
TimerLoop {
5964
timer_id_nonce: Arc::new(AtomicUsize::new(0)),
65+
6066
scheduler,
67+
scheduler_join_handle: Arc::new(Mutex::new(Cell::new(Some(scheduler_join_handle)))),
68+
69+
worker_queue,
70+
worker_join_handles: Arc::new(Mutex::new(worker_join_handles)),
6171
}
6272
}
6373

@@ -70,6 +80,19 @@ impl TimerLoop {
7080
scheduler: Arc::downgrade(&self.scheduler),
7181
}
7282
}
83+
84+
pub fn stop(self) {
85+
self.scheduler.notify_stop();
86+
self.worker_queue.notify_stop();
87+
88+
if let Some(join_handle) = self.scheduler_join_handle.lock().take() {
89+
let _ = join_handle.join();
90+
}
91+
let mut worker_join_handles = self.worker_join_handles.lock();
92+
for join_handle in worker_join_handles.drain(..) {
93+
let _ = join_handle.join();
94+
}
95+
}
7396
}
7497

7598
type TimerId = usize;
@@ -147,6 +170,12 @@ impl Scheduler {
147170
result
148171
}
149172

173+
fn notify_stop(&self) {
174+
let mut scheduler = self.inner.lock();
175+
scheduler.stop();
176+
self.condvar.notify_all();
177+
}
178+
150179
fn run(&self, worker_queue: &WorkerQueue) {
151180
let mut scheduler = self.inner.lock();
152181
while !scheduler.stop {
@@ -158,6 +187,7 @@ impl Scheduler {
158187
None => self.condvar.wait(&mut scheduler),
159188
}
160189
}
190+
ctrace!(TIMER, "Scheduler loop has been stopped");
161191
}
162192
}
163193

@@ -304,6 +334,14 @@ impl SchedulerInner {
304334
}
305335
}
306336

337+
fn stop(&mut self) {
338+
self.stop = true;
339+
self.states.clear();
340+
for Reverse(TimeOrdered(schedule)) in self.heap.drain() {
341+
schedule.state_control.cancel();
342+
}
343+
}
344+
307345
fn handle_timeout(&mut self, worker_queue: &WorkerQueue) -> Option<Duration> {
308346
loop {
309347
let now = Instant::now();
@@ -531,11 +569,15 @@ impl Callback {
531569
}
532570
}
533571

534-
fn spawn_workers(size: usize, queue: &Arc<WorkerQueue>) {
572+
fn spawn_workers(size: usize, queue: &Arc<WorkerQueue>) -> Vec<JoinHandle<()>> {
573+
let mut handles = Vec::new();
535574
for i in 0..size {
536575
let queue = Arc::clone(queue);
537-
thread::Builder::new().name(format!("timer.worker.{}", i)).spawn(move || worker_loop(&queue)).unwrap();
576+
let handle =
577+
thread::Builder::new().name(format!("timer.worker.{}", i)).spawn(move || worker_loop(&queue)).unwrap();
578+
handles.push(handle);
538579
}
580+
handles
539581
}
540582

541583
fn worker_loop(queue: &Arc<WorkerQueue>) {
@@ -565,6 +607,7 @@ fn worker_loop(queue: &Arc<WorkerQueue>) {
565607
}
566608
});
567609
}
610+
ctrace!(TIMER, "Worker loop has been stopped");
568611
}
569612

570613
struct WorkerQueue {
@@ -601,6 +644,13 @@ impl WorkerQueue {
601644
}
602645
queue.pop_front()
603646
}
647+
648+
fn notify_stop(&self) {
649+
let mut queue = self.queue.lock();
650+
queue.clear();
651+
self.stop.store(true, Ordering::SeqCst);
652+
self.condvar.notify_all();
653+
}
604654
}
605655

606656
#[cfg(test)]
@@ -676,6 +726,7 @@ mod tests {
676726
let (called_at, token) = value.unwrap();
677727
assert_eq!(token, timer_token);
678728
assert!(similar(called_at, begin + tick())); // called_at = now + ticksufficiently small
729+
timer_loop.stop();
679730
}
680731

681732
#[test]
@@ -701,6 +752,7 @@ mod tests {
701752
let mut value = mutex.lock();
702753
condvar.wait_for(&mut value, long_tick());
703754
assert!(value.is_none());
755+
timer_loop.stop();
704756
}
705757

706758
#[test]
@@ -747,6 +799,7 @@ mod tests {
747799

748800
assert_eq!(timer.schedule_once(tick(), timer_token), Ok(()));
749801
assert_eq!(timer.schedule_once(tick(), timer_token), Err(ScheduleError::TokenAlreadyScheduled));
802+
timer_loop.stop();
750803
}
751804

752805
#[test]
@@ -759,6 +812,7 @@ mod tests {
759812

760813
assert_eq!(timer.schedule_once(tick(), timer_token_1), Ok(()));
761814
assert_eq!(timer.schedule_once(tick(), timer_token_2), Ok(()));
815+
timer_loop.stop();
762816
}
763817

764818
#[test]
@@ -790,6 +844,7 @@ mod tests {
790844
let (called_at, token) = value.unwrap();
791845
assert_eq!(token, timer_token);
792846
assert!(similar(called_at, begin + tick())); // called_at = now + ticksufficiently small
847+
timer_loop.stop();
793848
}
794849

795850
#[test]
@@ -821,6 +876,7 @@ mod tests {
821876
let (called_at, token) = value.unwrap();
822877
assert_eq!(token, timer_token);
823878
assert!(similar(called_at, begin + tick())); // called_at = now + ticksufficiently small
879+
timer_loop.stop();
824880
}
825881
#[test]
826882
fn test_repeat() {
@@ -860,5 +916,31 @@ mod tests {
860916
for i in 1..TEST_COUNT {
861917
assert!(similar(value[i - 1] + tick(), value[i]));
862918
}
919+
timer_loop.stop();
920+
}
921+
922+
#[test]
923+
fn test_timerloop_drop() {
924+
let timer_token = 100;
925+
let timer_loop = Box::new(TimerLoop::new(4));
926+
let pair = Arc::new((Condvar::new(), Mutex::new(None)));
927+
let handler = {
928+
let pair = Arc::clone(&pair);
929+
Arc::new(CallbackHandler(move |_| {
930+
let (ref condvar, ref mutex) = *pair;
931+
let mut value = mutex.lock();
932+
*value = Some(());
933+
condvar.notify_all();
934+
}))
935+
};
936+
let timer = new_timer(&timer_loop, "test", &handler);
937+
938+
assert_eq!(timer.schedule_once(tick(), timer_token), Ok(()));
939+
timer_loop.stop();
940+
941+
let (ref condvar, ref mutex) = *pair;
942+
let mut value = mutex.lock();
943+
condvar.wait_for(&mut value, long_tick());
944+
assert!(value.is_none());
863945
}
864946
}

0 commit comments

Comments
 (0)