1+ /*
2+ * Copyright (c) 2019 Daniel Rees <[email protected] > 3+ *
4+ * Permission is hereby granted, free of charge, to any person obtaining a copy
5+ * of this software and associated documentation files (the "Software"), to deal
6+ * in the Software without restriction, including without limitation the rights
7+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+ * copies of the Software, and to permit persons to whom the Software is
9+ * furnished to do so, subject to the following conditions:
10+ *
11+ * The above copyright notice and this permission notice shall be included in
12+ * all copies or substantial portions of the Software.
13+ *
14+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20+ * THE SOFTWARE.
21+ */
22+
123package org.phoenixframework
224
325import java.util.concurrent.ConcurrentLinkedQueue
@@ -16,7 +38,7 @@ data class Binding(
1638 */
1739class Channel (
1840 val topic : String ,
19- var params : Payload ,
41+ params : Payload ,
2042 internal val socket : Socket
2143) {
2244
@@ -60,7 +82,7 @@ class Channel(
6082 // Channel Attributes
6183 // ------------------------------------------------------------------------------
6284 /* * Current state of the Channel */
63- internal var state: Channel . State
85+ internal var state: State
6486
6587 /* * Collection of event bindings. */
6688 internal val bindings: ConcurrentLinkedQueue <Binding >
@@ -71,23 +93,30 @@ class Channel(
7193 /* * Timeout when attempting to join a Channel */
7294 internal var timeout: Long
7395
96+ /* * Params passed in through constructions and provided to the JoinPush */
97+ var params: Payload = params
98+ set(value) {
99+ joinPush.payload = value
100+ field = value
101+ }
102+
74103 /* * Set to true once the channel has attempted to join */
75- var joinedOnce: Boolean
104+ internal var joinedOnce: Boolean
76105
77106 /* * Push to send then attempting to join */
78- var joinPush: Push
107+ internal var joinPush: Push
79108
80109 /* * Buffer of Pushes that will be sent once the Channel's socket connects */
81- var pushBuffer: MutableList <Push >
110+ internal var pushBuffer: MutableList <Push >
82111
83112 /* * Timer to attempt rejoins */
84- var rejoinTimer: TimeoutTimer
113+ internal var rejoinTimer: TimeoutTimer
85114
86115 /* *
87116 * Optional onMessage hook that can be provided. Receives all event messages for specialized
88117 * handling before dispatching to the Channel event callbacks.
89118 */
90- var onMessage: (Message ) -> Message = { it }
119+ internal var onMessage: (Message ) -> Message = { it }
91120
92121 init {
93122 this .state = State .CLOSED
@@ -97,14 +126,14 @@ class Channel(
97126 this .joinedOnce = false
98127 this .pushBuffer = mutableListOf ()
99128 this .rejoinTimer = TimeoutTimer (
100- scheduledExecutorService = socket.timerPool ,
129+ dispatchQueue = socket.dispatchQueue ,
101130 callback = { rejoinUntilConnected() },
102- timerCalculation = Defaults .steppedBackOff )
131+ timerCalculation = socket.reconnectAfterMs )
103132
104133 // Setup Push to be sent when joining
105134 this .joinPush = Push (
106135 channel = this ,
107- event = Channel . Event .JOIN .value,
136+ event = Event .JOIN .value,
108137 payload = params,
109138 timeout = timeout)
110139
@@ -122,7 +151,7 @@ class Channel(
122151 }
123152
124153 // Perform if Channel timed out while attempting to join
125- this .joinPush.receive(" timeout" ) { message ->
154+ this .joinPush.receive(" timeout" ) {
126155
127156 // Only handle a timeout if the Channel is in the 'joining' state
128157 if (! this .isJoining) return @receive
@@ -132,7 +161,8 @@ class Channel(
132161 // Send a Push to the server to leave the Channel
133162 val leavePush = Push (
134163 channel = this ,
135- event = Channel .Event .LEAVE .value)
164+ event = Event .LEAVE .value,
165+ timeout = this .timeout)
136166 leavePush.send()
137167
138168 // Mark the Channel as in an error and attempt to rejoin
@@ -207,7 +237,7 @@ class Channel(
207237 // ------------------------------------------------------------------------------
208238 // Public
209239 // ------------------------------------------------------------------------------
210- fun join (timeout : Long = Defaults . TIMEOUT ): Push {
240+ fun join (timeout : Long = this.timeout ): Push {
211241 // Ensure that `.join()` is called only once per Channel instance
212242 if (joinedOnce) {
213243 throw IllegalStateException (
@@ -232,7 +262,7 @@ class Channel(
232262 this .onMessage = callback
233263 }
234264
235- fun on (event : Channel . Event , callback : (Message ) -> Unit ): Int {
265+ fun on (event : Event , callback : (Message ) -> Unit ): Int {
236266 return this .on(event.value, callback)
237267 }
238268
@@ -250,7 +280,7 @@ class Channel(
250280 }
251281 }
252282
253- fun push (event : String , payload : Payload , timeout : Long = Defaults . TIMEOUT ): Push {
283+ fun push (event : String , payload : Payload , timeout : Long = this.timeout ): Push {
254284 if (! joinedOnce) {
255285 // If the Channel has not been joined, throw an exception
256286 throw RuntimeException (
@@ -269,13 +299,18 @@ class Channel(
269299 return pushEvent
270300 }
271301
272- fun leave (timeout : Long = Defaults .TIMEOUT ): Push {
302+ fun leave (timeout : Long = this.timeout): Push {
303+ // Can push is dependent upon state == JOINED. Once we set it to LEAVING, then canPush
304+ // will return false, so instead store it _before_ starting the leave
305+ val canPush = this .canPush
306+
307+ // Now set the state to leaving
273308 this .state = State .LEAVING
274309
275310 // Perform the same behavior if the channel leaves successfully or not
276311 val onClose: ((Message ) -> Unit ) = {
277312 this .socket.logItems(" Channel: leave $topic " )
278- this .trigger(it )
313+ this .trigger(Event . CLOSE , mapOf ( " reason " to " leave " ) )
279314 }
280315
281316 // Push event to send to the server
@@ -313,6 +348,15 @@ class Channel(
313348 return true
314349 }
315350
351+ internal fun trigger (
352+ event : Event ,
353+ payload : Payload = hashMapOf(),
354+ ref : String = "",
355+ joinRef : String? = null
356+ ) {
357+ this .trigger(event.value, payload, ref, joinRef)
358+ }
359+
316360 internal fun trigger (
317361 event : String ,
318362 payload : Payload = hashMapOf(),
@@ -353,7 +397,7 @@ class Channel(
353397 }
354398
355399 /* * Rejoins the Channel e.g. after a disconnect */
356- private fun rejoin (timeout : Long = Defaults . TIMEOUT ) {
400+ private fun rejoin (timeout : Long = this.timeout ) {
357401 this .sendJoin(timeout)
358402 }
359403}
0 commit comments