@@ -35,33 +35,42 @@ typealias Payload = Map<String, Any?>
3535/* * Data class that holds callbacks assigned to the socket */
3636internal class StateChangeCallbacks {
3737
38- var open: List < () -> Unit > = ArrayList ()
38+ var open: List <Pair < String , () - > Unit > > = ArrayList ()
3939 private set
40- var close: List < () -> Unit > = ArrayList ()
40+ var close: List <Pair < String , () - > Unit > > = ArrayList ()
4141 private set
42- var error: List < (Throwable , Response ? ) -> Unit > = ArrayList ()
42+ var error: List <Pair < String , (Throwable , Response ?) - > Unit > > = ArrayList ()
4343 private set
44- var message: List < (Message ) -> Unit > = ArrayList ()
44+ var message: List <Pair < String , (Message ) - > Unit > > = ArrayList ()
4545 private set
4646
4747 /* * Safely adds an onOpen callback */
48- fun onOpen (callback : () -> Unit ) {
49- this .open = this .open + callback
48+ fun onOpen (ref : String , callback : () -> Unit ) {
49+ this .open = this .open + Pair (ref, callback)
5050 }
5151
5252 /* * Safely adds an onClose callback */
53- fun onClose (callback : () -> Unit ) {
54- this .close = this .close + callback
53+ fun onClose (ref : String , callback : () -> Unit ) {
54+ this .close = this .close + Pair (ref, callback)
5555 }
5656
5757 /* * Safely adds an onError callback */
58- fun onError (callback : (Throwable , Response ? ) -> Unit ) {
59- this .error = this .error + callback
58+ fun onError (ref : String , callback : (Throwable , Response ? ) -> Unit ) {
59+ this .error = this .error + Pair (ref, callback)
6060 }
6161
6262 /* * Safely adds an onMessage callback */
63- fun onMessage (callback : (Message ) -> Unit ) {
64- this .message = this .message + callback
63+ fun onMessage (ref : String , callback : (Message ) -> Unit ) {
64+ this .message = this .message + Pair (ref, callback)
65+ }
66+
67+ /* * Clears any callbacks with the matching refs */
68+ fun release (refs : List <String >) {
69+ open = open.filter { refs.contains(it.first) }
70+ close = close.filter { refs.contains(it.first) }
71+ error = error.filter { refs.contains(it.first) }
72+ message = message.filter { refs.contains(it.first) }
73+
6574 }
6675
6776 /* * Clears all stored callbacks */
@@ -151,8 +160,11 @@ class Socket(
151160 /* * Collection of unclosed channels created by the Socket */
152161 internal var channels: List <Channel > = ArrayList ()
153162
154- /* * Buffers messages that need to be sent once the socket has connected */
155- internal var sendBuffer: MutableList < () -> Unit > = ArrayList ()
163+ /* *
164+ * Buffers messages that need to be sent once the socket has connected. It is an array of Pairs
165+ * that contain the ref of the message to send and the callback that will send the message.
166+ */
167+ internal var sendBuffer: MutableList <Pair <String ?, () - > Unit >> = ArrayList ()
156168
157169 /* * Ref counter for messages */
158170 internal var ref: Int = 0
@@ -273,20 +285,20 @@ class Socket(
273285
274286 }
275287
276- fun onOpen (callback : (() -> Unit )) {
277- this . stateChangeCallbacks.onOpen(callback)
288+ fun onOpen (callback : (() -> Unit )): String {
289+ return makeRef(). apply { stateChangeCallbacks.onOpen(this , callback) }
278290 }
279291
280- fun onClose (callback : () -> Unit ) {
281- this . stateChangeCallbacks.onClose(callback)
292+ fun onClose (callback : () -> Unit ): String {
293+ return makeRef(). apply { stateChangeCallbacks.onClose(this , callback) }
282294 }
283295
284- fun onError (callback : (Throwable , Response ? ) -> Unit ) {
285- this . stateChangeCallbacks.onError(callback)
296+ fun onError (callback : (Throwable , Response ? ) -> Unit ): String {
297+ return makeRef(). apply { stateChangeCallbacks.onError(this , callback) }
286298 }
287299
288- fun onMessage (callback : (Message ) -> Unit ) {
289- this . stateChangeCallbacks.onMessage(callback)
300+ fun onMessage (callback : (Message ) -> Unit ): String {
301+ return makeRef(). apply { stateChangeCallbacks.onMessage(this , callback) }
290302 }
291303
292304 fun removeAllCallbacks () {
@@ -301,13 +313,24 @@ class Socket(
301313 }
302314
303315 fun remove (channel : Channel ) {
316+ this .off(channel.stateChangeRefs)
317+
304318 // To avoid a ConcurrentModificationException, filter out the channels to be
305319 // removed instead of calling .remove() on the list, thus returning a new list
306320 // that does not contain the channel that was removed.
307321 this .channels = channels
308322 .filter { it.joinRef != channel.joinRef }
309323 }
310324
325+ /* *
326+ * Removes [onOpen], [onClose], [onError], and [onMessage] registrations by their [ref] value.
327+ *
328+ * @param refs List of refs to remove
329+ */
330+ fun off (refs : List <String >) {
331+ this .stateChangeCallbacks.release(refs)
332+ }
333+
311334 // ------------------------------------------------------------------------------
312335 // Internal
313336 // ------------------------------------------------------------------------------
@@ -341,7 +364,7 @@ class Socket(
341364 } else {
342365 // If the socket is not connected, add the push to a buffer which will
343366 // be sent immediately upon connection.
344- sendBuffer.add(callback)
367+ sendBuffer.add(Pair (ref, callback) )
345368 }
346369 }
347370
@@ -374,7 +397,7 @@ class Socket(
374397
375398 // Since the connections onClose was null'd out, inform all state callbacks
376399 // that the Socket has closed
377- this .stateChangeCallbacks.close.forEach { it.invoke() }
400+ this .stateChangeCallbacks.close.forEach { it.second. invoke() }
378401 callback?.invoke()
379402 }
380403
@@ -391,11 +414,27 @@ class Socket(
391414 /* * Send all messages that were buffered before the socket opened */
392415 internal fun flushSendBuffer () {
393416 if (isConnected && sendBuffer.isNotEmpty()) {
394- this .sendBuffer.forEach { it.invoke() }
417+ this .sendBuffer.forEach { it.second. invoke() }
395418 this .sendBuffer.clear()
396419 }
397420 }
398421
422+ /* * Removes an item from the send buffer with the matching ref */
423+ internal fun removeFromSendBuffer (ref : String ) {
424+ this .sendBuffer = this .sendBuffer
425+ .filter { it.first != ref }
426+ .toMutableList()
427+ }
428+
429+ internal fun leaveOpenTopic (topic : String ) {
430+ this .channels
431+ .firstOrNull { it.topic == topic && (it.isJoined || it.isJoining) }
432+ ?.let {
433+ logItems(" Transport: Leaving duplicate topic: [$topic ]" )
434+ it.leave()
435+ }
436+ }
437+
399438 // ------------------------------------------------------------------------------
400439 // Heartbeat
401440 // ------------------------------------------------------------------------------
@@ -469,7 +508,7 @@ class Socket(
469508 this .resetHeartbeat()
470509
471510 // Inform all onOpen callbacks that the Socket has opened
472- this .stateChangeCallbacks.open.forEach { it.invoke() }
511+ this .stateChangeCallbacks.open.forEach { it.second. invoke() }
473512 }
474513
475514 internal fun onConnectionClosed (code : Int ) {
@@ -486,7 +525,7 @@ class Socket(
486525 }
487526
488527 // Inform callbacks the socket closed
489- this .stateChangeCallbacks.close.forEach { it.invoke() }
528+ this .stateChangeCallbacks.close.forEach { it.second. invoke() }
490529 }
491530
492531 internal fun onConnectionMessage (rawMessage : String ) {
@@ -504,7 +543,7 @@ class Socket(
504543 .forEach { it.trigger(message) }
505544
506545 // Inform all onMessage callbacks of the message
507- this .stateChangeCallbacks.message.forEach { it.invoke(message) }
546+ this .stateChangeCallbacks.message.forEach { it.second. invoke(message) }
508547 }
509548
510549 internal fun onConnectionError (t : Throwable , response : Response ? ) {
@@ -514,7 +553,7 @@ class Socket(
514553 this .triggerChannelError()
515554
516555 // Inform any state callbacks of the error
517- this .stateChangeCallbacks.error.forEach { it.invoke(t, response) }
556+ this .stateChangeCallbacks.error.forEach { it.second. invoke(t, response) }
518557 }
519558
520559}
0 commit comments