diff --git a/build.gradle.kts b/build.gradle.kts index c0688b38b..fb20ca1b7 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -14,9 +14,8 @@ * limitations under the License. */ -import groovy.util.Node -import groovy.util.NodeList -import org.gradle.api.publish.maven.internal.artifact.FileBasedMavenArtifact +import groovy.util.* +import org.gradle.api.publish.maven.internal.artifact.* import org.jetbrains.kotlin.gradle.dsl.* import org.jetbrains.kotlin.gradle.plugin.mpp.* import org.jetbrains.kotlin.konan.target.* @@ -162,20 +161,21 @@ subprojects { useExperimentalAnnotation("kotlinx.coroutines.InternalCoroutinesApi") useExperimentalAnnotation("kotlinx.coroutines.ObsoleteCoroutinesApi") useExperimentalAnnotation("kotlinx.coroutines.FlowPreview") + useExperimentalAnnotation("kotlinx.coroutines.DelicateCoroutinesApi") - useExperimentalAnnotation("io.ktor.util.KtorExperimentalAPI") useExperimentalAnnotation("io.ktor.util.InternalAPI") useExperimentalAnnotation("io.ktor.utils.io.core.internal.DangerousInternalIoApi") useExperimentalAnnotation("io.rsocket.kotlin.TransportApi") useExperimentalAnnotation("io.rsocket.kotlin.ExperimentalMetadataApi") useExperimentalAnnotation("io.rsocket.kotlin.ExperimentalStreamsApi") + useExperimentalAnnotation("io.rsocket.kotlin.RSocketLoggingApi") } } } if (isLibProject && !isTestProject) { - explicitApiWarning() //TODO change to strict before release + explicitApi() sourceSets["commonTest"].dependencies { implementation(project(":rsocket-test")) } @@ -204,8 +204,10 @@ subprojects { } } -fun publishPlatformArtifactsInRootModule(platformPublication:MavenPublication, - kotlinMultiplatformPublication: MavenPublication) { +fun publishPlatformArtifactsInRootModule( + platformPublication: MavenPublication, + kotlinMultiplatformPublication: MavenPublication +) { lateinit var platformXml: XmlProvider platformPublication.pom.withXml { platformXml = this } @@ -312,7 +314,7 @@ subprojects { dependsOn(tasks.withType()) } - tasks.matching { it.name == "generatePomFileForKotlinMultiplatformPublication"}.configureEach { + tasks.matching { it.name == "generatePomFileForKotlinMultiplatformPublication" }.configureEach { dependsOn(tasks["generatePomFileForJvmPublication"]) } } @@ -421,7 +423,7 @@ if (sonatypeUsername != null && sonatypePassword != null) { it ) } - else -> it.artifactId = "${project.name}-$type" + else -> it.artifactId = "${project.name}-$type" } } } diff --git a/examples/interactions/src/jvmMain/kotlin/RequestResponseErrorExample.kt b/examples/interactions/src/jvmMain/kotlin/RequestResponseErrorExample.kt index c0de5b898..187412305 100644 --- a/examples/interactions/src/jvmMain/kotlin/RequestResponseErrorExample.kt +++ b/examples/interactions/src/jvmMain/kotlin/RequestResponseErrorExample.kt @@ -16,7 +16,6 @@ import io.rsocket.kotlin.* import io.rsocket.kotlin.core.* -import io.rsocket.kotlin.payload.* import io.rsocket.kotlin.transport.local.* import kotlinx.coroutines.* diff --git a/examples/interactions/src/jvmMain/kotlin/RequestResponseExample.kt b/examples/interactions/src/jvmMain/kotlin/RequestResponseExample.kt index ff3e1555b..500d90910 100644 --- a/examples/interactions/src/jvmMain/kotlin/RequestResponseExample.kt +++ b/examples/interactions/src/jvmMain/kotlin/RequestResponseExample.kt @@ -16,7 +16,6 @@ import io.rsocket.kotlin.* import io.rsocket.kotlin.core.* -import io.rsocket.kotlin.payload.* import io.rsocket.kotlin.transport.local.* import kotlinx.coroutines.* diff --git a/examples/interactions/src/jvmMain/kotlin/ServerRequestExample.kt b/examples/interactions/src/jvmMain/kotlin/ServerRequestExample.kt index a81ff3277..374151576 100644 --- a/examples/interactions/src/jvmMain/kotlin/ServerRequestExample.kt +++ b/examples/interactions/src/jvmMain/kotlin/ServerRequestExample.kt @@ -16,7 +16,6 @@ import io.rsocket.kotlin.* import io.rsocket.kotlin.core.* -import io.rsocket.kotlin.payload.* import io.rsocket.kotlin.transport.local.* import kotlinx.coroutines.* diff --git a/examples/multiplatform-chat/src/clientMain/kotlin/Api.kt b/examples/multiplatform-chat/src/clientMain/kotlin/Api.kt index 60ad3fb96..0f4b72f9a 100644 --- a/examples/multiplatform-chat/src/clientMain/kotlin/Api.kt +++ b/examples/multiplatform-chat/src/clientMain/kotlin/Api.kt @@ -17,7 +17,6 @@ import io.ktor.client.* import io.ktor.client.features.websocket.* import io.ktor.network.selector.* -import io.ktor.network.sockets.* import io.ktor.util.* import io.rsocket.kotlin.* import io.rsocket.kotlin.core.* diff --git a/examples/multiplatform-chat/src/clientMain/kotlin/ChatApi.kt b/examples/multiplatform-chat/src/clientMain/kotlin/ChatApi.kt index 27f2644d7..14a0b08b6 100644 --- a/examples/multiplatform-chat/src/clientMain/kotlin/ChatApi.kt +++ b/examples/multiplatform-chat/src/clientMain/kotlin/ChatApi.kt @@ -16,7 +16,6 @@ import io.ktor.utils.io.core.* import io.rsocket.kotlin.* -import io.rsocket.kotlin.payload.* import kotlinx.serialization.* import kotlinx.serialization.protobuf.* diff --git a/examples/multiplatform-chat/src/serverJvmMain/kotlin/App.kt b/examples/multiplatform-chat/src/serverJvmMain/kotlin/App.kt index fe363ca5f..bbf7fc031 100644 --- a/examples/multiplatform-chat/src/serverJvmMain/kotlin/App.kt +++ b/examples/multiplatform-chat/src/serverJvmMain/kotlin/App.kt @@ -16,7 +16,6 @@ import io.ktor.application.* import io.ktor.network.selector.* -import io.ktor.network.sockets.* import io.ktor.routing.* import io.ktor.server.cio.* import io.ktor.server.engine.* diff --git a/examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt b/examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt index 03b47c09a..eef0aeadf 100644 --- a/examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt +++ b/examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt @@ -18,7 +18,6 @@ import io.ktor.utils.io.core.* import io.ktor.utils.io.js.* import io.rsocket.kotlin.* import io.rsocket.kotlin.core.* -import io.rsocket.kotlin.frame.io.* import io.rsocket.kotlin.payload.* import io.rsocket.kotlin.transport.* import kotlinx.coroutines.* @@ -145,3 +144,17 @@ class NodeJsTcpConnection(private val socket: Socket) : Connection { return receiveChannel.receive() } } + +private fun ByteReadPacket.readLength(): Int { + val b = readByte().toInt() and 0xFF shl 16 + val b1 = readByte().toInt() and 0xFF shl 8 + val b2 = readByte().toInt() and 0xFF + return b or b1 or b2 +} + +private fun BytePacketBuilder.writeLength(length: Int) { + require(length and 0xFFFFFF.inv() == 0) { "Length is larger than 24 bits" } + writeByte((length shr 16).toByte()) + writeByte((length shr 8).toByte()) + writeByte(length.toByte()) +} diff --git a/playground/src/commonMain/kotlin/TCP.kt b/playground/src/commonMain/kotlin/TCP.kt index 2e6c7c950..724266b32 100644 --- a/playground/src/commonMain/kotlin/TCP.kt +++ b/playground/src/commonMain/kotlin/TCP.kt @@ -15,7 +15,6 @@ */ import io.ktor.network.selector.* -import io.ktor.network.sockets.* import io.rsocket.kotlin.core.* import io.rsocket.kotlin.payload.* import io.rsocket.kotlin.transport.ktor.* diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Annotations.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Annotations.kt index 90060986e..37b90a366 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Annotations.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Annotations.kt @@ -37,3 +37,12 @@ public annotation class ExperimentalMetadataApi message = "This is an API to customize request strategy of streams. This API can change in future in non backwards-compatible manner." ) public annotation class ExperimentalStreamsApi + +@Retention(value = AnnotationRetention.BINARY) +@RequiresOptIn( + level = RequiresOptIn.Level.WARNING, + message = "This is mostly internal API used for logging. This API can change in future in non backwards-compatible manner." +) +public annotation class RSocketLoggingApi + + diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt index 2492e1b4d..64e92c827 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt @@ -30,18 +30,16 @@ import kotlinx.coroutines.* public interface Connection { public val job: Job - @DangerousInternalIoApi - public val pool: ObjectPool - get() = ChunkBuffer.Pool + public val pool: ObjectPool get() = ChunkBuffer.Pool public suspend fun send(packet: ByteReadPacket) public suspend fun receive(): ByteReadPacket } -@OptIn(DangerousInternalIoApi::class, TransportApi::class) +@OptIn(TransportApi::class) internal suspend fun Connection.receiveFrame(): Frame = receive().readFrame(pool) -@OptIn(DangerousInternalIoApi::class, TransportApi::class) +@OptIn(TransportApi::class) internal suspend fun Connection.sendFrame(frame: Frame) { val packet = frame.toPacket(pool) packet.closeOnError { send(packet) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/ConnectionAcceptor.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/ConnectionAcceptor.kt index 267132996..a5bcb88b0 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/ConnectionAcceptor.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/ConnectionAcceptor.kt @@ -19,18 +19,10 @@ package io.rsocket.kotlin import io.rsocket.kotlin.keepalive.* import io.rsocket.kotlin.payload.* - -//TODO fun interfaces don't support `suspend` functions for now... (seems will work in kotlin 1.5) - -public interface ConnectionAcceptor { +public fun interface ConnectionAcceptor { public suspend fun ConnectionAcceptorContext.accept(): RSocket } -public inline fun ConnectionAcceptor(crossinline block: suspend ConnectionAcceptorContext.() -> RSocket): ConnectionAcceptor = - object : ConnectionAcceptor { - override suspend fun ConnectionAcceptorContext.accept(): RSocket = block() - } - public class ConnectionAcceptorContext internal constructor( public val config: ConnectionConfig, public val requester: RSocket, diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocketError.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocketError.kt index 4810080bd..24cbed6cf 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocketError.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/RSocketError.kt @@ -46,8 +46,8 @@ public sealed class RSocketError(public val errorCode: Int, message: String) : T public const val MinAllowedCode: Int = ErrorCode.CustomMin public const val MaxAllowedCode: Int = ErrorCode.CustomMax - public inline fun checkCodeInAllowedRange(errorCode: Int): Boolean = - MinAllowedCode <= errorCode || errorCode <= MaxAllowedCode + public fun checkCodeInAllowedRange(errorCode: Int): Boolean = + MinAllowedCode <= errorCode || errorCode <= MaxAllowedCode } } } @@ -55,7 +55,7 @@ public sealed class RSocketError(public val errorCode: Int, message: String) : T @Suppress("FunctionName") // function name intentionally starts with an uppercase letter internal fun RSocketError(streamId: Int, errorCode: Int, message: String): Throwable = when (streamId) { - 0 -> when (errorCode) { + 0 -> when (errorCode) { ErrorCode.InvalidSetup -> RSocketError.Setup.Invalid(message) ErrorCode.UnsupportedSetup -> RSocketError.Setup.Unsupported(message) ErrorCode.RejectedSetup -> RSocketError.Setup.Rejected(message) @@ -66,11 +66,11 @@ internal fun RSocketError(streamId: Int, errorCode: Int, message: String): Throw } else -> when (errorCode) { ErrorCode.ApplicationError -> RSocketError.ApplicationError(message) - ErrorCode.Rejected -> RSocketError.Rejected(message) - ErrorCode.Canceled -> RSocketError.Canceled(message) - ErrorCode.Invalid -> RSocketError.Invalid(message) - else -> when (RSocketError.Custom.checkCodeInAllowedRange(errorCode)) { - true -> RSocketError.Custom(errorCode, message) + ErrorCode.Rejected -> RSocketError.Rejected(message) + ErrorCode.Canceled -> RSocketError.Canceled(message) + ErrorCode.Invalid -> RSocketError.Invalid(message) + else -> when (RSocketError.Custom.checkCodeInAllowedRange(errorCode)) { + true -> RSocketError.Custom(errorCode, message) false -> IllegalArgumentException("Invalid Error frame in Stream ID $streamId: $errorCode '$message'") } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/MimeType.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/MimeType.kt index cf30ef1d4..7522de1c2 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/MimeType.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/MimeType.kt @@ -18,13 +18,13 @@ package io.rsocket.kotlin.core import io.rsocket.kotlin.frame.io.* -public interface MimeType +public sealed interface MimeType -public interface MimeTypeWithName : MimeType { +public sealed interface MimeTypeWithName : MimeType { public val text: String } -public interface MimeTypeWithId : MimeType { +public sealed interface MimeTypeWithId : MimeType { public val identifier: Byte } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt index 68b896f17..fbb0a3f72 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnector.kt @@ -19,11 +19,12 @@ package io.rsocket.kotlin.core import io.rsocket.kotlin.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.frame.io.* +import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.logging.* import io.rsocket.kotlin.transport.* -@OptIn(TransportApi::class) -class RSocketConnector internal constructor( +@OptIn(TransportApi::class, RSocketLoggingApi::class) +public class RSocketConnector internal constructor( private val loggerFactory: LoggerFactory, private val interceptors: Interceptors, private val connectionConfigProvider: () -> ConnectionConfig, @@ -31,7 +32,7 @@ class RSocketConnector internal constructor( private val reconnectPredicate: ReconnectPredicate?, ) { - suspend fun connect(transport: ClientTransport): RSocket = when (reconnectPredicate) { + public suspend fun connect(transport: ClientTransport): RSocket = when (reconnectPredicate) { null -> connectOnce(transport) else -> ReconnectableRSocket( logger = loggerFactory.logger("io.rsocket.kotlin.connection"), diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt index 554645523..7c027b160 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketConnectorBuilder.kt @@ -17,12 +17,14 @@ package io.rsocket.kotlin.core import io.rsocket.kotlin.* +import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.keepalive.* import io.rsocket.kotlin.logging.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.* public class RSocketConnectorBuilder internal constructor() { + @RSocketLoggingApi public var loggerFactory: LoggerFactory = DefaultLoggerFactory private val connectionConfig: ConnectionConfigBuilder = ConnectionConfigBuilder() @@ -42,10 +44,6 @@ public class RSocketConnectorBuilder internal constructor() { acceptor = block } - public fun acceptor(block: suspend ConnectionAcceptorContext.() -> RSocket) { - acceptor(ConnectionAcceptor(block)) - } - /** * When configured, [RSocketConnector.connect] will return custom [RSocket] implementation, * which will try to reconnect if connection lost and [retries] are not exhausted with [predicate] returning `true`. @@ -95,6 +93,7 @@ public class RSocketConnectorBuilder internal constructor() { } } + @OptIn(RSocketLoggingApi::class) internal fun build(): RSocketConnector = RSocketConnector( loggerFactory, interceptors.build(), diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt index 4b0271ccd..3c94e03a7 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServer.kt @@ -19,21 +19,17 @@ package io.rsocket.kotlin.core import io.rsocket.kotlin.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.frame.io.* +import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.logging.* import io.rsocket.kotlin.transport.* import kotlinx.coroutines.* -@OptIn(TransportApi::class) +@OptIn(TransportApi::class, RSocketLoggingApi::class) public class RSocketServer internal constructor( private val loggerFactory: LoggerFactory, private val interceptors: Interceptors, ) { - public fun bind( - transport: ServerTransport, - block: suspend ConnectionAcceptorContext.() -> RSocket - ): T = bind(transport, ConnectionAcceptor(block)) - public fun bind( transport: ServerTransport, acceptor: ConnectionAcceptor, diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt index 14e1c8e74..58d169037 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/RSocketServerBuilder.kt @@ -16,9 +16,11 @@ package io.rsocket.kotlin.core +import io.rsocket.kotlin.* import io.rsocket.kotlin.logging.* public class RSocketServerBuilder internal constructor() { + @RSocketLoggingApi public var loggerFactory: LoggerFactory = DefaultLoggerFactory private val interceptors: InterceptorsBuilder = InterceptorsBuilder() @@ -27,6 +29,7 @@ public class RSocketServerBuilder internal constructor() { interceptors.configure() } + @OptIn(RSocketLoggingApi::class) internal fun build(): RSocketServer = RSocketServer(loggerFactory, interceptors.build()) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/CancelFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/CancelFrame.kt index 55b009742..8e0b15240 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/CancelFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/CancelFrame.kt @@ -19,8 +19,9 @@ package io.rsocket.kotlin.frame import io.ktor.utils.io.core.* internal class CancelFrame( - override val streamId: Int, -) : Frame(FrameType.Cancel) { + override val streamId: Int +) : Frame() { + override val type: FrameType get() = FrameType.Cancel override val flags: Int get() = 0 override fun release(): Unit = Unit diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ErrorFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ErrorFrame.kt index 877cc90b3..cb9f80e92 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ErrorFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ErrorFrame.kt @@ -18,39 +18,31 @@ package io.rsocket.kotlin.frame import io.ktor.utils.io.core.* import io.rsocket.kotlin.* -import io.rsocket.kotlin.frame.io.* internal class ErrorFrame( override val streamId: Int, - val throwable: Throwable, - val data: ByteReadPacket? = null, -) : Frame(FrameType.Error) { + val throwable: Throwable +) : Frame() { + override val type: FrameType get() = FrameType.Error override val flags: Int get() = 0 val errorCode get() = (throwable as? RSocketError)?.errorCode ?: ErrorCode.ApplicationError - override fun release() { - data?.release() - } + override fun release(): Unit = Unit override fun BytePacketBuilder.writeSelf() { writeInt(errorCode) - when (data) { - null -> writeText(throwable.message ?: "") - else -> writePacket(data) - } + writeText(throwable.message ?: "") } override fun StringBuilder.appendFlags(): Unit = Unit override fun StringBuilder.appendSelf() { append("\nError code: ").append(errorCode).append("[").append(throwable::class.simpleName).append("]") if (throwable.message != null) append(" Message: ").append(throwable.message) - if (data != null) appendPacket("Data:", data) } } internal fun ByteReadPacket.readError(streamId: Int): ErrorFrame { val errorCode = readInt() - val data = copy() val message = readText() - return ErrorFrame(streamId, RSocketError(streamId, errorCode, message), data) + return ErrorFrame(streamId, RSocketError(streamId, errorCode, message)) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ExtensionFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ExtensionFrame.kt index 526fb8527..f4bc2d35f 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ExtensionFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ExtensionFrame.kt @@ -17,6 +17,8 @@ package io.rsocket.kotlin.frame import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.frame.io.* import io.rsocket.kotlin.payload.* @@ -24,7 +26,8 @@ internal class ExtensionFrame( override val streamId: Int, val extendedType: Int, val payload: Payload, -) : Frame(FrameType.Extension) { +) : Frame() { + override val type: FrameType get() = FrameType.Extension override val flags: Int get() = if (payload.metadata != null) Flags.Metadata else 0 override fun release() { @@ -46,7 +49,7 @@ internal class ExtensionFrame( } } -internal fun ByteReadPacket.readExtension(pool: BufferPool, streamId: Int, flags: Int): ExtensionFrame { +internal fun ByteReadPacket.readExtension(pool: ObjectPool, streamId: Int, flags: Int): ExtensionFrame { val extendedType = readInt() val payload = readPayload(pool, flags) return ExtensionFrame(streamId, extendedType, payload) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt index e5285ceae..1923745ea 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt @@ -24,18 +24,18 @@ import io.rsocket.kotlin.frame.io.* private const val FlagsMask: Int = 1023 private const val FrameTypeShift: Int = 10 -sealed class Frame(open val type: FrameType) : Closeable { - abstract val streamId: Int - abstract val flags: Int +public sealed class Frame : Closeable { + public abstract val type: FrameType + public abstract val streamId: Int + public abstract val flags: Int - abstract fun release() + internal abstract fun release() protected abstract fun BytePacketBuilder.writeSelf() protected abstract fun StringBuilder.appendFlags() protected abstract fun StringBuilder.appendSelf() - @DangerousInternalIoApi - fun toPacket(pool: ObjectPool): ByteReadPacket { + internal fun toPacket(pool: ObjectPool): ByteReadPacket { check(type.canHaveMetadata || !(flags check Flags.Metadata)) { "bad value for metadata flag" } return buildPacket(pool) { writeInt(streamId) @@ -60,8 +60,7 @@ sealed class Frame(open val type: FrameType) : Closeable { } } -@DangerousInternalIoApi -fun ByteReadPacket.readFrame(pool: ObjectPool): Frame = use { +internal fun ByteReadPacket.readFrame(pool: ObjectPool): Frame = use { val streamId = readInt() val typeAndFlags = readShort().toInt() and 0xFFFF val flags = typeAndFlags and FlagsMask diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/FrameType.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/FrameType.kt index 8ee6c72a9..1ad359380 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/FrameType.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/FrameType.kt @@ -18,7 +18,7 @@ package io.rsocket.kotlin.frame import io.rsocket.kotlin.frame.io.* -internal enum class FrameType(val encodedType: Int, flags: Int = Flags.Empty) { +public enum class FrameType(public val encodedType: Int, flags: Int = Flags.Empty) { Reserved(0x00), //CONNECTION @@ -49,11 +49,11 @@ internal enum class FrameType(val encodedType: Int, flags: Int = Flags.Empty) { Extension(0x3F, Flags.CanHaveData or Flags.CanHaveMetadata); - val hasInitialRequest: Boolean = flags check Flags.HasInitialRequest - val isRequestType: Boolean = flags check Flags.Request - val isFragmentable: Boolean = flags check Flags.Fragmentable - val canHaveMetadata: Boolean = flags check Flags.CanHaveMetadata - val canHaveData: Boolean = flags check Flags.CanHaveData + public val hasInitialRequest: Boolean = flags check Flags.HasInitialRequest + public val isRequestType: Boolean = flags check Flags.Request + public val isFragmentable: Boolean = flags check Flags.Fragmentable + public val canHaveMetadata: Boolean = flags check Flags.CanHaveMetadata + public val canHaveData: Boolean = flags check Flags.CanHaveData private object Flags { const val Empty = 0 @@ -64,7 +64,7 @@ internal enum class FrameType(val encodedType: Int, flags: Int = Flags.Empty) { const val CanHaveData = 16 } - companion object { + internal companion object { private val encodedTypes: Array init { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/KeepAliveFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/KeepAliveFrame.kt index dbc9c0e7e..078c621e7 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/KeepAliveFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/KeepAliveFrame.kt @@ -17,6 +17,8 @@ package io.rsocket.kotlin.frame import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.frame.io.* private const val RespondFlag = 128 @@ -25,7 +27,8 @@ internal class KeepAliveFrame( val respond: Boolean, val lastPosition: Long, val data: ByteReadPacket, -) : Frame(FrameType.KeepAlive) { +) : Frame() { + override val type: FrameType get() = FrameType.KeepAlive override val streamId: Int get() = 0 override val flags: Int get() = if (respond) RespondFlag else 0 @@ -48,7 +51,7 @@ internal class KeepAliveFrame( } } -internal fun ByteReadPacket.readKeepAlive(pool: BufferPool, flags: Int): KeepAliveFrame { +internal fun ByteReadPacket.readKeepAlive(pool: ObjectPool, flags: Int): KeepAliveFrame { val respond = flags check RespondFlag val lastPosition = readLong() val data = readPacket(pool) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/LeaseFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/LeaseFrame.kt index 9f991dbaf..d3abdfb02 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/LeaseFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/LeaseFrame.kt @@ -17,13 +17,16 @@ package io.rsocket.kotlin.frame import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.frame.io.* internal class LeaseFrame( val ttl: Int, val numberOfRequests: Int, val metadata: ByteReadPacket?, -) : Frame(FrameType.Lease) { +) : Frame() { + override val type: FrameType get() = FrameType.Lease override val streamId: Int get() = 0 override val flags: Int get() = if (metadata != null) Flags.Metadata else 0 @@ -47,7 +50,7 @@ internal class LeaseFrame( } } -internal fun ByteReadPacket.readLease(pool: BufferPool, flags: Int): LeaseFrame { +internal fun ByteReadPacket.readLease(pool: ObjectPool, flags: Int): LeaseFrame { val ttl = readInt() val numberOfRequests = readInt() val metadata = if (flags check Flags.Metadata) readMetadata(pool) else null diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/MetadataPushFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/MetadataPushFrame.kt index 43bb6c5d0..66ffd3713 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/MetadataPushFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/MetadataPushFrame.kt @@ -17,11 +17,14 @@ package io.rsocket.kotlin.frame import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.frame.io.* internal class MetadataPushFrame( val metadata: ByteReadPacket, -) : Frame(FrameType.MetadataPush) { +) : Frame() { + override val type: FrameType get() = FrameType.MetadataPush override val streamId: Int get() = 0 override val flags: Int get() = Flags.Metadata @@ -42,4 +45,4 @@ internal class MetadataPushFrame( } } -internal fun ByteReadPacket.readMetadataPush(pool: BufferPool): MetadataPushFrame = MetadataPushFrame(readPacket(pool)) +internal fun ByteReadPacket.readMetadataPush(pool: ObjectPool): MetadataPushFrame = MetadataPushFrame(readPacket(pool)) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt index 53f7c9ad7..3575a1479 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestFrame.kt @@ -19,6 +19,8 @@ package io.rsocket.kotlin.frame import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.frame.io.* import io.rsocket.kotlin.payload.* @@ -30,7 +32,7 @@ internal class RequestFrame( val next: Boolean, val initialRequest: Int, val payload: Payload, -) : Frame(type) { +) : Frame() { override val flags: Int get() { var flags = 0 @@ -63,7 +65,13 @@ internal class RequestFrame( } } -internal fun ByteReadPacket.readRequest(pool: BufferPool, type: FrameType, streamId: Int, flags: Int, withInitial: Boolean): RequestFrame { +internal fun ByteReadPacket.readRequest( + pool: ObjectPool, + type: FrameType, + streamId: Int, + flags: Int, + withInitial: Boolean +): RequestFrame { val fragmentFollows = flags check Flags.Follows val complete = flags check Flags.Complete val next = flags check Flags.Next diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestNFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestNFrame.kt index a3405b691..bbf6dcc6f 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestNFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/RequestNFrame.kt @@ -21,7 +21,8 @@ import io.ktor.utils.io.core.* internal class RequestNFrame( override val streamId: Int, val requestN: Int, -) : Frame(FrameType.RequestN) { +) : Frame() { + override val type: FrameType get() = FrameType.RequestN override val flags: Int get() = 0 override fun release(): Unit = Unit diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeFrame.kt index e9b0b21f6..d24aef4df 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeFrame.kt @@ -17,6 +17,8 @@ package io.rsocket.kotlin.frame import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.frame.io.* internal class ResumeFrame( @@ -24,7 +26,8 @@ internal class ResumeFrame( val resumeToken: ByteReadPacket, val lastReceivedServerPosition: Long, val firstAvailableClientPosition: Long, -) : Frame(FrameType.Resume) { +) : Frame() { + override val type: FrameType get() = FrameType.Resume override val streamId: Int get() = 0 override val flags: Int get() = 0 @@ -47,7 +50,7 @@ internal class ResumeFrame( } } -internal fun ByteReadPacket.readResume(pool: BufferPool): ResumeFrame { +internal fun ByteReadPacket.readResume(pool: ObjectPool): ResumeFrame { val version = readVersion() val resumeToken = readResumeToken(pool) val lastReceivedServerPosition = readLong() diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeOkFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeOkFrame.kt index 27a63f520..0aa87ba31 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeOkFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/ResumeOkFrame.kt @@ -20,7 +20,8 @@ import io.ktor.utils.io.core.* internal class ResumeOkFrame( val lastReceivedClientPosition: Long, -) : Frame(FrameType.ResumeOk) { +) : Frame() { + override val type: FrameType get() = FrameType.ResumeOk override val streamId: Int get() = 0 override val flags: Int get() = 0 diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt index 6b98c1906..d94d59d0f 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/SetupFrame.kt @@ -17,6 +17,8 @@ package io.rsocket.kotlin.frame import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.frame.io.* import io.rsocket.kotlin.keepalive.* import io.rsocket.kotlin.payload.* @@ -31,7 +33,8 @@ internal class SetupFrame( val resumeToken: ByteReadPacket?, val payloadMimeType: PayloadMimeType, val payload: Payload, -) : Frame(FrameType.Setup) { +) : Frame() { + override val type: FrameType get() = FrameType.Setup override val streamId: Int get() = 0 override val flags: Int get() { @@ -73,7 +76,7 @@ internal class SetupFrame( } } -internal fun ByteReadPacket.readSetup(pool: BufferPool, flags: Int): SetupFrame { +internal fun ByteReadPacket.readSetup(pool: ObjectPool, flags: Int): SetupFrame { val version = readVersion() val keepAlive = run { val interval = readInt() diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/length.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/length.kt index 3c2155a88..d6af496f4 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/length.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/length.kt @@ -20,14 +20,14 @@ import io.ktor.utils.io.core.* private const val lengthMask: Int = 0xFFFFFF.inv() -fun ByteReadPacket.readLength(): Int { +internal fun ByteReadPacket.readLength(): Int { val b = readByte().toInt() and 0xFF shl 16 val b1 = readByte().toInt() and 0xFF shl 8 val b2 = readByte().toInt() and 0xFF return b or b1 or b2 } -fun BytePacketBuilder.writeLength(length: Int) { +internal fun BytePacketBuilder.writeLength(length: Int) { require(length and lengthMask == 0) { "Length is larger than 24 bits" } writeByte((length shr 16).toByte()) writeByte((length shr 8).toByte()) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/mimeType.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/mimeType.kt index cc13f2d5e..416fa7231 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/mimeType.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/mimeType.kt @@ -25,7 +25,6 @@ internal fun BytePacketBuilder.writeMimeType(type: MimeType) { when (type) { is MimeTypeWithId -> writeIdentifier(type.identifier) is MimeTypeWithName -> writeTextWithLength(type.text) - else -> error("Unknown mime type") } } @@ -38,7 +37,6 @@ internal fun BytePacketBuilder.writeAuthType(type: AuthType) { when (type) { is AuthTypeWithId -> writeIdentifier(type.identifier) is AuthTypeWithName -> writeTextWithLength(type.text) - else -> error("Unknown mime type") } } @@ -74,5 +72,5 @@ private inline fun ByteReadPacket.readType( } internal fun String.requireAscii() { - require(all { it.toInt() <= 0x7f }) { "String should be an ASCII encodded string" } + require(all { it.code <= 0x7f }) { "String should be an ASCII encodded string" } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/packet.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/packet.kt index 5d9bb0c57..ac0f7e614 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/packet.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/packet.kt @@ -20,10 +20,7 @@ import io.ktor.utils.io.core.* import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* -@OptIn(DangerousInternalIoApi::class) -internal typealias BufferPool = ObjectPool - -internal inline fun buildPacket(pool: BufferPool, block: BytePacketBuilder.() -> Unit): ByteReadPacket { +internal inline fun buildPacket(pool: ObjectPool, block: BytePacketBuilder.() -> Unit): ByteReadPacket { val builder = BytePacketBuilder(0, pool) try { block(builder) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/payload.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/payload.kt index 85d2fc6d4..228482bae 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/payload.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/payload.kt @@ -17,9 +17,11 @@ package io.rsocket.kotlin.frame.io import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* import io.rsocket.kotlin.payload.* -internal fun ByteReadPacket.readMetadata(pool: BufferPool): ByteReadPacket { +internal fun ByteReadPacket.readMetadata(pool: ObjectPool): ByteReadPacket { val length = readLength() return readPacket(pool, length) } @@ -31,7 +33,7 @@ internal fun BytePacketBuilder.writeMetadata(metadata: ByteReadPacket?) { } } -internal fun ByteReadPacket.readPayload(pool: BufferPool, flags: Int): Payload { +internal fun ByteReadPacket.readPayload(pool: ObjectPool, flags: Int): Payload { val metadata = if (flags check Flags.Metadata) readMetadata(pool) else null val data = readPacket(pool) return Payload(data = data, metadata = metadata) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/util.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/util.kt index 24f98ee9a..a181d59f9 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/util.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/io/util.kt @@ -17,11 +17,10 @@ package io.rsocket.kotlin.frame.io import io.ktor.utils.io.core.* -import io.rsocket.kotlin.keepalive.* -import io.rsocket.kotlin.payload.* -import kotlin.time.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* -internal fun ByteReadPacket.readResumeToken(pool: BufferPool): ByteReadPacket { +internal fun ByteReadPacket.readResumeToken(pool: ObjectPool): ByteReadPacket { val length = readShort().toInt() and 0xFFFF return readPacket(pool, length) } @@ -34,14 +33,14 @@ internal fun BytePacketBuilder.writeResumeToken(resumeToken: ByteReadPacket?) { } } -internal fun ByteReadPacket.readPacket(pool: BufferPool): ByteReadPacket { +internal fun ByteReadPacket.readPacket(pool: ObjectPool): ByteReadPacket { if (isEmpty) return ByteReadPacket.Empty return buildPacket(pool) { writePacket(this@readPacket) } } -internal fun ByteReadPacket.readPacket(pool: BufferPool, length: Int): ByteReadPacket { +internal fun ByteReadPacket.readPacket(pool: ObjectPool, length: Int): ByteReadPacket { if (length == 0) return ByteReadPacket.Empty return buildPacket(pool) { writePacket(this@readPacket, length) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/Connect.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt similarity index 95% rename from rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/Connect.kt rename to rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt index 23d98681c..a7fa760eb 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/Connect.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt @@ -14,10 +14,10 @@ * limitations under the License. */ -package io.rsocket.kotlin.core +package io.rsocket.kotlin.internal import io.rsocket.kotlin.* -import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.core.* @OptIn(TransportApi::class) internal suspend inline fun Connection.connect( diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/keepalive/KeepAliveHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/KeepAliveHandler.kt similarity index 96% rename from rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/keepalive/KeepAliveHandler.kt rename to rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/KeepAliveHandler.kt index 41ab9d275..072e3b235 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/keepalive/KeepAliveHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/KeepAliveHandler.kt @@ -14,11 +14,12 @@ * limitations under the License. */ -package io.rsocket.kotlin.keepalive +package io.rsocket.kotlin.internal import io.ktor.utils.io.core.* import io.rsocket.kotlin.* import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.keepalive.* import kotlinx.atomicfu.* import kotlinx.coroutines.* diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/LoggingConnection.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/LoggingConnection.kt similarity index 91% rename from rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/LoggingConnection.kt rename to rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/LoggingConnection.kt index 65eff5613..e283e7ce2 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/LoggingConnection.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/LoggingConnection.kt @@ -14,12 +14,11 @@ * limitations under the License. */ -@file:OptIn(TransportApi::class) +@file:OptIn(TransportApi::class, RSocketLoggingApi::class) -package io.rsocket.kotlin.core +package io.rsocket.kotlin.internal import io.ktor.utils.io.core.* -import io.ktor.utils.io.core.internal.* import io.rsocket.kotlin.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.logging.* @@ -27,7 +26,6 @@ import io.rsocket.kotlin.logging.* internal fun Connection.logging(logger: Logger): Connection = if (logger.isLoggable(LoggingLevel.DEBUG)) LoggingConnection(this, logger) else this -@OptIn(DangerousInternalIoApi::class) private class LoggingConnection( private val delegate: Connection, private val logger: Logger, diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketState.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketState.kt index 93a47aaf3..bf10a243d 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketState.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/RSocketState.kt @@ -120,10 +120,7 @@ internal class RSocketState( private fun handleFrame(responder: RSocketResponder, frame: Frame) { when (val streamId = frame.streamId) { 0 -> when (frame) { - is ErrorFrame -> { - job.cancel("Error frame received on 0 stream", frame.throwable) - frame.release() //TODO - } + is ErrorFrame -> job.cancel("Error frame received on 0 stream", frame.throwable) is KeepAliveFrame -> keepAliveHandler.receive(frame) is LeaseFrame -> { frame.release() @@ -139,10 +136,7 @@ internal class RSocketState( else -> when (frame) { is RequestNFrame -> limits[streamId]?.updateRequests(frame.requestN) is CancelFrame -> senders.remove(streamId)?.cancel() - is ErrorFrame -> { - receivers.remove(streamId)?.close(frame.throwable) - frame.release() - } + is ErrorFrame -> receivers.remove(streamId)?.close(frame.throwable) is RequestFrame -> when (frame.type) { FrameType.Payload -> receivers[streamId]?.safeOffer(frame) ?: frame.release() FrameType.RequestFnF -> responder.handleFireAndForget(frame) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/ReconnectableRSocket.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/ReconnectableRSocket.kt similarity index 98% rename from rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/ReconnectableRSocket.kt rename to rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/ReconnectableRSocket.kt index 7b8f938f8..7ae189a0f 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/core/ReconnectableRSocket.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/ReconnectableRSocket.kt @@ -14,11 +14,10 @@ * limitations under the License. */ -package io.rsocket.kotlin.core +package io.rsocket.kotlin.internal import io.ktor.utils.io.core.* import io.rsocket.kotlin.* -import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.logging.* import io.rsocket.kotlin.payload.* import kotlinx.coroutines.* @@ -26,6 +25,7 @@ import kotlinx.coroutines.flow.* internal typealias ReconnectPredicate = suspend (cause: Throwable, attempt: Long) -> Boolean +@OptIn(RSocketLoggingApi::class) @Suppress("FunctionName") internal suspend fun ReconnectableRSocket( logger: Logger, diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/flow/LimitingFlowCollector.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/flow/LimitingFlowCollector.kt index 1e63b6bf8..60ee453b4 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/flow/LimitingFlowCollector.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/flow/LimitingFlowCollector.kt @@ -39,12 +39,7 @@ internal class LimitingFlowCollector( } override suspend fun emit(value: Payload): Unit = value.closeOnError { - try { - useRequest() - } catch (t: Throwable) { - value.release() - throw t - } + useRequest() state.send(NextPayloadFrame(streamId, value)) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/Logging.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/Logging.kt index d786a6cb8..e7577bb4c 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/Logging.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/Logging.kt @@ -16,21 +16,28 @@ package io.rsocket.kotlin.logging -enum class LoggingLevel { TRACE, DEBUG, INFO, WARN, ERROR } +import io.rsocket.kotlin.* -fun interface LoggerFactory { - fun logger(tag: String): Logger +@RSocketLoggingApi +public enum class LoggingLevel { TRACE, DEBUG, INFO, WARN, ERROR } + +@RSocketLoggingApi +public fun interface LoggerFactory { + public fun logger(tag: String): Logger } -expect val DefaultLoggerFactory: LoggerFactory +@RSocketLoggingApi +internal expect val DefaultLoggerFactory: LoggerFactory -interface Logger { - val tag: String - fun isLoggable(level: LoggingLevel): Boolean - fun rawLog(level: LoggingLevel, throwable: Throwable?, message: Any?) +@RSocketLoggingApi +public interface Logger { + public val tag: String + public fun isLoggable(level: LoggingLevel): Boolean + public fun rawLog(level: LoggingLevel, throwable: Throwable?, message: Any?) } -inline fun Logger.log(level: LoggingLevel, throwable: Throwable? = null, message: () -> Any?) { +@RSocketLoggingApi +public inline fun Logger.log(level: LoggingLevel, throwable: Throwable? = null, message: () -> Any?) { if (!isLoggable(level)) return val msg = try { @@ -41,22 +48,27 @@ inline fun Logger.log(level: LoggingLevel, throwable: Throwable? = null, message rawLog(level, throwable, msg) } -inline fun Logger.trace(throwable: Throwable? = null, message: () -> Any?) { +@RSocketLoggingApi +public inline fun Logger.trace(throwable: Throwable? = null, message: () -> Any?) { log(LoggingLevel.TRACE, throwable, message) } -inline fun Logger.debug(throwable: Throwable? = null, message: () -> Any?) { +@RSocketLoggingApi +public inline fun Logger.debug(throwable: Throwable? = null, message: () -> Any?) { log(LoggingLevel.DEBUG, throwable, message) } -inline fun Logger.info(throwable: Throwable? = null, message: () -> Any?) { +@RSocketLoggingApi +public inline fun Logger.info(throwable: Throwable? = null, message: () -> Any?) { log(LoggingLevel.INFO, throwable, message) } -inline fun Logger.warn(throwable: Throwable? = null, message: () -> Any?) { +@RSocketLoggingApi +public inline fun Logger.warn(throwable: Throwable? = null, message: () -> Any?) { log(LoggingLevel.WARN, throwable, message) } -inline fun Logger.error(throwable: Throwable? = null, message: () -> Any?) { +@RSocketLoggingApi +public inline fun Logger.error(throwable: Throwable? = null, message: () -> Any?) { log(LoggingLevel.ERROR, throwable, message) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/NoopLogger.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/NoopLogger.kt index eb43ac305..5ba77476f 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/NoopLogger.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/NoopLogger.kt @@ -16,10 +16,13 @@ package io.rsocket.kotlin.logging +import io.rsocket.kotlin.* + /** * Logger implementation, that never print */ -object NoopLogger : Logger, LoggerFactory { +@RSocketLoggingApi +public object NoopLogger : Logger, LoggerFactory { override val tag: String get() = "noop" override fun isLoggable(level: LoggingLevel): Boolean = false override fun rawLog(level: LoggingLevel, throwable: Throwable?, message: Any?): Unit = Unit diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/PrintLogger.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/PrintLogger.kt index 2f80e94fd..a3aff9b4a 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/PrintLogger.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/logging/PrintLogger.kt @@ -16,7 +16,10 @@ package io.rsocket.kotlin.logging -class PrintLogger( +import io.rsocket.kotlin.* + +@RSocketLoggingApi +public class PrintLogger( override val tag: String, private val minLevel: LoggingLevel = LoggingLevel.INFO, ) : Logger { @@ -26,9 +29,9 @@ class PrintLogger( println("[$level] ($tag) $message $error") } - companion object : LoggerFactory { + public companion object : LoggerFactory { override fun logger(tag: String): Logger = PrintLogger(tag) - fun withLevel(minLevel: LoggingLevel): LoggerFactory = LoggerFactory { PrintLogger(it, minLevel) } + public fun withLevel(minLevel: LoggingLevel): LoggerFactory = LoggerFactory { PrintLogger(it, minLevel) } } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadata.kt index d88f3339e..24d567149 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadata.kt @@ -50,8 +50,6 @@ public interface CompositeMetadata : Metadata { public companion object Reader : MetadataReader { override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketCompositeMetadata - - @DangerousInternalIoApi override fun ByteReadPacket.read(pool: ObjectPool): CompositeMetadata { val list = mutableListOf() while (isNotEmpty) { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataExtensions.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataExtensions.kt index 034de0f5c..848c03bfa 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataExtensions.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataExtensions.kt @@ -26,26 +26,19 @@ import io.rsocket.kotlin.core.* public fun CompositeMetadata.Entry.hasMimeTypeOf(reader: MetadataReader<*>): Boolean = mimeType == reader.mimeType @ExperimentalMetadataApi -@OptIn(DangerousInternalIoApi::class) -public fun CompositeMetadata.Entry.read(reader: MetadataReader): M = read(ChunkBuffer.Pool, reader) - -@ExperimentalMetadataApi -@OptIn(DangerousInternalIoApi::class) -public fun CompositeMetadata.Entry.readOrNull(reader: MetadataReader): M? = readOrNull(ChunkBuffer.Pool, reader) - -@ExperimentalMetadataApi -@DangerousInternalIoApi -public fun CompositeMetadata.Entry.read(pool: ObjectPool, reader: MetadataReader): M { - if (mimeType == reader.mimeType) return content.read(pool, reader) +public fun CompositeMetadata.Entry.read(reader: MetadataReader, pool: ObjectPool = ChunkBuffer.Pool): M { + if (mimeType == reader.mimeType) return content.read(reader, pool) content.release() error("Expected mimeType '${reader.mimeType}' but was '$mimeType'") } @ExperimentalMetadataApi -@DangerousInternalIoApi -public fun CompositeMetadata.Entry.readOrNull(pool: ObjectPool, reader: MetadataReader): M? { - return if (mimeType == reader.mimeType) content.read(pool, reader) else null +public fun CompositeMetadata.Entry.readOrNull( + reader: MetadataReader, + pool: ObjectPool = ChunkBuffer.Pool +): M? { + return if (mimeType == reader.mimeType) content.read(reader, pool) else null } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/Metadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/Metadata.kt index a94d1ad70..b8709705b 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/Metadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/Metadata.kt @@ -33,8 +33,6 @@ public interface Metadata { @ExperimentalMetadataApi public interface MetadataReader { public val mimeType: MimeType - - @DangerousInternalIoApi public fun ByteReadPacket.read(pool: ObjectPool): M } @@ -43,19 +41,9 @@ public interface MetadataReader { public fun PayloadBuilder.metadata(metadata: Metadata): Unit = metadata(metadata.toPacket()) @ExperimentalMetadataApi -@OptIn(DangerousInternalIoApi::class) -public fun ByteReadPacket.read(reader: MetadataReader): M = read(ChunkBuffer.Pool, reader) - -@ExperimentalMetadataApi -public fun Metadata.toPacket(): ByteReadPacket = buildPacket { writeSelf() } - - -@ExperimentalMetadataApi -@DangerousInternalIoApi -public fun ByteReadPacket.read(pool: ObjectPool, reader: MetadataReader): M = use { +public fun ByteReadPacket.read(reader: MetadataReader, pool: ObjectPool = ChunkBuffer.Pool): M = use { with(reader) { read(pool) } } @ExperimentalMetadataApi -@DangerousInternalIoApi -public fun Metadata.toPacket(pool: ObjectPool): ByteReadPacket = buildPacket(pool) { writeSelf() } +public fun Metadata.toPacket(pool: ObjectPool = ChunkBuffer.Pool): ByteReadPacket = buildPacket(pool) { writeSelf() } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamAcceptableDataMimeTypesMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamAcceptableDataMimeTypesMetadata.kt index 1e8e6a8ac..7b7eb61b2 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamAcceptableDataMimeTypesMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamAcceptableDataMimeTypesMetadata.kt @@ -39,8 +39,6 @@ public class PerStreamAcceptableDataMimeTypesMetadata(public val types: List { override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketAcceptMimeTypes - - @DangerousInternalIoApi override fun ByteReadPacket.read(pool: ObjectPool): PerStreamAcceptableDataMimeTypesMetadata { val list = mutableListOf() while (isNotEmpty) list.add(readMimeType()) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamDataMimeTypeMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamDataMimeTypeMetadata.kt index 77a72b951..cb98dc3ba 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamDataMimeTypeMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/PerStreamDataMimeTypeMetadata.kt @@ -33,8 +33,6 @@ public class PerStreamDataMimeTypeMetadata(public val type: MimeType) : Metadata public companion object Reader : MetadataReader { override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketMimeType - - @DangerousInternalIoApi override fun ByteReadPacket.read(pool: ObjectPool): PerStreamDataMimeTypeMetadata = PerStreamDataMimeTypeMetadata(readMimeType()) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RawMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RawMetadata.kt index f18d3abb5..021401fb4 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RawMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RawMetadata.kt @@ -33,7 +33,6 @@ public class RawMetadata( } private class Reader(override val mimeType: MimeType) : MetadataReader { - @DangerousInternalIoApi override fun ByteReadPacket.read(pool: ObjectPool): RawMetadata = RawMetadata(mimeType, readPacket(pool)) } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RoutingMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RoutingMetadata.kt index 94d545b97..77a39cf1d 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RoutingMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/RoutingMetadata.kt @@ -45,8 +45,6 @@ public class RoutingMetadata(public val tags: List) : Metadata { public companion object Reader : MetadataReader { override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketRouting - - @DangerousInternalIoApi override fun ByteReadPacket.read(pool: ObjectPool): RoutingMetadata { val list = mutableListOf() while (isNotEmpty) { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/ZipkinTracingMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/ZipkinTracingMetadata.kt index 5f50c8e24..4824f5f99 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/ZipkinTracingMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/ZipkinTracingMetadata.kt @@ -69,8 +69,6 @@ public class ZipkinTracingMetadata internal constructor( public companion object Reader : MetadataReader { override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketTracingZipkin - - @DangerousInternalIoApi override fun ByteReadPacket.read(pool: ObjectPool): ZipkinTracingMetadata { val flags = readByte() diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/AuthMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/AuthMetadata.kt index bd1e584bc..40f5c008a 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/AuthMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/AuthMetadata.kt @@ -39,12 +39,9 @@ public interface AuthMetadata : Metadata { @ExperimentalMetadataApi public interface AuthMetadataReader : MetadataReader { - @DangerousInternalIoApi public fun ByteReadPacket.readContent(type: AuthType, pool: ObjectPool): AM override val mimeType: MimeType get() = WellKnownMimeType.MessageRSocketAuthentication - - @DangerousInternalIoApi override fun ByteReadPacket.read(pool: ObjectPool): AM { val type = readAuthType() return readContent(type, pool) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/AuthType.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/AuthType.kt index d6d2bc101..17f5e8e57 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/AuthType.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/AuthType.kt @@ -18,13 +18,13 @@ package io.rsocket.kotlin.metadata.security import io.rsocket.kotlin.frame.io.* -public interface AuthType +public sealed interface AuthType -public interface AuthTypeWithName : AuthType { +public sealed interface AuthTypeWithName : AuthType { public val text: String } -public interface AuthTypeWithId : AuthType { +public sealed interface AuthTypeWithId : AuthType { public val identifier: Byte } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/BearerAuthMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/BearerAuthMetadata.kt index a22f69472..d0771d2e8 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/BearerAuthMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/BearerAuthMetadata.kt @@ -31,7 +31,6 @@ public class BearerAuthMetadata( } public companion object Reader : AuthMetadataReader { - @DangerousInternalIoApi override fun ByteReadPacket.readContent(type: AuthType, pool: ObjectPool): BearerAuthMetadata { require(type == WellKnowAuthType.Bearer) { "Metadata auth type should be 'bearer'" } val token = readText() diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/RawAuthMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/RawAuthMetadata.kt index fba48b397..7ecefc93b 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/RawAuthMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/RawAuthMetadata.kt @@ -33,7 +33,6 @@ public class RawAuthMetadata( } public companion object Reader : AuthMetadataReader { - @DangerousInternalIoApi override fun ByteReadPacket.readContent(type: AuthType, pool: ObjectPool): RawAuthMetadata { val content = readPacket(pool) return RawAuthMetadata(type, content) @@ -45,26 +44,19 @@ public class RawAuthMetadata( public fun RawAuthMetadata.hasAuthTypeOf(reader: AuthMetadataReader<*>): Boolean = type == reader.mimeType @ExperimentalMetadataApi -@OptIn(DangerousInternalIoApi::class) -public fun RawAuthMetadata.read(reader: AuthMetadataReader): AM = read(ChunkBuffer.Pool, reader) - -@ExperimentalMetadataApi -@OptIn(DangerousInternalIoApi::class) -public fun RawAuthMetadata.readOrNull(reader: AuthMetadataReader): AM? = readOrNull(ChunkBuffer.Pool, reader) - - -@ExperimentalMetadataApi -@DangerousInternalIoApi -public fun RawAuthMetadata.read(pool: ObjectPool, reader: AuthMetadataReader): AM { - return readOrNull(pool, reader) ?: run { +public fun RawAuthMetadata.read(reader: AuthMetadataReader, pool: ObjectPool = ChunkBuffer.Pool): AM { + return readOrNull(reader, pool) ?: run { content.release() error("Expected auth type '${reader.mimeType}' but was '$mimeType'") } } @ExperimentalMetadataApi -@DangerousInternalIoApi -public fun RawAuthMetadata.readOrNull(pool: ObjectPool, reader: AuthMetadataReader): AM? { + +public fun RawAuthMetadata.readOrNull( + reader: AuthMetadataReader, + pool: ObjectPool = ChunkBuffer.Pool +): AM? { if (type != reader.mimeType) return null with(reader) { diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/SimpleAuthMetadata.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/SimpleAuthMetadata.kt index 089eca748..a44413a71 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/SimpleAuthMetadata.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/metadata/security/SimpleAuthMetadata.kt @@ -40,7 +40,6 @@ public class SimpleAuthMetadata( } public companion object Reader : AuthMetadataReader { - @DangerousInternalIoApi override fun ByteReadPacket.readContent(type: AuthType, pool: ObjectPool): SimpleAuthMetadata { require(type == WellKnowAuthType.Simple) { "Metadata auth type should be 'simple'" } val length = readShort().toInt() diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/Payload.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/Payload.kt index 91afce1f3..5eee9595f 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/Payload.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/Payload.kt @@ -20,7 +20,7 @@ import io.ktor.utils.io.core.* public fun Payload(data: ByteReadPacket, metadata: ByteReadPacket? = null): Payload = DefaultPayload(data, metadata) -public interface Payload : Closeable { +public sealed interface Payload : Closeable { public val data: ByteReadPacket public val metadata: ByteReadPacket? diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/PayloadBuilder.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/PayloadBuilder.kt index 5935a655b..a0f1a3f77 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/PayloadBuilder.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/payload/PayloadBuilder.kt @@ -18,7 +18,7 @@ package io.rsocket.kotlin.payload import io.ktor.utils.io.core.* -public interface PayloadBuilder { +public sealed interface PayloadBuilder { public fun data(value: ByteReadPacket) public fun metadata(value: ByteReadPacket) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/ClientTransport.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/ClientTransport.kt index 90f002aa8..499336aea 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/ClientTransport.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/ClientTransport.kt @@ -18,14 +18,7 @@ package io.rsocket.kotlin.transport import io.rsocket.kotlin.* -//TODO fun interfaces don't support `suspend` functions for now... (seems will work in kotlin 1.5) - -public /*fun*/ interface ClientTransport { +public fun interface ClientTransport { @TransportApi public suspend fun connect(): Connection } - -@TransportApi -public inline fun ClientTransport(crossinline block: suspend () -> Connection): ClientTransport = object : ClientTransport { - override suspend fun connect(): Connection = block() -} diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/RSocketCustomErrorTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/RSocketCustomErrorTest.kt index 5aff5f73a..cd80282e1 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/RSocketCustomErrorTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/RSocketCustomErrorTest.kt @@ -1,8 +1,6 @@ package io.rsocket.kotlin -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertFailsWith +import kotlin.test.* class RSocketCustomErrorTest { diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/SetupRejectionTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/SetupRejectionTest.kt index 3a795f35e..75d560ec0 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/SetupRejectionTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/SetupRejectionTest.kt @@ -52,7 +52,6 @@ class SetupRejectionTest : SuspendTest, TestWithLeakCheck { assertTrue(frame is ErrorFrame) assertTrue(frame.throwable is RSocketError.Setup.Rejected) assertEquals(errorMessage, frame.throwable.message) - assertEquals(errorMessage, frame.data?.readText()) } val sender = sendingRSocket.await() assertFalse(sender.job.isActive) diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt index fcdf8f28e..28691951a 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt @@ -61,7 +61,7 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { return RSocketConnector { loggerFactory = NoopLogger connectionConfig { - keepAlive = KeepAlive(1000.seconds, 1000.seconds) + keepAlive = KeepAlive(Duration.seconds(1000), Duration.seconds(1000)) } }.connect(localServer) } diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/ReconnectableRSocketTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/ReconnectableRSocketTest.kt index f8548a129..f2431fd39 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/ReconnectableRSocketTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/ReconnectableRSocketTest.kt @@ -19,6 +19,7 @@ package io.rsocket.kotlin.core import app.cash.turbine.* import io.ktor.utils.io.core.* import io.rsocket.kotlin.* +import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.logging.* import io.rsocket.kotlin.payload.* import io.rsocket.kotlin.test.* diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/ErrorFrameTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/ErrorFrameTest.kt index d726e0f64..9267fccb5 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/ErrorFrameTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/ErrorFrameTest.kt @@ -44,7 +44,6 @@ class ErrorFrameTest : TestWithLeakCheck { assertEquals(ErrorCode.ApplicationError, frame.errorCode) assertTrue(frame.throwable is RSocketError.ApplicationError) assertEquals("d", frame.throwable.message) - assertEquals("d", frame.data?.readText()) } } diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/SetupFrameTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/SetupFrameTest.kt index 1cb9dba86..4533be2a0 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/SetupFrameTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/SetupFrameTest.kt @@ -28,7 +28,7 @@ import kotlin.time.* class SetupFrameTest : TestWithLeakCheck { private val version = Version.Current - private val keepAlive = KeepAlive(10.seconds, 500.seconds) + private val keepAlive = KeepAlive(Duration.seconds(10), Duration.seconds(500)) private val payloadMimeType = PayloadMimeType(WellKnownMimeType.ApplicationOctetStream, CustomMimeType("mime")) @Test diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt index fff4d66a7..34dccb712 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt @@ -33,7 +33,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { override suspend fun before() { super.before() - val state = RSocketState(connection, KeepAlive(1000.seconds, 1000.seconds)) + val state = RSocketState(connection, KeepAlive(Duration.seconds(1000), Duration.seconds(1000))) requester = RSocketRequester(state, StreamId.client()) state.start(RSocketRequestHandler { }) } diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt index 7e64e59b4..6fabeb30b 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt @@ -27,7 +27,7 @@ import kotlin.time.* class KeepAliveTest : TestWithConnection(), TestWithLeakCheck { - private fun requester(keepAlive: KeepAlive = KeepAlive(100.milliseconds, 1.seconds)): RSocket = run { + private fun requester(keepAlive: KeepAlive = KeepAlive(Duration.milliseconds(100), Duration.seconds(1))): RSocket = run { val state = RSocketState(connection, keepAlive) val requester = RSocketRequester(state, StreamId.client()) state.start(RSocketRequestHandler { }) @@ -49,14 +49,14 @@ class KeepAliveTest : TestWithConnection(), TestWithLeakCheck { @Test fun rSocketNotCanceledOnPresentKeepAliveTicks() = test { - val rSocket = requester(KeepAlive(100.seconds, 100.seconds)) + val rSocket = requester(KeepAlive(Duration.seconds(100), Duration.seconds(100))) connection.launch { repeat(50) { - delay(100.milliseconds) + delay(Duration.milliseconds(100)) connection.sendToReceiver(KeepAliveFrame(true, 0, ByteReadPacket.Empty)) } } - delay(1.5.seconds) + delay(Duration.seconds(1.5)) assertTrue(rSocket.job.isActive) connection.test { repeat(50) { @@ -67,10 +67,10 @@ class KeepAliveTest : TestWithConnection(), TestWithLeakCheck { @Test fun requesterRespondsToKeepAlive() = test { - requester(KeepAlive(100.seconds, 100.seconds)) + requester(KeepAlive(Duration.seconds(100), Duration.seconds(100))) connection.launch { while (isActive) { - delay(100.milliseconds) + delay(Duration.milliseconds(100)) connection.sendToReceiver(KeepAliveFrame(true, 0, ByteReadPacket.Empty)) } } diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataTest.kt index 2d1476f3a..b256759fe 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/CompositeMetadataTest.kt @@ -29,7 +29,7 @@ class CompositeMetadataTest : TestWithLeakCheck { add(CustomMimeType("w"), ByteReadPacket.Empty) } - val decoded = cm.toPacket(InUseTrackingPool).read(InUseTrackingPool, CompositeMetadata) + val decoded = cm.readLoop(CompositeMetadata) assertEquals(1, decoded.entries.size) val entry = decoded.entries.first() @@ -44,7 +44,7 @@ class CompositeMetadataTest : TestWithLeakCheck { add(ReservedMimeType(120), packet("reserved metadata")) add(WellKnownMimeType.ApplicationAvro, packet("avro metadata")) } - val decoded = cm.toPacket(InUseTrackingPool).read(InUseTrackingPool, CompositeMetadata) + val decoded = cm.readLoop(CompositeMetadata) assertEquals(3, decoded.entries.size) decoded.entries[0].let { custom -> @@ -67,7 +67,7 @@ class CompositeMetadataTest : TestWithLeakCheck { writeByte(120) } assertFails { - packet.read(InUseTrackingPool, CompositeMetadata) + packet.read(CompositeMetadata, InUseTrackingPool) } } @@ -78,7 +78,7 @@ class CompositeMetadataTest : TestWithLeakCheck { add(ReservedMimeType(120), packet("reserved metadata")) add(WellKnownMimeType.ApplicationAvro, packet("avro metadata")) } - val decoded = cm.toPacket(InUseTrackingPool).read(InUseTrackingPool, CompositeMetadata) + val decoded = cm.readLoop(CompositeMetadata) assertTrue(CustomMimeType("custom") in decoded) assertTrue(CustomMimeType("custom2") !in decoded) @@ -99,7 +99,7 @@ class CompositeMetadataTest : TestWithLeakCheck { add(ReservedMimeType(120), packet("reserved metadata")) add(WellKnownMimeType.ApplicationAvro, packet("avro metadata")) } - val decoded = cm.toPacket(InUseTrackingPool).read(InUseTrackingPool, CompositeMetadata) + val decoded = cm.readLoop(CompositeMetadata) assertEquals("custom metadata", decoded[CustomMimeType("custom")].readText()) assertEquals("reserved metadata", decoded[ReservedMimeType(120)].readText()) @@ -113,7 +113,7 @@ class CompositeMetadataTest : TestWithLeakCheck { add(ReservedMimeType(120), packet("reserved metadata")) add(WellKnownMimeType.ApplicationAvro, packet("avro metadata")) } - val decoded = cm.toPacket(InUseTrackingPool).read(InUseTrackingPool, CompositeMetadata) + val decoded = cm.readLoop(CompositeMetadata) assertNull(decoded.getOrNull(ReservedMimeType(121))) assertNull(decoded.getOrNull(CustomMimeType("custom2"))) @@ -133,7 +133,7 @@ class CompositeMetadataTest : TestWithLeakCheck { add(WellKnownMimeType.MessageRSocketRouting, packet("routing metadata")) add(CustomMimeType("custom"), packet("custom metadata - 2")) } - val decoded = cm.toPacket(InUseTrackingPool).read(InUseTrackingPool, CompositeMetadata) + val decoded = cm.readLoop(CompositeMetadata) assertEquals(5, decoded.entries.size) @@ -183,15 +183,17 @@ class CompositeMetadataTest : TestWithLeakCheck { fun testCombine() { val cm = buildCompositeMetadata { add(RoutingMetadata("tag1", "tag2")) - add(PerStreamAcceptableDataMimeTypesMetadata( - WellKnownMimeType.ApplicationAvro, - CustomMimeType("application/custom"), - ReservedMimeType(120) - )) + add( + PerStreamAcceptableDataMimeTypesMetadata( + WellKnownMimeType.ApplicationAvro, + CustomMimeType("application/custom"), + ReservedMimeType(120) + ) + ) add(WellKnownMimeType.ApplicationJson, packet("{}")) } - val decoded = cm.toPacket(InUseTrackingPool).read(InUseTrackingPool, CompositeMetadata) + val decoded = cm.readLoop(CompositeMetadata) assertEquals(3, decoded.entries.size) assertEquals(listOf("tag1", "tag2"), decoded[RoutingMetadata].tags) diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/PerStreamAcceptableDataMimeTypesMetadataTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/PerStreamAcceptableDataMimeTypesMetadataTest.kt index 20378d42a..6890add80 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/PerStreamAcceptableDataMimeTypesMetadataTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/PerStreamAcceptableDataMimeTypesMetadataTest.kt @@ -31,7 +31,7 @@ class PerStreamAcceptableDataMimeTypesMetadataTest : TestWithLeakCheck { ReservedMimeType(120), CustomMimeType("custom2"), ) - val decoded = metadata.toPacket(InUseTrackingPool).read(InUseTrackingPool, PerStreamAcceptableDataMimeTypesMetadata) + val decoded = metadata.readLoop(PerStreamAcceptableDataMimeTypesMetadata) assertEquals(WellKnownMimeType.MessageRSocketAcceptMimeTypes, decoded.mimeType) assertEquals(6, decoded.types.size) assertEquals( diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/PerStreamDataMimeTypeMetadataTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/PerStreamDataMimeTypeMetadataTest.kt index 7734aba76..1b19b5b39 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/PerStreamDataMimeTypeMetadataTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/PerStreamDataMimeTypeMetadataTest.kt @@ -24,7 +24,7 @@ class PerStreamDataMimeTypeMetadataTest : TestWithLeakCheck { @Test fun encodeReserved() { val metadata = PerStreamDataMimeTypeMetadata(ReservedMimeType(110)) - val decoded = metadata.toPacket(InUseTrackingPool).read(InUseTrackingPool, PerStreamDataMimeTypeMetadata) + val decoded = metadata.readLoop(PerStreamDataMimeTypeMetadata) assertEquals(WellKnownMimeType.MessageRSocketMimeType, decoded.mimeType) assertEquals(ReservedMimeType(110), decoded.type) } @@ -32,7 +32,7 @@ class PerStreamDataMimeTypeMetadataTest : TestWithLeakCheck { @Test fun encodeCustom() { val metadata = PerStreamDataMimeTypeMetadata(CustomMimeType("custom-2")) - val decoded = metadata.toPacket(InUseTrackingPool).read(InUseTrackingPool, PerStreamDataMimeTypeMetadata) + val decoded = metadata.readLoop(PerStreamDataMimeTypeMetadata) assertEquals(WellKnownMimeType.MessageRSocketMimeType, decoded.mimeType) assertEquals(CustomMimeType("custom-2"), decoded.type) } @@ -40,7 +40,7 @@ class PerStreamDataMimeTypeMetadataTest : TestWithLeakCheck { @Test fun encodeWellKnown() { val metadata = PerStreamDataMimeTypeMetadata(WellKnownMimeType.ApplicationGraphql) - val decoded = metadata.toPacket(InUseTrackingPool).read(InUseTrackingPool, PerStreamDataMimeTypeMetadata) + val decoded = metadata.readLoop(PerStreamDataMimeTypeMetadata) assertEquals(WellKnownMimeType.MessageRSocketMimeType, decoded.mimeType) assertEquals(WellKnownMimeType.ApplicationGraphql, decoded.type) } diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/RoutingMetadataTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/RoutingMetadataTest.kt index 2ee2a4876..47e16ed87 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/RoutingMetadataTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/RoutingMetadataTest.kt @@ -24,7 +24,7 @@ class RoutingMetadataTest : TestWithLeakCheck { fun encodeMetadata() { val tags = listOf("ws://localhost:8080/rsocket", "x".repeat(200)) val metadata = RoutingMetadata(tags) - val decodedMetadata = metadata.toPacket(InUseTrackingPool).read(InUseTrackingPool, RoutingMetadata) + val decodedMetadata = metadata.readLoop(RoutingMetadata) assertEquals(tags, decodedMetadata.tags) } diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/Util.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/Util.kt index 03a72aeaf..95918a63a 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/Util.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/Util.kt @@ -18,4 +18,4 @@ package io.rsocket.kotlin.metadata import io.rsocket.kotlin.test.* -fun Metadata.readLoop(reader: MetadataReader): M = toPacket(InUseTrackingPool).read(InUseTrackingPool, reader) +fun Metadata.readLoop(reader: MetadataReader): M = toPacket(InUseTrackingPool).read(reader, InUseTrackingPool) diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/security/AuthMetadataTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/security/AuthMetadataTest.kt index cb086a580..0007fdc6c 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/security/AuthMetadataTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/metadata/security/AuthMetadataTest.kt @@ -52,7 +52,8 @@ class AuthMetadataTest : TestWithLeakCheck { @Test fun encodeBearer() { - val token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJpYXQxIjoxNTE2MjM5MDIyLCJpYXQyIjoxNTE2MjM5MDIyLCJpYXQzIjoxNTE2MjM5MDIyLCJpYXQ0IjoxNTE2MjM5MDIyfQ.ljYuH-GNyyhhLcx-rHMchRkGbNsR2_4aSxo8XjrYrSM"; + val token = + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJpYXQxIjoxNTE2MjM5MDIyLCJpYXQyIjoxNTE2MjM5MDIyLCJpYXQzIjoxNTE2MjM5MDIyLCJpYXQ0IjoxNTE2MjM5MDIyfQ.ljYuH-GNyyhhLcx-rHMchRkGbNsR2_4aSxo8XjrYrSM"; val metadata = BearerAuthMetadata(token) val decoded = metadata.readLoop(BearerAuthMetadata) diff --git a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/keepalive/currentMillis.kt b/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/currentMillis.kt similarity index 95% rename from rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/keepalive/currentMillis.kt rename to rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/currentMillis.kt index 7d7044464..7d34b3374 100644 --- a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/keepalive/currentMillis.kt +++ b/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/internal/currentMillis.kt @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.rsocket.kotlin.keepalive +package io.rsocket.kotlin.internal import kotlin.js.* diff --git a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt b/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt index 1d81792b1..6d85fa4e3 100644 --- a/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt +++ b/rsocket-core/src/jsMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt @@ -16,9 +16,14 @@ package io.rsocket.kotlin.logging -actual val DefaultLoggerFactory: LoggerFactory get() = ConsoleLogger +import io.rsocket.kotlin.* -class ConsoleLogger( +@RSocketLoggingApi +internal actual val DefaultLoggerFactory: LoggerFactory + get() = ConsoleLogger + +@RSocketLoggingApi +public class ConsoleLogger( override val tag: String, private val minLevel: LoggingLevel = LoggingLevel.INFO, ) : Logger { @@ -27,16 +32,16 @@ class ConsoleLogger( val meta = "[$level] ($tag)" when (level) { LoggingLevel.ERROR -> throwable?.let { console.error(meta, message, "Error:", it) } ?: console.error(meta, message) - LoggingLevel.WARN -> throwable?.let { console.warn(meta, message, "Error:", it) } ?: console.warn(meta, message) - LoggingLevel.INFO -> throwable?.let { console.info(meta, message, "Error:", it) } ?: console.info(meta, message) + LoggingLevel.WARN -> throwable?.let { console.warn(meta, message, "Error:", it) } ?: console.warn(meta, message) + LoggingLevel.INFO -> throwable?.let { console.info(meta, message, "Error:", it) } ?: console.info(meta, message) LoggingLevel.DEBUG -> throwable?.let { console.log(meta, message, "Error:", it) } ?: console.log(meta, message) LoggingLevel.TRACE -> throwable?.let { console.log(meta, message, "Error:", it) } ?: console.log(meta, message) } } - companion object : LoggerFactory { + public companion object : LoggerFactory { override fun logger(tag: String): Logger = ConsoleLogger(tag) - fun withLevel(minLevel: LoggingLevel): LoggerFactory = LoggerFactory { ConsoleLogger(it, minLevel) } + public fun withLevel(minLevel: LoggingLevel): LoggerFactory = LoggerFactory { ConsoleLogger(it, minLevel) } } } diff --git a/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/keepalive/currentMillis.kt b/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/currentMillis.kt similarity index 95% rename from rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/keepalive/currentMillis.kt rename to rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/currentMillis.kt index 96d728147..c1d912da2 100644 --- a/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/keepalive/currentMillis.kt +++ b/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/internal/currentMillis.kt @@ -14,6 +14,6 @@ * limitations under the License. */ -package io.rsocket.kotlin.keepalive +package io.rsocket.kotlin.internal internal actual fun currentMillis(): Long = System.currentTimeMillis() diff --git a/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt b/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt index 16296b10f..8c436f504 100644 --- a/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt +++ b/rsocket-core/src/jvmMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt @@ -16,21 +16,25 @@ package io.rsocket.kotlin.logging +import io.rsocket.kotlin.* import java.util.logging.* import java.util.logging.Level as JLevel import java.util.logging.Logger as JLogger -actual val DefaultLoggerFactory: LoggerFactory get() = JavaLogger +@RSocketLoggingApi +internal actual val DefaultLoggerFactory: LoggerFactory + get() = JavaLogger -class JavaLogger(override val tag: String) : Logger { +@RSocketLoggingApi +public class JavaLogger(override val tag: String) : Logger { private val jLogger = JLogger.getLogger(tag) private val LoggingLevel.jLevel: JLevel get() = when (this) { LoggingLevel.TRACE -> JLevel.FINEST LoggingLevel.DEBUG -> JLevel.FINE - LoggingLevel.INFO -> JLevel.INFO - LoggingLevel.WARN -> JLevel.WARNING + LoggingLevel.INFO -> JLevel.INFO + LoggingLevel.WARN -> JLevel.WARNING LoggingLevel.ERROR -> JLevel.SEVERE } @@ -46,7 +50,7 @@ class JavaLogger(override val tag: String) : Logger { jLogger.log(record) } - companion object : LoggerFactory { + public companion object : LoggerFactory { override fun logger(tag: String): Logger = JavaLogger(tag) } } diff --git a/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/keepalive/currentMillis.kt b/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/currentMillis.kt similarity index 95% rename from rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/keepalive/currentMillis.kt rename to rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/currentMillis.kt index 2ad9d33c2..6c9aae617 100644 --- a/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/keepalive/currentMillis.kt +++ b/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/internal/currentMillis.kt @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.rsocket.kotlin.keepalive +package io.rsocket.kotlin.internal import kotlin.system.* diff --git a/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt b/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt index eff845a55..d86342197 100644 --- a/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt +++ b/rsocket-core/src/nativeMain/kotlin/io/rsocket/kotlin/logging/DefaultLoggerFactory.kt @@ -16,4 +16,8 @@ package io.rsocket.kotlin.logging -actual val DefaultLoggerFactory: LoggerFactory get() = PrintLogger +import io.rsocket.kotlin.* + +@RSocketLoggingApi +public actual val DefaultLoggerFactory: LoggerFactory + get() = PrintLogger diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/SuspendTest.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/SuspendTest.kt index bbbd39106..aabca398c 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/SuspendTest.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/SuspendTest.kt @@ -21,10 +21,10 @@ import kotlin.coroutines.* import kotlin.time.* interface SuspendTest { - val testTimeout: Duration get() = 1.minutes + val testTimeout: Duration get() = Duration.minutes(1) - val beforeTimeout: Duration get() = 10.seconds - val afterTimeout: Duration get() = 10.seconds + val beforeTimeout: Duration get() = Duration.seconds(10) + val afterTimeout: Duration get() = Duration.seconds(10) val debug: Boolean get() = true //change to turn off debug logs locally (useful for CI) diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt index 50ee14bf9..5188a8501 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt @@ -51,10 +51,12 @@ class TestConnection : Connection, CoroutineScope { return receiveChannel.receive() } + @Suppress("INVISIBLE_MEMBER") //for toPacket suspend fun sendToReceiver(vararg frames: Frame) { frames.forEach { receiveChannel.send(it.toPacket(InUseTrackingPool)) } } + @Suppress("INVISIBLE_MEMBER") //for readFrame private fun sentAsFlow(): Flow = sendChannel.receiveAsFlow().map { it.readFrame(InUseTrackingPool) } suspend fun test(validate: suspend FlowTurbine.() -> Unit) { diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt index ec23c5885..9a7606ca3 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt @@ -27,7 +27,7 @@ import kotlin.test.* import kotlin.time.* abstract class TransportTest : SuspendTest, TestWithLeakCheck { - override val testTimeout: Duration = 2.minutes + override val testTimeout: Duration = Duration.minutes(2) lateinit var client: RSocket //should be assigned in `before` @@ -56,13 +56,13 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { } @Test - fun requestChannel0() = test(10.seconds) { + fun requestChannel0() = test(Duration.seconds(10)) { val list = client.requestChannel(payload(0), emptyFlow()).toList() assertTrue(list.isEmpty()) } @Test - fun requestChannel1() = test(10.seconds) { + fun requestChannel1() = test(Duration.seconds(10)) { val list = client.requestChannel(payload(0), flowOf(payload(0))).onEach { it.release() }.toList() assertEquals(1, list.size) } @@ -221,7 +221,7 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { loggerFactory = NoopLogger connectionConfig { - keepAlive = KeepAlive(10.minutes, 100.minutes) + keepAlive = KeepAlive(Duration.minutes(10), Duration.minutes(100)) } } diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/client/WebSocketClientTransport.kt b/rsocket-transport-ktor/rsocket-transport-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/client/WebSocketClientTransport.kt index 6d8f6e464..0613f47fc 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/client/WebSocketClientTransport.kt +++ b/rsocket-transport-ktor/rsocket-transport-ktor-client/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/client/WebSocketClientTransport.kt @@ -32,6 +32,7 @@ public fun WebSocketClientTransport( request: HttpRequestBuilder.() -> Unit, ): ClientTransport = ClientTransport { val session = httpClient.webSocketSession(request) + @Suppress("INVISIBLE_MEMBER") WebSocketConnection(session) } diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-server/src/jvmMain/kotlin/io/rsocket/kotlin/transport/ktor/server/Routing.kt b/rsocket-transport-ktor/rsocket-transport-ktor-server/src/jvmMain/kotlin/io/rsocket/kotlin/transport/ktor/server/Routing.kt index 456d9b4d6..06d26e173 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-server/src/jvmMain/kotlin/io/rsocket/kotlin/transport/ktor/server/Routing.kt +++ b/rsocket-transport-ktor/rsocket-transport-ktor-server/src/jvmMain/kotlin/io/rsocket/kotlin/transport/ktor/server/Routing.kt @@ -20,14 +20,6 @@ import io.ktor.application.* import io.ktor.routing.* import io.rsocket.kotlin.* -public fun Route.rSocket( - path: String? = null, - protocol: String? = null, - block: suspend ConnectionAcceptorContext.() -> RSocket -) { - rSocket(path, protocol, ConnectionAcceptor(block)) -} - public fun Route.rSocket( path: String? = null, protocol: String? = null, diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-server/src/jvmMain/kotlin/io/rsocket/kotlin/transport/ktor/server/WebSocketServerTransport.kt b/rsocket-transport-ktor/rsocket-transport-ktor-server/src/jvmMain/kotlin/io/rsocket/kotlin/transport/ktor/server/WebSocketServerTransport.kt index 4fca34514..a10312307 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-server/src/jvmMain/kotlin/io/rsocket/kotlin/transport/ktor/server/WebSocketServerTransport.kt +++ b/rsocket-transport-ktor/rsocket-transport-ktor-server/src/jvmMain/kotlin/io/rsocket/kotlin/transport/ktor/server/WebSocketServerTransport.kt @@ -29,10 +29,12 @@ internal fun Route.serverTransport( ): ServerTransport = ServerTransport { acceptor -> when (path) { null -> webSocket(protocol) { + @Suppress("INVISIBLE_MEMBER") val connection = WebSocketConnection(this) acceptor(connection) } else -> webSocket(path, protocol) { + @Suppress("INVISIBLE_MEMBER") val connection = WebSocketConnection(this) acceptor(connection) } diff --git a/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpClientTransport.kt b/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpClientTransport.kt index 93d773bb3..9b4e8a18e 100644 --- a/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpClientTransport.kt +++ b/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpClientTransport.kt @@ -21,12 +21,10 @@ package io.rsocket.kotlin.transport.ktor import io.ktor.network.selector.* import io.ktor.network.sockets.* -import io.ktor.util.* import io.ktor.util.network.* import io.rsocket.kotlin.* import io.rsocket.kotlin.transport.* -@InternalAPI //because of selector public fun TcpClientTransport( selector: SelectorManager, hostname: String, port: Int, @@ -34,7 +32,6 @@ public fun TcpClientTransport( configure: SocketOptions.TCPClientSocketOptions.() -> Unit = {}, ): ClientTransport = TcpClientTransport(selector, NetworkAddress(hostname, port), intercept, configure) -@InternalAPI //because of selector public fun TcpClientTransport( selector: SelectorManager, remoteAddress: NetworkAddress, diff --git a/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpConnection.kt b/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpConnection.kt index 64c4b7b5b..1bc143aa4 100644 --- a/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpConnection.kt +++ b/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpConnection.kt @@ -20,29 +20,30 @@ import io.ktor.network.sockets.* import io.ktor.util.cio.* import io.ktor.utils.io.* import io.ktor.utils.io.core.* -import io.ktor.utils.io.core.internal.* import io.rsocket.kotlin.* import io.rsocket.kotlin.Connection import io.rsocket.kotlin.frame.io.* +import io.rsocket.kotlin.internal.* import kotlinx.coroutines.* import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.channels.* import kotlin.coroutines.* import kotlin.native.concurrent.* @SharedImmutable internal val ignoreExceptionHandler = CoroutineExceptionHandler { _, _ -> } -@OptIn(TransportApi::class, DangerousInternalIoApi::class, ExperimentalCoroutinesApi::class) +@OptIn(TransportApi::class) internal class TcpConnection(private val socket: Socket) : Connection, CoroutineScope { override val job: Job = socket.socketContext override val coroutineContext: CoroutineContext = job + Dispatchers.Unconfined + ignoreExceptionHandler + @Suppress("INVISIBLE_MEMBER") private val sendChannel = SafeChannel(8) + + @Suppress("INVISIBLE_MEMBER") private val receiveChannel = SafeChannel(8) init { - val channelCloseJob = Job(job) launch { socket.openWriteChannel(autoFlush = true).use { while (isActive) { @@ -50,6 +51,7 @@ internal class TcpConnection(private val socket: Socket) : Connection, Coroutine val length = packet.remaining.toInt() try { writePacket { + @Suppress("INVISIBLE_MEMBER") writeLength(length) writePacket(packet) } @@ -63,6 +65,7 @@ internal class TcpConnection(private val socket: Socket) : Connection, Coroutine launch { socket.openReadChannel().apply { while (isActive) { + @Suppress("INVISIBLE_MEMBER") val length = readPacket(3).readLength() val packet = readPacket(length) try { @@ -78,14 +81,6 @@ internal class TcpConnection(private val socket: Socket) : Connection, Coroutine val error = cause?.let { it as? CancellationException ?: CancellationException("Connection failed", it) } sendChannel.cancel(error) receiveChannel.cancel(error) - CoroutineScope(job).launch { - while (!sendChannel.isClosedForReceive || !sendChannel.isClosedForSend - || !receiveChannel.isClosedForReceive || !receiveChannel.isClosedForSend - ) { - delay(1) - } - channelCloseJob.complete() - } } } @@ -93,9 +88,3 @@ internal class TcpConnection(private val socket: Socket) : Connection, Coroutine override suspend fun receive(): ByteReadPacket = receiveChannel.receive() } - -@SharedImmutable -private val onUndeliveredCloseable: (Closeable) -> Unit = Closeable::close - -@Suppress("FunctionName") -private fun SafeChannel(capacity: Int): Channel = Channel(capacity, onUndeliveredElement = onUndeliveredCloseable) diff --git a/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpServerTransport.kt b/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpServerTransport.kt index 0202426a9..65467f297 100644 --- a/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpServerTransport.kt +++ b/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpServerTransport.kt @@ -20,19 +20,17 @@ package io.rsocket.kotlin.transport.ktor import io.ktor.network.selector.* import io.ktor.network.sockets.* -import io.ktor.util.* import io.ktor.util.network.* import io.rsocket.kotlin.transport.* import kotlinx.coroutines.* -@InternalAPI //because of selector public fun TcpServerTransport( selector: SelectorManager, hostname: String = "0.0.0.0", port: Int = 0, configure: SocketOptions.AcceptorOptions.() -> Unit = {}, ): ServerTransport = TcpServerTransport(selector, NetworkAddress(hostname, port), configure) -@InternalAPI //because of selector +@OptIn(DelicateCoroutinesApi::class) //TODO ? public fun TcpServerTransport( selector: SelectorManager, localAddress: NetworkAddress? = null, diff --git a/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/WebSocketConnection.kt b/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/WebSocketConnection.kt index 52d9f1295..ff8f22dbd 100644 --- a/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/WebSocketConnection.kt +++ b/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/WebSocketConnection.kt @@ -22,7 +22,7 @@ import io.rsocket.kotlin.* import kotlinx.coroutines.* @TransportApi -public class WebSocketConnection(private val session: WebSocketSession) : Connection { +internal class WebSocketConnection(private val session: WebSocketSession) : Connection { override val job: Job = session.coroutineContext.job diff --git a/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/WebSocketConnectionTest.kt b/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/WebSocketConnectionTest.kt index 9fba17e3e..a830abc03 100644 --- a/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/WebSocketConnectionTest.kt +++ b/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/WebSocketConnectionTest.kt @@ -31,7 +31,6 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.random.* import kotlin.test.* -import kotlin.time.* import io.ktor.client.engine.cio.CIO as ClientCIO import io.ktor.client.features.websocket.WebSockets as ClientWebSockets import io.ktor.server.cio.CIO as ServerCIO @@ -46,7 +45,7 @@ class WebSocketConnectionTest : SuspendTest, TestWithLeakCheck { install(ClientRSocketSupport) { connector = RSocketConnector { connectionConfig { - keepAlive = KeepAlive(500.milliseconds) + keepAlive = KeepAlive(500) } } } diff --git a/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/WebSocketTransportTest.kt b/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/WebSocketTransportTest.kt index 807296b6d..afc3daf35 100644 --- a/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/WebSocketTransportTest.kt +++ b/rsocket-transport-ktor/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/WebSocketTransportTest.kt @@ -24,9 +24,8 @@ import io.ktor.server.engine.* import io.ktor.websocket.* import io.rsocket.kotlin.* import io.rsocket.kotlin.test.* -import io.rsocket.kotlin.transport.ServerTransport +import io.rsocket.kotlin.transport.* import io.rsocket.kotlin.transport.ktor.client.* -import io.rsocket.kotlin.transport.ktor.server.* import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlin.random.* diff --git a/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalConnection.kt b/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalConnection.kt index c78ede478..72b44156b 100644 --- a/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalConnection.kt +++ b/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalConnection.kt @@ -23,7 +23,7 @@ import io.rsocket.kotlin.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* -@OptIn(DangerousInternalIoApi::class, TransportApi::class) +@OptIn(TransportApi::class) internal class LocalConnection( private val sender: SendChannel, private val receiver: ReceiveChannel, diff --git a/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalServer.kt b/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalServer.kt index 9c64d38d4..f3a1c1573 100644 --- a/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalServer.kt +++ b/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalServer.kt @@ -22,41 +22,29 @@ import io.ktor.utils.io.core.* import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* +import io.rsocket.kotlin.internal.* import io.rsocket.kotlin.transport.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* -import kotlin.native.concurrent.* -@Suppress("FunctionName") -@OptIn(DangerousInternalIoApi::class) -public fun LocalServer(parentJob: Job? = null): LocalServer = LocalServer(parentJob, ChunkBuffer.Pool) - -public class LocalServer -@DangerousInternalIoApi -internal constructor( - parentJob: Job?, - private val pool: ObjectPool, +public class LocalServer( + parentJob: Job? = null, + private val pool: ObjectPool = ChunkBuffer.Pool, ) : ServerTransport, ClientTransport { public val job: Job = SupervisorJob(parentJob) private val connections = Channel() override suspend fun connect(): Connection { + @Suppress("INVISIBLE_MEMBER") val clientChannel = SafeChannel(Channel.UNLIMITED) + + @Suppress("INVISIBLE_MEMBER") val serverChannel = SafeChannel(Channel.UNLIMITED) val connectionJob = Job(job) - val channelCloseJob = Job(job) connectionJob.invokeOnCompletion { val error = CancellationException("Connection failed", it) clientChannel.cancel(error) serverChannel.cancel(error) - CoroutineScope(job).launch { - while (!clientChannel.isClosedForReceive || !clientChannel.isClosedForSend - || !serverChannel.isClosedForReceive || !serverChannel.isClosedForSend - ) { - delay(1) - } - channelCloseJob.complete() - } } val clientConnection = LocalConnection(serverChannel, clientChannel, pool, connectionJob) val serverConnection = LocalConnection(clientChannel, serverChannel, pool, connectionJob) @@ -64,6 +52,7 @@ internal constructor( return clientConnection } + @OptIn(DelicateCoroutinesApi::class) override fun start(accept: suspend (Connection) -> Unit): Job = GlobalScope.launch(job + Dispatchers.Unconfined, CoroutineStart.UNDISPATCHED) { supervisorScope { @@ -73,9 +62,3 @@ internal constructor( } } } - -@SharedImmutable -private val onUndeliveredCloseable: (Closeable) -> Unit = Closeable::close - -@Suppress("FunctionName") -private fun SafeChannel(capacity: Int): Channel = Channel(capacity, onUndeliveredElement = onUndeliveredCloseable)