Skip to content

Commit 43b6f36

Browse files
committed
Add method to drop TimerLoop
1 parent 6691be7 commit 43b6f36

File tree

1 file changed

+159
-16
lines changed

1 file changed

+159
-16
lines changed

util/timer/src/timer.rs

Lines changed: 159 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@
1414
// You should have received a copy of the GNU Affero General Public License
1515
// along with this program. If not, see <https://www.gnu.org/licenses/>.
1616

17+
use std::cell::Cell;
1718
use std::cmp::Reverse;
1819
use std::collections::binary_heap::BinaryHeap;
1920
use std::collections::hash_map::{Entry, HashMap};
2021
use std::collections::VecDeque;
2122
use std::string::ToString;
2223
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2324
use std::sync::{Arc, Weak};
24-
use std::thread;
25+
use std::thread::{self, JoinHandle};
2526
use std::time::{Duration, Instant};
2627

2728
use parking_lot::{Condvar, Mutex, RwLock};
@@ -38,28 +39,35 @@ pub struct TimerLoop {
3839
timer_id_nonce: AtomicUsize,
3940

4041
scheduler: Arc<Scheduler>,
42+
scheduler_join_handle: Mutex<Cell<Option<JoinHandle<()>>>>,
43+
44+
worker_queue: Arc<WorkerQueue>,
45+
worker_join_handles: Mutex<Vec<JoinHandle<()>>>,
4146
}
4247

4348
impl TimerLoop {
4449
pub fn new(worker_size: usize) -> TimerLoop {
4550
let scheduler = Arc::new(Scheduler::new());
4651

4752
let worker_queue = Arc::new(WorkerQueue::new());
48-
49-
spawn_workers(worker_size, &worker_queue);
50-
{
53+
let worker_join_handles = spawn_workers(worker_size, &worker_queue);
54+
let scheduler_join_handle = {
5155
let worker_queue = Arc::clone(&worker_queue);
5256
let scheduler = Arc::clone(&scheduler);
5357
thread::Builder::new()
5458
.name("timer.scheduler".to_string())
5559
.spawn(move || scheduler.run(&worker_queue))
56-
.unwrap();
57-
}
60+
.unwrap()
61+
};
5862

5963
TimerLoop {
6064
timer_id_nonce: AtomicUsize::new(0),
6165

6266
scheduler,
67+
scheduler_join_handle: Mutex::new(Cell::new(Some(scheduler_join_handle))),
68+
69+
worker_queue,
70+
worker_join_handles: Mutex::new(worker_join_handles),
6371
}
6472
}
6573

@@ -78,6 +86,30 @@ impl TimerLoop {
7886
timer.set_name(name);
7987
timer
8088
}
89+
90+
/* stop() will clear all remaining schedules,
91+
* while drop() tries to deliver remaining schedules */
92+
pub fn stop(self) {
93+
self.scheduler.clear_and_stop();
94+
self.worker_queue.clear_and_stop();
95+
96+
if let Some(join_handle) = self.scheduler_join_handle.lock().take() {
97+
let _ = join_handle.join();
98+
}
99+
let mut worker_join_handles = self.worker_join_handles.lock();
100+
for join_handle in worker_join_handles.drain(..) {
101+
let _ = join_handle.join();
102+
}
103+
}
104+
}
105+
106+
impl Drop for TimerLoop {
107+
fn drop(&mut self) {
108+
self.scheduler.start_drain();
109+
// worker will be handled after the scheduler finishes its job.
110+
// The scheduler thread will be the only strong ref holder after this.
111+
// (all TimerApi will holds weak refs only)
112+
}
81113
}
82114

83115
type TimerId = usize;
@@ -161,17 +193,37 @@ impl Scheduler {
161193
result
162194
}
163195

196+
fn clear_and_stop(&self) {
197+
let mut scheduler = self.inner.lock();
198+
scheduler.clear_and_stop();
199+
self.condvar.notify_all();
200+
}
201+
202+
fn start_drain(&self) {
203+
let mut scheduler = self.inner.lock();
204+
scheduler.finished = true;
205+
self.condvar.notify_all();
206+
}
207+
164208
fn run(&self, worker_queue: &WorkerQueue) {
165209
let mut scheduler = self.inner.lock();
166-
while !scheduler.stop {
210+
loop {
167211
let wait_for = scheduler.handle_timeout(worker_queue);
168212
match wait_for {
169213
Some(timeout) => {
170214
self.condvar.wait_for(&mut scheduler, timeout);
171215
}
172-
None => self.condvar.wait(&mut scheduler),
216+
None => {
217+
if scheduler.finished {
218+
break
219+
} else {
220+
self.condvar.wait(&mut scheduler)
221+
}
222+
}
173223
}
174224
}
225+
worker_queue.enqueue_finished();
226+
ctrace!(TIMER, "Scheduler loop has been stopped");
175227
}
176228
}
177229

