diff --git a/rsocket-transports/nodejs-tcp/build.gradle.kts b/rsocket-transports/nodejs-tcp/build.gradle.kts index c048c5d6..a297a744 100644 --- a/rsocket-transports/nodejs-tcp/build.gradle.kts +++ b/rsocket-transports/nodejs-tcp/build.gradle.kts @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,7 @@ plugins { id("rsocketbuild.multiplatform-library") } -description = "rsocket-kotlin NodeJS TCP client/server transport implementation" +description = "[DEPRECATED] rsocket-kotlin NodeJS TCP client/server transport implementation" kotlin { jsTarget(supportsBrowser = false) diff --git a/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpClientTransport.kt b/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpClientTransport.kt deleted file mode 100644 index 498dbc8a..00000000 --- a/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpClientTransport.kt +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright 2015-2024 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.transport.nodejs.tcp - -import io.rsocket.kotlin.internal.io.* -import io.rsocket.kotlin.transport.* -import io.rsocket.kotlin.transport.nodejs.tcp.internal.* -import kotlinx.coroutines.* -import kotlin.coroutines.* - -@OptIn(RSocketTransportApi::class) -public sealed interface NodejsTcpClientTransport : RSocketTransport { - public fun target(host: String, port: Int): RSocketClientTarget - - public companion object Factory : - RSocketTransportFactory(::NodejsTcpClientTransportBuilderImpl) -} - -@OptIn(RSocketTransportApi::class) -public sealed interface NodejsTcpClientTransportBuilder : RSocketTransportBuilder { - public fun dispatcher(context: CoroutineContext) - public fun inheritDispatcher(): Unit = dispatcher(EmptyCoroutineContext) -} - -private class NodejsTcpClientTransportBuilderImpl : NodejsTcpClientTransportBuilder { - private var dispatcher: CoroutineContext = Dispatchers.Default - - override fun dispatcher(context: CoroutineContext) { - check(context[Job] == null) { "Dispatcher shouldn't contain job" } - this.dispatcher = context - } - - @RSocketTransportApi - override fun buildTransport(context: CoroutineContext): NodejsTcpClientTransport = NodejsTcpClientTransportImpl( - coroutineContext = context.supervisorContext() + dispatcher, - ) -} - -private class NodejsTcpClientTransportImpl( - override val coroutineContext: CoroutineContext, -) : NodejsTcpClientTransport { - override fun target(host: String, port: Int): RSocketClientTarget = NodejsTcpClientTargetImpl( - coroutineContext = coroutineContext.supervisorContext(), - host = host, - port = port - ) -} - -@OptIn(RSocketTransportApi::class) -private class NodejsTcpClientTargetImpl( - override val coroutineContext: CoroutineContext, - private val host: String, - private val port: Int, -) : RSocketClientTarget { - @RSocketTransportApi - override fun connectClient(handler: RSocketConnectionHandler): Job = launch { - val socket = connect(port, host) - handler.handleNodejsTcpConnection(socket) - } -} diff --git a/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpConnection.kt b/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpConnection.kt deleted file mode 100644 index 189b5191..00000000 --- a/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpConnection.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2015-2024 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.transport.nodejs.tcp - -import io.rsocket.kotlin.internal.io.* -import io.rsocket.kotlin.transport.* -import io.rsocket.kotlin.transport.internal.* -import io.rsocket.kotlin.transport.nodejs.tcp.internal.* -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import kotlinx.io.* - -@RSocketTransportApi -internal suspend fun RSocketConnectionHandler.handleNodejsTcpConnection(socket: Socket): Unit = coroutineScope { - val outboundQueue = PrioritizationFrameQueue(Channel.BUFFERED) - val inbound = bufferChannel(Channel.UNLIMITED) - - val closed = CompletableDeferred() - val frameAssembler = FrameWithLengthAssembler { inbound.trySend(it) } - socket.on( - onData = frameAssembler::write, - onError = { closed.completeExceptionally(it) }, - onClose = { - frameAssembler.close() - if (!it) closed.complete(Unit) - } - ) - - val writerJob = launch { - while (true) socket.writeFrame(outboundQueue.dequeueFrame() ?: break) - }.onCompletion { outboundQueue.cancel() } - - try { - handleConnection(NodejsTcpConnection(outboundQueue, inbound)) - } finally { - inbound.cancel() - outboundQueue.close() // will cause `writerJob` completion - // even if it was cancelled, we still need to close socket and await it closure - withContext(NonCancellable) { - writerJob.join() - // close socket - socket.destroy() - closed.join() - } - } -} - -@RSocketTransportApi -private class NodejsTcpConnection( - private val outboundQueue: PrioritizationFrameQueue, - private val inbound: ReceiveChannel, -) : RSocketSequentialConnection { - override val isClosedForSend: Boolean get() = outboundQueue.isClosedForSend - override suspend fun sendFrame(streamId: Int, frame: Buffer) { - return outboundQueue.enqueueFrame(streamId, frame) - } - - override suspend fun receiveFrame(): Buffer? { - return inbound.receiveCatching().getOrNull() - } -} diff --git a/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpServerTransport.kt b/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpServerTransport.kt deleted file mode 100644 index f30d16e0..00000000 --- a/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/NodejsTcpServerTransport.kt +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright 2015-2024 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.transport.nodejs.tcp - -import io.rsocket.kotlin.internal.io.* -import io.rsocket.kotlin.transport.* -import io.rsocket.kotlin.transport.nodejs.tcp.internal.* -import kotlinx.coroutines.* -import kotlin.coroutines.* - -@OptIn(RSocketTransportApi::class) -public sealed interface NodejsTcpServerInstance : RSocketServerInstance { - public val host: String - public val port: Int -} - -@OptIn(RSocketTransportApi::class) -public sealed interface NodejsTcpServerTransport : RSocketTransport { - public fun target(host: String, port: Int): RSocketServerTarget - - public companion object Factory : - RSocketTransportFactory({ NodejsTcpServerTransportBuilderImpl }) -} - -@OptIn(RSocketTransportApi::class) -public sealed interface NodejsTcpServerTransportBuilder : RSocketTransportBuilder { - public fun dispatcher(context: CoroutineContext) - public fun inheritDispatcher(): Unit = dispatcher(EmptyCoroutineContext) -} - -private object NodejsTcpServerTransportBuilderImpl : NodejsTcpServerTransportBuilder { - private var dispatcher: CoroutineContext = Dispatchers.Default - - override fun dispatcher(context: CoroutineContext) { - check(context[Job] == null) { "Dispatcher shouldn't contain job" } - this.dispatcher = context - } - - @RSocketTransportApi - override fun buildTransport(context: CoroutineContext): NodejsTcpServerTransport = NodejsTcpServerTransportImpl( - coroutineContext = context.supervisorContext() + dispatcher, - ) -} - -private class NodejsTcpServerTransportImpl( - override val coroutineContext: CoroutineContext, -) : NodejsTcpServerTransport { - override fun target(host: String, port: Int): RSocketServerTarget = NodejsTcpServerTargetImpl( - coroutineContext = coroutineContext.supervisorContext(), - host = host, - port = port - ) -} - -@OptIn(RSocketTransportApi::class) -private class NodejsTcpServerTargetImpl( - override val coroutineContext: CoroutineContext, - private val host: String, - private val port: Int, -) : RSocketServerTarget { - - @RSocketTransportApi - override suspend fun startServer(handler: RSocketConnectionHandler): NodejsTcpServerInstance { - currentCoroutineContext().ensureActive() - coroutineContext.ensureActive() - - val serverJob = launch { - val handlerScope = CoroutineScope(coroutineContext.supervisorContext()) - val server = createServer(port, host, { - coroutineContext.job.cancel("Server closed") - }) { - handlerScope.launch { handler.handleNodejsTcpConnection(it) } - } - try { - awaitCancellation() - } finally { - suspendCoroutine { cont -> server.close { cont.resume(Unit) } } - } - } - - return NodejsTcpServerInstanceImpl( - coroutineContext = coroutineContext + serverJob, - host = host, - port = port - ) - } -} - -@RSocketTransportApi -private class NodejsTcpServerInstanceImpl( - override val coroutineContext: CoroutineContext, - override val host: String, - override val port: Int, -) : NodejsTcpServerInstance diff --git a/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpClientTransport.kt b/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpClientTransport.kt index 31d93906..332e9093 100644 --- a/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpClientTransport.kt +++ b/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpClientTransport.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ import kotlinx.coroutines.* import kotlin.coroutines.* @Suppress("DEPRECATION_ERROR") -@Deprecated(level = DeprecationLevel.ERROR, message = "Deprecated in favor of new Transport API, use NodejsTcpClientTransport") +@Deprecated(level = DeprecationLevel.ERROR, message = "Deprecated in favor of `rsocket-transport-ktor-tcp` with ktor 3.1") public class TcpClientTransport( private val port: Int, private val hostname: String, diff --git a/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpServerTransport.kt b/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpServerTransport.kt index f12d9fb8..8daed12a 100644 --- a/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpServerTransport.kt +++ b/rsocket-transports/nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpServerTransport.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ import io.rsocket.kotlin.transport.nodejs.tcp.internal.* import kotlinx.coroutines.* import kotlin.coroutines.* -@Deprecated(level = DeprecationLevel.ERROR, message = "Deprecated in favor of new Transport API, use NodejsTcpServerInstance") +@Deprecated(level = DeprecationLevel.ERROR, message = "Deprecated in favor of `rsocket-transport-ktor-tcp` with ktor 3.1") public class TcpServer internal constructor( public val job: Job, private val server: Server ) { @@ -32,7 +32,7 @@ public class TcpServer internal constructor( } @Suppress("DEPRECATION_ERROR") -@Deprecated(level = DeprecationLevel.ERROR, message = "Deprecated in favor of new Transport API, use NodejsTcpServerTransport") +@Deprecated(level = DeprecationLevel.ERROR, message = "Deprecated in favor of `rsocket-transport-ktor-tcp` with ktor 3.1") public class TcpServerTransport( private val port: Int, private val hostname: String, ) : ServerTransport { diff --git a/rsocket-transports/nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt b/rsocket-transports/nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt index 545331e0..c81983f7 100644 --- a/rsocket-transports/nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt +++ b/rsocket-transports/nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2024 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,11 +35,3 @@ class TcpTransportTest : TransportTest() { server.close() } } - -class NodejsTcpTransportTest : TransportTest() { - override suspend fun before() { - val port = PortProvider.next() - startServer(NodejsTcpServerTransport(testContext).target("127.0.0.1", port)) - client = connectClient(NodejsTcpClientTransport(testContext).target("127.0.0.1", port)) - } -}