Skip to content

Commit e7db7fa

Browse files
dustinconraddsrees
authored andcommitted
Manage bindings in concurrent data structures. (#26)
* Use concurrenthshmap and concurrentlinkedqueue for managing bindings * Change bindings to a val (from a var)
1 parent 6691cf8 commit e7db7fa

File tree

2 files changed

+38
-9
lines changed

2 files changed

+38
-9
lines changed

src/main/kotlin/org/phoenixframework/PhxChannel.kt

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package org.phoenixframework
22

33
import java.lang.IllegalStateException
4+
import java.util.concurrent.ConcurrentHashMap
5+
import java.util.concurrent.ConcurrentLinkedQueue
46

57

68
class PhxChannel(
@@ -44,7 +46,7 @@ class PhxChannel(
4446

4547

4648
var state: PhxChannel.PhxState
47-
var bindings: MutableList<Triple<String, Int, (PhxMessage) -> Unit>>
49+
val bindings: ConcurrentHashMap<String, ConcurrentLinkedQueue<Pair<Int, (PhxMessage) -> Unit>>>
4850
var bindingRef: Int
4951
var timeout: Long
5052
var joinedOnce: Boolean
@@ -58,7 +60,7 @@ class PhxChannel(
5860

5961
init {
6062
this.state = PhxChannel.PhxState.CLOSED
61-
this.bindings = ArrayList()
63+
this.bindings = ConcurrentHashMap()
6264
this.bindingRef = 0
6365
this.timeout = socket.timeout
6466
this.joinedOnce = false
@@ -197,7 +199,9 @@ class PhxChannel(
197199
val ref = bindingRef
198200
this.bindingRef = ref + 1
199201

200-
this.bindings.add(Triple(event, ref, callback))
202+
this.bindings.getOrPut(event) { ConcurrentLinkedQueue() }
203+
.add(ref to callback)
204+
201205
return ref
202206
}
203207

@@ -219,9 +223,11 @@ class PhxChannel(
219223
public fun off(event: String, ref: Int? = null) {
220224
// Remove any subscriptions that match the given event and ref ID. If no ref
221225
// ID is given, then remove all subscriptions for an event.
222-
this.bindings = bindings
223-
.filterNot { it.first == event && (ref == null || ref == it.second) }
224-
.toMutableList()
226+
if (ref != null) {
227+
this.bindings[event]?.removeIf{ ref == it.first }
228+
} else {
229+
this.bindings.remove(event)
230+
}
225231
}
226232

227233
/**
@@ -333,9 +339,7 @@ class PhxChannel(
333339
*/
334340
fun trigger(message: PhxMessage) {
335341
val handledMessage = onMessage(message)
336-
this.bindings
337-
.filter { it.first == message.event }
338-
.forEach { it.third(handledMessage) }
342+
this.bindings[message.event]?.forEach { it.second(handledMessage) }
339343
}
340344

341345
/**

src/test/kotlin/org/phoenixframework/PhxChannelTest.kt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import org.junit.Test
66
import org.mockito.Mockito
77
import org.mockito.MockitoAnnotations
88
import org.mockito.Spy
9+
import java.util.concurrent.CompletableFuture
10+
import java.util.concurrent.TimeUnit
911

1012
class PhxChannelTest {
1113

@@ -124,5 +126,28 @@ class PhxChannelTest {
124126
assertThat(aCalled).isFalse()
125127
assertThat(bCalled).isTrue()
126128
}
129+
130+
@Test
131+
fun `Issue 22`() {
132+
// This reproduces a concurrent modification exception. The original cause is most likely as follows:
133+
// 1. Push (And receive) messages very quickly
134+
// 2. PhxChannel.push, calls PhxPush.send()
135+
// 3. PhxPush calls startTimeout().
136+
// 4. PhxPush.startTimeout() calls this.channel.on(refEvent) - This modifies the bindings list
137+
// 5. any trigger (possibly from a timeout) can be iterating through the binding list that was modified in step 4.
138+
139+
val f1 = CompletableFuture.runAsync {
140+
for (i in 0..1000) {
141+
channel.on("event-$i") { /** do nothing **/ }
142+
}
143+
}
144+
val f3 = CompletableFuture.runAsync {
145+
for (i in 0..1000) {
146+
channel.trigger(PhxMessage(event = "event-$i", ref = defaultRef))
147+
}
148+
}
149+
150+
CompletableFuture.allOf(f1, f3).get(10, TimeUnit.SECONDS)
151+
}
127152
}
128153

0 commit comments

Comments
 (0)