From 58f2ba1a665d99f17e537c5e197b974c2b59f03e Mon Sep 17 00:00:00 2001 From: Daniel Rees Date: Tue, 6 Aug 2019 21:57:44 -0400 Subject: [PATCH 1/4] Made StateChangeCallbacks read-only and handling adding elements to them --- .../kotlin/org/phoenixframework/Socket.kt | 62 +++++++++--- .../kotlin/org/phoenixframework/SocketTest.kt | 94 ++++++++++++++++++- 2 files changed, 140 insertions(+), 16 deletions(-) diff --git a/src/main/kotlin/org/phoenixframework/Socket.kt b/src/main/kotlin/org/phoenixframework/Socket.kt index 92ece3a..e5116a8 100644 --- a/src/main/kotlin/org/phoenixframework/Socket.kt +++ b/src/main/kotlin/org/phoenixframework/Socket.kt @@ -33,18 +33,51 @@ import java.util.concurrent.TimeUnit typealias Payload = Map /** Data class that holds callbacks assigned to the socket */ -internal data class StateChangeCallbacks( - val open: MutableList<() -> Unit> = ArrayList(), - val close: MutableList<() -> Unit> = ArrayList(), - val error: MutableList<(Throwable, Response?) -> Unit> = ArrayList(), - val message: MutableList<(Message) -> Unit> = ArrayList() -) { +internal class StateChangeCallbacks { + + var open: List<() -> Unit> = ArrayList() + private set + var close: List<() -> Unit> = ArrayList() + private set + var error: List<(Throwable, Response?) -> Unit> = ArrayList() + private set + var message: List<(Message) -> Unit> = ArrayList() + private set + + /** Safely adds an onOpen callback */ + fun onOpen(callback: () -> Unit) { + this.open = toMutateAdd(open, callback) + } + + /** Safely adds an onClose callback */ + fun onClose(callback: () -> Unit) { + this.close = toMutateAdd(close, callback) + } + + /** Safely adds an onError callback */ + fun onError(callback: (Throwable, Response?) -> Unit) { + this.error = toMutateAdd(error, callback) + } + + /** Safely adds an onMessage callback */ + fun onMessage(callback: (Message) -> Unit) { + this.message = toMutateAdd(message, callback) + } + /** Clears all stored callbacks */ fun release() { - open.clear() - close.clear() - error.clear() - message.clear() + open = emptyList() + close = emptyList() + error = emptyList() + message = emptyList() + } + + + private fun toMutateAdd(list: List, callback: T): List { + val temp = list.toMutableList() + temp.add(callback) + + return temp } } @@ -54,7 +87,6 @@ const val WS_CLOSE_NORMAL = 1000 /** RFC 6455: indicates that the connection was closed abnormally */ const val WS_CLOSE_ABNORMAL = 1006 - /** * Connects to a Phoenix Server */ @@ -250,19 +282,19 @@ class Socket( } fun onOpen(callback: (() -> Unit)) { - this.stateChangeCallbacks.open.add(callback) + this.stateChangeCallbacks.onOpen(callback) } fun onClose(callback: () -> Unit) { - this.stateChangeCallbacks.close.add(callback) + this.stateChangeCallbacks.onClose(callback) } fun onError(callback: (Throwable, Response?) -> Unit) { - this.stateChangeCallbacks.error.add(callback) + this.stateChangeCallbacks.onError(callback) } fun onMessage(callback: (Message) -> Unit) { - this.stateChangeCallbacks.message.add(callback) + this.stateChangeCallbacks.onMessage(callback) } fun removeAllCallbacks() { diff --git a/src/test/kotlin/org/phoenixframework/SocketTest.kt b/src/test/kotlin/org/phoenixframework/SocketTest.kt index 4ca29c1..26434d0 100644 --- a/src/test/kotlin/org/phoenixframework/SocketTest.kt +++ b/src/test/kotlin/org/phoenixframework/SocketTest.kt @@ -397,7 +397,6 @@ class SocketTest { channel1.join().trigger("ok", emptyMap()) channel2.join().trigger("ok", emptyMap()) - var chan1Called = false channel1.onError { chan1Called = true } @@ -923,4 +922,97 @@ class SocketTest { /* End OnConnectionMessage */ } + + @Nested + @DisplayName("ConcurrentModificationException") + inner class ConcurrentModificationExceptionTests { + + @Test + internal fun `onOpen does not throw`() { + var oneCalled = 0 + var twoCalled = 0 + socket.onOpen { + socket.onOpen { twoCalled += 1 } + oneCalled += 1 + } + + socket.onConnectionOpened() + assertThat(oneCalled).isEqualTo(1) + assertThat(twoCalled).isEqualTo(0) + + socket.onConnectionOpened() + assertThat(oneCalled).isEqualTo(2) + assertThat(twoCalled).isEqualTo(1) + } + + @Test + internal fun `onClose does not throw`() { + var oneCalled = 0 + var twoCalled = 0 + socket.onClose { + socket.onClose { twoCalled += 1 } + oneCalled += 1 + } + + socket.onConnectionClosed(1000) + assertThat(oneCalled).isEqualTo(1) + assertThat(twoCalled).isEqualTo(0) + + socket.onConnectionClosed(1001) + assertThat(oneCalled).isEqualTo(2) + assertThat(twoCalled).isEqualTo(1) + } + + @Test + internal fun `onError does not throw`() { + var oneCalled = 0 + var twoCalled = 0 + socket.onError { _, _-> + socket.onError { _, _ -> twoCalled += 1 } + oneCalled += 1 + } + + socket.onConnectionError(Throwable(), null) + assertThat(oneCalled).isEqualTo(1) + assertThat(twoCalled).isEqualTo(0) + + socket.onConnectionError(Throwable(), null) + assertThat(oneCalled).isEqualTo(2) + assertThat(twoCalled).isEqualTo(1) + } + + @Test + internal fun `onMessage does not throw`() { + var oneCalled = 0 + var twoCalled = 0 + socket.onMessage { + socket.onMessage { twoCalled += 1 } + oneCalled += 1 + } + + socket.onConnectionMessage("{\"status\":\"ok\"}") + assertThat(oneCalled).isEqualTo(1) + assertThat(twoCalled).isEqualTo(0) + + socket.onConnectionMessage("{\"status\":\"ok\"}") + assertThat(oneCalled).isEqualTo(2) + assertThat(twoCalled).isEqualTo(1) + } + + @Test + internal fun `does not throw when adding channel`() { + var oneCalled = 0 + socket.onOpen { + val channel = socket.channel("foo") + oneCalled += 1 + } + + socket.onConnectionOpened() + assertThat(oneCalled).isEqualTo(1) + } + + /* End ConcurrentModificationExceptionTests */ + } + + } \ No newline at end of file From 017b58e2d60555556a858977f8cf27a739921de2 Mon Sep 17 00:00:00 2001 From: Daniel Rees Date: Thu, 8 Aug 2019 09:35:31 -0400 Subject: [PATCH 2/4] Channels are read-only --- .../kotlin/org/phoenixframework/Socket.kt | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/main/kotlin/org/phoenixframework/Socket.kt b/src/main/kotlin/org/phoenixframework/Socket.kt index e5116a8..c13443b 100644 --- a/src/main/kotlin/org/phoenixframework/Socket.kt +++ b/src/main/kotlin/org/phoenixframework/Socket.kt @@ -46,22 +46,22 @@ internal class StateChangeCallbacks { /** Safely adds an onOpen callback */ fun onOpen(callback: () -> Unit) { - this.open = toMutateAdd(open, callback) + this.open = this.open.copyAndAdd(callback) } /** Safely adds an onClose callback */ fun onClose(callback: () -> Unit) { - this.close = toMutateAdd(close, callback) + this.close = this.close.copyAndAdd(callback) } /** Safely adds an onError callback */ fun onError(callback: (Throwable, Response?) -> Unit) { - this.error = toMutateAdd(error, callback) + this.error = this.error.copyAndAdd(callback) } /** Safely adds an onMessage callback */ fun onMessage(callback: (Message) -> Unit) { - this.message = toMutateAdd(message, callback) + this.message = this.message.copyAndAdd(callback) } /** Clears all stored callbacks */ @@ -71,16 +71,17 @@ internal class StateChangeCallbacks { error = emptyList() message = emptyList() } +} +/** Converts the List to a MutableList, adds the value, and then returns as a read-only List */ +fun List.copyAndAdd(value: T): List { + val temp = this.toMutableList() + temp.add(value) - private fun toMutateAdd(list: List, callback: T): List { - val temp = list.toMutableList() - temp.add(callback) - - return temp - } + return temp } + /** RFC 6455: indicates a normal closure */ const val WS_CLOSE_NORMAL = 1000 @@ -157,7 +158,7 @@ class Socket( internal val stateChangeCallbacks: StateChangeCallbacks = StateChangeCallbacks() /** Collection of unclosed channels created by the Socket */ - internal var channels: MutableList = ArrayList() + internal var channels: List = ArrayList() /** Buffers messages that need to be sent once the socket has connected */ internal var sendBuffer: MutableList<() -> Unit> = ArrayList() @@ -303,7 +304,7 @@ class Socket( fun channel(topic: String, params: Payload = mapOf()): Channel { val channel = Channel(topic, params, this) - this.channels.add(channel) + this.channels.copyAndAdd(channel) return channel } @@ -314,7 +315,6 @@ class Socket( // that does not contain the channel that was removed. this.channels = channels .filter { it.joinRef != channel.joinRef } - .toMutableList() } //------------------------------------------------------------------------------ From fd0b22700c9b9dc8ff2433175b2c5a740f36c5ca Mon Sep 17 00:00:00 2001 From: Daniel Rees Date: Thu, 8 Aug 2019 09:46:29 -0400 Subject: [PATCH 3/4] Channel and Push hooks moved to read only list with copy --- src/main/kotlin/org/phoenixframework/Push.kt | 12 +++----- .../kotlin/org/phoenixframework/Socket.kt | 2 +- .../kotlin/org/phoenixframework/SocketTest.kt | 29 ++++++++++--------- .../utilities/TestUtilities.kt | 8 +++++ 4 files changed, 28 insertions(+), 23 deletions(-) diff --git a/src/main/kotlin/org/phoenixframework/Push.kt b/src/main/kotlin/org/phoenixframework/Push.kt index f6ee918..bac2a10 100644 --- a/src/main/kotlin/org/phoenixframework/Push.kt +++ b/src/main/kotlin/org/phoenixframework/Push.kt @@ -45,7 +45,7 @@ class Push( var timeoutTask: DispatchWorkItem? = null /** Hooks into a Push. Where .receive("ok", callback(Payload)) are stored */ - var receiveHooks: MutableMap Unit)>> = HashMap() + var receiveHooks: MutableMap Unit)>> = HashMap() /** True if the Push has been sent */ var sent: Boolean = false @@ -93,13 +93,9 @@ class Push( // If the message has already be received, pass it to the callback receivedMessage?.let { if (hasReceived(status)) callback(it) } - if (receiveHooks[status] == null) { - // Create a new array of hooks if no previous hook is associated with status - receiveHooks[status] = arrayListOf(callback) - } else { - // A previous hook for this status already exists. Just append the new hook - receiveHooks[status]?.add(callback) - } + // If a previous hook for this status already exists. Just append the new hook. If not, then + // create a new array of hooks if no previous hook is associated with status + receiveHooks[status] = receiveHooks[status]?.copyAndAdd(callback) ?: arrayListOf(callback) return this } diff --git a/src/main/kotlin/org/phoenixframework/Socket.kt b/src/main/kotlin/org/phoenixframework/Socket.kt index c13443b..410d929 100644 --- a/src/main/kotlin/org/phoenixframework/Socket.kt +++ b/src/main/kotlin/org/phoenixframework/Socket.kt @@ -304,7 +304,7 @@ class Socket( fun channel(topic: String, params: Payload = mapOf()): Channel { val channel = Channel(topic, params, this) - this.channels.copyAndAdd(channel) + this.channels = this.channels.copyAndAdd(channel) return channel } diff --git a/src/test/kotlin/org/phoenixframework/SocketTest.kt b/src/test/kotlin/org/phoenixframework/SocketTest.kt index 26434d0..fbb3ac2 100644 --- a/src/test/kotlin/org/phoenixframework/SocketTest.kt +++ b/src/test/kotlin/org/phoenixframework/SocketTest.kt @@ -19,6 +19,7 @@ import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test import org.mockito.Mock import org.mockito.MockitoAnnotations +import org.phoenixframework.utilities.copyAndRemove import java.net.URL import java.util.concurrent.TimeUnit @@ -755,8 +756,8 @@ class SocketTest { val spy = spy(channel) // Use the spy instance instead of the Channel instance - socket.channels.remove(channel) - socket.channels.add(spy) + socket.channels = socket.channels.copyAndRemove(channel) + socket.channels = socket.channels.copyAndAdd(spy) spy.join() assertThat(spy.state).isEqualTo(Channel.State.JOINING) @@ -771,8 +772,8 @@ class SocketTest { val spy = spy(channel) // Use the spy instance instead of the Channel instance - socket.channels.remove(channel) - socket.channels.add(spy) + socket.channels = socket.channels.copyAndRemove(channel) + socket.channels = socket.channels.copyAndAdd(spy) spy.join().trigger("ok", emptyMap()) @@ -788,8 +789,8 @@ class SocketTest { val spy = spy(channel) // Use the spy instance instead of the Channel instance - socket.channels.remove(channel) - socket.channels.add(spy) + socket.channels = socket.channels.copyAndRemove(channel) + socket.channels = socket.channels.copyAndAdd(spy) spy.join().trigger("ok", emptyMap()) spy.leave() @@ -827,8 +828,8 @@ class SocketTest { val spy = spy(channel) // Use the spy instance instead of the Channel instance - socket.channels.remove(channel) - socket.channels.add(spy) + socket.channels = socket.channels.copyAndRemove(channel) + socket.channels = socket.channels.copyAndAdd(spy) spy.join() assertThat(spy.state).isEqualTo(Channel.State.JOINING) @@ -843,8 +844,8 @@ class SocketTest { val spy = spy(channel) // Use the spy instance instead of the Channel instance - socket.channels.remove(channel) - socket.channels.add(spy) + socket.channels = socket.channels.copyAndRemove(channel) + socket.channels = socket.channels.copyAndAdd(spy) spy.join().trigger("ok", emptyMap()) @@ -860,8 +861,8 @@ class SocketTest { val spy = spy(channel) // Use the spy instance instead of the Channel instance - socket.channels.remove(channel) - socket.channels.add(spy) + socket.channels = socket.channels.copyAndRemove(channel) + socket.channels = socket.channels.copyAndAdd(spy) spy.join().trigger("ok", emptyMap()) spy.leave() @@ -885,8 +886,8 @@ class SocketTest { val otherChannel = mock() whenever(otherChannel.isMember(any())).thenReturn(false) - socket.channels.add(targetChannel) - socket.channels.add(otherChannel) + socket.channels = socket.channels.copyAndAdd(targetChannel) + socket.channels = socket.channels.copyAndRemove(otherChannel) val rawMessage = "{\"topic\":\"topic\",\"event\":\"event\",\"payload\":{\"one\":\"two\"},\"status\":\"ok\"}" diff --git a/src/test/kotlin/org/phoenixframework/utilities/TestUtilities.kt b/src/test/kotlin/org/phoenixframework/utilities/TestUtilities.kt index 015e95e..dcbba7a 100644 --- a/src/test/kotlin/org/phoenixframework/utilities/TestUtilities.kt +++ b/src/test/kotlin/org/phoenixframework/utilities/TestUtilities.kt @@ -5,4 +5,12 @@ import org.phoenixframework.Channel fun Channel.getBindings(event: String): List { return bindings.toList().filter { it.event == event } +} + +/** Converts the List to a MutableList, removes the value, and then returns as a read-only List */ +fun List.copyAndRemove(value: T): List { + val temp = this.toMutableList() + temp.remove(value) + + return temp } \ No newline at end of file From ec430fb4ef9779a08e8f254b21f26880d97ee025 Mon Sep 17 00:00:00 2001 From: Daniel Rees Date: Thu, 8 Aug 2019 09:53:08 -0400 Subject: [PATCH 4/4] only run on trusty --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 89f0cc1..1800eea 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,5 @@ language: java +dist: trusty after_success: - bash <(curl -s https://codecov.io/bash) jdk: