Skip to content

Commit 93064af

Browse files
author
Daniel Rees
authored
Updated ManualDispatchQueue to run items scheduled during a tick (#62)
* Updated ManualDispatchQueue to run items scheduled during a tick * Added reset test * Updated channel tests to account for the timer behavior properly
1 parent 84bdcd3 commit 93064af

File tree

5 files changed

+272
-107
lines changed

5 files changed

+272
-107
lines changed

src/test/kotlin/org/phoenixframework/ChannelTest.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import org.junit.jupiter.api.Test
1717
import org.mockito.Mock
1818
import org.mockito.MockitoAnnotations
1919
import org.mockito.stubbing.Answer
20-
import org.phoenixframework.utilities.ManualDispatchQueue
20+
import org.phoenixframework.queue.ManualDispatchQueue
2121
import org.phoenixframework.utilities.getBindings
2222

2323
class ChannelTest {
@@ -302,7 +302,7 @@ class ChannelTest {
302302
fakeClock.tick(6_000)
303303
whenever(socket.isConnected).thenReturn(true)
304304

305-
fakeClock.tick(5_000)
305+
fakeClock.tick(4_000)
306306
joinPush.trigger("ok", kEmptyPayload)
307307

308308
fakeClock.tick(2_000)
@@ -334,7 +334,7 @@ class ChannelTest {
334334
}
335335

336336
private fun receivesTimeout(joinPush: Push) {
337-
fakeClock.tick(joinPush.timeout * 2)
337+
fakeClock.tick(joinPush.timeout)
338338
}
339339

340340
private fun receivesError(joinPush: Push) {
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package org.phoenixframework.queue
2+
3+
import org.phoenixframework.DispatchQueue
4+
import org.phoenixframework.DispatchWorkItem
5+
import java.util.concurrent.TimeUnit
6+
7+
class ManualDispatchQueue : DispatchQueue {
8+
9+
var tickTime: Long = 0
10+
private val tickTimeUnit: TimeUnit = TimeUnit.MILLISECONDS
11+
var workItems: MutableList<ManualDispatchWorkItem> = mutableListOf()
12+
13+
fun reset() {
14+
this.tickTime = 0
15+
this.workItems = mutableListOf()
16+
}
17+
18+
fun tick(duration: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
19+
val durationInMs = tickTimeUnit.convert(duration, unit)
20+
21+
// calculate what time to advance to
22+
val advanceTo = tickTime + durationInMs
23+
24+
// Filter all work items that are due to be fired and have not been
25+
// cancelled. Return early if there are no items to fire
26+
var pastDueWorkItems = workItems.filter { it.isPastDue(advanceTo) && !it.isCancelled }
27+
28+
// Keep looping until there are no more work items that are passed the advance to time
29+
while (pastDueWorkItems.isNotEmpty()) {
30+
31+
// Perform all work items that are due
32+
pastDueWorkItems.forEach {
33+
tickTime = it.deadline
34+
it.perform()
35+
}
36+
37+
// Remove all work items that are past due or canceled
38+
workItems.removeAll { it.isPastDue(tickTime) || it.isCancelled }
39+
pastDueWorkItems = workItems.filter { it.isPastDue(advanceTo) && !it.isCancelled }
40+
}
41+
42+
// Now that all work has been performed, advance the clock
43+
this.tickTime = advanceTo
44+
45+
}
46+
47+
override fun queue(delay: Long, unit: TimeUnit, runnable: () -> Unit): DispatchWorkItem {
48+
// Converts the given unit and delay to the unit used by this class
49+
val delayInMs = tickTimeUnit.convert(delay, unit)
50+
val deadline = tickTime + delayInMs
51+
52+
val workItem = ManualDispatchWorkItem(runnable, deadline)
53+
workItems.add(workItem)
54+
55+
return workItem
56+
}
57+
58+
override fun queueAtFixedRate(
59+
delay: Long,
60+
period: Long,
61+
unit: TimeUnit,
62+
runnable: () -> Unit
63+
): DispatchWorkItem {
64+
65+
val delayInMs = tickTimeUnit.convert(delay, unit)
66+
val periodInMs = tickTimeUnit.convert(period, unit)
67+
val deadline = tickTime + delayInMs
68+
69+
val workItem =
70+
ManualDispatchWorkItem(runnable, deadline, periodInMs)
71+
workItems.add(workItem)
72+
73+
return workItem
74+
}
75+
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package org.phoenixframework.queue
2+
3+
import com.google.common.truth.Truth.assertThat
4+
import org.junit.jupiter.api.AfterEach
5+
import org.junit.jupiter.api.BeforeEach
6+
import org.junit.jupiter.api.Test
7+
import java.util.concurrent.TimeUnit
8+
9+
internal class ManualDispatchQueueTest {
10+
11+
12+
private lateinit var queue: ManualDispatchQueue
13+
14+
@BeforeEach
15+
internal fun setUp() {
16+
queue = ManualDispatchQueue()
17+
}
18+
19+
@AfterEach
20+
internal fun tearDown() {
21+
queue.reset()
22+
}
23+
24+
@Test
25+
internal fun `reset the queue`() {
26+
var task100Called = false
27+
var task200Called = false
28+
var task300Called = false
29+
30+
queue.queue(100, TimeUnit.MILLISECONDS) {
31+
task100Called = true
32+
}
33+
queue.queue(200, TimeUnit.MILLISECONDS) {
34+
task200Called = true
35+
}
36+
queue.queue(300, TimeUnit.MILLISECONDS) {
37+
task300Called = true
38+
}
39+
40+
queue.tick(250)
41+
42+
assertThat(queue.tickTime).isEqualTo(250)
43+
assertThat(queue.workItems).hasSize(1)
44+
45+
queue.reset()
46+
assertThat(queue.tickTime).isEqualTo(0)
47+
assertThat(queue.workItems).isEmpty()
48+
}
49+
50+
@Test
51+
internal fun `triggers work that is passed due`() {
52+
var task100Called = false
53+
var task200Called = false
54+
var task300Called = false
55+
56+
queue.queue(100, TimeUnit.MILLISECONDS) {
57+
task100Called = true
58+
}
59+
queue.queue(200, TimeUnit.MILLISECONDS) {
60+
task200Called = true
61+
}
62+
queue.queue(300, TimeUnit.MILLISECONDS) {
63+
task300Called = true
64+
}
65+
66+
queue.tick(100)
67+
assertThat(task100Called).isTrue()
68+
69+
queue.tick(100)
70+
assertThat(task200Called).isTrue()
71+
72+
queue.tick(50)
73+
assertThat(task300Called).isFalse()
74+
}
75+
76+
@Test
77+
internal fun `triggers all work that is passed due`() {
78+
var task100Called = false
79+
var task200Called = false
80+
var task300Called = false
81+
82+
queue.queue(100, TimeUnit.MILLISECONDS) {
83+
task100Called = true
84+
}
85+
queue.queue(200, TimeUnit.MILLISECONDS) {
86+
task200Called = true
87+
}
88+
queue.queue(300, TimeUnit.MILLISECONDS) {
89+
task300Called = true
90+
}
91+
92+
queue.tick(250)
93+
assertThat(task100Called).isTrue()
94+
assertThat(task200Called).isTrue()
95+
assertThat(task300Called).isFalse()
96+
}
97+
98+
@Test
99+
internal fun `triggers work that is scheduled for a time that is after tick`() {
100+
var task100Called = false
101+
var task200Called = false
102+
var task300Called = false
103+
104+
queue.queue(100, TimeUnit.MILLISECONDS) {
105+
task100Called = true
106+
107+
queue.queue(100, TimeUnit.MILLISECONDS) {
108+
task200Called = true
109+
}
110+
111+
}
112+
113+
queue.queue(300, TimeUnit.MILLISECONDS) {
114+
task300Called = true
115+
}
116+
117+
queue.tick(250)
118+
assertThat(task100Called).isTrue()
119+
assertThat(task200Called).isTrue()
120+
assertThat(task300Called).isFalse()
121+
122+
assertThat(queue.tickTime).isEqualTo(250)
123+
}
124+
125+
126+
@Test
127+
internal fun `does not triggers nested work that is scheduled outside of the tick`() {
128+
var task100Called = false
129+
var task200Called = false
130+
var task300Called = false
131+
132+
queue.queue(100, TimeUnit.MILLISECONDS) {
133+
task100Called = true
134+
135+
queue.queue(100, TimeUnit.MILLISECONDS) {
136+
task200Called = true
137+
138+
queue.queue(100, TimeUnit.MILLISECONDS) {
139+
task300Called = true
140+
}
141+
}
142+
}
143+
144+
queue.tick(250)
145+
assertThat(task100Called).isTrue()
146+
assertThat(task200Called).isTrue()
147+
assertThat(task300Called).isFalse()
148+
}
149+
150+
@Test
151+
internal fun `queueAtFixedRate repeats work`() {
152+
var repeatTaskCallCount = 0
153+
154+
queue.queueAtFixedRate(100, 100, TimeUnit.MILLISECONDS) {
155+
repeatTaskCallCount += 1
156+
}
157+
158+
queue.tick(500)
159+
assertThat(repeatTaskCallCount).isEqualTo(5)
160+
}
161+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package org.phoenixframework.queue
2+
3+
import org.phoenixframework.DispatchWorkItem
4+
5+
class ManualDispatchWorkItem(
6+
private val runnable: () -> Unit,
7+
var deadline: Long,
8+
private val period: Long = 0
9+
) : DispatchWorkItem {
10+
11+
private var performCount = 0
12+
13+
// Test
14+
fun isPastDue(tickTime: Long): Boolean {
15+
return this.deadline <= tickTime
16+
}
17+
18+
fun perform() {
19+
if (isCancelled) return
20+
runnable.invoke()
21+
performCount += 1
22+
23+
// If the task is repeatable, then schedule the next deadline after the given period
24+
deadline += period
25+
}
26+
27+
// DispatchWorkItem
28+
override var isCancelled: Boolean = false
29+
30+
override fun cancel() {
31+
this.isCancelled = true
32+
}
33+
}

0 commit comments

Comments
 (0)