From 5b90f0c165bbe848d7799f9073df3798061d9ab0 Mon Sep 17 00:00:00 2001 From: olme04 Date: Wed, 18 Aug 2021 13:00:30 +0300 Subject: [PATCH] Metadata, PayloadBuilder and CompositeMetadataBuilder implements Closeable remove Frame.release and Payload.release (use close instead) use close instead of release everywhere use sealed in composite and auth metadata --- .../benchmarks/RSocketKotlinBenchmark.kt | 8 +++--- .../src/clientMain/kotlin/ChatApi.kt | 2 +- .../kotlin/io/rsocket/kotlin/RSocket.kt | 10 +++---- .../rsocket/kotlin/core/RSocketConnector.kt | 4 +-- .../kotlin/core/RSocketConnectorBuilder.kt | 2 +- .../io/rsocket/kotlin/frame/CancelFrame.kt | 2 +- .../io/rsocket/kotlin/frame/ErrorFrame.kt | 2 +- .../io/rsocket/kotlin/frame/ExtensionFrame.kt | 4 +-- .../kotlin/io/rsocket/kotlin/frame/Frame.kt | 6 ----- .../io/rsocket/kotlin/frame/KeepAliveFrame.kt | 4 +-- .../io/rsocket/kotlin/frame/LeaseFrame.kt | 4 +-- .../rsocket/kotlin/frame/MetadataPushFrame.kt | 4 +-- .../io/rsocket/kotlin/frame/RequestFrame.kt | 4 +-- .../io/rsocket/kotlin/frame/RequestNFrame.kt | 2 +- .../io/rsocket/kotlin/frame/ResumeFrame.kt | 2 +- .../io/rsocket/kotlin/frame/ResumeOkFrame.kt | 2 +- .../io/rsocket/kotlin/frame/SetupFrame.kt | 6 ++--- .../io/rsocket/kotlin/frame/io/packet.kt | 2 +- .../io/rsocket/kotlin/internal/Connect.kt | 6 ++--- .../kotlin/internal/RSocketRequester.kt | 4 +-- .../kotlin/internal/RSocketResponder.kt | 2 +- .../rsocket/kotlin/internal/StreamsStorage.kt | 10 +++---- .../kotlin/internal/handler/FrameHandler.kt | 8 +++--- .../kotlin/metadata/CompositeMetadata.kt | 6 ++++- .../metadata/CompositeMetadataBuilder.kt | 22 ++++------------ .../metadata/CompositeMetadataExtensions.kt | 2 +- .../io/rsocket/kotlin/metadata/Metadata.kt | 2 +- ...erStreamAcceptableDataMimeTypesMetadata.kt | 2 ++ .../metadata/PerStreamDataMimeTypeMetadata.kt | 2 ++ .../io/rsocket/kotlin/metadata/RawMetadata.kt | 4 +++ .../kotlin/metadata/RoutingMetadata.kt | 2 ++ .../kotlin/metadata/ZipkinTracingMetadata.kt | 2 ++ .../kotlin/metadata/security/AuthMetadata.kt | 4 +-- .../metadata/security/BearerAuthMetadata.kt | 2 ++ .../metadata/security/RawAuthMetadata.kt | 6 ++++- .../metadata/security/SimpleAuthMetadata.kt | 3 +++ .../io/rsocket/kotlin/payload/Payload.kt | 8 ++---- .../rsocket/kotlin/payload/PayloadBuilder.kt | 20 +++++--------- .../io/rsocket/kotlin/core/RSocketTest.kt | 26 +++++++++---------- .../kotlin/internal/RSocketRequesterTest.kt | 2 +- .../kotlin/metadata/CompositeMetadataTest.kt | 2 +- .../kotlin/payload/PayloadBuilderTest.kt | 4 +-- .../kotlin/io/rsocket/kotlin/test/Packets.kt | 2 +- .../io/rsocket/kotlin/test/TestRSocket.kt | 10 +++---- .../io/rsocket/kotlin/test/TransportTest.kt | 14 +++++----- .../kotlin/transport/ktor/TcpServerTest.kt | 16 ++++++------ .../transport/ktor/WebSocketConnectionTest.kt | 4 +-- 47 files changed, 132 insertions(+), 135 deletions(-) diff --git a/benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt b/benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt index 821cada52..8557075f8 100644 --- a/benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt +++ b/benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt @@ -43,15 +43,15 @@ class RSocketKotlinBenchmark : RSocketBenchmark() { val server = RSocketServer().bindIn(CoroutineScope(benchJob + Dispatchers.Unconfined), LocalServerTransport()) { RSocketRequestHandler { requestResponse { - it.release() + it.close() payloadCopy() } requestStream { - it.release() + it.close() payloadsFlow } requestChannel { init, payloads -> - init.release() + init.close() payloads.flowOn(requestStrategy) } } @@ -74,7 +74,7 @@ class RSocketKotlinBenchmark : RSocketBenchmark() { ) override fun releasePayload(payload: Payload) { - payload.release() + payload.close() } override suspend fun doRequestResponse(): Payload = client.requestResponse(payloadCopy()) diff --git a/examples/multiplatform-chat/src/clientMain/kotlin/ChatApi.kt b/examples/multiplatform-chat/src/clientMain/kotlin/ChatApi.kt index 14a0b08b6..f2492f758 100644 --- a/examples/multiplatform-chat/src/clientMain/kotlin/ChatApi.kt +++ b/examples/multiplatform-chat/src/clientMain/kotlin/ChatApi.kt @@ -34,6 +34,6 @@ actual class ChatApi(private val rSocket: RSocket, private val proto: ProtoBuf) actual suspend fun delete(id: Int) { rSocket.requestResponse( proto.encodeToPayload(route = "chats.delete", DeleteChat(id)) - ).release() + ).close() } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocket.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocket.kt index 78873dc7e..4c6db67c9 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocket.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocket.kt @@ -24,27 +24,27 @@ import kotlinx.coroutines.flow.* public interface RSocket : CoroutineScope { public suspend fun metadataPush(metadata: ByteReadPacket) { - metadata.release() + metadata.close() notImplemented("Metadata Push") } public suspend fun fireAndForget(payload: Payload) { - payload.release() + payload.close() notImplemented("Fire and Forget") } public suspend fun requestResponse(payload: Payload): Payload { - payload.release() + payload.close() notImplemented("Request Response") } public fun requestStream(payload: Payload): Flow { - payload.release() + payload.close() notImplemented("Request Stream") } public fun requestChannel(initPayload: Payload, payloads: Flow): Flow { - initPayload.release() + initPayload.close() notImplemented("Request Channel") } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt index cdc62cf08..a003b6152 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt @@ -73,8 +73,8 @@ public class RSocketConnector internal constructor( connection.sendFrame(setupFrame) return requester } catch (cause: Throwable) { - connectionConfig.setupPayload.release() - setupFrame.release() + connectionConfig.setupPayload.close() + setupFrame.close() connection.cancel("Connection establishment failed", cause) throw cause } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt index 38a56c45e..2387c43dc 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt @@ -113,7 +113,7 @@ public class RSocketConnectorBuilder internal constructor() { private companion object { private val defaultAcceptor: ConnectionAcceptor = ConnectionAcceptor { - config.setupPayload.release() + config.setupPayload.close() EmptyRSocket() } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/CancelFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/CancelFrame.kt index 8e0b15240..e06e79215 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/CancelFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/CancelFrame.kt @@ -24,7 +24,7 @@ internal class CancelFrame( override val type: FrameType get() = FrameType.Cancel override val flags: Int get() = 0 - override fun release(): Unit = Unit + override fun close(): Unit = Unit override fun BytePacketBuilder.writeSelf(): Unit = Unit diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ErrorFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ErrorFrame.kt index cb9f80e92..1b5b1c122 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ErrorFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ErrorFrame.kt @@ -27,7 +27,7 @@ internal class ErrorFrame( override val flags: Int get() = 0 val errorCode get() = (throwable as? RSocketError)?.errorCode ?: ErrorCode.ApplicationError - override fun release(): Unit = Unit + override fun close(): Unit = Unit override fun BytePacketBuilder.writeSelf() { writeInt(errorCode) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ExtensionFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ExtensionFrame.kt index f4bc2d35f..aebc86b9b 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ExtensionFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ExtensionFrame.kt @@ -30,8 +30,8 @@ internal class ExtensionFrame( override val type: FrameType get() = FrameType.Extension override val flags: Int get() = if (payload.metadata != null) Flags.Metadata else 0 - override fun release() { - payload.release() + override fun close() { + payload.close() } override fun BytePacketBuilder.writeSelf() { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt index 1923745ea..a09abdd0d 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt @@ -29,8 +29,6 @@ public sealed class Frame : Closeable { public abstract val streamId: Int public abstract val flags: Int - internal abstract fun release() - protected abstract fun BytePacketBuilder.writeSelf() protected abstract fun StringBuilder.appendFlags() protected abstract fun StringBuilder.appendSelf() @@ -54,10 +52,6 @@ public sealed class Frame : Closeable { append(flag) if (value) append(1) else append(0) } - - override fun close() { - release() - } } internal fun ByteReadPacket.readFrame(pool: ObjectPool): Frame = use { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/KeepAliveFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/KeepAliveFrame.kt index 078c621e7..09c29778e 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/KeepAliveFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/KeepAliveFrame.kt @@ -32,8 +32,8 @@ internal class KeepAliveFrame( override val streamId: Int get() = 0 override val flags: Int get() = if (respond) RespondFlag else 0 - override fun release() { - data.release() + override fun close() { + data.close() } override fun BytePacketBuilder.writeSelf() { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/LeaseFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/LeaseFrame.kt index d3abdfb02..f7aafd98d 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/LeaseFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/LeaseFrame.kt @@ -30,8 +30,8 @@ internal class LeaseFrame( override val streamId: Int get() = 0 override val flags: Int get() = if (metadata != null) Flags.Metadata else 0 - override fun release() { - metadata?.release() + override fun close() { + metadata?.close() } override fun BytePacketBuilder.writeSelf() { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/MetadataPushFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/MetadataPushFrame.kt index 66ffd3713..56fcafd97 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/MetadataPushFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/MetadataPushFrame.kt @@ -28,8 +28,8 @@ internal class MetadataPushFrame( override val streamId: Int get() = 0 override val flags: Int get() = Flags.Metadata - override fun release() { - metadata.release() + override fun close() { + metadata.close() } override fun BytePacketBuilder.writeSelf() { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt index 3575a1479..b2c60706c 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt @@ -43,8 +43,8 @@ internal class RequestFrame( return flags } - override fun release() { - payload.release() + override fun close() { + payload.close() } override fun BytePacketBuilder.writeSelf() { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestNFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestNFrame.kt index bbf6dcc6f..fda5032bd 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestNFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestNFrame.kt @@ -25,7 +25,7 @@ internal class RequestNFrame( override val type: FrameType get() = FrameType.RequestN override val flags: Int get() = 0 - override fun release(): Unit = Unit + override fun close(): Unit = Unit override fun BytePacketBuilder.writeSelf() { writeInt(requestN) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeFrame.kt index d24aef4df..764bad1fc 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeFrame.kt @@ -31,7 +31,7 @@ internal class ResumeFrame( override val streamId: Int get() = 0 override val flags: Int get() = 0 - override fun release(): Unit = Unit + override fun close(): Unit = Unit override fun BytePacketBuilder.writeSelf() { writeVersion(version) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeOkFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeOkFrame.kt index 0aa87ba31..10aadc27b 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeOkFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeOkFrame.kt @@ -25,7 +25,7 @@ internal class ResumeOkFrame( override val streamId: Int get() = 0 override val flags: Int get() = 0 - override fun release(): Unit = Unit + override fun close(): Unit = Unit override fun BytePacketBuilder.writeSelf() { writeLong(lastReceivedClientPosition) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt index d94d59d0f..8de6b0a72 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt @@ -45,9 +45,9 @@ internal class SetupFrame( return flags } - override fun release() { - resumeToken?.release() - payload.release() + override fun close() { + resumeToken?.close() + payload.close() } override fun BytePacketBuilder.writeSelf() { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/packet.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/packet.kt index ac0f7e614..56b6f46f8 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/packet.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/packet.kt @@ -26,7 +26,7 @@ internal inline fun buildPacket(pool: ObjectPool, block: BytePacket block(builder) return builder.build() } catch (t: Throwable) { - builder.release() + builder.close() throw t } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt index 88bee3c4b..baf9b0a0a 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt @@ -41,7 +41,7 @@ internal suspend inline fun connect( requestJob.invokeOnCompletion { prioritizer.close(it) streamsStorage.cleanup(it) - connectionConfig.setupPayload.release() + connectionConfig.setupPayload.close() } val requester = interceptors.wrapRequester( @@ -76,8 +76,8 @@ internal suspend inline fun connect( is MetadataPushFrame -> responder.handleMetadataPush(frame.metadata) is ErrorFrame -> connection.cancel("Error frame received on 0 stream", frame.throwable) is KeepAliveFrame -> keepAliveHandler.mark(frame) - is LeaseFrame -> frame.release().also { error("lease isn't implemented") } - else -> frame.release() + is LeaseFrame -> frame.close().also { error("lease isn't implemented") } + else -> frame.close() } else -> streamsStorage.handleFrame(frame, responder) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt index b14649664..d641422be 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt @@ -51,7 +51,7 @@ internal class RSocketRequester( try { sender.sendRequestPayload(FrameType.RequestFnF, id, payload) } catch (cause: Throwable) { - payload.release() + payload.close() if (isActive) sender.sendCancel(id) //if cancelled during fragmentation throw cause } @@ -128,7 +128,7 @@ internal class RSocketRequester( onReceiveComplete() return result } catch (cause: Throwable) { - payload.release() + payload.close() val isCancelled = onReceiveCancelled(cause) if (isActive && isCancelled) sender.sendCancel(id) throw cause diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt index b24623323..5a0c6dbef 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt @@ -78,7 +78,7 @@ internal class RSocketResponder( if (currentCoroutineContext().isActive && isFailed) sender.sendError(id, cause) throw cause } finally { - payload.release() + payload.close() } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt index 01b574791..1d422a5d0 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt @@ -32,7 +32,7 @@ internal class StreamsStorage(private val isServer: Boolean, private val pool: O } fun remove(id: Int): FrameHandler? { - return handlers.remove(id)?.also(FrameHandler::release) + return handlers.remove(id)?.also(FrameHandler::close) } fun contains(id: Int): Boolean { @@ -44,7 +44,7 @@ internal class StreamsStorage(private val isServer: Boolean, private val pool: O handlers.clear() values.forEach { it.cleanup(error) - it.release() + it.close() } } @@ -55,8 +55,8 @@ internal class StreamsStorage(private val isServer: Boolean, private val pool: O is CancelFrame -> handlers[id]?.handleCancel() is ErrorFrame -> handlers[id]?.handleError(frame.throwable) is RequestFrame -> when { - frame.type == FrameType.Payload -> handlers[id]?.handleRequest(frame) ?: frame.release() // release on unknown stream id - isServer.xor(id % 2 != 0) -> frame.release() // request frame on wrong stream id + frame.type == FrameType.Payload -> handlers[id]?.handleRequest(frame) ?: frame.close() // release on unknown stream id + isServer.xor(id % 2 != 0) -> frame.close() // request frame on wrong stream id else -> { val initialRequest = frame.initialRequest val handler = when (frame.type) { @@ -70,7 +70,7 @@ internal class StreamsStorage(private val isServer: Boolean, private val pool: O handler.handleRequest(frame) } } - else -> frame.release() + else -> frame.close() } } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt index 2191475e2..4d2fcf559 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/handler/FrameHandler.kt @@ -23,7 +23,7 @@ import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.* -internal abstract class FrameHandler(pool: ObjectPool) { +internal abstract class FrameHandler(pool: ObjectPool) : Closeable { private val data = BytePacketBuilder(0, pool) private val metadata = BytePacketBuilder(0, pool) protected abstract var hasMetadata: Boolean @@ -57,9 +57,9 @@ internal abstract class FrameHandler(pool: ObjectPool) { abstract fun cleanup(cause: Throwable?) - fun release() { - data.release() - metadata.release() + override fun close() { + data.close() + metadata.close() } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadata.kt index 24d567149..7cfd9b9b3 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadata.kt @@ -32,7 +32,7 @@ public fun CompositeMetadata(entries: List): CompositeMetadata = DefaultCompositeMetadata(entries.map(CompositeMetadata::Entry)) @ExperimentalMetadataApi -public interface CompositeMetadata : Metadata { +public sealed interface CompositeMetadata : Metadata { public val entries: List override val mimeType: MimeType get() = Reader.mimeType @@ -44,6 +44,10 @@ public interface CompositeMetadata : Metadata { } } + override fun close() { + entries.forEach { it.content.close() } + } + public class Entry(public val mimeType: MimeType, public val content: ByteReadPacket) { public constructor(metadata: Metadata) : this(metadata.mimeType, metadata.toPacket()) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataBuilder.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataBuilder.kt index f83bc04fd..2086a18ec 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataBuilder.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataBuilder.kt @@ -22,22 +22,19 @@ import io.rsocket.kotlin.core.* import io.rsocket.kotlin.payload.* @ExperimentalMetadataApi -public interface CompositeMetadataBuilder { +public sealed interface CompositeMetadataBuilder : Closeable { public fun add(mimeType: MimeType, metadata: ByteReadPacket) public fun add(metadata: Metadata) - - public fun clean() - public fun build(): CompositeMetadata } @ExperimentalMetadataApi public inline fun buildCompositeMetadata(block: CompositeMetadataBuilder.() -> Unit): CompositeMetadata { - val builder = createCompositeMetadataBuilder() + val builder = CompositeMetadataFromBuilder() try { builder.block() - return builder.build() + return builder } catch (t: Throwable) { - builder.clean() + builder.close() throw t } } @@ -48,10 +45,7 @@ public inline fun PayloadBuilder.compositeMetadata(block: CompositeMetadataBuild @PublishedApi @ExperimentalMetadataApi -internal fun createCompositeMetadataBuilder(): CompositeMetadataBuilder = CompositeMetadataFromBuilder() - -@ExperimentalMetadataApi -private class CompositeMetadataFromBuilder : CompositeMetadataBuilder, CompositeMetadata { +internal class CompositeMetadataFromBuilder : CompositeMetadataBuilder, CompositeMetadata { private val _entries = mutableListOf() override val entries: List get() = _entries @@ -63,10 +57,4 @@ private class CompositeMetadataFromBuilder : CompositeMetadataBuilder, Composite override fun add(metadata: Metadata) { _entries += CompositeMetadata.Entry(metadata) } - - override fun clean() { - _entries.forEach { it.content.release() } - } - - override fun build(): CompositeMetadata = this } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataExtensions.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataExtensions.kt index 848c03bfa..ddb18e9c7 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataExtensions.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataExtensions.kt @@ -29,7 +29,7 @@ public fun CompositeMetadata.Entry.hasMimeTypeOf(reader: MetadataReader<*>): Boo public fun CompositeMetadata.Entry.read(reader: MetadataReader, pool: ObjectPool = ChunkBuffer.Pool): M { if (mimeType == reader.mimeType) return content.read(reader, pool) - content.release() + content.close() error("Expected mimeType '${reader.mimeType}' but was '$mimeType'") } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/Metadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/Metadata.kt index b8709705b..05d922943 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/Metadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/Metadata.kt @@ -25,7 +25,7 @@ import io.rsocket.kotlin.frame.io.* import io.rsocket.kotlin.payload.* @ExperimentalMetadataApi -public interface Metadata { +public interface Metadata : Closeable { public val mimeType: MimeType public fun BytePacketBuilder.writeSelf() } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamAcceptableDataMimeTypesMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamAcceptableDataMimeTypesMetadata.kt index 7b7eb61b2..15d04b762 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamAcceptableDataMimeTypesMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamAcceptableDataMimeTypesMetadata.kt @@ -37,6 +37,8 @@ public class PerStreamAcceptableDataMimeTypesMetadata(public val types: List { override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketAcceptMimeTypes override fun ByteReadPacket.read(pool: ObjectPool): PerStreamAcceptableDataMimeTypesMetadata { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamDataMimeTypeMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamDataMimeTypeMetadata.kt index cb98dc3ba..a3aa1a2ea 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamDataMimeTypeMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamDataMimeTypeMetadata.kt @@ -31,6 +31,8 @@ public class PerStreamDataMimeTypeMetadata(public val type: MimeType) : Metadata writeMimeType(type) } + override fun close(): Unit = Unit + public companion object Reader : MetadataReader { override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketMimeType override fun ByteReadPacket.read(pool: ObjectPool): PerStreamDataMimeTypeMetadata = diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RawMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RawMetadata.kt index 021401fb4..fec283598 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RawMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RawMetadata.kt @@ -32,6 +32,10 @@ public class RawMetadata( writePacket(content) } + override fun close() { + content.close() + } + private class Reader(override val mimeType: MimeType) : MetadataReader { override fun ByteReadPacket.read(pool: ObjectPool): RawMetadata = RawMetadata(mimeType, readPacket(pool)) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RoutingMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RoutingMetadata.kt index 77a39cf1d..04c1125b4 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RoutingMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RoutingMetadata.kt @@ -43,6 +43,8 @@ public class RoutingMetadata(public val tags: List) : Metadata { } } + override fun close(): Unit = Unit + public companion object Reader : MetadataReader { override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketRouting override fun ByteReadPacket.read(pool: ObjectPool): RoutingMetadata { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/ZipkinTracingMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/ZipkinTracingMetadata.kt index 4824f5f99..0182d69ca 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/ZipkinTracingMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/ZipkinTracingMetadata.kt @@ -67,6 +67,8 @@ public class ZipkinTracingMetadata internal constructor( if (hasParentSpanId) writeLong(parentSpanId) } + override fun close(): Unit = Unit + public companion object Reader : MetadataReader { override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketTracingZipkin override fun ByteReadPacket.read(pool: ObjectPool): ZipkinTracingMetadata { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/AuthMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/AuthMetadata.kt index 40f5c008a..16f76f3b7 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/AuthMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/AuthMetadata.kt @@ -25,7 +25,7 @@ import io.rsocket.kotlin.frame.io.* import io.rsocket.kotlin.metadata.* @ExperimentalMetadataApi -public interface AuthMetadata : Metadata { +public sealed interface AuthMetadata : Metadata { public val type: AuthType public fun BytePacketBuilder.writeContent() @@ -38,7 +38,7 @@ public interface AuthMetadata : Metadata { } @ExperimentalMetadataApi -public interface AuthMetadataReader : MetadataReader { +public sealed interface AuthMetadataReader : MetadataReader { public fun ByteReadPacket.readContent(type: AuthType, pool: ObjectPool): AM override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketAuthentication diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/BearerAuthMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/BearerAuthMetadata.kt index d0771d2e8..f8f6b9c27 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/BearerAuthMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/BearerAuthMetadata.kt @@ -30,6 +30,8 @@ public class BearerAuthMetadata( writeText(token) } + override fun close(): Unit = Unit + public companion object Reader : AuthMetadataReader { override fun ByteReadPacket.readContent(type: AuthType, pool: ObjectPool): BearerAuthMetadata { require(type == WellKnowAuthType.Bearer) { "Metadata auth type should be 'bearer'" } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/RawAuthMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/RawAuthMetadata.kt index 7ecefc93b..1b601234e 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/RawAuthMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/RawAuthMetadata.kt @@ -32,6 +32,10 @@ public class RawAuthMetadata( writePacket(content) } + override fun close() { + content.close() + } + public companion object Reader : AuthMetadataReader { override fun ByteReadPacket.readContent(type: AuthType, pool: ObjectPool): RawAuthMetadata { val content = readPacket(pool) @@ -46,7 +50,7 @@ public fun RawAuthMetadata.hasAuthTypeOf(reader: AuthMetadataReader<*>): Boolean @ExperimentalMetadataApi public fun RawAuthMetadata.read(reader: AuthMetadataReader, pool: ObjectPool = ChunkBuffer.Pool): AM { return readOrNull(reader, pool) ?: run { - content.release() + content.close() error("Expected auth type '${reader.mimeType}' but was '$mimeType'") } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/SimpleAuthMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/SimpleAuthMetadata.kt index a44413a71..9998a2675 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/SimpleAuthMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/SimpleAuthMetadata.kt @@ -32,6 +32,7 @@ public class SimpleAuthMetadata( } override val type: AuthType get() = WellKnowAuthType.Simple + override fun BytePacketBuilder.writeContent() { val length = username.encodeToByteArray() writeShort(length.size.toShort()) @@ -39,6 +40,8 @@ public class SimpleAuthMetadata( writeText(password) } + override fun close(): Unit = Unit + public companion object Reader : AuthMetadataReader { override fun ByteReadPacket.readContent(type: AuthType, pool: ObjectPool): SimpleAuthMetadata { require(type == WellKnowAuthType.Simple) { "Metadata auth type should be 'simple'" } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/Payload.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/Payload.kt index 5eee9595f..5c1d61fa6 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/Payload.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/Payload.kt @@ -26,13 +26,9 @@ public sealed interface Payload : Closeable { public fun copy(): Payload = DefaultPayload(data.copy(), metadata?.copy()) - public fun release() { - data.release() - metadata?.release() - } - override fun close() { - release() + data.close() + metadata?.close() } public companion object { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/PayloadBuilder.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/PayloadBuilder.kt index a0f1a3f77..aa4b63a20 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/PayloadBuilder.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/PayloadBuilder.kt @@ -18,21 +18,18 @@ package io.rsocket.kotlin.payload import io.ktor.utils.io.core.* -public sealed interface PayloadBuilder { +public sealed interface PayloadBuilder : Closeable { public fun data(value: ByteReadPacket) public fun metadata(value: ByteReadPacket) - - public fun clean() - public fun build(): Payload } public inline fun buildPayload(block: PayloadBuilder.() -> Unit): Payload { - val builder = createPayloadBuilder() + val builder = PayloadFromBuilder() try { builder.block() return builder.build() } catch (t: Throwable) { - builder.clean() + builder.close() throw t } } @@ -47,9 +44,7 @@ public fun PayloadBuilder.metadata(value: ByteArray): Unit = metadata { writeFul @PublishedApi -internal fun createPayloadBuilder(): PayloadBuilder = PayloadFromBuilder() - -private class PayloadFromBuilder : PayloadBuilder, Payload { +internal class PayloadFromBuilder : PayloadBuilder, Payload { private var hasData = false private var hasMetadata = false @@ -60,7 +55,7 @@ private class PayloadFromBuilder : PayloadBuilder, Payload { override fun data(value: ByteReadPacket) { if (hasData) { - value.release() + value.close() error("Data already provided") } data = value @@ -69,15 +64,14 @@ private class PayloadFromBuilder : PayloadBuilder, Payload { override fun metadata(value: ByteReadPacket) { if (hasMetadata) { - value.release() + value.close() error("Metadata already provided") } metadata = value hasMetadata = true } - override fun clean(): Unit = release() - override fun build(): Payload { + fun build(): Payload { check(hasData) { "Data is required" } return this } diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt index 616681ce5..4b0311cb7 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt @@ -49,12 +49,12 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { handler ?: RSocketRequestHandler { requestResponse { it } requestStream { - it.release() + it.close() flow { repeat(10) { emitOrClose(payload("server got -> [$it]")) } } } requestChannel { init, payloads -> - init.release() - payloads.onEach { it.release() }.launchIn(this) + init.close() + payloads.onEach { it.close() }.launchIn(this) flow { repeat(10) { emitOrClose(payload("server got -> [$it]")) } } } } @@ -71,7 +71,7 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { @Test fun testRequestResponseNoError() = test { val requester = start() - requester.requestResponse(payload("HELLO")).release() + requester.requestResponse(payload("HELLO")).close() } @Test @@ -96,7 +96,7 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { val requester = start() requester.requestStream(payload("HELLO")).test { repeat(10) { - expectItem().release() + expectItem().close() } expectComplete() } @@ -122,7 +122,7 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { }) requester.requestStream(payload("HELLO")).flowOn(PrefetchStrategy(1, 0)).test { repeat(3) { - expectItem().release() + expectItem().close() } val error = expectError() assertTrue(error is RSocketError.ApplicationError) @@ -148,7 +148,7 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { .map { it.value } .test { repeat(23) { - expectItem().release() + expectItem().close() } val error = expectError() assertTrue(error is IllegalStateException) @@ -170,7 +170,7 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { .take(3) //canceled after 3 element .test { repeat(3) { - expectItem().release() + expectItem().close() } expectComplete() } @@ -191,7 +191,7 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { .produceIn(this) repeat(18) { - channel.receive().release() + channel.receive().close() } assertTrue(channel.receiveCatching().isClosed) } @@ -203,7 +203,7 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { val request = (1..10).asFlow().map { payload(it.toString()) }.onCompletion { awaiter.complete() } requester.requestChannel(payload(""), request).test { repeat(10) { - expectItem().release() + expectItem().close() } expectComplete() } @@ -216,7 +216,7 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { val error = CompletableDeferred() val requester = start(RSocketRequestHandler { requestChannel { init, payloads -> - init.release() + init.close() payloads.catch { error.complete(it) } } }) @@ -234,14 +234,14 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { fun testRequestPropagatesCorrectlyForRequestChannel() = test { val requester = start(RSocketRequestHandler { requestChannel { init, payloads -> - init.release() + init.close() payloads.flowOn(PrefetchStrategy(3, 0)).take(3) } }) val request = (1..3).asFlow().map { payload(it.toString()) } requester.requestChannel(payload("0"), request).flowOn(PrefetchStrategy(3, 0)).test { repeat(3) { - expectItem().release() + expectItem().close() } expectComplete() } diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt index 7a24b7b06..28dcc00b1 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt @@ -346,7 +346,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { val streamId = frame.streamId assertTrue(frame is RequestFrame) assertEquals(FrameType.RequestChannel, frame.type) - frame.release() + frame.close() connection.sendToReceiver(CancelFrame(streamId), CompletePayloadFrame(streamId)) } response.join() diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataTest.kt index b256759fe..c55ed6760 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataTest.kt @@ -89,7 +89,7 @@ class CompositeMetadataTest : TestWithLeakCheck { assertTrue(WellKnownMimeType.ApplicationAvro in decoded) assertTrue(WellKnownMimeType.MessageRSocketRouting !in decoded) - decoded.entries.forEach { it.content.release() } + decoded.entries.forEach { it.content.close() } } @Test diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/payload/PayloadBuilderTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/payload/PayloadBuilderTest.kt index ea8194a7e..9d50cc3bb 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/payload/PayloadBuilderTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/payload/PayloadBuilderTest.kt @@ -37,8 +37,8 @@ class PayloadBuilderTest : TestWithLeakCheck { } @Test - fun payloadRelease() { - Payload(packet("data"), packet("metadata")).release() + fun payloadclose() { + Payload(packet("data"), packet("metadata")).close() } @Test diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Packets.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Packets.kt index 09467f6ec..85635f4e2 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Packets.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Packets.kt @@ -43,7 +43,7 @@ private inline fun buildPacket(pool: ObjectPool, block: BytePacketB block(builder) return builder.build() } catch (t: Throwable) { - builder.release() + builder.close() throw t } } diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestRSocket.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestRSocket.kt index 1ba65f142..747f3cf63 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestRSocket.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestRSocket.kt @@ -26,24 +26,24 @@ import kotlin.coroutines.* class TestRSocket : RSocket { override val coroutineContext: CoroutineContext = Job() - override suspend fun metadataPush(metadata: ByteReadPacket): Unit = metadata.release() + override suspend fun metadataPush(metadata: ByteReadPacket): Unit = metadata.close() - override suspend fun fireAndForget(payload: Payload): Unit = payload.release() + override suspend fun fireAndForget(payload: Payload): Unit = payload.close() override suspend fun requestResponse(payload: Payload): Payload { - payload.release() + payload.close() return Payload(packet(data), packet(metadata)) } override fun requestStream(payload: Payload): Flow = flow { - payload.release() + payload.close() repeat(10000) { emitOrClose(Payload(packet(data), packet(metadata))) } } override fun requestChannel(initPayload: Payload, payloads: Flow): Flow = flow { - initPayload.release() + initPayload.close() payloads.collect { emitOrClose(it) } } diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt index 88f45fbf2..03c2059db 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt @@ -63,7 +63,7 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { @Test fun requestChannel1() = test(Duration.seconds(10)) { - val list = client.requestChannel(payload(0), flowOf(payload(0))).onEach { it.release() }.toList() + val list = client.requestChannel(payload(0), flowOf(payload(0))).onEach { it.close() }.toList() assertEquals(1, list.size) } @@ -72,7 +72,7 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { val request = flow { repeat(3) { emit(payload(it)) } } - val list = client.requestChannel(payload(0), request).flowOn(PrefetchStrategy(3, 0)).onEach { it.release() }.toList() + val list = client.requestChannel(payload(0), request).flowOn(PrefetchStrategy(3, 0)).onEach { it.close() }.toList() assertEquals(3, list.size) } @@ -84,7 +84,7 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { val list = client.requestChannel(LARGE_PAYLOAD, request) .flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)) - .onEach { it.release() } + .onEach { it.close() } .toList() assertEquals(200, list.size) } @@ -106,7 +106,7 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { val request = flow { repeat(200_000) { emit(payload(it)) } } - val list = client.requestChannel(payload(0), request).flowOn(PrefetchStrategy(10000, 0)).onEach { it.release() }.toList() + val list = client.requestChannel(payload(0), request).flowOn(PrefetchStrategy(10000, 0)).onEach { it.close() }.toList() assertEquals(200_000, list.size) } @@ -119,7 +119,7 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { } (0..16).map { async(Dispatchers.Default) { - val list = client.requestChannel(payload(0), request).onEach { it.release() }.toList() + val list = client.requestChannel(payload(0), request).onEach { it.close() }.toList() assertEquals(256, list.size) } }.awaitAll() @@ -134,7 +134,7 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { } (0..256).map { async(Dispatchers.Default) { - val list = client.requestChannel(payload(0), request).onEach { it.release() }.toList() + val list = client.requestChannel(payload(0), request).onEach { it.close() }.toList() assertEquals(512, list.size) } }.awaitAll() @@ -174,7 +174,7 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { @Test fun largePayloadRequestResponse100() = test { - (1..100).map { async { client.requestResponse(LARGE_PAYLOAD) } }.awaitAll().onEach { it.release() } + (1..100).map { async { client.requestResponse(LARGE_PAYLOAD) } }.awaitAll().onEach { it.close() } } @Test diff --git a/rsocket-transport-ktor/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/TcpServerTest.kt b/rsocket-transport-ktor/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/TcpServerTest.kt index 399b152c7..3202dccfc 100644 --- a/rsocket-transport-ktor/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/TcpServerTest.kt +++ b/rsocket-transport-ktor/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/TcpServerTest.kt @@ -53,7 +53,7 @@ abstract class TcpServerTest : SuspendTest, TestWithLeakCheck { }.connect(clientTransport) val client1 = newClient("ok") - client1.requestResponse(payload("ok")).release() + client1.requestResponse(payload("ok")).close() val client2 = newClient("not ok") assertFails { @@ -62,8 +62,8 @@ abstract class TcpServerTest : SuspendTest, TestWithLeakCheck { val client3 = newClient("ok") - client3.requestResponse(payload("ok")).release() - client1.requestResponse(payload("ok")).release() + client3.requestResponse(payload("ok")).close() + client1.requestResponse(payload("ok")).close() assertTrue(client1.isActive) assertFalse(client2.isActive) @@ -90,18 +90,18 @@ abstract class TcpServerTest : SuspendTest, TestWithLeakCheck { val client1 = newClient() - client1.requestResponse(payload("1")).release() + client1.requestResponse(payload("1")).close() val client2 = newClient() - client2.requestResponse(payload("1")).release() + client2.requestResponse(payload("1")).close() handlers[1].coroutineContext.job.apply { cancel("FAILED") join() } - client1.requestResponse(payload("1")).release() + client1.requestResponse(payload("1")).close() assertFails { client2.requestResponse(payload("1")) @@ -109,9 +109,9 @@ abstract class TcpServerTest : SuspendTest, TestWithLeakCheck { val client3 = newClient() - client3.requestResponse(payload("1")).release() + client3.requestResponse(payload("1")).close() - client1.requestResponse(payload("1")).release() + client1.requestResponse(payload("1")).close() assertTrue(client1.isActive) assertFalse(client2.isActive) diff --git a/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/WebSocketConnectionTest.kt b/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/WebSocketConnectionTest.kt index 0328f9058..2ddc8f46a 100644 --- a/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/WebSocketConnectionTest.kt +++ b/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/WebSocketConnectionTest.kt @@ -59,7 +59,7 @@ class WebSocketConnectionTest : SuspendTest, TestWithLeakCheck { rSocket { RSocketRequestHandler { requestStream { - it.release() + it.close() flow { var i = 0 while (true) { @@ -90,7 +90,7 @@ class WebSocketConnectionTest : SuspendTest, TestWithLeakCheck { rSocket .requestStream(Payload.Empty) .take(2) - .onEach { delay(100); it.release() } + .onEach { delay(100); it.close() } .collect() assertTrue(requesterJob.isActive)