Skip to content

Commit e1a2a5c

Browse files
committed
Add method to drop TimerLoop
1 parent a17366b commit e1a2a5c

File tree

1 file changed

+90
-8
lines changed

1 file changed

+90
-8
lines changed

util/timer/src/timer.rs

Lines changed: 90 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@
1414
// You should have received a copy of the GNU Affero General Public License
1515
// along with this program. If not, see <https://www.gnu.org/licenses/>.
1616

17+
use std::cell::Cell;
1718
use std::cmp::Reverse;
1819
use std::collections::binary_heap::BinaryHeap;
1920
use std::collections::hash_map::{Entry, HashMap};
2021
use std::collections::VecDeque;
2122
use std::string::ToString;
2223
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2324
use std::sync::{Arc, Weak};
24-
use std::thread;
25+
use std::thread::{self, JoinHandle};
2526
use std::time::{Duration, Instant};
2627

2728
use parking_lot::{Condvar, Mutex, RwLock};
@@ -36,28 +37,37 @@ pub trait TimeoutHandler: Send + Sync {
3637
#[derive(Clone)]
3738
pub struct TimerLoop {
3839
timer_id_nonce: Arc<AtomicUsize>,
40+
3941
scheduler: Arc<Scheduler>,
42+
scheduler_join_handle: Arc<Mutex<Cell<Option<JoinHandle<()>>>>>,
43+
44+
worker_queue: Arc<WorkerQueue>,
45+
worker_join_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
4046
}
4147

4248
impl TimerLoop {
4349
pub fn new(worker_size: usize) -> TimerLoop {
4450
let scheduler = Arc::new(Scheduler::new());
4551

4652
let worker_queue = Arc::new(WorkerQueue::new());
47-
48-
spawn_workers(worker_size, &worker_queue);
49-
{
53+
let worker_join_handles = spawn_workers(worker_size, &worker_queue);
54+
let scheduler_join_handle = {
5055
let worker_queue = Arc::clone(&worker_queue);
5156
let scheduler = Arc::clone(&scheduler);
5257
thread::Builder::new()
5358
.name("timer.scheduler".to_string())
5459
.spawn(move || scheduler.run(&worker_queue))
55-
.unwrap();
56-
}
60+
.unwrap()
61+
};
5762

5863
TimerLoop {
5964
timer_id_nonce: Arc::new(AtomicUsize::new(0)),
65+
6066
scheduler,
67+
scheduler_join_handle: Arc::new(Mutex::new(Cell::new(Some(scheduler_join_handle)))),
68+
69+
worker_queue,
70+
worker_join_handles: Arc::new(Mutex::new(worker_join_handles)),
6171
}
6272
}
6373

@@ -70,6 +80,19 @@ impl TimerLoop {
7080
scheduler: Arc::downgrade(&self.scheduler),
7181
}
7282
}
83+
84+
pub fn stop(self) {
85+
self.scheduler.notify_stop();
86+
self.worker_queue.notify_stop();
87+
88+
if let Some(join_handle) = self.scheduler_join_handle.lock().take() {
89+
let _ = join_handle.join();
90+
}
91+
let mut worker_join_handles = self.worker_join_handles.lock();
92+
for join_handle in worker_join_handles.drain(..) {
93+
let _ = join_handle.join();
94+
}
95+
}
7396
}
7497

7598
type TimerId = usize;
@@ -148,6 +171,12 @@ impl Scheduler {
148171
result
149172
}
150173

174+
fn notify_stop(&self) {
175+
let mut scheduler = self.inner.lock();
176+
scheduler.stop();
177+
self.condvar.notify_all();
178+
}
179+
151180
fn run(&self, worker_queue: &WorkerQueue) {
152181
let mut scheduler = self.inner.lock();
153182
while !scheduler.stop {
@@ -159,6 +188,7 @@ impl Scheduler {
159188
None => self.condvar.wait(&mut scheduler),
160189
}
161190
}
191+
ctrace!(TIMER, "Scheduler loop has been stopped");
162192
}
163193
}
164194

@@ -309,6 +339,14 @@ impl SchedulerInner {
309339
}
310340
}
311341

