Skip to content

Commit 84bdcd3

Browse files
author
Daniel Rees
authored
Optimize Socket Reconnect (#61)
* Added new rejoin and reconnect defaults * Updated socket to track abnormal close * Updated transport to call onClose after onError
1 parent a11852b commit 84bdcd3

File tree

5 files changed

+279
-103
lines changed

5 files changed

+279
-103
lines changed

src/main/kotlin/org/phoenixframework/Defaults.kt

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,20 @@ object Defaults {
3636

3737
/** Default reconnect algorithm. Reconnects after 1s, 2s, 5s and then 10s thereafter */
3838
val steppedBackOff: (Int) -> Long = { tries ->
39-
if (tries > 3) 10000 else listOf(1000L, 2000L, 5000L)[tries - 1]
39+
if (tries > 3) 10_000 else listOf(1_000L, 2_000L, 5_000L)[tries - 1]
4040
}
4141

42+
/** Default reconnect algorithm for the socket */
43+
val reconnectSteppedBackOff: (Int) -> Long = { tries ->
44+
if (tries > 9) 5_000 else listOf(10L, 50L, 100L, 150L, 200L, 250L, 500L, 1_000L, 2_000L)[tries - 1]
45+
}
46+
47+
/** Default rejoin algorithm for individual channels */
48+
val rejoinSteppedBackOff: (Int) -> Long = { tries ->
49+
if (tries > 3) 10_000 else listOf(1_000L, 2_000L, 5_000L)[tries - 1]
50+
}
51+
52+
4253
/** The default Gson configuration to use when parsing messages */
4354
val gson: Gson
4455
get() = GsonBuilder()

src/main/kotlin/org/phoenixframework/Socket.kt

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,11 @@ internal data class StateChangeCallbacks(
4848
}
4949
}
5050

51-
/** The code used when the socket was closed without error */
51+
/** RFC 6455: indicates a normal closure */
5252
const val WS_CLOSE_NORMAL = 1000
5353

54-
/** The socket was closed due to a SocketException. Likely the client lost connectivity */
55-
const val WS_CLOSE_SOCKET_EXCEPTION = 4000
54+
/** RFC 6455: indicates that the connection was closed abnormally */
55+
const val WS_CLOSE_ABNORMAL = 1006
5656

5757

5858
/**
@@ -88,11 +88,14 @@ class Socket(
8888
/** Timeout to use when opening a connection */
8989
var timeout: Long = Defaults.TIMEOUT
9090

91-
/** Interval between sending a heartbeat */
92-
var heartbeatInterval: Long = Defaults.HEARTBEAT
91+
/** Interval between sending a heartbeat, in ms */
92+
var heartbeatIntervalMs: Long = Defaults.HEARTBEAT
9393

94-
/** Internval between socket reconnect attempts */
95-
var reconnectAfterMs: ((Int) -> Long) = Defaults.steppedBackOff
94+
/** Interval between socket reconnect attempts, in ms */
95+
var reconnectAfterMs: ((Int) -> Long) = Defaults.reconnectSteppedBackOff
96+
97+
/** Interval between channel rejoin attempts, in ms */
98+
var rejoinAfterMs: ((Int) -> Long) = Defaults.rejoinSteppedBackOff
9699

97100
/** The optional function to receive logs */
98101
var logger: ((String) -> Unit)? = null
@@ -139,6 +142,9 @@ class Socket(
139142
/** Timer to use when attempting to reconnect */
140143
internal var reconnectTimer: TimeoutTimer
141144

145+
/** True if the Socket closed cleaned. False if not (connection timeout, heartbeat, etc) */
146+
internal var closeWasClean = false
147+
142148
//------------------------------------------------------------------------------
143149
// Connection Attributes
144150
//------------------------------------------------------------------------------
@@ -218,6 +224,9 @@ class Socket(
218224
// Do not attempt to connect if already connected
219225
if (isConnected) return
220226

227+
// Reset the clean close flag when attempting to connect
228+
this.closeWasClean = false
229+
221230
this.connection = this.transport(endpointUrl)
222231
this.connection?.onOpen = { onConnectionOpened() }
223232
this.connection?.onClose = { code -> onConnectionClosed(code) }
@@ -231,6 +240,10 @@ class Socket(
231240
reason: String? = null,
232241
callback: (() -> Unit)? = null
233242
) {
243+
// The socket was closed cleanly by the User
244+
this.closeWasClean = true
245+
246+
// Reset any reconnects and teardown the socket connection
234247
this.reconnectTimer.reset()
235248
this.teardown(code, reason, callback)
236249

@@ -339,7 +352,12 @@ class Socket(
339352

340353
/** Triggers an error event to all connected Channels */
341354
private fun triggerChannelError() {
342-
this.channels.forEach { it.trigger(Channel.Event.ERROR.value) }
355+
this.channels.forEach { channel ->
356+
// Only trigger a channel error if it is in an "opened" state
357+
if (!(channel.isErrored || channel.isLeaving || channel.isClosed)) {
358+
channel.trigger(Channel.Event.ERROR.value)
359+
}
360+
}
343361
}
344362

345363
/** Send all messages that were buffered before the socket opened */
@@ -361,8 +379,8 @@ class Socket(
361379

362380
// Do not start up the heartbeat timer if skipHeartbeat is true
363381
if (skipHeartbeat) return
364-
val delay = heartbeatInterval
365-
val period = heartbeatInterval
382+
val delay = heartbeatIntervalMs
383+
val period = heartbeatIntervalMs
366384

367385
heartbeatTask =
368386
dispatchQueue.queueAtFixedRate(delay, period, TimeUnit.MILLISECONDS) { sendHeartbeat() }
@@ -379,9 +397,8 @@ class Socket(
379397
pendingHeartbeatRef = null
380398
logItems("Transport: Heartbeat timeout. Attempt to re-establish connection")
381399

382-
// Disconnect the socket manually. Do not use `teardown` or
383-
// `disconnect` as they will nil out the websocket delegate
384-
this.connection?.disconnect(WS_CLOSE_NORMAL, "Heartbeat timed out")
400+
// Close the socket, flagging the closure as abnormal
401+
this.abnormalClose("heartbeat timeout")
385402
return
386403
}
387404

@@ -394,12 +411,26 @@ class Socket(
394411
ref = pendingHeartbeatRef)
395412
}
396413

414+
private fun abnormalClose(reason: String) {
415+
this.closeWasClean = false
416+
417+
/*
418+
We use NORMAL here since the client is the one determining to close the connection. However,
419+
we keep a flag `closeWasClean` set to false so that the client knows that it should attempt
420+
to reconnect.
421+
*/
422+
this.connection?.disconnect(WS_CLOSE_NORMAL, reason)
423+
}
424+
397425
//------------------------------------------------------------------------------
398426
// Connection Transport Hooks
399427
//------------------------------------------------------------------------------
400428
internal fun onConnectionOpened() {
401429
this.logItems("Transport: Connected to $endpoint")
402430

431+
// Reset the closeWasClean flag now that the socket has been connected
432+
this.closeWasClean = false
433+
403434
// Send any messages that were waiting for a connection
404435
this.flushSendBuffer()
405436

@@ -421,14 +452,13 @@ class Socket(
421452
this.heartbeatTask?.cancel()
422453
this.heartbeatTask = null
423454

455+
// Only attempt to reconnect if the socket did not close normally
456+
if (!this.closeWasClean) {
457+
this.reconnectTimer.scheduleTimeout()
458+
}
459+
424460
// Inform callbacks the socket closed
425461
this.stateChangeCallbacks.close.forEach { it.invoke() }
426-
427-
// If there was a non-normal event when the connection closed, attempt
428-
// to schedule a reconnect attempt
429-
if (code != WS_CLOSE_NORMAL) {
430-
reconnectTimer.scheduleTimeout()
431-
}
432462
}
433463

434464
internal fun onConnectionMessage(rawMessage: String) {

src/main/kotlin/org/phoenixframework/Transport.kt

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import okhttp3.Request
2727
import okhttp3.Response
2828
import okhttp3.WebSocket
2929
import okhttp3.WebSocketListener
30-
import java.io.IOException
3130
import java.net.URL
3231

3332
/**
@@ -130,13 +129,28 @@ class WebSocketTransport(
130129
}
131130

132131
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
132+
// Set the state of the Transport as CLOSED since no more data will be received
133133
this.readyState = Transport.ReadyState.CLOSED
134+
135+
// Invoke the onError callback, to inform of the error
134136
this.onError?.invoke(t, response)
135-
136-
// Check if the socket was closed for some recoverable reason
137-
when (t) {
138-
is IOException -> this.onClosed(webSocket, WS_CLOSE_SOCKET_EXCEPTION, "IOException")
139-
}
137+
138+
/*
139+
According to the OkHttp documentation, `onFailure` will be
140+
141+
"Invoked when a web socket has been closed due to an error reading from or writing to the
142+
network. Both outgoing and incoming messages may have been lost. No further calls to this
143+
listener will be made."
144+
145+
This means `onClose` will never be called which will never kick off the socket reconnect
146+
attempts.
147+
148+
The JS WebSocket class calls `onError` and then `onClose` which will then trigger
149+
the reconnect logic inside of the PhoenixClient. In order to mimic this behavior and abstract
150+
this detail of OkHttp away from the PhoenixClient, the `WebSocketTransport` class should
151+
convert `onFailure` calls to an `onError` and `onClose` sequence.
152+
*/
153+
this.onClose?.invoke(WS_CLOSE_ABNORMAL)
140154
}
141155

142156
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {

0 commit comments

Comments
 (0)