Skip to content

Commit 604e27b

Browse files
author
Daniel Rees
committed
Refactored channel rejoins
1 parent 93064af commit 604e27b

File tree

4 files changed

+46
-30
lines changed

4 files changed

+46
-30
lines changed

src/main/kotlin/org/phoenixframework/Channel.kt

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,17 @@ class Channel(
127127
this.pushBuffer = mutableListOf()
128128
this.rejoinTimer = TimeoutTimer(
129129
dispatchQueue = socket.dispatchQueue,
130-
callback = { rejoinUntilConnected() },
131-
timerCalculation = socket.reconnectAfterMs)
130+
timerCalculation = socket.rejoinAfterMs,
131+
callback = { if (socket.isConnected) rejoin() }
132+
)
133+
134+
// Respond to socket events
135+
this.socket.onError { _, _-> this.rejoinTimer.reset() }
136+
this.socket.onOpen {
137+
this.rejoinTimer.reset()
138+
if (this.isErrored) { this.rejoin() }
139+
}
140+
132141

133142
// Setup Push to be sent when joining
134143
this.joinPush = Push(
@@ -150,12 +159,15 @@ class Channel(
150159
this.pushBuffer.clear()
151160
}
152161

162+
// Perform if Channel errors while attempting to join
163+
this.joinPush.receive("error") {
164+
this.state = State.ERRORED
165+
if (this.socket.isConnected) { this.rejoinTimer.scheduleTimeout() }
166+
}
167+
153168
// Perform if Channel timed out while attempting to join
154169
this.joinPush.receive("timeout") {
155-
156-
// Only handle a timeout if the Channel is in the 'joining' state
157-
if (!this.isJoining) return@receive
158-
170+
// Log the timeout
159171
this.socket.logItems("Channel: timeouts $topic, $joinRef after $timeout ms")
160172

161173
// Send a Push to the server to leave the Channel
@@ -165,10 +177,11 @@ class Channel(
165177
timeout = this.timeout)
166178
leavePush.send()
167179

168-
// Mark the Channel as in an error and attempt to rejoin
180+
// Mark the Channel as in an error and attempt to rejoin if socket is connected
169181
this.state = State.ERRORED
170182
this.joinPush.reset()
171-
this.rejoinTimer.scheduleTimeout()
183+
184+
if (this.socket.isConnected) { this.rejoinTimer.scheduleTimeout() }
172185
}
173186

174187
// Clean up when the channel closes
@@ -177,7 +190,7 @@ class Channel(
177190
this.rejoinTimer.reset()
178191

179192
// Log that the channel was left
180-
this.socket.logItems("Channel: close $topic")
193+
this.socket.logItems("Channel: close $topic $joinRef")
181194

182195
// Mark the channel as closed and remove it from the socket
183196
this.state = State.CLOSED
@@ -186,16 +199,15 @@ class Channel(
186199

187200
// Handles an error, attempts to rejoin
188201
this.onError {
189-
// Do not emit error if the channel is in the process of leaving
190-
// or if it has already closed
191-
if (this.isLeaving || this.isClosed) return@onError
192-
193202
// Log that the channel received an error
194-
this.socket.logItems("Channel: error $topic")
203+
this.socket.logItems("Channel: error $topic ${it.payload}")
204+
205+
// If error was received while joining, then reset the Push
206+
if (isJoining) { this.joinPush.reset() }
195207

196-
// Mark the channel as errored and attempt to rejoin
208+
// Mark the channel as errored and attempt to rejoin if socket is currently connected
197209
this.state = State.ERRORED
198-
this.rejoinTimer.scheduleTimeout()
210+
if (socket.isConnected) { this.rejoinTimer.scheduleTimeout() }
199211
}
200212

201213
// Perform when the join reply is received
@@ -245,8 +257,9 @@ class Channel(
245257
}
246258

247259
// Join the channel
260+
this.timeout = timeout
248261
this.joinedOnce = true
249-
this.rejoin(timeout)
262+
this.rejoin()
250263
return joinPush
251264
}
252265

@@ -304,6 +317,9 @@ class Channel(
304317
// will return false, so instead store it _before_ starting the leave
305318
val canPush = this.canPush
306319

320+
// If attempting a rejoin during a leave, then reset, cancelling the rejoin
321+
this.rejoinTimer.reset()
322+
307323
// Now set the state to leaving
308324
this.state = State.LEAVING
309325

@@ -384,12 +400,6 @@ class Channel(
384400
//------------------------------------------------------------------------------
385401
// Private
386402
//------------------------------------------------------------------------------
387-
/** Will continually attempt to rejoin the Channel on a timer. */
388-
private fun rejoinUntilConnected() {
389-
this.rejoinTimer.scheduleTimeout()
390-
if (this.socket.isConnected) this.rejoin()
391-
}
392-
393403
/** Sends the Channel's joinPush to the Server */
394404
private fun sendJoin(timeout: Long) {
395405
this.state = State.JOINING
@@ -398,6 +408,10 @@ class Channel(
398408

399409
/** Rejoins the Channel e.g. after a disconnect */
400410
private fun rejoin(timeout: Long = this.timeout) {
411+
// Do not attempt to rejoin if the channel is in the process of leaving
412+
if (isLeaving) return
413+
414+
// Send the joinPush
401415
this.sendJoin(timeout)
402416
}
403417
}

src/main/kotlin/org/phoenixframework/Defaults.kt

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,6 @@ object Defaults {
3434
/** Default heartbeat interval of 30s */
3535
const val HEARTBEAT: Long = 30_000
3636

37-
/** Default reconnect algorithm. Reconnects after 1s, 2s, 5s and then 10s thereafter */
38-
val steppedBackOff: (Int) -> Long = { tries ->
39-
if (tries > 3) 10_000 else listOf(1_000L, 2_000L, 5_000L)[tries - 1]
40-
}
41-
4237
/** Default reconnect algorithm for the socket */
4338
val reconnectSteppedBackOff: (Int) -> Long = { tries ->
4439
if (tries > 9) 5_000 else listOf(10L, 50L, 100L, 150L, 200L, 250L, 500L, 1_000L, 2_000L)[tries - 1]

src/test/kotlin/org/phoenixframework/ChannelTest.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,19 @@ import org.phoenixframework.utilities.getBindings
2222

2323
class ChannelTest {
2424

25+
companion object {
26+
const val DEFAULT_REF = "1"
27+
const val DEFAULT_TIMEOUT = 10_000L
28+
}
29+
30+
2531
@Mock lateinit var socket: Socket
2632
@Mock lateinit var mockCallback: ((Message) -> Unit)
2733

2834
private val kDefaultRef = "1"
2935
private val kDefaultTimeout = 10_000L
3036
private val kDefaultPayload: Payload = mapOf("one" to "two")
3137
private val kEmptyPayload: Payload = mapOf()
32-
private val reconnectAfterMs: (Int) -> Long = Defaults.steppedBackOff
3338

3439
lateinit var fakeClock: ManualDispatchQueue
3540
lateinit var channel: Channel
@@ -50,7 +55,8 @@ class ChannelTest {
5055
whenever(socket.dispatchQueue).thenReturn(fakeClock)
5156
whenever(socket.makeRef()).thenReturn(kDefaultRef)
5257
whenever(socket.timeout).thenReturn(kDefaultTimeout)
53-
whenever(socket.reconnectAfterMs).thenReturn(reconnectAfterMs)
58+
whenever(socket.reconnectAfterMs).thenReturn(Defaults.reconnectSteppedBackOff)
59+
whenever(socket.rejoinAfterMs).thenReturn(Defaults.rejoinSteppedBackOff)
5460

5561
channel = Channel("topic", kDefaultPayload, socket)
5662
}

src/test/kotlin/org/phoenixframework/PresenceTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class PresenceTest {
3838
whenever(socket.timeout).thenReturn(Defaults.TIMEOUT)
3939
whenever(socket.makeRef()).thenReturn("1")
4040
whenever(socket.reconnectAfterMs).thenReturn { 1_000 }
41+
whenever(socket.rejoinAfterMs).thenReturn(Defaults.rejoinSteppedBackOff)
4142
whenever(socket.dispatchQueue).thenReturn(mock())
4243

4344
channel = Channel("topic", mapOf(), socket)

0 commit comments

Comments
 (0)