Skip to content

Commit d7acb33

Browse files
dustinconraddsrees
authored andcommitted
Fix reply timeouts (#37)
* Ensure that timeouts remove reply bindings * Remove unused imports in test * requested changes for timeout fix
1 parent 22c396c commit d7acb33

File tree

2 files changed

+53
-3
lines changed

2 files changed

+53
-3
lines changed

src/main/kotlin/org/phoenixframework/PhxPush.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,8 @@ class PhxPush(
184184
val mutPayload = payload.toMutableMap()
185185
mutPayload["status"] = status
186186

187-
refEvent?.let {
188-
val message = PhxMessage(it, "", "", mutPayload)
187+
refEvent?.let { safeRefEvent ->
188+
val message = PhxMessage(event = safeRefEvent, payload = mutPayload)
189189
this.channel.trigger(message)
190190
}
191191
}

src/test/kotlin/org/phoenixframework/PhxChannelTest.kt

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,25 @@
11
package org.phoenixframework
22

33
import com.google.common.truth.Truth.assertThat
4+
import okhttp3.OkHttpClient
5+
import okhttp3.Request
6+
import okhttp3.WebSocket
7+
import okhttp3.WebSocketListener
48
import org.junit.Before
59
import org.junit.Test
10+
import org.mockito.ArgumentMatchers
611
import org.mockito.Mockito
12+
import org.mockito.Mockito.`when`
713
import org.mockito.MockitoAnnotations
814
import org.mockito.Spy
915
import java.util.concurrent.CompletableFuture
1016
import java.util.concurrent.TimeUnit
17+
import java.util.concurrent.atomic.AtomicInteger
1118

1219
class PhxChannelTest {
1320

1421
private val defaultRef = "1"
22+
private val topic = "topic"
1523

1624
@Spy
1725
var socket: PhxSocket = PhxSocket("http://localhost:4000/socket/websocket")
@@ -23,7 +31,7 @@ class PhxChannelTest {
2331
Mockito.doReturn(defaultRef).`when`(socket).makeRef()
2432

2533
socket.timeout = 1234
26-
channel = PhxChannel("topic", hashMapOf("one" to "two"), socket)
34+
channel = PhxChannel(topic, hashMapOf("one" to "two"), socket)
2735
}
2836

2937

@@ -149,4 +157,46 @@ class PhxChannelTest {
149157

150158
CompletableFuture.allOf(f1, f3).get(10, TimeUnit.SECONDS)
151159
}
160+
161+
@Test
162+
fun `issue 36 - verify timeouts remove bindings`() {
163+
// mock okhttp to get isConnected to return true for the socket
164+
val mockOkHttp = Mockito.mock(OkHttpClient::class.java)
165+
val mockSocket = Mockito.mock(WebSocket::class.java)
166+
`when`(mockOkHttp.newWebSocket(ArgumentMatchers.any(Request::class.java), ArgumentMatchers.any(WebSocketListener::class.java))).thenReturn(mockSocket)
167+
168+
// local mocks for this test
169+
val localSocket = Mockito.spy(PhxSocket(url = "http://localhost:4000/socket/websocket", client = mockOkHttp))
170+
val localChannel = PhxChannel(topic, hashMapOf("one" to "two"), localSocket)
171+
172+
// setup makeRef so it increments
173+
val refCounter = AtomicInteger(1)
174+
Mockito.doAnswer {
175+
refCounter.getAndIncrement().toString()
176+
}.`when`(localSocket).makeRef()
177+
178+
//connect the socket
179+
localSocket.connect()
180+
181+
//join the channel
182+
val joinPush = localChannel.join()
183+
localChannel.trigger(PhxMessage(
184+
ref = joinPush.ref!!,
185+
joinRef = joinPush.ref!!,
186+
event = PhxChannel.PhxEvent.REPLY.value,
187+
topic = topic,
188+
payload = mutableMapOf("status" to "ok")))
189+
190+
//get bindings
191+
val originalBindingsSize = localChannel.bindings.size
192+
val pushCount = 100
193+
repeat(pushCount) {
194+
localChannel.push("some-event", mutableMapOf(), timeout = 500)
195+
}
196+
//verify binding count before timeouts
197+
assertThat(localChannel.bindings.size).isEqualTo(originalBindingsSize + pushCount)
198+
Thread.sleep(1000)
199+
//verify binding count after timeouts
200+
assertThat(localChannel.bindings.size).isEqualTo(originalBindingsSize)
201+
}
152202
}

0 commit comments

Comments
 (0)