From 0302a2086a76c8f5716265ad9c8ec2690e87b7d7 Mon Sep 17 00:00:00 2001 From: Daniel Rees Date: Wed, 19 Jun 2019 14:16:18 -0400 Subject: [PATCH 1/3] Added new rejoin and reconnect defaults --- src/main/kotlin/org/phoenixframework/Defaults.kt | 13 ++++++++++++- src/main/kotlin/org/phoenixframework/Socket.kt | 5 +++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/org/phoenixframework/Defaults.kt b/src/main/kotlin/org/phoenixframework/Defaults.kt index 0a1a6f3..2fd2249 100644 --- a/src/main/kotlin/org/phoenixframework/Defaults.kt +++ b/src/main/kotlin/org/phoenixframework/Defaults.kt @@ -36,9 +36,20 @@ object Defaults { /** Default reconnect algorithm. Reconnects after 1s, 2s, 5s and then 10s thereafter */ val steppedBackOff: (Int) -> Long = { tries -> - if (tries > 3) 10000 else listOf(1000L, 2000L, 5000L)[tries - 1] + if (tries > 3) 10_000 else listOf(1_000L, 2_000L, 5_000L)[tries - 1] } + /** Default reconnect algorithm for the socket */ + val reconnectAfterMs: (Int) -> Long = { tries -> + if (tries > 9) 5_000 else listOf(10L, 50L, 100L, 150L, 200L, 250L, 500L, 1_000L, 2_000L)[tries - 1] + } + + /** Default rejoin algorithm for individual channels */ + val rejoinAfterMs: (Int) -> Long = { tries -> + if (tries > 3) 10_000 else listOf(1_000L, 2_000L, 5_000L)[tries - 1] + } + + /** The default Gson configuration to use when parsing messages */ val gson: Gson get() = GsonBuilder() diff --git a/src/main/kotlin/org/phoenixframework/Socket.kt b/src/main/kotlin/org/phoenixframework/Socket.kt index 15f920b..7826888 100644 --- a/src/main/kotlin/org/phoenixframework/Socket.kt +++ b/src/main/kotlin/org/phoenixframework/Socket.kt @@ -51,6 +51,8 @@ internal data class StateChangeCallbacks( /** The code used when the socket was closed without error */ const val WS_CLOSE_NORMAL = 1000 + + /** The socket was closed due to a SocketException. Likely the client lost connectivity */ const val WS_CLOSE_SOCKET_EXCEPTION = 4000 @@ -139,6 +141,9 @@ class Socket( /** Timer to use when attempting to reconnect */ internal var reconnectTimer: TimeoutTimer + /** True if the socket closed cleanly. False if it was closed due to an error or timeout */ + internal var closeWasClean: Boolean = false + //------------------------------------------------------------------------------ // Connection Attributes //------------------------------------------------------------------------------ From 7f114761dbf9ad40434350eb25c578b6fd671175 Mon Sep 17 00:00:00 2001 From: Daniel Rees Date: Wed, 19 Jun 2019 19:53:23 -0400 Subject: [PATCH 2/3] Updated socket to track abnormal close --- .../kotlin/org/phoenixframework/Defaults.kt | 4 +- .../kotlin/org/phoenixframework/Socket.kt | 64 +++-- .../kotlin/org/phoenixframework/SocketTest.kt | 263 +++++++++++++----- 3 files changed, 234 insertions(+), 97 deletions(-) diff --git a/src/main/kotlin/org/phoenixframework/Defaults.kt b/src/main/kotlin/org/phoenixframework/Defaults.kt index 2fd2249..b57de50 100644 --- a/src/main/kotlin/org/phoenixframework/Defaults.kt +++ b/src/main/kotlin/org/phoenixframework/Defaults.kt @@ -40,12 +40,12 @@ object Defaults { } /** Default reconnect algorithm for the socket */ - val reconnectAfterMs: (Int) -> Long = { tries -> + val reconnectSteppedBackOff: (Int) -> Long = { tries -> if (tries > 9) 5_000 else listOf(10L, 50L, 100L, 150L, 200L, 250L, 500L, 1_000L, 2_000L)[tries - 1] } /** Default rejoin algorithm for individual channels */ - val rejoinAfterMs: (Int) -> Long = { tries -> + val rejoinSteppedBackOff: (Int) -> Long = { tries -> if (tries > 3) 10_000 else listOf(1_000L, 2_000L, 5_000L)[tries - 1] } diff --git a/src/main/kotlin/org/phoenixframework/Socket.kt b/src/main/kotlin/org/phoenixframework/Socket.kt index 7826888..202b502 100644 --- a/src/main/kotlin/org/phoenixframework/Socket.kt +++ b/src/main/kotlin/org/phoenixframework/Socket.kt @@ -48,15 +48,16 @@ internal data class StateChangeCallbacks( } } -/** The code used when the socket was closed without error */ +/** RFC 6455: indicates a normal closure */ const val WS_CLOSE_NORMAL = 1000 - +/** RFC 6455: indicates that the connection was closed abnormally */ +const val WS_CLOSE_ABNORMAL = 1006 /** The socket was closed due to a SocketException. Likely the client lost connectivity */ +// DEPRECATED const val WS_CLOSE_SOCKET_EXCEPTION = 4000 - /** * Connects to a Phoenix Server */ @@ -90,11 +91,14 @@ class Socket( /** Timeout to use when opening a connection */ var timeout: Long = Defaults.TIMEOUT - /** Interval between sending a heartbeat */ - var heartbeatInterval: Long = Defaults.HEARTBEAT + /** Interval between sending a heartbeat, in ms */ + var heartbeatIntervalMs: Long = Defaults.HEARTBEAT - /** Internval between socket reconnect attempts */ - var reconnectAfterMs: ((Int) -> Long) = Defaults.steppedBackOff + /** Interval between socket reconnect attempts, in ms */ + var reconnectAfterMs: ((Int) -> Long) = Defaults.reconnectSteppedBackOff + + /** Interval between channel rejoin attempts, in ms */ + var rejoinAfterMs: ((Int) -> Long) = Defaults.rejoinSteppedBackOff /** The optional function to receive logs */ var logger: ((String) -> Unit)? = null @@ -141,8 +145,8 @@ class Socket( /** Timer to use when attempting to reconnect */ internal var reconnectTimer: TimeoutTimer - /** True if the socket closed cleanly. False if it was closed due to an error or timeout */ - internal var closeWasClean: Boolean = false + /** True if the Socket closed cleaned. False if not (connection timeout, heartbeat, etc) */ + internal var closeWasClean = false //------------------------------------------------------------------------------ // Connection Attributes @@ -223,6 +227,9 @@ class Socket( // Do not attempt to connect if already connected if (isConnected) return + // Reset the clean close flag when attempting to connect + this.closeWasClean = false + this.connection = this.transport(endpointUrl) this.connection?.onOpen = { onConnectionOpened() } this.connection?.onClose = { code -> onConnectionClosed(code) } @@ -236,6 +243,10 @@ class Socket( reason: String? = null, callback: (() -> Unit)? = null ) { + // The socket was closed cleanly by the User + this.closeWasClean = true + + // Reset any reconnects and teardown the socket connection this.reconnectTimer.reset() this.teardown(code, reason, callback) @@ -344,7 +355,12 @@ class Socket( /** Triggers an error event to all connected Channels */ private fun triggerChannelError() { - this.channels.forEach { it.trigger(Channel.Event.ERROR.value) } + this.channels.forEach { channel -> + // Only trigger a channel error if it is in an "opened" state + if (!(channel.isErrored || channel.isLeaving || channel.isClosed)) { + channel.trigger(Channel.Event.ERROR.value) + } + } } /** Send all messages that were buffered before the socket opened */ @@ -366,8 +382,8 @@ class Socket( // Do not start up the heartbeat timer if skipHeartbeat is true if (skipHeartbeat) return - val delay = heartbeatInterval - val period = heartbeatInterval + val delay = heartbeatIntervalMs + val period = heartbeatIntervalMs heartbeatTask = dispatchQueue.queueAtFixedRate(delay, period, TimeUnit.MILLISECONDS) { sendHeartbeat() } @@ -384,9 +400,8 @@ class Socket( pendingHeartbeatRef = null logItems("Transport: Heartbeat timeout. Attempt to re-establish connection") - // Disconnect the socket manually. Do not use `teardown` or - // `disconnect` as they will nil out the websocket delegate - this.connection?.disconnect(WS_CLOSE_NORMAL, "Heartbeat timed out") + // Close the socket, flagging the closure as abnormal + this.abnormalClose("heartbeat timeout") return } @@ -399,12 +414,20 @@ class Socket( ref = pendingHeartbeatRef) } + internal fun abnormalClose(reason: String) { + this.closeWasClean = false + this.connection?.disconnect(WS_CLOSE_NORMAL, reason) + } + //------------------------------------------------------------------------------ // Connection Transport Hooks //------------------------------------------------------------------------------ internal fun onConnectionOpened() { this.logItems("Transport: Connected to $endpoint") + // Reset the closeWasClean flag now that the socket has been connected + this.closeWasClean = false + // Send any messages that were waiting for a connection this.flushSendBuffer() @@ -426,14 +449,13 @@ class Socket( this.heartbeatTask?.cancel() this.heartbeatTask = null + // Only attempt to reconnect if the socket did not close normally + if (!this.closeWasClean) { + this.reconnectTimer.scheduleTimeout() + } + // Inform callbacks the socket closed this.stateChangeCallbacks.close.forEach { it.invoke() } - - // If there was a non-normal event when the connection closed, attempt - // to schedule a reconnect attempt - if (code != WS_CLOSE_NORMAL) { - reconnectTimer.scheduleTimeout() - } } internal fun onConnectionMessage(rawMessage: String) { diff --git a/src/test/kotlin/org/phoenixframework/SocketTest.kt b/src/test/kotlin/org/phoenixframework/SocketTest.kt index 50385ab..784bb0a 100644 --- a/src/test/kotlin/org/phoenixframework/SocketTest.kt +++ b/src/test/kotlin/org/phoenixframework/SocketTest.kt @@ -58,27 +58,33 @@ class SocketTest { assertThat(socket.stateChangeCallbacks.error).isEmpty() assertThat(socket.stateChangeCallbacks.message).isEmpty() assertThat(socket.timeout).isEqualTo(Defaults.TIMEOUT) - assertThat(socket.heartbeatInterval).isEqualTo(Defaults.HEARTBEAT) + assertThat(socket.heartbeatIntervalMs).isEqualTo(Defaults.HEARTBEAT) assertThat(socket.logger).isNull() - assertThat(socket.reconnectAfterMs(1)).isEqualTo(1000) - assertThat(socket.reconnectAfterMs(2)).isEqualTo(2000) - assertThat(socket.reconnectAfterMs(3)).isEqualTo(5000) - assertThat(socket.reconnectAfterMs(4)).isEqualTo(10000) - assertThat(socket.reconnectAfterMs(5)).isEqualTo(10000) + assertThat(socket.reconnectAfterMs(1)).isEqualTo(10) + assertThat(socket.reconnectAfterMs(2)).isEqualTo(50) + assertThat(socket.reconnectAfterMs(3)).isEqualTo(100) + assertThat(socket.reconnectAfterMs(4)).isEqualTo(150) + assertThat(socket.reconnectAfterMs(5)).isEqualTo(200) + assertThat(socket.reconnectAfterMs(6)).isEqualTo(250) + assertThat(socket.reconnectAfterMs(7)).isEqualTo(500) + assertThat(socket.reconnectAfterMs(8)).isEqualTo(1_000) + assertThat(socket.reconnectAfterMs(9)).isEqualTo(2_000) + assertThat(socket.reconnectAfterMs(10)).isEqualTo(5_000) + assertThat(socket.reconnectAfterMs(11)).isEqualTo(5_000) } @Test internal fun `overrides some defaults`() { val socket = Socket("wss://localhost:4000/socket/", mapOf("one" to 2)) socket.timeout = 40_000 - socket.heartbeatInterval = 60_000 + socket.heartbeatIntervalMs = 60_000 socket.logger = { } socket.reconnectAfterMs = { 10 } assertThat(socket.params).isEqualTo(mapOf("one" to 2)) assertThat(socket.endpoint).isEqualTo("wss://localhost:4000/socket/websocket") assertThat(socket.timeout).isEqualTo(40_000) - assertThat(socket.heartbeatInterval).isEqualTo(60_000) + assertThat(socket.heartbeatIntervalMs).isEqualTo(60_000) assertThat(socket.logger).isNotNull() assertThat(socket.reconnectAfterMs(1)).isEqualTo(10) assertThat(socket.reconnectAfterMs(2)).isEqualTo(10) @@ -447,25 +453,26 @@ class SocketTest { @Nested @DisplayName("sendHeartbeat") inner class SendHeartbeat { - @Test - internal fun `closes socket when heartbeat is not ack'd within heartbeat window`() { + + @BeforeEach + internal fun setUp() { whenever(connection.readyState).thenReturn(Transport.ReadyState.OPEN) socket.connect() + } + @Test + internal fun `closes socket when heartbeat is not ack'd within heartbeat window`() { socket.sendHeartbeat() verify(connection, never()).disconnect(any(), any()) assertThat(socket.pendingHeartbeatRef).isNotNull() socket.sendHeartbeat() - verify(connection).disconnect(WS_CLOSE_NORMAL, "Heartbeat timed out") + verify(connection).disconnect(WS_CLOSE_NORMAL, "heartbeat timeout") assertThat(socket.pendingHeartbeatRef).isNull() } @Test internal fun `pushes heartbeat data when connected`() { - whenever(connection.readyState).thenReturn(Transport.ReadyState.OPEN) - socket.connect() - socket.sendHeartbeat() val expected = "{\"topic\":\"phoenix\",\"event\":\"heartbeat\",\"payload\":{},\"ref\":\"1\"}" @@ -476,7 +483,6 @@ class SocketTest { @Test internal fun `does nothing when not connected`() { whenever(connection.readyState).thenReturn(Transport.ReadyState.CLOSED) - socket.connect() socket.sendHeartbeat() verify(connection, never()).disconnect(any(), any()) @@ -529,14 +535,66 @@ class SocketTest { /* End FlushSendBuffer */ } + @Nested + @DisplayName("resetHeartbeat") + inner class ResetHeartbeat { + @Test + internal fun `clears any pending heartbeat`() { + socket.pendingHeartbeatRef = "1" + socket.resetHeartbeat() + + assertThat(socket.pendingHeartbeatRef).isNull() + } + + @Test + fun `does not schedule heartbeat if skipHeartbeat == true`() { + socket.skipHeartbeat = true + socket.resetHeartbeat() + + verifyZeroInteractions(mockDispatchQueue) + } + + @Test + internal fun `creates a future heartbeat task`() { + val mockTask = mock() + whenever(mockDispatchQueue.queueAtFixedRate(any(), any(), any(), any())).thenReturn(mockTask) + + whenever(connection.readyState).thenReturn(Transport.ReadyState.OPEN) + socket.connect() + socket.heartbeatIntervalMs = 5_000 + + assertThat(socket.heartbeatTask).isNull() + socket.resetHeartbeat() + + assertThat(socket.heartbeatTask).isNotNull() + argumentCaptor<() -> Unit> { + verify(mockDispatchQueue).queueAtFixedRate(eq(5_000L), eq(5_000L), + eq(TimeUnit.MILLISECONDS), capture()) + + // fire the task + allValues.first().invoke() + + val expected = + "{\"topic\":\"phoenix\",\"event\":\"heartbeat\",\"payload\":{},\"ref\":\"1\"}" + verify(connection).send(expected) + } + } + + /* End ResetHeartbeat */ + } + @Nested @DisplayName("onConnectionOpened") inner class OnConnectionOpened { - @Test - internal fun `flushes the send buffer`() { + + @BeforeEach + internal fun setUp() { whenever(connection.readyState).thenReturn(Transport.ReadyState.OPEN) socket.connect() + } + @Test + internal fun `flushes the send buffer`() { var oneCalled = 0 socket.sendBuffer.add { oneCalled += 1 } @@ -583,69 +641,42 @@ class SocketTest { } @Nested - @DisplayName("resetHeartbeat") - inner class ResetHeartbeat { - @Test - internal fun `clears any pending heartbeat`() { - socket.pendingHeartbeatRef = "1" - socket.resetHeartbeat() - - assertThat(socket.pendingHeartbeatRef).isNull() - } - - @Test - fun `does not schedule heartbeat if skipHeartbeat == true`() { - socket.skipHeartbeat = true - socket.resetHeartbeat() + @DisplayName("onConnectionClosed") + inner class OnConnectionClosed { - verifyZeroInteractions(mockDispatchQueue) - } + private lateinit var mockTimer: TimeoutTimer - @Test - internal fun `creates a future heartbeat task`() { - val mockTask = mock() - whenever(mockDispatchQueue.queueAtFixedRate(any(), any(), any(), any())).thenReturn(mockTask) + @BeforeEach + internal fun setUp() { + mockTimer = mock() + socket.reconnectTimer = mockTimer whenever(connection.readyState).thenReturn(Transport.ReadyState.OPEN) socket.connect() - socket.heartbeatInterval = 5_000 - - assertThat(socket.heartbeatTask).isNull() - socket.resetHeartbeat() - - assertThat(socket.heartbeatTask).isNotNull() - argumentCaptor<() -> Unit> { - verify(mockDispatchQueue).queueAtFixedRate(eq(5_000L), eq(5_000L), - eq(TimeUnit.MILLISECONDS), capture()) - - // fire the task - allValues.first().invoke() - - val expected = - "{\"topic\":\"phoenix\",\"event\":\"heartbeat\",\"payload\":{},\"ref\":\"1\"}" - verify(connection).send(expected) - } } - /* End ResetHeartbeat */ - } - - @Nested - @DisplayName("onConnectionClosed") - inner class OnConnectionClosed { @Test - internal fun `it does not schedule reconnectTimer timeout if normal close`() { - val mockTimer = mock() - socket.reconnectTimer = mockTimer - + internal fun `schedules reconnectTimer timeout if normal close`() { socket.onConnectionClosed(WS_CLOSE_NORMAL) + verify(mockTimer).scheduleTimeout() + } + + @Test + internal fun `does not schedule reconnectTimer timeout if normal close after explicit disconnect`() { + socket.disconnect() verify(mockTimer, never()).scheduleTimeout() } @Test internal fun `schedules reconnectTimer if not normal close`() { - val mockTimer = mock() - socket.reconnectTimer = mockTimer + socket.onConnectionClosed(1001) + verify(mockTimer).scheduleTimeout() + } + + @Test + internal fun `schedules reconnectTimer timeout if connection cannot be made after a previous clean disconnect`() { + socket.disconnect() + socket.connect() socket.onConnectionClosed(1001) verify(mockTimer).scheduleTimeout() @@ -677,12 +708,54 @@ class SocketTest { } @Test - internal fun `triggers channel error`() { - val channel = mock() - socket.channels.add(channel) + internal fun `triggers channel error if joining`() { + val channel = socket.channel("topic") + val spy = spy(channel) + + // Use the spy instance instead of the Channel instance + socket.channels.remove(channel) + socket.channels.add(spy) + + spy.join() + assertThat(spy.state).isEqualTo(Channel.State.JOINING) + + socket.onConnectionClosed(1001) + verify(spy).trigger("phx_error") + } + + @Test + internal fun `triggers channel error if joined`() { + val channel = socket.channel("topic") + val spy = spy(channel) + + // Use the spy instance instead of the Channel instance + socket.channels.remove(channel) + socket.channels.add(spy) + + spy.join().trigger("ok", emptyMap()) + + assertThat(channel.state).isEqualTo(Channel.State.JOINED) + + socket.onConnectionClosed(1001) + verify(spy).trigger("phx_error") + } + + @Test + internal fun `does not trigger channel error after leave`() { + val channel = socket.channel("topic") + val spy = spy(channel) + + // Use the spy instance instead of the Channel instance + socket.channels.remove(channel) + socket.channels.add(spy) + + spy.join().trigger("ok", emptyMap()) + spy.leave() + + assertThat(channel.state).isEqualTo(Channel.State.CLOSED) socket.onConnectionClosed(1001) - verify(channel).trigger("phx_error") + verify(spy, never()).trigger("phx_error") } /* End OnConnectionClosed */ @@ -707,12 +780,54 @@ class SocketTest { } @Test - internal fun `triggers channel error`() { - val channel = mock() - socket.channels.add(channel) + internal fun `triggers channel error if joining`() { + val channel = socket.channel("topic") + val spy = spy(channel) + + // Use the spy instance instead of the Channel instance + socket.channels.remove(channel) + socket.channels.add(spy) + + spy.join() + assertThat(spy.state).isEqualTo(Channel.State.JOINING) + + socket.onConnectionError(Throwable(), null) + verify(spy).trigger("phx_error") + } + + @Test + internal fun `triggers channel error if joined`() { + val channel = socket.channel("topic") + val spy = spy(channel) + + // Use the spy instance instead of the Channel instance + socket.channels.remove(channel) + socket.channels.add(spy) + + spy.join().trigger("ok", emptyMap()) + + assertThat(channel.state).isEqualTo(Channel.State.JOINED) + + socket.onConnectionError(Throwable(), null) + verify(spy).trigger("phx_error") + } + + @Test + internal fun `does not trigger channel error after leave`() { + val channel = socket.channel("topic") + val spy = spy(channel) + + // Use the spy instance instead of the Channel instance + socket.channels.remove(channel) + socket.channels.add(spy) + + spy.join().trigger("ok", emptyMap()) + spy.leave() + + assertThat(channel.state).isEqualTo(Channel.State.CLOSED) socket.onConnectionError(Throwable(), null) - verify(channel).trigger("phx_error") + verify(spy, never()).trigger("phx_error") } /* End OnConnectionError */ From 04f1e7d3b7913ebb1b639518546705aa78a8e4ac Mon Sep 17 00:00:00 2001 From: Daniel Rees Date: Wed, 19 Jun 2019 22:17:38 -0400 Subject: [PATCH 3/3] Updated transport to call onClose after onError --- .../kotlin/org/phoenixframework/Socket.kt | 11 +++++--- .../kotlin/org/phoenixframework/Transport.kt | 26 ++++++++++++++----- .../kotlin/org/phoenixframework/SocketTest.kt | 8 ++++++ .../WebSocketTransportTest.kt | 4 +-- 4 files changed, 36 insertions(+), 13 deletions(-) diff --git a/src/main/kotlin/org/phoenixframework/Socket.kt b/src/main/kotlin/org/phoenixframework/Socket.kt index 202b502..d10d5ee 100644 --- a/src/main/kotlin/org/phoenixframework/Socket.kt +++ b/src/main/kotlin/org/phoenixframework/Socket.kt @@ -54,9 +54,6 @@ const val WS_CLOSE_NORMAL = 1000 /** RFC 6455: indicates that the connection was closed abnormally */ const val WS_CLOSE_ABNORMAL = 1006 -/** The socket was closed due to a SocketException. Likely the client lost connectivity */ -// DEPRECATED -const val WS_CLOSE_SOCKET_EXCEPTION = 4000 /** * Connects to a Phoenix Server @@ -414,8 +411,14 @@ class Socket( ref = pendingHeartbeatRef) } - internal fun abnormalClose(reason: String) { + private fun abnormalClose(reason: String) { this.closeWasClean = false + + /* + We use NORMAL here since the client is the one determining to close the connection. However, + we keep a flag `closeWasClean` set to false so that the client knows that it should attempt + to reconnect. + */ this.connection?.disconnect(WS_CLOSE_NORMAL, reason) } diff --git a/src/main/kotlin/org/phoenixframework/Transport.kt b/src/main/kotlin/org/phoenixframework/Transport.kt index c262f01..25c8818 100644 --- a/src/main/kotlin/org/phoenixframework/Transport.kt +++ b/src/main/kotlin/org/phoenixframework/Transport.kt @@ -27,7 +27,6 @@ import okhttp3.Request import okhttp3.Response import okhttp3.WebSocket import okhttp3.WebSocketListener -import java.io.IOException import java.net.URL /** @@ -130,13 +129,28 @@ class WebSocketTransport( } override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { + // Set the state of the Transport as CLOSED since no more data will be received this.readyState = Transport.ReadyState.CLOSED + + // Invoke the onError callback, to inform of the error this.onError?.invoke(t, response) - - // Check if the socket was closed for some recoverable reason - when (t) { - is IOException -> this.onClosed(webSocket, WS_CLOSE_SOCKET_EXCEPTION, "IOException") - } + + /* + According to the OkHttp documentation, `onFailure` will be + + "Invoked when a web socket has been closed due to an error reading from or writing to the + network. Both outgoing and incoming messages may have been lost. No further calls to this + listener will be made." + + This means `onClose` will never be called which will never kick off the socket reconnect + attempts. + + The JS WebSocket class calls `onError` and then `onClose` which will then trigger + the reconnect logic inside of the PhoenixClient. In order to mimic this behavior and abstract + this detail of OkHttp away from the PhoenixClient, the `WebSocketTransport` class should + convert `onFailure` calls to an `onError` and `onClose` sequence. + */ + this.onClose?.invoke(WS_CLOSE_ABNORMAL) } override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { diff --git a/src/test/kotlin/org/phoenixframework/SocketTest.kt b/src/test/kotlin/org/phoenixframework/SocketTest.kt index 784bb0a..3b74612 100644 --- a/src/test/kotlin/org/phoenixframework/SocketTest.kt +++ b/src/test/kotlin/org/phoenixframework/SocketTest.kt @@ -294,6 +294,14 @@ class SocketTest { verify(connection).disconnect(WS_CLOSE_NORMAL) } + @Test + internal fun `flags the socket as closed cleanly`() { + assertThat(socket.closeWasClean).isFalse() + + socket.disconnect() + assertThat(socket.closeWasClean).isTrue() + } + @Test internal fun `calls callback`() { val mockCallback = mock<() -> Unit> {} diff --git a/src/test/kotlin/org/phoenixframework/WebSocketTransportTest.kt b/src/test/kotlin/org/phoenixframework/WebSocketTransportTest.kt index ebcd9a3..37ed0dc 100644 --- a/src/test/kotlin/org/phoenixframework/WebSocketTransportTest.kt +++ b/src/test/kotlin/org/phoenixframework/WebSocketTransportTest.kt @@ -14,8 +14,6 @@ import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test import org.mockito.Mock import org.mockito.MockitoAnnotations -import org.phoenixframework.Transport -import org.phoenixframework.WebSocketTransport import java.net.SocketException import java.net.URL @@ -124,7 +122,7 @@ class WebSocketTransportTest { val throwable = SocketException() transport.onFailure(mockWebSocket, throwable, mockResponse) verify(mockOnError).invoke(throwable, mockResponse) - verify(mockOnClose).invoke(4000) + verify(mockOnClose).invoke(1006) } /* End OnFailure */