@@ -208,14 +260,15 @@ impl Scheduler {
208260
struct SchedulerInner {
209261
states: HashMap<ScheduleId, Arc<ScheduleStateControl>>,
210262
heap: BinaryHeap<Reverse<TimeOrdered<Schedule>>>,
211-
stop: bool,
263+
finished: bool,
212264
}
265+
213266
impl SchedulerInner {
214267
fn new() -> SchedulerInner {
215268
SchedulerInner {
216269
states: HashMap::new(),
217270
heap: BinaryHeap::new(),
218-
stop: false,
271+
finished: false,
219272
}
220273
}
221274

@@ -226,6 +279,11 @@ impl SchedulerInner {
226279
after: Duration,
227280
repeat: Option<Duration>,
228281
) -> Result<(), ScheduleError> {
282+
if self.finished {
283+
cdebug!(TIMER, "schedule: TimerLoop has been finished");
284+
return Ok(())
285+
}
286+
229287
let schedule_id = ScheduleId(requested_timer.timer_id, timer_token);
230288
let handler = {
231289
let guard = requested_timer.handler.read();
@@ -320,6 +378,14 @@ impl SchedulerInner {
320378
}
321379
}
322380

381+
fn clear_and_stop(&mut self) {
382+
self.finished = true;
383+
self.states.clear();
384+
for Reverse(TimeOrdered(schedule)) in self.heap.drain() {
385+
schedule.state_control.cancel();
386+
}
387+
}
388+
323389
fn handle_timeout(&mut self, worker_queue: &WorkerQueue) -> Option<Duration> {
324390
loop {
325391
let now = Instant::now();
@@ -547,11 +613,15 @@ impl Callback {
547613
}
548614
}
549615

550-
fn spawn_workers(size: usize, queue: &Arc<WorkerQueue>) {
616+
fn spawn_workers(size: usize, queue: &Arc<WorkerQueue>) -> Vec<JoinHandle<()>> {
617+
let mut handles = Vec::with_capacity(size);
551618
for i in 0..size {
552619
let queue = Arc::clone(queue);
553-
thread::Builder::new().name(format!("timer.worker.{}", i)).spawn(move || worker_loop(&queue)).unwrap();
620+
let handle =
621+
thread::Builder::new().name(format!("timer.worker.{}", i)).spawn(move || worker_loop(&queue)).unwrap();
622+
handles.push(handle);
554623
}
624+
handles
555625
}
556626

557627
fn worker_loop(queue: &Arc<WorkerQueue>) {
@@ -581,26 +651,27 @@ fn worker_loop(queue: &Arc<WorkerQueue>) {
581651
}
582652
});
583653
}
654+
ctrace!(TIMER, "Worker loop has been stopped");
584655
}
585656

586657
struct WorkerQueue {
587658
queue: Mutex<VecDeque<Callback>>,
588659
condvar: Condvar,
589-
stop: AtomicBool,
660+
finished: AtomicBool,
590661
}
591662

592663
impl WorkerQueue {
593664
fn new() -> WorkerQueue {
594665
WorkerQueue {
595666
queue: Mutex::new(VecDeque::new()),
596667
condvar: Condvar::new(),
597-
stop: AtomicBool::new(false),
668+
finished: AtomicBool::new(false),
598669
}
599670
}
600671

601672
fn enqueue(&self, callback: Callback) {
602673
let mut queue = self.queue.lock();
603-
if self.stop.load(Ordering::SeqCst) {
674+
if self.finished.load(Ordering::SeqCst) {
604675
return
605676
}
606677
queue.push_back(callback);
@@ -610,13 +681,25 @@ impl WorkerQueue {
610681
fn dequeue(&self) -> Option<Callback> {
611682
let mut queue = self.queue.lock();
612683
while queue.is_empty() {
613-
if self.stop.load(Ordering::SeqCst) {
684+
if self.finished.load(Ordering::SeqCst) {
614685
return None
615686
}
616687
self.condvar.wait(&mut queue);
617688
}
618689
queue.pop_front()
619690
}
691+
692+
fn clear_and_stop(&self) {
693+
let mut queue = self.queue.lock();
694+
queue.clear();
695+
self.finished.store(true, Ordering::SeqCst);
696+
self.condvar.notify_all();
697+
}
698+
699+
fn enqueue_finished(&self) {
700+
self.finished.store(true, Ordering::SeqCst);
701+
self.condvar.notify_all();
702+
}
620703
}
621704

622705
#[cfg(test)]
@@ -693,6 +776,7 @@ mod tests {
693776
let (called_at, token) = value.unwrap();
694777
assert_eq!(token, timer_token);
695778
assert!(similar(called_at, begin + tick())); // called_at = now + ticksufficiently small
779+
timer_loop.stop();
696780
}
697781

698782
#[test]
@@ -718,6 +802,7 @@ mod tests {
718802
let mut value = mutex.lock();
719803
condvar.wait_for(&mut value, long_tick());
720804
assert!(value.is_none());
805+
timer_loop.stop();
721806
}
722807

723808
#[test]
@@ -773,6 +858,7 @@ mod tests {
773858

774859
assert_eq!(timer.schedule_once(tick(), timer_token), Ok(()));
775860
assert_eq!(timer.schedule_once(tick(), timer_token), Err(ScheduleError::TokenAlreadyScheduled));
861+
timer_loop.stop();
776862
}
777863

778864
#[test]
@@ -785,6 +871,7 @@ mod tests {
785871

786872
assert_eq!(timer.schedule_once(tick(), timer_token_1), Ok(()));
787873
assert_eq!(timer.schedule_once(tick(), timer_token_2), Ok(()));
874+
timer_loop.stop();
788875
}
789876

790877
#[test]
@@ -816,6 +903,7 @@ mod tests {
816903
let (called_at, token) = value.unwrap();
817904
assert_eq!(token, timer_token);
818905
assert!(similar(called_at, begin + tick())); // called_at = now + ticksufficiently small
906+
timer_loop.stop();
819907
}
820908

821909
#[test]
@@ -847,6 +935,7 @@ mod tests {
847935
let (called_at, token) = value.unwrap();
848936
assert_eq!(token, timer_token);
849937
assert!(similar(called_at, begin + tick())); // called_at = now + ticksufficiently small
938+
timer_loop.stop();
850939
}
851940
#[test]
852941
fn test_repeat() {
@@ -886,5 +975,59 @@ mod tests {
886975
for i in 1..TEST_COUNT {
887976
assert!(similar(value[i - 1] + tick(), value[i]));
888977
}
978+
timer_loop.stop();
979+
}
980+
981+
#[test]
982+
fn test_timerloop_stop() {
983+
let timer_token = 100;
984+
let timer_loop = Box::new(TimerLoop::new(4));
985+
let pair = Arc::new((Condvar::new(), Mutex::new(None)));
986+
let handler = {
987+
let pair = Arc::clone(&pair);
988+
Arc::new(CallbackHandler(move |_| {
989+
let (ref condvar, ref mutex) = *pair;
990+
let mut value = mutex.lock();
991+
*value = Some(());
992+
condvar.notify_all();
993+
}))
994+
};
995+
let timer = new_timer(&timer_loop, "test", &handler);
996+
997+
assert_eq!(timer.schedule_once(tick(), timer_token), Ok(()));
998+
timer_loop.stop();
999+
1000+
let (ref condvar, ref mutex) = *pair;
1001+
let mut value = mutex.lock();
1002+
condvar.wait_for(&mut value, long_tick());
1003+
assert!(value.is_none());
1004+
}
1005+
1006+
1007+
#[test]
1008+
fn test_timerloop_drop() {
1009+
let timer_token = 100;
1010+
let timer_loop = Box::new(TimerLoop::new(4));
1011+
let pair = Arc::new((Condvar::new(), Mutex::new(None)));
1012+
let handler = {
1013+
let pair = Arc::clone(&pair);
1014+
Arc::new(CallbackHandler(move |_| {
1015+
let (ref condvar, ref mutex) = *pair;
1016+
let mut value = mutex.lock();
1017+
*value = Some(());
1018+
condvar.notify_all();
1019+
}))
1020+
};
1021+
let timer = new_timer(&timer_loop, "test", &handler);
1022+
1023+
assert_eq!(timer.schedule_once(tick(), timer_token), Ok(()));
1024+
drop(timer_loop);
1025+
1026+
let (ref condvar, ref mutex) = *pair;
1027+
let mut value = mutex.lock();
1028+
condvar.wait_for(&mut value, long_tick());
1029+
assert_eq!(*value, Some(()));
1030+
1031+
assert_eq!(timer.schedule_once(tick(), timer_token), Err(ScheduleError::TimerLoopDropped));
8891032
}
8901033
}

0 commit comments

Comments
 (0)