From c872cec8977a5e59f76121a489f9e18dd66f2ce0 Mon Sep 17 00:00:00 2001 From: olme04 Date: Tue, 13 Jul 2021 10:00:32 +0300 Subject: [PATCH] fragmentation and reassembly --- .../rsocket/kotlin/core/RSocketConnector.kt | 2 + .../kotlin/core/RSocketConnectorBuilder.kt | 8 ++ .../io/rsocket/kotlin/core/RSocketServer.kt | 2 + .../kotlin/core/RSocketServerBuilder.kt | 9 +- .../io/rsocket/kotlin/internal/Connect.kt | 8 +- .../io/rsocket/kotlin/internal/FrameSender.kt | 120 ++++++++++++++++++ .../io/rsocket/kotlin/internal/Prioritizer.kt | 2 +- .../kotlin/internal/RSocketRequester.kt | 63 ++++----- .../kotlin/internal/RSocketResponder.kt | 21 ++- .../rsocket/kotlin/internal/StreamsStorage.kt | 15 ++- .../kotlin/internal/handler/FrameHandler.kt | 40 ++++-- .../RequesterRequestChannelFrameHandler.kt | 5 +- .../RequesterRequestResponseFrameHandler.kt | 7 +- .../RequesterRequestStreamFrameHandler.kt | 7 +- .../ResponderFireAndForgetFrameHandler.kt | 5 +- .../ResponderRequestChannelFrameHandler.kt | 7 +- .../ResponderRequestResponseFrameHandler.kt | 7 +- .../ResponderRequestStreamFrameHandler.kt | 5 +- .../kotlin/internal/FrameSenderTest.kt | 57 +++++++++ .../kotlin/internal/PrioritizerTest.kt | 100 +++++++++++++++ .../kotlin/internal/RSocketRequesterTest.kt | 1 + ...equesterRequestResponseFrameHandlerTest.kt | 64 ++++++++++ .../rsocket/kotlin/keepalive/KeepAliveTest.kt | 1 + .../kotlin/internal/handler/FrameHandler.kt | 8 +- .../kotlin/internal/handler/FrameHandler.kt | 8 +- .../kotlin/internal/handler/FrameHandler.kt | 8 +- 26 files changed, 502 insertions(+), 78 deletions(-) create mode 100644 rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/FrameSender.kt create mode 100644 rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/FrameSenderTest.kt create mode 100644 rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/PrioritizerTest.kt create mode 100644 rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestResponseFrameHandlerTest.kt diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt index 37989d720..55e89179c 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt @@ -27,6 +27,7 @@ import kotlinx.coroutines.* @OptIn(TransportApi::class, RSocketLoggingApi::class) public class RSocketConnector internal constructor( private val loggerFactory: LoggerFactory, + private val maxFragmentSize: Int, private val interceptors: Interceptors, private val connectionConfigProvider: () -> ConnectionConfig, private val acceptor: ConnectionAcceptor, @@ -61,6 +62,7 @@ public class RSocketConnector internal constructor( try { val requester = connection.connect( isServer = false, + maxFragmentSize = maxFragmentSize, interceptors = interceptors, connectionConfig = connectionConfig, acceptor = acceptor diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt index 47c157659..3eacc2f27 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt @@ -26,6 +26,13 @@ import kotlinx.coroutines.* public class RSocketConnectorBuilder internal constructor() { @RSocketLoggingApi public var loggerFactory: LoggerFactory = DefaultLoggerFactory + public var maxFragmentSize: Int = 0 + set(value) { + require(value == 0 || value >= 64) { + "maxFragmentSize should be zero (no fragmentation) or greater than or equal to 64, but was $value" + } + field = value + } private val connectionConfig: ConnectionConfigBuilder = ConnectionConfigBuilder() private val interceptors: InterceptorsBuilder = InterceptorsBuilder() @@ -96,6 +103,7 @@ public class RSocketConnectorBuilder internal constructor() { @OptIn(RSocketLoggingApi::class) internal fun build(): RSocketConnector = RSocketConnector( loggerFactory, + maxFragmentSize, interceptors.build(), connectionConfig.producer(), acceptor ?: defaultAcceptor, diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt index d12ba1e2d..1c7092c43 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt @@ -27,6 +27,7 @@ import kotlinx.coroutines.* @OptIn(TransportApi::class, RSocketLoggingApi::class) public class RSocketServer internal constructor( private val loggerFactory: LoggerFactory, + private val maxFragmentSize: Int, private val interceptors: Interceptors, ) { @@ -44,6 +45,7 @@ public class RSocketServer internal constructor( else -> try { connect( isServer = true, + maxFragmentSize = maxFragmentSize, interceptors = interceptors, connectionConfig = ConnectionConfig( keepAlive = setupFrame.keepAlive, diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt index 58d169037..519a8ad9f 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt @@ -22,6 +22,13 @@ import io.rsocket.kotlin.logging.* public class RSocketServerBuilder internal constructor() { @RSocketLoggingApi public var loggerFactory: LoggerFactory = DefaultLoggerFactory + public var maxFragmentSize: Int = 0 + set(value) { + require(value == 0 || value >= 64) { + "maxFragmentSize should be zero (no fragmentation) or greater than or equal to 64, but was $value" + } + field = value + } private val interceptors: InterceptorsBuilder = InterceptorsBuilder() @@ -30,7 +37,7 @@ public class RSocketServerBuilder internal constructor() { } @OptIn(RSocketLoggingApi::class) - internal fun build(): RSocketServer = RSocketServer(loggerFactory, interceptors.build()) + internal fun build(): RSocketServer = RSocketServer(loggerFactory, maxFragmentSize, interceptors.build()) } public fun RSocketServer(configure: RSocketServerBuilder.() -> Unit = {}): RSocketServer { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt index b5fc409f2..ff1ec5b69 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt @@ -25,13 +25,15 @@ import kotlinx.coroutines.* @OptIn(TransportApi::class) internal suspend inline fun Connection.connect( isServer: Boolean, + maxFragmentSize: Int, interceptors: Interceptors, connectionConfig: ConnectionConfig, acceptor: ConnectionAcceptor ): RSocket { val keepAliveHandler = KeepAliveHandler(connectionConfig.keepAlive) val prioritizer = Prioritizer() - val streamsStorage = StreamsStorage(isServer) + val frameSender = FrameSender(prioritizer, pool, maxFragmentSize) + val streamsStorage = StreamsStorage(isServer, pool) val requestJob = SupervisorJob(job) requestJob.invokeOnCompletion { @@ -43,7 +45,7 @@ internal suspend inline fun Connection.connect( val requestScope = CoroutineScope(requestJob + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> }) val connectionScope = CoroutineScope(job + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> }) - val requester = interceptors.wrapRequester(RSocketRequester(job, prioritizer, streamsStorage, requestScope)) + val requester = interceptors.wrapRequester(RSocketRequester(job, frameSender, streamsStorage, requestScope, pool)) val requestHandler = interceptors.wrapResponder( with(interceptors.wrapAcceptor(acceptor)) { ConnectionAcceptorContext(connectionConfig, requester).accept() @@ -71,7 +73,7 @@ internal suspend inline fun Connection.connect( // start frame handling connectionScope.launch { - val rSocketResponder = RSocketResponder(prioritizer, requestHandler, requestScope) + val rSocketResponder = RSocketResponder(frameSender, requestHandler, requestScope) while (isActive) { receiveFrame().closeOnError { frame -> when (frame.streamId) { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/FrameSender.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/FrameSender.kt new file mode 100644 index 000000000..866a4fa96 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/FrameSender.kt @@ -0,0 +1,120 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal + +import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* +import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.frame.io.* +import io.rsocket.kotlin.payload.* +import kotlinx.coroutines.* +import kotlin.math.* + +private const val lengthSize = 3 +private const val headerSize = 6 +private const val fragmentOffset = lengthSize + headerSize +private const val fragmentOffsetWithMetadata = fragmentOffset + lengthSize + +internal class FrameSender( + private val prioritizer: Prioritizer, + private val pool: ObjectPool, + private val maxFragmentSize: Int +) { + + suspend fun sendKeepAlive(respond: Boolean, lastPosition: Long, data: ByteReadPacket): Unit = + prioritizer.send(KeepAliveFrame(respond, lastPosition, data)) + + suspend fun sendMetadataPush(metadata: ByteReadPacket): Unit = prioritizer.send(MetadataPushFrame(metadata)) + + suspend fun sendCancel(id: Int): Unit = withContext(NonCancellable) { prioritizer.send(CancelFrame(id)) } + suspend fun sendError(id: Int, throwable: Throwable): Unit = withContext(NonCancellable) { prioritizer.send(ErrorFrame(id, throwable)) } + suspend fun sendRequestN(id: Int, n: Int): Unit = prioritizer.send(RequestNFrame(id, n)) + + suspend fun sendRequestPayload(type: FrameType, streamId: Int, payload: Payload, initialRequest: Int = 0) { + sendFragmented(type, streamId, payload, false, false, initialRequest) + } + + suspend fun sendNextPayload(streamId: Int, payload: Payload) { + sendFragmented(FrameType.Payload, streamId, payload, false, true, 0) + } + + suspend fun sendNextCompletePayload(streamId: Int, payload: Payload) { + sendFragmented(FrameType.Payload, streamId, payload, true, true, 0) + } + + suspend fun sendCompletePayload(streamId: Int) { + sendFragmented(FrameType.Payload, streamId, Payload.Empty, true, false, 0) + } + + private suspend fun sendFragmented( + type: FrameType, + streamId: Int, + payload: Payload, + complete: Boolean, + next: Boolean, + initialRequest: Int + ) { + //TODO release on fail ? + if (!payload.isFragmentable(type.hasInitialRequest)) { + prioritizer.send(RequestFrame(type, streamId, false, complete, next, initialRequest, payload)) + return + } + + val data = payload.data + val metadata = payload.metadata + + val fragmentSize = maxFragmentSize - fragmentOffset - (if (type.hasInitialRequest) Int.SIZE_BYTES else 0) + + var first = true + var remaining = fragmentSize + if (metadata != null) remaining -= lengthSize + + do { + val metadataFragment = if (metadata != null && metadata.isNotEmpty) { + if (!first) remaining -= lengthSize + val length = min(metadata.remaining.toInt(), remaining) + remaining -= length + metadata.readPacket(pool, length) + } else null + + val dataFragment = if (remaining > 0 && data.isNotEmpty) { + val length = min(data.remaining.toInt(), remaining) + remaining -= length + data.readPacket(pool, length) + } else { + ByteReadPacket.Empty + } + + val fType = if (first && type.isRequestType) type else FrameType.Payload + val fragment = Payload(dataFragment, metadataFragment) + val follows = metadata != null && metadata.isNotEmpty || data.isNotEmpty + prioritizer.send(RequestFrame(fType, streamId, follows, (!follows && complete), !fType.isRequestType, initialRequest, fragment)) + first = false + remaining = fragmentSize + } while (follows) + } + + private fun Payload.isFragmentable(hasInitialRequest: Boolean) = when (maxFragmentSize) { + 0 -> false + else -> when (val meta = metadata) { + null -> data.remaining > maxFragmentSize - fragmentOffset - (if (hasInitialRequest) Int.SIZE_BYTES else 0) + else -> data.remaining + meta.remaining > maxFragmentSize - fragmentOffsetWithMetadata - (if (hasInitialRequest) Int.SIZE_BYTES else 0) + } + } + +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt index 54ee83496..63fbd64eb 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt @@ -30,7 +30,7 @@ internal class Prioritizer { private val commonChannel = SafeChannel(Channel.UNLIMITED) suspend fun send(frame: Frame) { - if (frame.type != FrameType.Cancel && frame.type != FrameType.Error) currentCoroutineContext().ensureActive() + currentCoroutineContext().ensureActive() val channel = if (frame.streamId == 0) priorityChannel else commonChannel channel.send(frame) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt index cabcc9d5d..933db562b 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt @@ -17,6 +17,8 @@ package io.rsocket.kotlin.internal import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.internal.handler.* @@ -28,28 +30,29 @@ import kotlinx.coroutines.flow.* @OptIn(ExperimentalStreamsApi::class) internal class RSocketRequester( connectionJob: Job, - private val prioritizer: Prioritizer, + private val sender: FrameSender, private val streamsStorage: StreamsStorage, - private val requestScope: CoroutineScope + private val requestScope: CoroutineScope, + private val pool: ObjectPool ) : RSocket { override val job: Job = connectionJob override suspend fun metadataPush(metadata: ByteReadPacket) { ensureActiveOrRelease(metadata) metadata.closeOnError { - prioritizer.send(MetadataPushFrame(metadata)) + sender.sendMetadataPush(metadata) } } override suspend fun fireAndForget(payload: Payload) { ensureActiveOrRelease(payload) - val streamId = streamsStorage.nextId() + val id = streamsStorage.nextId() try { - prioritizer.send(RequestFireAndForgetFrame(streamId, payload)) + sender.sendRequestPayload(FrameType.RequestFnF, id, payload) } catch (cause: Throwable) { payload.release() - if (job.isActive) prioritizer.send(CancelFrame(streamId)) //if cancelled during fragmentation + if (job.isActive) sender.sendCancel(id) //if cancelled during fragmentation throw cause } } @@ -57,14 +60,14 @@ internal class RSocketRequester( override suspend fun requestResponse(payload: Payload): Payload { ensureActiveOrRelease(payload) - val streamId = streamsStorage.nextId() + val id = streamsStorage.nextId() val deferred = CompletableDeferred() - val handler = RequesterRequestResponseFrameHandler(streamId, streamsStorage, deferred) - streamsStorage.save(streamId, handler) + val handler = RequesterRequestResponseFrameHandler(id, streamsStorage, deferred, pool) + streamsStorage.save(id, handler) - return handler.receiveOrCancel(streamId, payload) { - prioritizer.send(RequestResponseFrame(streamId, payload)) + return handler.receiveOrCancel(id, payload) { + sender.sendRequestPayload(FrameType.RequestResponse, id, payload) deferred.await() } } @@ -72,39 +75,39 @@ internal class RSocketRequester( override fun requestStream(payload: Payload): Flow = requestFlow { strategy, initialRequest -> ensureActiveOrRelease(payload) - val streamId = streamsStorage.nextId() + val id = streamsStorage.nextId() val channel = SafeChannel(Channel.UNLIMITED) - val handler = RequesterRequestStreamFrameHandler(streamId, streamsStorage, channel) - streamsStorage.save(streamId, handler) + val handler = RequesterRequestStreamFrameHandler(id, streamsStorage, channel, pool) + streamsStorage.save(id, handler) - handler.receiveOrCancel(streamId, payload) { - prioritizer.send(RequestStreamFrame(streamId, initialRequest, payload)) - emitAllWithRequestN(channel, strategy) { prioritizer.send(RequestNFrame(streamId, it)) } + handler.receiveOrCancel(id, payload) { + sender.sendRequestPayload(FrameType.RequestStream, id, payload, initialRequest) + emitAllWithRequestN(channel, strategy) { sender.sendRequestN(id, it) } } } override fun requestChannel(initPayload: Payload, payloads: Flow): Flow = requestFlow { strategy, initialRequest -> ensureActiveOrRelease(initPayload) - val streamId = streamsStorage.nextId() + val id = streamsStorage.nextId() val channel = SafeChannel(Channel.UNLIMITED) val limiter = Limiter(0) - val sender = Job(requestScope.coroutineContext.job) - val handler = RequesterRequestChannelFrameHandler(streamId, streamsStorage, limiter, sender, channel) - streamsStorage.save(streamId, handler) + val payloadsJob = Job(requestScope.coroutineContext.job) + val handler = RequesterRequestChannelFrameHandler(id, streamsStorage, limiter, payloadsJob, channel, pool) + streamsStorage.save(id, handler) - handler.receiveOrCancel(streamId, initPayload) { - prioritizer.send(RequestChannelFrame(streamId, initialRequest, initPayload)) + handler.receiveOrCancel(id, initPayload) { + sender.sendRequestPayload(FrameType.RequestChannel, id, initPayload, initialRequest) //TODO lazy? - requestScope.launch(sender) { - handler.sendOrFail(streamId) { - payloads.collectLimiting(limiter) { prioritizer.send(NextPayloadFrame(streamId, it)) } - prioritizer.send(CompletePayloadFrame(streamId)) + requestScope.launch(payloadsJob) { + handler.sendOrFail(id) { + payloads.collectLimiting(limiter) { sender.sendNextPayload(id, it) } + sender.sendCompletePayload(id) } } - emitAllWithRequestN(channel, strategy) { prioritizer.send(RequestNFrame(streamId, it)) } + emitAllWithRequestN(channel, strategy) { sender.sendRequestN(id, it) } } } @@ -114,7 +117,7 @@ internal class RSocketRequester( onSendComplete() } catch (cause: Throwable) { val isFailed = onSendFailed(cause) - if (job.isActive && isFailed) prioritizer.send(ErrorFrame(id, cause)) + if (job.isActive && isFailed) sender.sendError(id, cause) throw cause } } @@ -127,7 +130,7 @@ internal class RSocketRequester( } catch (cause: Throwable) { payload.release() val isCancelled = onReceiveCancelled(cause) - if (job.isActive && isCancelled) prioritizer.send(CancelFrame(id)) + if (job.isActive && isCancelled) sender.sendCancel(id) throw cause } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt index f5e8a878c..ca25c468e 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt @@ -18,14 +18,13 @@ package io.rsocket.kotlin.internal import io.ktor.utils.io.core.* import io.rsocket.kotlin.* -import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.internal.handler.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.* @OptIn(ExperimentalStreamsApi::class) internal class RSocketResponder( - private val prioritizer: Prioritizer, + private val sender: FrameSender, private val requestHandler: RSocket, private val requestScope: CoroutineScope, ) { @@ -52,27 +51,27 @@ internal class RSocketResponder( fun handleRequestResponse(payload: Payload, id: Int, handler: ResponderRequestResponseFrameHandler): Job = requestScope.launch { handler.sendOrFail(id, payload) { val response = requestHandler.requestResponse(payload) - prioritizer.send(NextCompletePayloadFrame(id, response)) + sender.sendNextCompletePayload(id, response) } }.closeOnCompletion(payload) fun handleRequestStream(payload: Payload, id: Int, handler: ResponderRequestStreamFrameHandler): Job = requestScope.launch { handler.sendOrFail(id, payload) { - requestHandler.requestStream(payload).collectLimiting(handler.limiter) { prioritizer.send(NextPayloadFrame(id, it)) } - prioritizer.send(CompletePayloadFrame(id)) + requestHandler.requestStream(payload).collectLimiting(handler.limiter) { sender.sendNextPayload(id, it) } + sender.sendCompletePayload(id) } }.closeOnCompletion(payload) fun handleRequestChannel(payload: Payload, id: Int, handler: ResponderRequestChannelFrameHandler): Job = requestScope.launch { val payloads = requestFlow { strategy, initialRequest -> handler.receiveOrCancel(id) { - prioritizer.send(RequestNFrame(id, initialRequest)) - emitAllWithRequestN(handler.channel, strategy) { prioritizer.send(RequestNFrame(id, it)) } + sender.sendRequestN(id, initialRequest) + emitAllWithRequestN(handler.channel, strategy) { sender.sendRequestN(id, it) } } } handler.sendOrFail(id, payload) { - requestHandler.requestChannel(payload, payloads).collectLimiting(handler.limiter) { prioritizer.send(NextPayloadFrame(id, it)) } - prioritizer.send(CompletePayloadFrame(id)) + requestHandler.requestChannel(payload, payloads).collectLimiting(handler.limiter) { sender.sendNextPayload(id, it) } + sender.sendCompletePayload(id) } }.closeOnCompletion(payload) @@ -82,7 +81,7 @@ internal class RSocketResponder( onSendComplete() } catch (cause: Throwable) { val isFailed = onSendFailed(cause) - if (currentCoroutineContext().isActive && isFailed) prioritizer.send(ErrorFrame(id, cause)) + if (currentCoroutineContext().isActive && isFailed) sender.sendError(id, cause) throw cause } finally { payload.release() @@ -95,7 +94,7 @@ internal class RSocketResponder( onReceiveComplete() } catch (cause: Throwable) { val isCancelled = onReceiveCancelled(cause) - if (requestScope.isActive && isCancelled) prioritizer.send(CancelFrame(id)) + if (requestScope.isActive && isCancelled) sender.sendCancel(id) throw cause } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt index 206bf0b9e..6ca1a3fc6 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt @@ -16,10 +16,12 @@ package io.rsocket.kotlin.internal +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.internal.handler.* -internal class StreamsStorage(private val isServer: Boolean) { +internal class StreamsStorage(private val isServer: Boolean, private val pool: ObjectPool) { private val streamId: StreamId = StreamId(isServer) private val handlers: IntMap = IntMap() @@ -30,7 +32,7 @@ internal class StreamsStorage(private val isServer: Boolean) { } fun remove(id: Int): FrameHandler? { - return handlers.remove(id) + return handlers.remove(id)?.also(FrameHandler::release) } fun contains(id: Int): Boolean { @@ -42,6 +44,7 @@ internal class StreamsStorage(private val isServer: Boolean) { handlers.clear() values.forEach { it.cleanup(error) + it.release() } } @@ -57,10 +60,10 @@ internal class StreamsStorage(private val isServer: Boolean) { else -> { val initialRequest = frame.initialRequest val handler = when (frame.type) { - FrameType.RequestFnF -> ResponderFireAndForgetFrameHandler(id, this, responder) - FrameType.RequestResponse -> ResponderRequestResponseFrameHandler(id, this, responder) - FrameType.RequestStream -> ResponderRequestStreamFrameHandler(id, this, responder, initialRequest) - FrameType.RequestChannel -> ResponderRequestChannelFrameHandler(id, this, responder, initialRequest) + FrameType.RequestFnF -> ResponderFireAndForgetFrameHandler(id, this, responder, pool) + FrameType.RequestResponse -> ResponderRequestResponseFrameHandler(id, this, responder, pool) + FrameType.RequestStream -> ResponderRequestStreamFrameHandler(id, this, responder, initialRequest, pool) + FrameType.RequestChannel -> ResponderRequestChannelFrameHandler(id, this, responder, initialRequest, pool) else -> error("Wrong request frame type") // should never happen } handlers[id] = handler diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt index 1dcb1402e..2191475e2 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt @@ -16,11 +16,17 @@ package io.rsocket.kotlin.internal.handler +import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.* -internal abstract class FrameHandler { +internal abstract class FrameHandler(pool: ObjectPool) { + private val data = BytePacketBuilder(0, pool) + private val metadata = BytePacketBuilder(0, pool) + protected abstract var hasMetadata: Boolean fun handleRequest(frame: RequestFrame) { if (frame.next || frame.type.isRequestType) handleNextFragment(frame) @@ -28,8 +34,19 @@ internal abstract class FrameHandler { } private fun handleNextFragment(frame: RequestFrame) { - //TODO fragmentation will be here - handleNext(frame.payload) + data.writePacket(frame.payload.data) + when (val meta = frame.payload.metadata) { + null -> Unit + else -> { + hasMetadata = true + metadata.writePacket(meta) + } + } + if (frame.follows && !frame.complete) return + + val payload = Payload(data.build(), if (hasMetadata) metadata.build() else null) + hasMetadata = false + handleNext(payload) } protected abstract fun handleNext(payload: Payload) @@ -39,6 +56,11 @@ internal abstract class FrameHandler { abstract fun handleRequestN(n: Int) abstract fun cleanup(cause: Throwable?) + + fun release() { + data.release() + metadata.release() + } } internal interface ReceiveFrameHandler { @@ -51,7 +73,7 @@ internal interface SendFrameHandler { fun onSendFailed(cause: Throwable): Boolean // if true, then request is failed } -internal abstract class BaseRequesterFrameHandler : FrameHandler(), ReceiveFrameHandler { +internal abstract class BaseRequesterFrameHandler(pool: ObjectPool) : FrameHandler(pool), ReceiveFrameHandler { override fun handleCancel() { //should be called only for RC } @@ -61,7 +83,7 @@ internal abstract class BaseRequesterFrameHandler : FrameHandler(), ReceiveFrame } } -internal abstract class BaseResponderFrameHandler : FrameHandler(), SendFrameHandler { +internal abstract class BaseResponderFrameHandler(pool: ObjectPool) : FrameHandler(pool), SendFrameHandler { protected abstract var job: Job? protected abstract fun start(payload: Payload): Job @@ -84,11 +106,11 @@ internal abstract class BaseResponderFrameHandler : FrameHandler(), SendFrameHan } } -internal expect abstract class ResponderFrameHandler() : BaseResponderFrameHandler { +internal expect abstract class ResponderFrameHandler(pool: ObjectPool) : BaseResponderFrameHandler { override var job: Job? - //TODO fragmentation will be here + override var hasMetadata: Boolean } -internal expect abstract class RequesterFrameHandler() : BaseRequesterFrameHandler { - //TODO fragmentation will be here +internal expect abstract class RequesterFrameHandler(pool: ObjectPool) : BaseRequesterFrameHandler { + override var hasMetadata: Boolean } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestChannelFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestChannelFrameHandler.kt index 154bf5936..220ef11fc 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestChannelFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestChannelFrameHandler.kt @@ -16,6 +16,8 @@ package io.rsocket.kotlin.internal.handler +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.* @@ -27,7 +29,8 @@ internal class RequesterRequestChannelFrameHandler( private val limiter: Limiter, private val sender: Job, private val channel: Channel, -) : RequesterFrameHandler(), SendFrameHandler { + pool: ObjectPool +) : RequesterFrameHandler(pool), SendFrameHandler { override fun handleNext(payload: Payload) { channel.safeTrySend(payload) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestResponseFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestResponseFrameHandler.kt index b7838c1b1..f81bd40e8 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestResponseFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestResponseFrameHandler.kt @@ -16,6 +16,8 @@ package io.rsocket.kotlin.internal.handler +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.* @@ -23,8 +25,9 @@ import kotlinx.coroutines.* internal class RequesterRequestResponseFrameHandler( private val id: Int, private val streamsStorage: StreamsStorage, - private val deferred: CompletableDeferred -) : RequesterFrameHandler() { + private val deferred: CompletableDeferred, + pool: ObjectPool +) : RequesterFrameHandler(pool) { override fun handleNext(payload: Payload) { deferred.complete(payload) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestStreamFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestStreamFrameHandler.kt index eba58a903..df54bea80 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestStreamFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestStreamFrameHandler.kt @@ -16,6 +16,8 @@ package io.rsocket.kotlin.internal.handler +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.channels.* @@ -23,8 +25,9 @@ import kotlinx.coroutines.channels.* internal class RequesterRequestStreamFrameHandler( private val id: Int, private val streamsStorage: StreamsStorage, - private val channel: Channel -) : RequesterFrameHandler() { + private val channel: Channel, + pool: ObjectPool +) : RequesterFrameHandler(pool) { override fun handleNext(payload: Payload) { channel.safeTrySend(payload) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderFireAndForgetFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderFireAndForgetFrameHandler.kt index ae049e9ba..1df2fc6b9 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderFireAndForgetFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderFireAndForgetFrameHandler.kt @@ -16,6 +16,8 @@ package io.rsocket.kotlin.internal.handler +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.* @@ -24,7 +26,8 @@ internal class ResponderFireAndForgetFrameHandler( private val id: Int, private val streamsStorage: StreamsStorage, private val responder: RSocketResponder, -) : ResponderFrameHandler() { + pool: ObjectPool +) : ResponderFrameHandler(pool) { override fun start(payload: Payload): Job = responder.handleFireAndForget(payload, this) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestChannelFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestChannelFrameHandler.kt index 0be3d7f15..28422534d 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestChannelFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestChannelFrameHandler.kt @@ -16,6 +16,8 @@ package io.rsocket.kotlin.internal.handler +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.payload.* @@ -26,8 +28,9 @@ internal class ResponderRequestChannelFrameHandler( private val id: Int, private val streamsStorage: StreamsStorage, private val responder: RSocketResponder, - initialRequest: Int -) : ResponderFrameHandler(), ReceiveFrameHandler { + initialRequest: Int, + pool: ObjectPool +) : ResponderFrameHandler(pool), ReceiveFrameHandler { val limiter = Limiter(initialRequest) val channel = SafeChannel(Channel.UNLIMITED) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestResponseFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestResponseFrameHandler.kt index 77f2411b8..7c6c39d3f 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestResponseFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestResponseFrameHandler.kt @@ -16,6 +16,8 @@ package io.rsocket.kotlin.internal.handler +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.* @@ -23,8 +25,9 @@ import kotlinx.coroutines.* internal class ResponderRequestResponseFrameHandler( private val id: Int, private val streamsStorage: StreamsStorage, - private val responder: RSocketResponder -) : ResponderFrameHandler() { + private val responder: RSocketResponder, + pool: ObjectPool +) : ResponderFrameHandler(pool) { override fun start(payload: Payload): Job = responder.handleRequestResponse(payload, id, this) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestStreamFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestStreamFrameHandler.kt index 41b3ccfc6..4d526ea59 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestStreamFrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/ResponderRequestStreamFrameHandler.kt @@ -16,6 +16,8 @@ package io.rsocket.kotlin.internal.handler +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.* @@ -25,7 +27,8 @@ internal class ResponderRequestStreamFrameHandler( private val streamsStorage: StreamsStorage, private val responder: RSocketResponder, initialRequest: Int, -) : ResponderFrameHandler() { + pool: ObjectPool +) : ResponderFrameHandler(pool) { val limiter = Limiter(initialRequest) override fun start(payload: Payload): Job = responder.handleRequestStream(payload, id, this) diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/FrameSenderTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/FrameSenderTest.kt new file mode 100644 index 000000000..d9e1eba83 --- /dev/null +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/FrameSenderTest.kt @@ -0,0 +1,57 @@ +package io.rsocket.kotlin.internal + +import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.payload.* +import io.rsocket.kotlin.test.* +import kotlin.test.* + +class FrameSenderTest : SuspendTest, TestWithLeakCheck { + + private val prioritizer = Prioritizer() + private fun sender(maxFragmentSize: Int) = FrameSender(prioritizer, InUseTrackingPool, maxFragmentSize) + + @Test + fun testFrameFragmented() = test { + val sender = sender(99) + + sender.sendNextPayload(1, buildPayload { + data("1234567890".repeat(50)) + }) + + repeat(6) { + val frame = prioritizer.receive() + assertIs(frame) + assertTrue(frame.next) + assertNull(frame.payload.metadata) + if (it != 5) { + assertTrue(frame.follows) + assertEquals("1234567890".repeat(9), frame.payload.data.readText()) + } else { //last frame + assertFalse(frame.follows) + assertEquals("1234567890".repeat(5), frame.payload.data.readText()) + } + } + } + + @Test + fun testFrameFragmentedFully() = test { + val sender = sender(99) + + sender.sendNextPayload(1, buildPayload { + data("1234567890".repeat(18)) + }) + + repeat(2) { + val frame = prioritizer.receive() + assertIs(frame) + assertTrue(frame.next) + assertNull(frame.payload.metadata) + assertEquals("1234567890".repeat(9), frame.payload.data.readText()) + if (it != 1) { + assertTrue(frame.follows) + } else { //last frame + assertFalse(frame.follows) + } + } + } +} diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/PrioritizerTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/PrioritizerTest.kt new file mode 100644 index 000000000..aa1b1a95d --- /dev/null +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/PrioritizerTest.kt @@ -0,0 +1,100 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal + +import io.ktor.utils.io.core.* +import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.test.* +import kotlinx.coroutines.* +import kotlin.test.* + +class PrioritizerTest : SuspendTest, TestWithLeakCheck { + private val prioritizer = Prioritizer() + + @Test + fun testOrdering() = test { + prioritizer.send(CancelFrame(1)) + prioritizer.send(CancelFrame(2)) + prioritizer.send(CancelFrame(3)) + + assertEquals(1, prioritizer.receive().streamId) + assertEquals(2, prioritizer.receive().streamId) + assertEquals(3, prioritizer.receive().streamId) + } + + @Test + fun testOrderingPriority() = test { + prioritizer.send(MetadataPushFrame(ByteReadPacket.Empty)) + prioritizer.send(KeepAliveFrame(true, 0, ByteReadPacket.Empty)) + + assertTrue(prioritizer.receive() is MetadataPushFrame) + assertTrue(prioritizer.receive() is KeepAliveFrame) + } + + @Test + fun testPrioritization() = test { + prioritizer.send(CancelFrame(5)) + prioritizer.send(MetadataPushFrame(ByteReadPacket.Empty)) + prioritizer.send(CancelFrame(1)) + prioritizer.send(MetadataPushFrame(ByteReadPacket.Empty)) + + assertEquals(0, prioritizer.receive().streamId) + assertEquals(0, prioritizer.receive().streamId) + assertEquals(5, prioritizer.receive().streamId) + assertEquals(1, prioritizer.receive().streamId) + } + + @Test + fun testAsyncReceive() = test { + val deferred = CompletableDeferred() + launch(anotherDispatcher) { + deferred.complete(prioritizer.receive()) + } + delay(100) + prioritizer.send(CancelFrame(5)) + assertTrue(deferred.await() is CancelFrame) + } + + @Test + fun testPrioritizationAndOrdering() = test { + prioritizer.send(RequestNFrame(1, 1)) + prioritizer.send(MetadataPushFrame(ByteReadPacket.Empty)) + prioritizer.send(CancelFrame(1)) + prioritizer.send(KeepAliveFrame(true, 0, ByteReadPacket.Empty)) + + assertTrue(prioritizer.receive() is MetadataPushFrame) + assertTrue(prioritizer.receive() is KeepAliveFrame) + assertTrue(prioritizer.receive() is RequestNFrame) + assertTrue(prioritizer.receive() is CancelFrame) + } + + @Test + fun testReleaseOnClose() = test { + val packet = packet("metadata") + val payload = payload("data") + prioritizer.send(MetadataPushFrame(packet)) + prioritizer.send(NextPayloadFrame(1, payload)) + + assertTrue(packet.isNotEmpty) + assertTrue(payload.data.isNotEmpty) + + prioritizer.close(null) + + assertTrue(packet.isEmpty) + assertTrue(payload.data.isEmpty) + } +} diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt index cdfbc6c8e..ce260b3a5 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt @@ -36,6 +36,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { requester = connection.connect( isServer = false, + maxFragmentSize = 0, interceptors = InterceptorsBuilder().build(), connectionConfig = ConnectionConfig( keepAlive = KeepAlive(Duration.seconds(1000), Duration.seconds(1000)), diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestResponseFrameHandlerTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestResponseFrameHandlerTest.kt new file mode 100644 index 000000000..58186c83d --- /dev/null +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/handler/RequesterRequestResponseFrameHandlerTest.kt @@ -0,0 +1,64 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.handler + +import io.rsocket.kotlin.* +import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.payload.* +import io.rsocket.kotlin.test.* +import kotlinx.coroutines.* +import kotlin.test.* + +class RequesterRequestResponseFrameHandlerTest : SuspendTest, TestWithLeakCheck { + private val storage = StreamsStorage(true, InUseTrackingPool) + private val deferred = CompletableDeferred() + private val handler = RequesterRequestResponseFrameHandler(1, storage, deferred, InUseTrackingPool).also { storage.save(1, it) } + + @Test + fun testCompleteOnPayloadReceive() = test { + handler.handleRequest(RequestFrame(FrameType.Payload, 1, false, false, true, 0, payload("hello"))) + assertTrue(deferred.isCompleted) + assertEquals("hello", deferred.await().data.readText()) + handler.onReceiveComplete() + assertFalse(storage.contains(1)) + } + + @Test + fun testFailOnPayloadReceive() = test { + handler.handleError(RSocketError.ApplicationError("failed")) + assertTrue(deferred.isCompleted) + assertFailsWith(RSocketError.ApplicationError::class, "failed") { deferred.await() } + assertFalse(storage.contains(1)) + } + + @Test + fun testFailOnCleanup() = test { + handler.cleanup(IllegalStateException("failed")) + assertTrue(deferred.isCompleted) + assertFailsWith(CancellationException::class, "Connection closed") { deferred.await() } + } + + @Test + fun testReassembly() = test { + handler.handleRequest(RequestFrame(FrameType.Payload, 1, true, false, true, 0, payload("hello"))) + assertFalse(deferred.isCompleted) + handler.handleRequest(RequestFrame(FrameType.Payload, 1, false, false, true, 0, payload(" world"))) + assertTrue(deferred.isCompleted) + assertEquals("hello world", deferred.await().data.readText()) + } +} diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt index d021bf32b..2bc4e44c4 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt @@ -33,6 +33,7 @@ class KeepAliveTest : TestWithConnection(), TestWithLeakCheck { keepAlive: KeepAlive = KeepAlive(Duration.milliseconds(100), Duration.seconds(1)) ): RSocket = connection.connect( isServer = false, + maxFragmentSize = 0, interceptors = InterceptorsBuilder().build(), connectionConfig = ConnectionConfig(keepAlive, DefaultPayloadMimeType, Payload.Empty), acceptor = { RSocketRequestHandler { } } diff --git a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt b/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt index 917419f0c..4e40aa98d 100644 --- a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt +++ b/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt @@ -16,11 +16,15 @@ package io.rsocket.kotlin.internal.handler +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import kotlinx.coroutines.* -internal actual abstract class ResponderFrameHandler : BaseResponderFrameHandler() { +internal actual abstract class ResponderFrameHandler actual constructor(pool: ObjectPool) : BaseResponderFrameHandler(pool) { actual override var job: Job? = null + actual override var hasMetadata: Boolean = false } -internal actual abstract class RequesterFrameHandler : BaseRequesterFrameHandler() { +internal actual abstract class RequesterFrameHandler actual constructor(pool: ObjectPool) : BaseRequesterFrameHandler(pool) { + actual override var hasMetadata: Boolean = false } diff --git a/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt b/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt index 917419f0c..4e40aa98d 100644 --- a/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt +++ b/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt @@ -16,11 +16,15 @@ package io.rsocket.kotlin.internal.handler +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import kotlinx.coroutines.* -internal actual abstract class ResponderFrameHandler : BaseResponderFrameHandler() { +internal actual abstract class ResponderFrameHandler actual constructor(pool: ObjectPool) : BaseResponderFrameHandler(pool) { actual override var job: Job? = null + actual override var hasMetadata: Boolean = false } -internal actual abstract class RequesterFrameHandler : BaseRequesterFrameHandler() { +internal actual abstract class RequesterFrameHandler actual constructor(pool: ObjectPool) : BaseRequesterFrameHandler(pool) { + actual override var hasMetadata: Boolean = false } diff --git a/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt b/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt index 51db7aebc..a27c3f94d 100644 --- a/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt +++ b/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt @@ -16,12 +16,16 @@ package io.rsocket.kotlin.internal.handler +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import kotlinx.atomicfu.* import kotlinx.coroutines.* -internal actual abstract class ResponderFrameHandler : BaseResponderFrameHandler() { +internal actual abstract class ResponderFrameHandler actual constructor(pool: ObjectPool) : BaseResponderFrameHandler(pool) { actual override var job: Job? by atomic(null) + actual override var hasMetadata: Boolean by atomic(false) } -internal actual abstract class RequesterFrameHandler : BaseRequesterFrameHandler() { +internal actual abstract class RequesterFrameHandler actual constructor(pool: ObjectPool) : BaseRequesterFrameHandler(pool) { + actual override var hasMetadata: Boolean by atomic(false) }