Skip to content

Commit 7f11476

Browse files
author
Daniel Rees
committed
Updated socket to track abnormal close
1 parent 0302a20 commit 7f11476

File tree

3 files changed

+234
-97
lines changed

3 files changed

+234
-97
lines changed

src/main/kotlin/org/phoenixframework/Defaults.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ object Defaults {
4040
}
4141

4242
/** Default reconnect algorithm for the socket */
43-
val reconnectAfterMs: (Int) -> Long = { tries ->
43+
val reconnectSteppedBackOff: (Int) -> Long = { tries ->
4444
if (tries > 9) 5_000 else listOf(10L, 50L, 100L, 150L, 200L, 250L, 500L, 1_000L, 2_000L)[tries - 1]
4545
}
4646

4747
/** Default rejoin algorithm for individual channels */
48-
val rejoinAfterMs: (Int) -> Long = { tries ->
48+
val rejoinSteppedBackOff: (Int) -> Long = { tries ->
4949
if (tries > 3) 10_000 else listOf(1_000L, 2_000L, 5_000L)[tries - 1]
5050
}
5151

src/main/kotlin/org/phoenixframework/Socket.kt

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,16 @@ internal data class StateChangeCallbacks(
4848
}
4949
}
5050

51-
/** The code used when the socket was closed without error */
51+
/** RFC 6455: indicates a normal closure */
5252
const val WS_CLOSE_NORMAL = 1000
5353

54-
54+
/** RFC 6455: indicates that the connection was closed abnormally */
55+
const val WS_CLOSE_ABNORMAL = 1006
5556

5657
/** The socket was closed due to a SocketException. Likely the client lost connectivity */
58+
// DEPRECATED
5759
const val WS_CLOSE_SOCKET_EXCEPTION = 4000
5860

59-
6061
/**
6162
* Connects to a Phoenix Server
6263
*/
@@ -90,11 +91,14 @@ class Socket(
9091
/** Timeout to use when opening a connection */
9192
var timeout: Long = Defaults.TIMEOUT
9293

93-
/** Interval between sending a heartbeat */
94-
var heartbeatInterval: Long = Defaults.HEARTBEAT
94+
/** Interval between sending a heartbeat, in ms */
95+
var heartbeatIntervalMs: Long = Defaults.HEARTBEAT
9596

96-
/** Internval between socket reconnect attempts */
97-
var reconnectAfterMs: ((Int) -> Long) = Defaults.steppedBackOff
97+
/** Interval between socket reconnect attempts, in ms */
98+
var reconnectAfterMs: ((Int) -> Long) = Defaults.reconnectSteppedBackOff
99+
100+
/** Interval between channel rejoin attempts, in ms */
101+
var rejoinAfterMs: ((Int) -> Long) = Defaults.rejoinSteppedBackOff
98102

99103
/** The optional function to receive logs */
100104
var logger: ((String) -> Unit)? = null
@@ -141,8 +145,8 @@ class Socket(
141145
/** Timer to use when attempting to reconnect */
142146
internal var reconnectTimer: TimeoutTimer
143147

144-
/** True if the socket closed cleanly. False if it was closed due to an error or timeout */
145-
internal var closeWasClean: Boolean = false
148+
/** True if the Socket closed cleaned. False if not (connection timeout, heartbeat, etc) */
149+
internal var closeWasClean = false
146150

147151
//------------------------------------------------------------------------------
148152
// Connection Attributes
@@ -223,6 +227,9 @@ class Socket(
223227
// Do not attempt to connect if already connected
224228
if (isConnected) return
225229

230+
// Reset the clean close flag when attempting to connect
231+
this.closeWasClean = false
232+
226233
this.connection = this.transport(endpointUrl)
227234
this.connection?.onOpen = { onConnectionOpened() }
228235
this.connection?.onClose = { code -> onConnectionClosed(code) }
@@ -236,6 +243,10 @@ class Socket(
236243
reason: String? = null,
237244
callback: (() -> Unit)? = null
238245
) {
246+
// The socket was closed cleanly by the User
247+
this.closeWasClean = true
248+
249+
// Reset any reconnects and teardown the socket connection
239250
this.reconnectTimer.reset()
240251
this.teardown(code, reason, callback)
241252

@@ -344,7 +355,12 @@ class Socket(
344355

345356
/** Triggers an error event to all connected Channels */
346357
private fun triggerChannelError() {
347-
this.channels.forEach { it.trigger(Channel.Event.ERROR.value) }
358+
this.channels.forEach { channel ->
359+
// Only trigger a channel error if it is in an "opened" state
360+
if (!(channel.isErrored || channel.isLeaving || channel.isClosed)) {
361+
channel.trigger(Channel.Event.ERROR.value)
362+
}
363+
}
348364
}
349365

350366
/** Send all messages that were buffered before the socket opened */
@@ -366,8 +382,8 @@ class Socket(
366382

367383
// Do not start up the heartbeat timer if skipHeartbeat is true
368384
if (skipHeartbeat) return
369-
val delay = heartbeatInterval
370-
val period = heartbeatInterval
385+
val delay = heartbeatIntervalMs
386+
val period = heartbeatIntervalMs
371387

372388
heartbeatTask =
373389
dispatchQueue.queueAtFixedRate(delay, period, TimeUnit.MILLISECONDS) { sendHeartbeat() }
@@ -384,9 +400,8 @@ class Socket(
384400
pendingHeartbeatRef = null
385401
logItems("Transport: Heartbeat timeout. Attempt to re-establish connection")
386402

387-
// Disconnect the socket manually. Do not use `teardown` or
388-
// `disconnect` as they will nil out the websocket delegate
389-
this.connection?.disconnect(WS_CLOSE_NORMAL, "Heartbeat timed out")
403+
// Close the socket, flagging the closure as abnormal
404+
this.abnormalClose("heartbeat timeout")
390405
return
391406
}
392407

@@ -399,12 +414,20 @@ class Socket(
399414
ref = pendingHeartbeatRef)
400415
}
401416

417+
internal fun abnormalClose(reason: String) {
418+
this.closeWasClean = false
419+
this.connection?.disconnect(WS_CLOSE_NORMAL, reason)
420+
}
421+
402422
//------------------------------------------------------------------------------
403423
// Connection Transport Hooks
404424
//------------------------------------------------------------------------------
405425
internal fun onConnectionOpened() {
406426
this.logItems("Transport: Connected to $endpoint")
407427

428+
// Reset the closeWasClean flag now that the socket has been connected
429+
this.closeWasClean = false
430+
408431
// Send any messages that were waiting for a connection
409432
this.flushSendBuffer()
410433

@@ -426,14 +449,13 @@ class Socket(
426449
this.heartbeatTask?.cancel()
427450
this.heartbeatTask = null
428451

452+
// Only attempt to reconnect if the socket did not close normally
453+
if (!this.closeWasClean) {
454+
this.reconnectTimer.scheduleTimeout()
455+
}
456+
429457
// Inform callbacks the socket closed
430458
this.stateChangeCallbacks.close.forEach { it.invoke() }
431-
432-
// If there was a non-normal event when the connection closed, attempt
433-
// to schedule a reconnect attempt
434-
if (code != WS_CLOSE_NORMAL) {
435-
reconnectTimer.scheduleTimeout()
436-
}
437459
}
438460

439461
internal fun onConnectionMessage(rawMessage: String) {

0 commit comments

Comments
 (0)