342+
fn stop(&mut self) {
343+
self.stop = true;
344+
self.states.clear();
345+
for Reverse(TimeOrdered(schedule)) in self.heap.drain() {
346+
schedule.state_control.cancel();
347+
}
348+
}
349+
312350
fn handle_timeout(&mut self, worker_queue: &WorkerQueue) -> Option<Duration> {
313351
loop {
314352
let now = Instant::now();
@@ -536,11 +574,15 @@ impl Callback {
536574
}
537575
}
538576

539-
fn spawn_workers(size: usize, queue: &Arc<WorkerQueue>) {
577+
fn spawn_workers(size: usize, queue: &Arc<WorkerQueue>) -> Vec<JoinHandle<()>> {
578+
let mut handles = Vec::new();
540579
for i in 0..size {
541580
let queue = Arc::clone(queue);
542-
thread::Builder::new().name(format!("timer.worker.{}", i)).spawn(move || worker_loop(&queue)).unwrap();
581+
let handle =
582+
thread::Builder::new().name(format!("timer.worker.{}", i)).spawn(move || worker_loop(&queue)).unwrap();
583+
handles.push(handle);
543584
}
585+
handles
544586
}
545587

546588
fn worker_loop(queue: &Arc<WorkerQueue>) {
@@ -570,6 +612,7 @@ fn worker_loop(queue: &Arc<WorkerQueue>) {
570612
}
571613
});
572614
}
615+
ctrace!(TIMER, "Worker loop has been stopped");
573616
}
574617

575618
struct WorkerQueue {
@@ -606,6 +649,13 @@ impl WorkerQueue {
606649
}
607650
queue.pop_front()
608651
}
652+
653+
fn notify_stop(&self) {
654+
let mut queue = self.queue.lock();
655+
queue.clear();
656+
self.stop.store(true, Ordering::SeqCst);
657+
self.condvar.notify_all();
658+
}
609659
}
610660

611661
#[cfg(test)]
@@ -679,6 +729,7 @@ mod tests {
679729
let (called_at, token) = value.unwrap();
680730
assert_eq!(token, timer_token);
681731
assert!(similar(called_at, begin + tick())); // called_at = now + ticksufficiently small
732+
timer_loop.stop();
682733
}
683734

684735
#[test]
@@ -704,6 +755,7 @@ mod tests {
704755
let mut value = mutex.lock();
705756
condvar.wait_for(&mut value, long_tick());
706757
assert!(value.is_none());
758+
timer_loop.stop();
707759
}
708760

709761
#[test]
@@ -740,6 +792,7 @@ mod tests {
740792

741793
assert_eq!(timer.schedule_once(tick(), timer_token), Ok(()));
742794
assert_eq!(timer.schedule_once(tick(), timer_token), Err(ScheduleError::TokenAlreadyScheduled));
795+
timer_loop.stop();
743796
}
744797

745798
#[test]
@@ -752,6 +805,7 @@ mod tests {
752805

753806
assert_eq!(timer.schedule_once(tick(), timer_token_1), Ok(()));
754807
assert_eq!(timer.schedule_once(tick(), timer_token_2), Ok(()));
808+
timer_loop.stop();
755809
}
756810

757811
#[test]
@@ -783,6 +837,7 @@ mod tests {
783837
let (called_at, token) = value.unwrap();
784838
assert_eq!(token, timer_token);
785839
assert!(similar(called_at, begin + tick())); // called_at = now + ticksufficiently small
840+
timer_loop.stop();
786841
}
787842

788843
#[test]
@@ -814,6 +869,7 @@ mod tests {
814869
let (called_at, token) = value.unwrap();
815870
assert_eq!(token, timer_token);
816871
assert!(similar(called_at, begin + tick())); // called_at = now + ticksufficiently small
872+
timer_loop.stop();
817873
}
818874
#[test]
819875
fn test_repeat() {
@@ -853,5 +909,31 @@ mod tests {
853909
for i in 1..TEST_COUNT {
854910
assert!(similar(value[i - 1] + tick(), value[i]));
855911
}
912+
timer_loop.stop();
913+
}
914+
915+
#[test]
916+
fn test_timerloop_drop() {
917+
let timer_token = 100;
918+
let timer_loop = Box::new(TimerLoop::new(4));
919+
let pair = Arc::new((Condvar::new(), Mutex::new(None)));
920+
let handler = {
921+
let pair = Arc::clone(&pair);
922+
Arc::new(CallbackHandler(move |_| {
923+
let (ref condvar, ref mutex) = *pair;
924+
let mut value = mutex.lock();
925+
*value = Some(());
926+
condvar.notify_all();
927+
}))
928+
};
929+
let timer = new_timer(&timer_loop, "test", &handler);
930+
931+
assert_eq!(timer.schedule_once(tick(), timer_token), Ok(()));
932+
timer_loop.stop();
933+
934+
let (ref condvar, ref mutex) = *pair;
935+
let mut value = mutex.lock();
936+
condvar.wait_for(&mut value, long_tick());
937+
assert!(value.is_none());
856938
}
857939
}

0 commit comments

Comments
 (0)