1+ package org.phoenixframework
2+
3+ import java.util.concurrent.ConcurrentLinkedQueue
4+
5+ /* *
6+ * Represents a binding to a Channel event
7+ */
8+ data class Binding (
9+ val event : String ,
10+ val ref : Int ,
11+ val callback : (Message ) -> Unit
12+ )
13+
14+ /* *
15+ * Represents a Channel bound to a given topic
16+ */
17+ class Channel (
18+ val topic : String ,
19+ var params : Payload ,
20+ internal val socket : Socket
21+ ) {
22+
23+ // ------------------------------------------------------------------------------
24+ // Channel Nested Enums
25+ // ------------------------------------------------------------------------------
26+ /* * States of a Channel */
27+ enum class State () {
28+ CLOSED ,
29+ ERRORED ,
30+ JOINED ,
31+ JOINING ,
32+ LEAVING
33+ }
34+
35+ /* * Channel specific events */
36+ enum class Event (val value : String ) {
37+ HEARTBEAT (" heartbeat" ),
38+ JOIN (" phx_join" ),
39+ LEAVE (" phx_leave" ),
40+ REPLY (" phx_reply" ),
41+ ERROR (" phx_error" ),
42+ CLOSE (" phx_close" );
43+
44+ companion object {
45+ /* * True if the event is one of Phoenix's channel lifecycle events */
46+ fun isLifecycleEvent (event : String ): Boolean {
47+ return when (event) {
48+ JOIN .value,
49+ LEAVE .value,
50+ REPLY .value,
51+ ERROR .value,
52+ CLOSE .value -> true
53+ else -> false
54+ }
55+ }
56+ }
57+ }
58+
59+ // ------------------------------------------------------------------------------
60+ // Channel Attributes
61+ // ------------------------------------------------------------------------------
62+ /* * Current state of the Channel */
63+ internal var state: Channel .State
64+
65+ /* * Collection of event bindings. */
66+ internal val bindings: ConcurrentLinkedQueue <Binding >
67+
68+ /* * Tracks event binding ref counters */
69+ internal var bindingRef: Int
70+
71+ /* * Timeout when attempting to join a Channel */
72+ internal var timeout: Long
73+
74+ /* * Set to true once the channel has attempted to join */
75+ var joinedOnce: Boolean
76+
77+ /* * Push to send then attempting to join */
78+ var joinPush: Push
79+
80+ /* * Buffer of Pushes that will be sent once the Channel's socket connects */
81+ var pushBuffer: MutableList <Push >
82+
83+ /* * Timer to attempt rejoins */
84+ var rejoinTimer: TimeoutTimer
85+
86+ /* *
87+ * Optional onMessage hook that can be provided. Receives all event messages for specialized
88+ * handling before dispatching to the Channel event callbacks.
89+ */
90+ var onMessage: (Message ) -> Message = { it }
91+
92+ init {
93+ this .state = State .CLOSED
94+ this .bindings = ConcurrentLinkedQueue ()
95+ this .bindingRef = 0
96+ this .timeout = socket.timeout
97+ this .joinedOnce = false
98+ this .pushBuffer = mutableListOf ()
99+ this .rejoinTimer = TimeoutTimer (
100+ scheduledExecutorService = socket.timerPool,
101+ callback = { rejoinUntilConnected() },
102+ timerCalculation = Defaults .steppedBackOff)
103+
104+ // Setup Push to be sent when joining
105+ this .joinPush = Push (
106+ channel = this ,
107+ event = Channel .Event .JOIN .value,
108+ payload = params,
109+ timeout = timeout)
110+
111+ // Perform once the Channel has joined
112+ this .joinPush.receive(" ok" ) {
113+ // Mark the Channel as joined
114+ this .state = State .JOINED
115+
116+ // Reset the timer, preventing it from attempting to join again
117+ this .rejoinTimer.reset()
118+
119+ // Send any buffered messages and clear the buffer
120+ this .pushBuffer.forEach { it.send() }
121+ this .pushBuffer.clear()
122+ }
123+
124+ // Perform if Channel timed out while attempting to join
125+ this .joinPush.receive(" timeout" ) { message ->
126+
127+ // Only handle a timeout if the Channel is in the 'joining' state
128+ if (! this .isJoining) return @receive
129+
130+ this .socket.logItems(" Channel: timeouts $topic , $joinRef after $timeout ms" )
131+
132+ // Send a Push to the server to leave the Channel
133+ val leavePush = Push (
134+ channel = this ,
135+ event = Channel .Event .LEAVE .value)
136+ leavePush.send()
137+
138+ // Mark the Channel as in an error and attempt to rejoin
139+ this .state = State .ERRORED
140+ this .joinPush.reset()
141+ this .rejoinTimer.scheduleTimeout()
142+ }
143+
144+ // Clean up when the channel closes
145+ this .onClose {
146+ // Reset any timer that may be on-going
147+ this .rejoinTimer.reset()
148+
149+ // Log that the channel was left
150+ this .socket.logItems(" Channel: close $topic " )
151+
152+ // Mark the channel as closed and remove it from the socket
153+ this .state = State .CLOSED
154+ this .socket.remove(this )
155+ }
156+
157+ // Handles an error, attempts to rejoin
158+ this .onError {
159+ // Do not emit error if the channel is in the process of leaving
160+ // or if it has already closed
161+ if (this .isLeaving || this .isClosed) return @onError
162+
163+ // Log that the channel received an error
164+ this .socket.logItems(" Channel: error $topic " )
165+
166+ // Mark the channel as errored and attempt to rejoin
167+ this .state = State .ERRORED
168+ this .rejoinTimer.scheduleTimeout()
169+ }
170+
171+ // Perform when the join reply is received
172+ this .on(Event .REPLY ) { message ->
173+ this .trigger(replyEventName(message.ref), message.payload, message.ref, message.joinRef)
174+ }
175+ }
176+
177+ // ------------------------------------------------------------------------------
178+ // Public Properties
179+ // ------------------------------------------------------------------------------
180+ /* * The ref sent during the join message. */
181+ val joinRef: String? get() = joinPush.ref
182+
183+ /* * @return True if the Channel can push messages */
184+ val canPush: Boolean
185+ get() = this .socket.isConnected && this .isJoined
186+
187+ /* * @return: True if the Channel has been closed */
188+ val isClosed: Boolean
189+ get() = state == State .CLOSED
190+
191+ /* * @return: True if the Channel experienced an error */
192+ val isErrored: Boolean
193+ get() = state == State .ERRORED
194+
195+ /* * @return: True if the channel has joined */
196+ val isJoined: Boolean
197+ get() = state == State .JOINED
198+
199+ /* * @return: True if the channel has requested to join */
200+ val isJoining: Boolean
201+ get() = state == State .JOINING
202+
203+ /* * @return: True if the channel has requested to leave */
204+ val isLeaving: Boolean
205+ get() = state == State .LEAVING
206+
207+ // ------------------------------------------------------------------------------
208+ // Public
209+ // ------------------------------------------------------------------------------
210+ fun join (timeout : Long = Defaults .TIMEOUT ): Push {
211+ // Ensure that `.join()` is called only once per Channel instance
212+ if (joinedOnce) {
213+ throw IllegalStateException (
214+ " Tried to join channel multiple times. `join()` can only be called once per channel" )
215+ }
216+
217+ // Join the channel
218+ this .joinedOnce = true
219+ this .rejoin(timeout)
220+ return joinPush
221+ }
222+
223+ fun onClose (callback : (Message ) -> Unit ): Int {
224+ return this .on(Event .CLOSE , callback)
225+ }
226+
227+ fun onError (callback : (Message ) -> Unit ): Int {
228+ return this .on(Event .ERROR , callback)
229+ }
230+
231+ fun onMessage (callback : (Message ) -> Message ) {
232+ this .onMessage = callback
233+ }
234+
235+ fun on (event : Channel .Event , callback : (Message ) -> Unit ): Int {
236+ return this .on(event.value, callback)
237+ }
238+
239+ fun on (event : String , callback : (Message ) -> Unit ): Int {
240+ val ref = bindingRef
241+ this .bindingRef = ref + 1
242+
243+ this .bindings.add(Binding (event, ref, callback))
244+ return ref
245+ }
246+
247+ fun off (event : String , ref : Int? = null) {
248+ this .bindings.removeAll { bind ->
249+ bind.event == event && (ref == null || ref == bind.ref)
250+ }
251+ }
252+
253+ fun push (event : String , payload : Payload , timeout : Long = Defaults .TIMEOUT ): Push {
254+ if (! joinedOnce) {
255+ // If the Channel has not been joined, throw an exception
256+ throw RuntimeException (
257+ " Tried to push $event to $topic before joining. Use channel.join() before pushing events" )
258+ }
259+
260+ val pushEvent = Push (this , event, payload, timeout)
261+
262+ if (canPush) {
263+ pushEvent.send()
264+ } else {
265+ pushEvent.startTimeout()
266+ pushBuffer.add(pushEvent)
267+ }
268+
269+ return pushEvent
270+ }
271+
272+ fun leave (timeout : Long = Defaults .TIMEOUT ): Push {
273+ this .state = State .LEAVING
274+
275+ // Perform the same behavior if the channel leaves successfully or not
276+ val onClose: ((Message ) -> Unit ) = {
277+ this .socket.logItems(" Channel: leave $topic " )
278+ this .trigger(it)
279+ }
280+
281+ // Push event to send to the server
282+ val leavePush = Push (
283+ channel = this ,
284+ event = Event .LEAVE .value,
285+ timeout = timeout)
286+
287+ leavePush
288+ .receive(" ok" , onClose)
289+ .receive(" timeout" , onClose)
290+ leavePush.send()
291+
292+ // If the Channel cannot send push events, trigger a success locally
293+ if (! canPush) leavePush.trigger(" ok" , hashMapOf())
294+
295+ return leavePush
296+ }
297+
298+ // ------------------------------------------------------------------------------
299+ // Internal
300+ // ------------------------------------------------------------------------------
301+ /* * Checks if a Message's event belongs to this Channel instance */
302+ internal fun isMember (message : Message ): Boolean {
303+ if (message.topic != this .topic) return false
304+
305+ val isLifecycleEvent = Event .isLifecycleEvent(message.event)
306+
307+ // If the message is a lifecycle event and it is not a join for this channel, drop the outdated message
308+ if (message.joinRef != null && isLifecycleEvent && message.joinRef != this .joinRef) {
309+ this .socket.logItems(" Channel: Dropping outdated message. ${message.topic} " )
310+ return false
311+ }
312+
313+ return true
314+ }
315+
316+ internal fun trigger (
317+ event : String ,
318+ payload : Payload = hashMapOf(),
319+ ref : String = "",
320+ joinRef : String? = null
321+ ) {
322+ this .trigger(Message (ref, topic, event, payload, joinRef))
323+ }
324+
325+ internal fun trigger (message : Message ) {
326+ // Inform the onMessage hook of the message
327+ val handledMessage = this .onMessage(message)
328+
329+ // Inform all matching event bindings of the message
330+ this .bindings
331+ .filter { it.event == message.event }
332+ .forEach { it.callback(handledMessage) }
333+ }
334+
335+ /* * Create an event with a given ref */
336+ internal fun replyEventName (ref : String ): String {
337+ return " chan_reply_$ref "
338+ }
339+
340+ // ------------------------------------------------------------------------------
341+ // Private
342+ // ------------------------------------------------------------------------------
343+ /* * Will continually attempt to rejoin the Channel on a timer. */
344+ private fun rejoinUntilConnected () {
345+ this .rejoinTimer.scheduleTimeout()
346+ if (this .socket.isConnected) this .rejoin()
347+ }
348+
349+ /* * Sends the Channel's joinPush to the Server */
350+ private fun sendJoin (timeout : Long ) {
351+ this .state = State .JOINING
352+ this .joinPush.resend(timeout)
353+ }
354+
355+ /* * Rejoins the Channel e.g. after a disconnect */
356+ private fun rejoin (timeout : Long = Defaults .TIMEOUT ) {
357+ this .sendJoin(timeout)
358+ }
359+ }
0 commit comments