Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 37 additions & 23 deletions src/main/kotlin/org/phoenixframework/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,17 @@ class Channel(
this.pushBuffer = mutableListOf()
this.rejoinTimer = TimeoutTimer(
dispatchQueue = socket.dispatchQueue,
callback = { rejoinUntilConnected() },
timerCalculation = socket.reconnectAfterMs)
timerCalculation = socket.rejoinAfterMs,
callback = { if (socket.isConnected) rejoin() }
)

// Respond to socket events
this.socket.onError { _, _-> this.rejoinTimer.reset() }
this.socket.onOpen {
this.rejoinTimer.reset()
if (this.isErrored) { this.rejoin() }
}


// Setup Push to be sent when joining
this.joinPush = Push(
Expand All @@ -150,12 +159,15 @@ class Channel(
this.pushBuffer.clear()
}

// Perform if Channel errors while attempting to join
this.joinPush.receive("error") {
this.state = State.ERRORED
if (this.socket.isConnected) { this.rejoinTimer.scheduleTimeout() }
}

// Perform if Channel timed out while attempting to join
this.joinPush.receive("timeout") {

// Only handle a timeout if the Channel is in the 'joining' state
if (!this.isJoining) return@receive

// Log the timeout
this.socket.logItems("Channel: timeouts $topic, $joinRef after $timeout ms")

// Send a Push to the server to leave the Channel
Expand All @@ -165,10 +177,11 @@ class Channel(
timeout = this.timeout)
leavePush.send()

// Mark the Channel as in an error and attempt to rejoin
// Mark the Channel as in an error and attempt to rejoin if socket is connected
this.state = State.ERRORED
this.joinPush.reset()
this.rejoinTimer.scheduleTimeout()

if (this.socket.isConnected) { this.rejoinTimer.scheduleTimeout() }
}

// Clean up when the channel closes
Expand All @@ -177,7 +190,7 @@ class Channel(
this.rejoinTimer.reset()

// Log that the channel was left
this.socket.logItems("Channel: close $topic")
this.socket.logItems("Channel: close $topic $joinRef")

// Mark the channel as closed and remove it from the socket
this.state = State.CLOSED
Expand All @@ -186,16 +199,15 @@ class Channel(

// Handles an error, attempts to rejoin
this.onError {
// Do not emit error if the channel is in the process of leaving
// or if it has already closed
if (this.isLeaving || this.isClosed) return@onError

// Log that the channel received an error
this.socket.logItems("Channel: error $topic")
this.socket.logItems("Channel: error $topic ${it.payload}")

// If error was received while joining, then reset the Push
if (isJoining) { this.joinPush.reset() }

// Mark the channel as errored and attempt to rejoin
// Mark the channel as errored and attempt to rejoin if socket is currently connected
this.state = State.ERRORED
this.rejoinTimer.scheduleTimeout()
if (socket.isConnected) { this.rejoinTimer.scheduleTimeout() }
}

// Perform when the join reply is received
Expand Down Expand Up @@ -245,8 +257,9 @@ class Channel(
}

// Join the channel
this.timeout = timeout
this.joinedOnce = true
this.rejoin(timeout)
this.rejoin()
return joinPush
}

Expand Down Expand Up @@ -304,6 +317,9 @@ class Channel(
// will return false, so instead store it _before_ starting the leave
val canPush = this.canPush

// If attempting a rejoin during a leave, then reset, cancelling the rejoin
this.rejoinTimer.reset()

// Now set the state to leaving
this.state = State.LEAVING

Expand Down Expand Up @@ -384,12 +400,6 @@ class Channel(
//------------------------------------------------------------------------------
// Private
//------------------------------------------------------------------------------
/** Will continually attempt to rejoin the Channel on a timer. */
private fun rejoinUntilConnected() {
this.rejoinTimer.scheduleTimeout()
if (this.socket.isConnected) this.rejoin()
}

/** Sends the Channel's joinPush to the Server */
private fun sendJoin(timeout: Long) {
this.state = State.JOINING
Expand All @@ -398,6 +408,10 @@ class Channel(

/** Rejoins the Channel e.g. after a disconnect */
private fun rejoin(timeout: Long = this.timeout) {
// Do not attempt to rejoin if the channel is in the process of leaving
if (isLeaving) return

// Send the joinPush
this.sendJoin(timeout)
}
}
5 changes: 0 additions & 5 deletions src/main/kotlin/org/phoenixframework/Defaults.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ object Defaults {
/** Default heartbeat interval of 30s */
const val HEARTBEAT: Long = 30_000

/** Default reconnect algorithm. Reconnects after 1s, 2s, 5s and then 10s thereafter */
val steppedBackOff: (Int) -> Long = { tries ->
if (tries > 3) 10_000 else listOf(1_000L, 2_000L, 5_000L)[tries - 1]
}

/** Default reconnect algorithm for the socket */
val reconnectSteppedBackOff: (Int) -> Long = { tries ->
if (tries > 9) 5_000 else listOf(10L, 50L, 100L, 150L, 200L, 250L, 500L, 1_000L, 2_000L)[tries - 1]
Expand Down
Loading