Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/main/kotlin/org/phoenixframework/Defaults.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,20 @@ object Defaults {

/** Default reconnect algorithm. Reconnects after 1s, 2s, 5s and then 10s thereafter */
val steppedBackOff: (Int) -> Long = { tries ->
if (tries > 3) 10000 else listOf(1000L, 2000L, 5000L)[tries - 1]
if (tries > 3) 10_000 else listOf(1_000L, 2_000L, 5_000L)[tries - 1]
}

/** Default reconnect algorithm for the socket */
val reconnectSteppedBackOff: (Int) -> Long = { tries ->
if (tries > 9) 5_000 else listOf(10L, 50L, 100L, 150L, 200L, 250L, 500L, 1_000L, 2_000L)[tries - 1]
}

/** Default rejoin algorithm for individual channels */
val rejoinSteppedBackOff: (Int) -> Long = { tries ->
if (tries > 3) 10_000 else listOf(1_000L, 2_000L, 5_000L)[tries - 1]
}


/** The default Gson configuration to use when parsing messages */
val gson: Gson
get() = GsonBuilder()
Expand Down
68 changes: 49 additions & 19 deletions src/main/kotlin/org/phoenixframework/Socket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ internal data class StateChangeCallbacks(
}
}

/** The code used when the socket was closed without error */
/** RFC 6455: indicates a normal closure */
const val WS_CLOSE_NORMAL = 1000

/** The socket was closed due to a SocketException. Likely the client lost connectivity */
const val WS_CLOSE_SOCKET_EXCEPTION = 4000
/** RFC 6455: indicates that the connection was closed abnormally */
const val WS_CLOSE_ABNORMAL = 1006


/**
Expand Down Expand Up @@ -88,11 +88,14 @@ class Socket(
/** Timeout to use when opening a connection */
var timeout: Long = Defaults.TIMEOUT

/** Interval between sending a heartbeat */
var heartbeatInterval: Long = Defaults.HEARTBEAT
/** Interval between sending a heartbeat, in ms */
var heartbeatIntervalMs: Long = Defaults.HEARTBEAT

/** Internval between socket reconnect attempts */
var reconnectAfterMs: ((Int) -> Long) = Defaults.steppedBackOff
/** Interval between socket reconnect attempts, in ms */
var reconnectAfterMs: ((Int) -> Long) = Defaults.reconnectSteppedBackOff

/** Interval between channel rejoin attempts, in ms */
var rejoinAfterMs: ((Int) -> Long) = Defaults.rejoinSteppedBackOff

