Skip to content

Commit caf94bc

Browse files
author
Daniel Rees
authored
Refactor Channel Rejoins (#63)
* Refactored channel rejoins * Finished up channel test changes
1 parent 93064af commit caf94bc

File tree

8 files changed

+380
-236
lines changed

8 files changed

+380
-236
lines changed

src/main/kotlin/org/phoenixframework/Channel.kt

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,17 @@ class Channel(
127127
this.pushBuffer = mutableListOf()
128128
this.rejoinTimer = TimeoutTimer(
129129
dispatchQueue = socket.dispatchQueue,
130-
callback = { rejoinUntilConnected() },
131-
timerCalculation = socket.reconnectAfterMs)
130+
timerCalculation = socket.rejoinAfterMs,
131+
callback = { if (socket.isConnected) rejoin() }
132+
)
133+
134+
// Respond to socket events
135+
this.socket.onError { _, _-> this.rejoinTimer.reset() }
136+
this.socket.onOpen {
137+
this.rejoinTimer.reset()
138+
if (this.isErrored) { this.rejoin() }
139+
}
140+
132141

133142
// Setup Push to be sent when joining
134143
this.joinPush = Push(
@@ -150,12 +159,15 @@ class Channel(
150159
this.pushBuffer.clear()
151160
}
152161

162+
// Perform if Channel errors while attempting to join
163+
this.joinPush.receive("error") {
164+
this.state = State.ERRORED
165+
if (this.socket.isConnected) { this.rejoinTimer.scheduleTimeout() }
166+
}
167+
153168
// Perform if Channel timed out while attempting to join
154169
this.joinPush.receive("timeout") {
155-
156-
// Only handle a timeout if the Channel is in the 'joining' state
157-
if (!this.isJoining) return@receive
158-
170+
// Log the timeout
159171
this.socket.logItems("Channel: timeouts $topic, $joinRef after $timeout ms")
160172

161173
// Send a Push to the server to leave the Channel
@@ -165,10 +177,11 @@ class Channel(
165177
timeout = this.timeout)
166178
leavePush.send()
167179

168-
// Mark the Channel as in an error and attempt to rejoin
180+
// Mark the Channel as in an error and attempt to rejoin if socket is connected
169181
this.state = State.ERRORED
170182
this.joinPush.reset()
171-
this.rejoinTimer.scheduleTimeout()
183+
184+
if (this.socket.isConnected) { this.rejoinTimer.scheduleTimeout() }
172185
}
173186

174187
// Clean up when the channel closes
@@ -177,7 +190,7 @@ class Channel(
177190
this.rejoinTimer.reset()
178191

179192
// Log that the channel was left
180-
this.socket.logItems("Channel: close $topic")
193+
this.socket.logItems("Channel: close $topic $joinRef")
181194

182195
// Mark the channel as closed and remove it from the socket
183196
this.state = State.CLOSED
@@ -186,16 +199,15 @@ class Channel(
186199

187200
// Handles an error, attempts to rejoin
188201
this.onError {
189-
// Do not emit error if the channel is in the process of leaving
190-
// or if it has already closed
191-
if (this.isLeaving || this.isClosed) return@onError
192-
193202
// Log that the channel received an error
194-
this.socket.logItems("Channel: error $topic")
203+
this.socket.logItems("Channel: error $topic ${it.payload}")
204+
205+
// If error was received while joining, then reset the Push
206+
if (isJoining) { this.joinPush.reset() }
195207

196-
// Mark the channel as errored and attempt to rejoin
208+
// Mark the channel as errored and attempt to rejoin if socket is currently connected
197209
this.state = State.ERRORED
198-
this.rejoinTimer.scheduleTimeout()
210+
if (socket.isConnected) { this.rejoinTimer.scheduleTimeout() }
199211
}
200212

201213
// Perform when the join reply is received
@@ -245,8 +257,9 @@ class Channel(
245257
}
246258

247259
// Join the channel
260+
this.timeout = timeout
248261
this.joinedOnce = true
249-
this.rejoin(timeout)
262+
this.rejoin()
250263
return joinPush
251264
}
252265

@@ -304,6 +317,9 @@ class Channel(
304317
// will return false, so instead store it _before_ starting the leave
305318
val canPush = this.canPush
306319

320+
// If attempting a rejoin during a leave, then reset, cancelling the rejoin
321+
this.rejoinTimer.reset()
322+
307323
// Now set the state to leaving
308324
this.state = State.LEAVING
309325

@@ -384,12 +400,6 @@ class Channel(
384400
//------------------------------------------------------------------------------
385401
// Private
386402
//------------------------------------------------------------------------------
387-
/** Will continually attempt to rejoin the Channel on a timer. */
388-
private fun rejoinUntilConnected() {
389-
this.rejoinTimer.scheduleTimeout()
390-
if (this.socket.isConnected) this.rejoin()
391-
}
392-
393403
/** Sends the Channel's joinPush to the Server */
394404
private fun sendJoin(timeout: Long) {
395405
this.state = State.JOINING
@@ -398,6 +408,10 @@ class Channel(
398408

399409
/** Rejoins the Channel e.g. after a disconnect */
400410
private fun rejoin(timeout: Long = this.timeout) {
411+
// Do not attempt to rejoin if the channel is in the process of leaving
412+
if (isLeaving) return
413+
414+
// Send the joinPush
401415
this.sendJoin(timeout)
402416
}
403417
}

src/main/kotlin/org/phoenixframework/Defaults.kt

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,6 @@ object Defaults {
3434
/** Default heartbeat interval of 30s */
3535
const val HEARTBEAT: Long = 30_000
3636

37-
/** Default reconnect algorithm. Reconnects after 1s, 2s, 5s and then 10s thereafter */
38-
val steppedBackOff: (Int) -> Long = { tries ->
39-
if (tries > 3) 10_000 else listOf(1_000L, 2_000L, 5_000L)[tries - 1]
40-
}
41-
4237
/** Default reconnect algorithm for the socket */
4338
val reconnectSteppedBackOff: (Int) -> Long = { tries ->
4439
if (tries > 9) 5_000 else listOf(10L, 50L, 100L, 150L, 200L, 250L, 500L, 1_000L, 2_000L)[tries - 1]

0 commit comments

Comments
 (0)