Skip to content

Commit d4462ea

Browse files
authored
ensures Payload released in LimitingFlowCollector (#172)
1 parent 7c5d44c commit d4462ea

File tree

3 files changed

+24
-1
lines changed
  • rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/flow
  • rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor
  • rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local

3 files changed

+24
-1
lines changed

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/flow/LimitingFlowCollector.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@ internal class LimitingFlowCollector(
3939
}
4040

4141
override suspend fun emit(value: Payload): Unit = value.closeOnError {
42-
useRequest()
42+
try {
43+
useRequest()
44+
} catch (t: Throwable) {
45+
value.release()
46+
throw t
47+
}
4348
state.send(NextPayloadFrame(streamId, value))
4449
}
4550

rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpConnection.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ internal class TcpConnection(private val socket: Socket) : Connection, Coroutine
4242
private val receiveChannel = SafeChannel<ByteReadPacket>(8)
4343

4444
init {
45+
val channelCloseJob = Job(job)
4546
launch {
4647
socket.openWriteChannel(autoFlush = true).use {
4748
while (isActive) {
@@ -77,6 +78,14 @@ internal class TcpConnection(private val socket: Socket) : Connection, Coroutine
7778
val error = cause?.let { it as? CancellationException ?: CancellationException("Connection failed", it) }
7879
sendChannel.cancel(error)
7980
receiveChannel.cancel(error)
81+
CoroutineScope(job).launch {
82+
while (!sendChannel.isClosedForReceive || !sendChannel.isClosedForSend
83+
|| !receiveChannel.isClosedForReceive || !receiveChannel.isClosedForSend
84+
) {
85+
delay(1)
86+
}
87+
channelCloseJob.complete()
88+
}
8089
}
8190
}
8291

rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalServer.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,19 @@ internal constructor(
4444
val clientChannel = SafeChannel<ByteReadPacket>(Channel.UNLIMITED)
4545
val serverChannel = SafeChannel<ByteReadPacket>(Channel.UNLIMITED)
4646
val connectionJob = Job(job)
47+
val channelCloseJob = Job(job)
4748
connectionJob.invokeOnCompletion {
4849
val error = CancellationException("Connection failed", it)
4950
clientChannel.cancel(error)
5051
serverChannel.cancel(error)
52+
CoroutineScope(job).launch {
53+
while (!clientChannel.isClosedForReceive || !clientChannel.isClosedForSend
54+
|| !serverChannel.isClosedForReceive || !serverChannel.isClosedForSend
55+
) {
56+
delay(1)
57+
}
58+
channelCloseJob.complete()
59+
}
5160
}
5261
val clientConnection = LocalConnection(serverChannel, clientChannel, pool, connectionJob)
5362
val serverConnection = LocalConnection(clientChannel, serverChannel, pool, connectionJob)

0 commit comments

Comments
 (0)