/** The optional function to receive logs */
var logger: ((String) -> Unit)? = null
Expand Down Expand Up @@ -139,6 +142,9 @@ class Socket(
/** Timer to use when attempting to reconnect */
internal var reconnectTimer: TimeoutTimer

/** True if the Socket closed cleaned. False if not (connection timeout, heartbeat, etc) */
internal var closeWasClean = false

//------------------------------------------------------------------------------
// Connection Attributes
//------------------------------------------------------------------------------
Expand Down Expand Up @@ -218,6 +224,9 @@ class Socket(
// Do not attempt to connect if already connected
if (isConnected) return

// Reset the clean close flag when attempting to connect
this.closeWasClean = false

this.connection = this.transport(endpointUrl)
this.connection?.onOpen = { onConnectionOpened() }
this.connection?.onClose = { code -> onConnectionClosed(code) }
Expand All @@ -231,6 +240,10 @@ class Socket(
reason: String? = null,
callback: (() -> Unit)? = null
) {
// The socket was closed cleanly by the User
this.closeWasClean = true

// Reset any reconnects and teardown the socket connection
this.reconnectTimer.reset()
this.teardown(code, reason, callback)

Expand Down Expand Up @@ -339,7 +352,12 @@ class Socket(

/** Triggers an error event to all connected Channels */
private fun triggerChannelError() {
this.channels.forEach { it.trigger(Channel.Event.ERROR.value) }
this.channels.forEach { channel ->
// Only trigger a channel error if it is in an "opened" state
if (!(channel.isErrored || channel.isLeaving || channel.isClosed)) {
channel.trigger(Channel.Event.ERROR.value)
}
}
}

/** Send all messages that were buffered before the socket opened */
Expand All @@ -361,8 +379,8 @@ class Socket(

// Do not start up the heartbeat timer if skipHeartbeat is true
if (skipHeartbeat) return
val delay = heartbeatInterval
val period = heartbeatInterval
val delay = heartbeatIntervalMs
val period = heartbeatIntervalMs

heartbeatTask =
dispatchQueue.queueAtFixedRate(delay, period, TimeUnit.MILLISECONDS) { sendHeartbeat() }
Expand All @@ -379,9 +397,8 @@ class Socket(
pendingHeartbeatRef = null
logItems("Transport: Heartbeat timeout. Attempt to re-establish connection")

// Disconnect the socket manually. Do not use `teardown` or
// `disconnect` as they will nil out the websocket delegate
this.connection?.disconnect(WS_CLOSE_NORMAL, "Heartbeat timed out")
// Close the socket, flagging the closure as abnormal
this.abnormalClose("heartbeat timeout")
return
}

Expand All @@ -394,12 +411,26 @@ class Socket(
ref = pendingHeartbeatRef)
}

private fun abnormalClose(reason: String) {
this.closeWasClean = false

/*
We use NORMAL here since the client is the one determining to close the connection. However,
we keep a flag `closeWasClean` set to false so that the client knows that it should attempt
to reconnect.
*/
this.connection?.disconnect(WS_CLOSE_NORMAL, reason)
}

//------------------------------------------------------------------------------
// Connection Transport Hooks
//------------------------------------------------------------------------------
internal fun onConnectionOpened() {
this.logItems("Transport: Connected to $endpoint")

// Reset the closeWasClean flag now that the socket has been connected
this.closeWasClean = false

// Send any messages that were waiting for a connection
this.flushSendBuffer()

Expand All @@ -421,14 +452,13 @@ class Socket(
this.heartbeatTask?.cancel()
this.heartbeatTask = null

// Only attempt to reconnect if the socket did not close normally
if (!this.closeWasClean) {
this.reconnectTimer.scheduleTimeout()
}

// Inform callbacks the socket closed
this.stateChangeCallbacks.close.forEach { it.invoke() }

// If there was a non-normal event when the connection closed, attempt
// to schedule a reconnect attempt
if (code != WS_CLOSE_NORMAL) {
reconnectTimer.scheduleTimeout()
}
}

internal fun onConnectionMessage(rawMessage: String) {
Expand Down
26 changes: 20 additions & 6 deletions src/main/kotlin/org/phoenixframework/Transport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import okhttp3.Request
import okhttp3.Response
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import java.io.IOException
import java.net.URL

/**
Expand Down Expand Up @@ -130,13 +129,28 @@ class WebSocketTransport(
}

override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
// Set the state of the Transport as CLOSED since no more data will be received
this.readyState = Transport.ReadyState.CLOSED

// Invoke the onError callback, to inform of the error
this.onError?.invoke(t, response)

// Check if the socket was closed for some recoverable reason
when (t) {
is IOException -> this.onClosed(webSocket, WS_CLOSE_SOCKET_EXCEPTION, "IOException")
}

/*
According to the OkHttp documentation, `onFailure` will be

"Invoked when a web socket has been closed due to an error reading from or writing to the
network. Both outgoing and incoming messages may have been lost. No further calls to this
listener will be made."

This means `onClose` will never be called which will never kick off the socket reconnect
attempts.

The JS WebSocket class calls `onError` and then `onClose` which will then trigger
the reconnect logic inside of the PhoenixClient. In order to mimic this behavior and abstract
this detail of OkHttp away from the PhoenixClient, the `WebSocketTransport` class should
convert `onFailure` calls to an `onError` and `onClose` sequence.
*/
this.onClose?.invoke(WS_CLOSE_ABNORMAL)
}

override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
Expand Down
Loading