From 4f38d6b07597dddb65ac6cfa59e47ca6e87c85a5 Mon Sep 17 00:00:00 2001 From: olme04 Date: Thu, 3 Feb 2022 09:20:05 +0000 Subject: [PATCH] Improve tests: * turn on frame logging in all tests except transport tests * fix flakiness of some tests * improve `SuspendTest` test logs * new way to ignore several native tests * move TestConnection to core module, as it used only there * make frame/frameType fully internal for now * extract transport related tests into separate module * add build file template for transport * merge tests with test server * simplify transport tests configuration --- .../rsocket.template.transport.gradle.kts | 13 ++ .../kotlin/io/rsocket/kotlin/frame/Frame.kt | 8 +- .../io/rsocket/kotlin/frame/FrameType.kt | 22 ++-- .../kotlin/ConnectionEstablishmentTest.kt | 9 +- .../io/rsocket/kotlin}/TestConnection.kt | 31 +++-- .../io/rsocket/kotlin}/TestWithConnection.kt | 3 +- .../io/rsocket/kotlin/core/RSocketTest.kt | 22 ++-- .../kotlin/core/ReconnectableRSocketTest.kt | 5 +- .../kotlin/io/rsocket/kotlin/frame/Util.kt | 6 +- .../kotlin/internal/RSocketRequesterTest.kt | 32 +++-- .../rsocket/kotlin/keepalive/KeepAliveTest.kt | 19 ++- .../src/jvmTest/resources/logging.properties | 21 ---- .../io/rsocket/kotlin/test/server/App.kt | 75 ------------ .../io/rsocket/kotlin/test/SuspendTest.kt | 55 +++++++-- .../io/rsocket/kotlin/test/Test.common.kt | 9 +- .../io/rsocket/kotlin/test/TestConfig.kt | 46 +++++++ .../io/rsocket/kotlin/test/TestRSocket.kt | 54 --------- .../kotlin/io/rsocket/kotlin/test/Test.kt | 13 +- .../kotlin/io/rsocket/kotlin/test/Test.kt | 21 +--- .../kotlin/io/rsocket/kotlin/test/Test.kt | 15 +-- rsocket-transport-ktor/build.gradle.kts | 2 +- .../build.gradle.kts | 2 +- .../transport/ktor/tcp/TcpServerTest.kt | 12 +- .../transport/ktor/tcp/TcpTransportTest.kt | 18 +-- .../build.gradle.kts | 2 +- .../build.gradle.kts | 2 +- .../build.gradle.kts | 4 +- .../websocket/ClientWebSocketTransportTest.kt | 2 +- .../ktor/websocket/WebSocketConnectionTest.kt | 11 +- .../ktor/websocket/WebSocketTransportTest.kt | 33 ++--- rsocket-transport-local/build.gradle.kts | 2 +- .../transport/local/LocalTransportTest.kt | 17 +-- rsocket-transport-nodejs-tcp/build.gradle.kts | 2 +- .../transport/nodejs/tcp/TcpTransportTest.kt | 20 +-- .../build.gradle.kts | 55 +++++---- .../kotlin/transport/tests}/PortProvider.kt | 5 +- .../kotlin/transport/tests}/TransportTest.kt | 114 ++++++++++++------ .../kotlin/transport/tests/server/App.kt | 55 +++++++++ settings.gradle.kts | 3 +- 39 files changed, 414 insertions(+), 426 deletions(-) create mode 100644 buildSrc/src/main/kotlin/rsocket.template.transport.gradle.kts rename {rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test => rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin}/TestConnection.kt (68%) rename {rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test => rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin}/TestWithConnection.kt (93%) delete mode 100644 rsocket-core/src/jvmTest/resources/logging.properties delete mode 100644 rsocket-test/rsocket-test-server/src/jvmMain/kotlin/io/rsocket/kotlin/test/server/App.kt create mode 100644 rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConfig.kt delete mode 100644 rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestRSocket.kt rename {rsocket-test/rsocket-test-server => rsocket-transport-tests}/build.gradle.kts (57%) rename {rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test => rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests}/PortProvider.kt (62%) rename {rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test => rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests}/TransportTest.kt (64%) create mode 100644 rsocket-transport-tests/src/jvmTest/kotlin/io/rsocket/kotlin/transport/tests/server/App.kt diff --git a/buildSrc/src/main/kotlin/rsocket.template.transport.gradle.kts b/buildSrc/src/main/kotlin/rsocket.template.transport.gradle.kts new file mode 100644 index 000000000..16c0baba6 --- /dev/null +++ b/buildSrc/src/main/kotlin/rsocket.template.transport.gradle.kts @@ -0,0 +1,13 @@ +plugins { + id("rsocket.template.library") +} + +kotlin { + sourceSets { + commonTest { + dependencies { + implementation(project(":rsocket-transport-tests")) + } + } + } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt index a09abdd0d..856a1d711 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt @@ -24,10 +24,10 @@ import io.rsocket.kotlin.frame.io.* private const val FlagsMask: Int = 1023 private const val FrameTypeShift: Int = 10 -public sealed class Frame : Closeable { - public abstract val type: FrameType - public abstract val streamId: Int - public abstract val flags: Int +internal sealed class Frame : Closeable { + abstract val type: FrameType + abstract val streamId: Int + abstract val flags: Int protected abstract fun BytePacketBuilder.writeSelf() protected abstract fun StringBuilder.appendFlags() diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/FrameType.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/FrameType.kt index 1ad359380..265a144ac 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/FrameType.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/FrameType.kt @@ -18,7 +18,7 @@ package io.rsocket.kotlin.frame import io.rsocket.kotlin.frame.io.* -public enum class FrameType(public val encodedType: Int, flags: Int = Flags.Empty) { +internal enum class FrameType(val encodedType: Int, flags: Int = Flags.Empty) { Reserved(0x00), //CONNECTION @@ -32,8 +32,14 @@ public enum class FrameType(public val encodedType: Int, flags: Int = Flags.Empt //REQUEST RequestFnF(0x05, Flags.CanHaveData or Flags.CanHaveMetadata or Flags.Fragmentable or Flags.Request), RequestResponse(0x04, Flags.CanHaveData or Flags.CanHaveMetadata or Flags.Fragmentable or Flags.Request), - RequestStream(0x06, Flags.CanHaveMetadata or Flags.CanHaveData or Flags.HasInitialRequest or Flags.Fragmentable or Flags.Request), - RequestChannel(0x07, Flags.CanHaveMetadata or Flags.CanHaveData or Flags.HasInitialRequest or Flags.Fragmentable or Flags.Request), + RequestStream( + 0x06, + Flags.CanHaveMetadata or Flags.CanHaveData or Flags.HasInitialRequest or Flags.Fragmentable or Flags.Request + ), + RequestChannel( + 0x07, + Flags.CanHaveMetadata or Flags.CanHaveData or Flags.HasInitialRequest or Flags.Fragmentable or Flags.Request + ), // DURING REQUEST RequestN(0x08), @@ -49,11 +55,11 @@ public enum class FrameType(public val encodedType: Int, flags: Int = Flags.Empt Extension(0x3F, Flags.CanHaveData or Flags.CanHaveMetadata); - public val hasInitialRequest: Boolean = flags check Flags.HasInitialRequest - public val isRequestType: Boolean = flags check Flags.Request - public val isFragmentable: Boolean = flags check Flags.Fragmentable - public val canHaveMetadata: Boolean = flags check Flags.CanHaveMetadata - public val canHaveData: Boolean = flags check Flags.CanHaveData + val hasInitialRequest: Boolean = flags check Flags.HasInitialRequest + val isRequestType: Boolean = flags check Flags.Request + val isFragmentable: Boolean = flags check Flags.Fragmentable + val canHaveMetadata: Boolean = flags check Flags.CanHaveMetadata + val canHaveData: Boolean = flags check Flags.CanHaveData private object Flags { const val Empty = 0 diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/ConnectionEstablishmentTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/ConnectionEstablishmentTest.kt index 1e7a44b5d..f9e08391e 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/ConnectionEstablishmentTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/ConnectionEstablishmentTest.kt @@ -17,7 +17,6 @@ package io.rsocket.kotlin import io.ktor.utils.io.core.* -import io.rsocket.kotlin.core.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.frame.io.* import io.rsocket.kotlin.keepalive.* @@ -39,7 +38,7 @@ class ConnectionEstablishmentTest : SuspendTest, TestWithLeakCheck { GlobalScope.async { accept(connection) } } - val deferred = RSocketServer().bind(serverTransport) { + val deferred = TestServer().bind(serverTransport) { sendingRSocket.complete(requester) error(errorMessage) } @@ -67,6 +66,7 @@ class ConnectionEstablishmentTest : SuspendTest, TestWithLeakCheck { assertFalse(sender.isActive) expectNoEventsIn(100) } + connection.coroutineContext.job.join() val error = connection.coroutineContext.job.getCancellationException().cause assertTrue(error is RSocketError.Setup.Rejected) assertEquals(errorMessage, error.message) @@ -77,7 +77,7 @@ class ConnectionEstablishmentTest : SuspendTest, TestWithLeakCheck { val connection = TestConnection() val p = payload("setup") assertFailsWith(IllegalStateException::class, "failed") { - RSocketConnector { + TestConnector { connectionConfig { setupPayload { p } } @@ -86,8 +86,9 @@ class ConnectionEstablishmentTest : SuspendTest, TestWithLeakCheck { assertTrue(p.data.isNotEmpty) error("failed") } - }.connect { connection } + }.connect(connection) } + connection.coroutineContext.job.join() assertTrue(p.data.isEmpty) } diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestConnection.kt similarity index 68% rename from rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt rename to rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestConnection.kt index 3406f1c33..8f61337bf 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestConnection.kt @@ -14,25 +14,28 @@ * limitations under the License. */ -package io.rsocket.kotlin.test +package io.rsocket.kotlin import app.cash.turbine.* import io.ktor.utils.io.core.* import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* -import io.rsocket.kotlin.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.test.* +import io.rsocket.kotlin.transport.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlin.coroutines.* +import kotlin.test.* import kotlin.time.* +import kotlin.time.Duration.Companion.seconds -class TestConnection : Connection { +class TestConnection : Connection, ClientTransport { override val pool: ObjectPool = InUseTrackingPool override val coroutineContext: CoroutineContext = - Job() + Dispatchers.Unconfined + CoroutineExceptionHandler { c, e -> println("$c -> $e") } + Job() + Dispatchers.Unconfined + TestExceptionHandler private val sendChannel = Channel(Channel.UNLIMITED) private val receiveChannel = Channel(Channel.UNLIMITED) @@ -40,10 +43,12 @@ class TestConnection : Connection { init { coroutineContext.job.invokeOnCompletion { sendChannel.close(it) - @Suppress("INVISIBLE_MEMBER") receiveChannel.fullClose(it) + receiveChannel.fullClose(it) } } + override suspend fun connect(): Connection = this + override suspend fun send(packet: ByteReadPacket) { sendChannel.send(packet) } @@ -52,17 +57,21 @@ class TestConnection : Connection { return receiveChannel.receive() } - suspend fun sendToReceiver(vararg frames: Frame) { + suspend fun ignoreSetupFrame() { + assertEquals(FrameType.Setup, sendChannel.receive().readFrame(InUseTrackingPool).type) + } + + internal suspend fun sendToReceiver(vararg frames: Frame) { frames.forEach { - val packet = @Suppress("INVISIBLE_MEMBER") it.toPacket(InUseTrackingPool) + val packet = it.toPacket(InUseTrackingPool) receiveChannel.send(packet) } } - suspend fun test(validate: suspend FlowTurbine.() -> Unit) { + internal suspend fun test(validate: suspend FlowTurbine.() -> Unit) { sendChannel.consumeAsFlow().map { - @Suppress("INVISIBLE_MEMBER") it.readFrame(InUseTrackingPool) - }.test(validate = validate) + it.readFrame(InUseTrackingPool) + }.test(5.seconds, validate = validate) } } @@ -76,6 +85,6 @@ suspend fun FlowTurbine<*>.expectNoEventsIn(timeMillis: Long) { expectNoEvents() } -suspend inline fun FlowTurbine.awaitFrame(block: (frame: Frame) -> Unit) { +internal suspend inline fun FlowTurbine.awaitFrame(block: (frame: Frame) -> Unit) { block(awaitItem()) } diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestWithConnection.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestWithConnection.kt similarity index 93% rename from rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestWithConnection.kt rename to rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestWithConnection.kt index ca169e793..b98685e58 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestWithConnection.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestWithConnection.kt @@ -14,8 +14,9 @@ * limitations under the License. */ -package io.rsocket.kotlin.test +package io.rsocket.kotlin +import io.rsocket.kotlin.test.* import kotlinx.coroutines.* abstract class TestWithConnection : SuspendTest { diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt index 548c975ee..52c9f3bbe 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt @@ -20,7 +20,6 @@ import app.cash.turbine.* import io.ktor.utils.io.core.* import io.rsocket.kotlin.* import io.rsocket.kotlin.keepalive.* -import io.rsocket.kotlin.logging.* import io.rsocket.kotlin.payload.* import io.rsocket.kotlin.test.* import io.rsocket.kotlin.transport.local.* @@ -40,10 +39,8 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { } private suspend fun start(handler: RSocket? = null): RSocket { - val localServer = RSocketServer { - loggerFactory = LoggerFactory { PrintLogger.withLevel(LoggingLevel.DEBUG).logger("SERVER |$it") } - }.bindIn( - CoroutineScope(Dispatchers.Unconfined + testJob + CoroutineExceptionHandler { c, e -> println("$c -> $e") }), + val localServer = TestServer().bindIn( + CoroutineScope(Dispatchers.Unconfined + testJob + TestExceptionHandler), LocalServerTransport(InUseTrackingPool) ) { handler ?: RSocketRequestHandler { @@ -60,8 +57,7 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { } } - return RSocketConnector { - loggerFactory = LoggerFactory { PrintLogger.withLevel(LoggingLevel.DEBUG).logger("CLIENT |$it") } + return TestConnector { connectionConfig { keepAlive = KeepAlive(1000.seconds, 1000.seconds) } @@ -102,8 +98,8 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { } } - @Test //ignored on native because of bug inside native coroutines - fun testStreamResponderError() = test(ignoreNative = true) { + @Test + fun testStreamResponderError() = test { var p: Payload? = null val requester = start(RSocketRequestHandler { requestStream { @@ -361,13 +357,13 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { private suspend inline fun complete(sendChannel: SendChannel, receiveChannel: ReceiveChannel) { sendChannel.close() delay(100) - assertTrue(receiveChannel.isClosedForReceive) + assertTrue(receiveChannel.isClosedForReceive, "receiveChannel.isClosedForReceive=true") } private suspend inline fun cancel(requesterChannel: SendChannel, responderChannel: ReceiveChannel) { responderChannel.cancel() delay(100) - assertTrue(requesterChannel.isClosedForSend) + assertTrue(requesterChannel.isClosedForSend, "requesterChannel.isClosedForSend=true") } private suspend fun sendAndCheckReceived( @@ -376,8 +372,8 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { payloads: List, ) { delay(100) - assertFalse(requesterChannel.isClosedForSend) - assertFalse(responderChannel.isClosedForReceive) + assertFalse(requesterChannel.isClosedForSend, "requesterChannel.isClosedForSend=false") + assertFalse(responderChannel.isClosedForReceive, "responderChannel.isClosedForReceive=false") payloads.forEach { requesterChannel.send(it.copy()) } //TODO? payloads.forEach { responderChannel.checkReceived(it) } } diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/ReconnectableRSocketTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/ReconnectableRSocketTest.kt index 6d2db3a87..8871cdc5e 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/ReconnectableRSocketTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/ReconnectableRSocketTest.kt @@ -27,13 +27,14 @@ import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.test.* +import kotlin.time.Duration.Companion.seconds class ReconnectableRSocketTest : SuspendTest, TestWithLeakCheck { //needed for native private val fails = atomic(0) private val first = atomic(true) - private val logger = DefaultLoggerFactory.logger("io.rsocket.kotlin.connection") + private val logger = PrintLogger.withLevel(LoggingLevel.DEBUG).logger("io.rsocket.kotlin.connection") private suspend fun connectWithReconnect( connect: suspend () -> RSocket, @@ -202,7 +203,7 @@ class ReconnectableRSocketTest : SuspendTest, TestWithLeakCheck { rSocket.requestStream(Payload.Empty).collect() } - rSocket.requestStream(Payload.Empty).test { + rSocket.requestStream(Payload.Empty).test(5.seconds) { repeat(5) { assertEquals(Payload.Empty, awaitItem()) } diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/Util.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/Util.kt index c06f37de4..33dbff25b 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/Util.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/Util.kt @@ -21,16 +21,16 @@ import io.rsocket.kotlin.frame.io.* import io.rsocket.kotlin.test.* import kotlin.test.* -fun Frame.toPacketWithLength(): ByteReadPacket = buildPacket(InUseTrackingPool) { +internal fun Frame.toPacketWithLength(): ByteReadPacket = buildPacket(InUseTrackingPool) { val packet = toPacket(InUseTrackingPool) writeLength(packet.remaining.toInt()) writePacket(packet) } -fun ByteReadPacket.toFrameWithLength(): Frame { +internal fun ByteReadPacket.toFrameWithLength(): Frame { val length = readLength() assertEquals(length, remaining.toInt()) return readFrame(InUseTrackingPool) } -fun Frame.loopFrame(): Frame = toPacket(InUseTrackingPool).readFrame(InUseTrackingPool) +internal fun Frame.loopFrame(): Frame = toPacket(InUseTrackingPool).readFrame(InUseTrackingPool) diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt index 367c1b43f..5c51beae0 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt @@ -17,7 +17,6 @@ package io.rsocket.kotlin.internal import io.rsocket.kotlin.* -import io.rsocket.kotlin.core.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.keepalive.* import io.rsocket.kotlin.payload.* @@ -34,17 +33,13 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { override suspend fun before() { super.before() - requester = connect( - connection = connection, - isServer = false, - maxFragmentSize = 0, - interceptors = InterceptorsBuilder().build(), - connectionConfig = ConnectionConfig( - keepAlive = KeepAlive(1000.seconds, 1000.seconds), - payloadMimeType = DefaultPayloadMimeType, - setupPayload = Payload.Empty - ) - ) { RSocketRequestHandler { } } + requester = TestConnector { + connectionConfig { + keepAlive = KeepAlive(1000.seconds, 1000.seconds) + } + }.connect(connection) + + connection.ignoreSetupFrame() } @Test @@ -333,8 +328,8 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { } } - @Test //ignored on native because of coroutines bug with channels - fun testChannelRequestServerSideCancellation() = test(ignoreNative = true) { + @Test + fun testChannelRequestServerSideCancellation() = test { var ch: SendChannel? = null val request = channelFlow { ch = this @@ -366,7 +361,8 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { } } - requester.requestChannel(payload("INIT"), request).flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)).launchIn(connection) + requester.requestChannel(payload("INIT"), request).flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)) + .launchIn(connection) connection.test { awaitFrame { frame -> assertTrue(frame is RequestFrame) @@ -397,10 +393,12 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { } @Test - fun rrTerminatedOnConnectionClose() = streamIsTerminatedOnConnectionClose { requester.requestResponse(Payload.Empty) } + fun rrTerminatedOnConnectionClose() = + streamIsTerminatedOnConnectionClose { requester.requestResponse(Payload.Empty) } @Test - fun rsTerminatedOnConnectionClose() = streamIsTerminatedOnConnectionClose { requester.requestStream(Payload.Empty).collect() } + fun rsTerminatedOnConnectionClose() = + streamIsTerminatedOnConnectionClose { requester.requestStream(Payload.Empty).collect() } @Test fun rcTerminatedOnConnectionClose() = diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt index c34562411..ff8f6742f 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt @@ -18,10 +18,7 @@ package io.rsocket.kotlin.keepalive import io.ktor.utils.io.core.* import io.rsocket.kotlin.* -import io.rsocket.kotlin.core.* import io.rsocket.kotlin.frame.* -import io.rsocket.kotlin.internal.* -import io.rsocket.kotlin.payload.* import io.rsocket.kotlin.test.* import kotlinx.coroutines.* import kotlin.test.* @@ -32,17 +29,17 @@ class KeepAliveTest : TestWithConnection(), TestWithLeakCheck { private suspend fun requester( keepAlive: KeepAlive = KeepAlive(100.milliseconds, 1.seconds) - ): RSocket = connect( - connection = connection, - isServer = false, - maxFragmentSize = 0, - interceptors = InterceptorsBuilder().build(), - connectionConfig = ConnectionConfig(keepAlive, DefaultPayloadMimeType, Payload.Empty) - ) { RSocketRequestHandler { } } + ): RSocket = TestConnector { + connectionConfig { + this.keepAlive = keepAlive + } + }.connect(connection).also { + connection.ignoreSetupFrame() + } @Test fun requesterSendKeepAlive() = test { - requester() + requester(KeepAlive(1.seconds, 10.seconds)) connection.test { repeat(5) { awaitFrame { frame -> diff --git a/rsocket-core/src/jvmTest/resources/logging.properties b/rsocket-core/src/jvmTest/resources/logging.properties deleted file mode 100644 index a52a09f71..000000000 --- a/rsocket-core/src/jvmTest/resources/logging.properties +++ /dev/null @@ -1,21 +0,0 @@ -# -# Copyright 2015-2020 the original author or authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -handlers=java.util.logging.ConsoleHandler -.level=FINEST -java.util.logging.ConsoleHandler.level=FINEST -java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter -java.util.logging.SimpleFormatter.format=[%1$tF %1$tT] [%4$-7s] %5$s %n diff --git a/rsocket-test/rsocket-test-server/src/jvmMain/kotlin/io/rsocket/kotlin/test/server/App.kt b/rsocket-test/rsocket-test-server/src/jvmMain/kotlin/io/rsocket/kotlin/test/server/App.kt deleted file mode 100644 index 5e0cc789b..000000000 --- a/rsocket-test/rsocket-test-server/src/jvmMain/kotlin/io/rsocket/kotlin/test/server/App.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.test.server - -import io.ktor.server.application.* -import io.ktor.server.cio.* -import io.ktor.server.engine.* -import io.ktor.server.routing.* -import io.ktor.server.websocket.* -import io.rsocket.kotlin.core.* -import io.rsocket.kotlin.test.* -import io.rsocket.kotlin.transport.ktor.tcp.* -import io.rsocket.kotlin.transport.ktor.websocket.server.* -import kotlinx.coroutines.* -import java.io.* - -fun main() { - start().await() -} - -fun start(): TestServer { - val server = TestServer() - server.start() - return server -} - -class TestServer : Closeable { - private val job = Job() - private var wsServer: ApplicationEngine? = null - private val rSocketServer = RSocketServer { -// loggerFactory = PrintLogger.withLevel(LoggingLevel.DEBUG) - } - - fun start(): Unit = runCatching { - val scope = CoroutineScope(job) - - //start TCP server - rSocketServer.bindIn(scope, TcpServerTransport(port = 8000)) { TestRSocket() } - - //start WS server - wsServer = scope.embeddedServer(CIO, port = 9000) { - install(WebSockets) - install(RSocketSupport) { server = rSocketServer } - - routing { - rSocket { TestRSocket() } - } - }.start() - - Thread.sleep(1000) //await start - }.onFailure { close() }.getOrThrow() - - fun await() { - runBlocking { job.join() } - } - - override fun close() { - runBlocking { job.cancelAndJoin() } - wsServer?.stop(0, 1000) - } -} diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/SuspendTest.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/SuspendTest.kt index eb09252c8..016af5881 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/SuspendTest.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/SuspendTest.kt @@ -22,6 +22,10 @@ import kotlin.time.* import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds +val TestExceptionHandler = CoroutineExceptionHandler { c, e -> + println("Error in $c -> ${e.stackTraceToString()}") +} + interface SuspendTest { val testTimeout: Duration get() = 1.minutes @@ -35,9 +39,8 @@ interface SuspendTest { fun test( timeout: Duration = testTimeout, - ignoreNative: Boolean = false, block: suspend CoroutineScope.() -> Unit, - ) = runTest(ignoreNative = ignoreNative) { + ) = runTest { val beforeError = runPhase("BEFORE", beforeTimeout) { before() } @@ -66,17 +69,47 @@ interface SuspendTest { } private suspend fun runPhase(tag: String, timeout: Duration, block: suspend CoroutineScope.() -> Unit): Throwable? { - if (debug) println("[TEST] $tag started") - val error = runCatching { - withTimeout(timeout, block) - }.exceptionOrNull() - if (debug) when (error) { - null -> println("[TEST] $tag completed") - is TimeoutCancellationException -> println("[TEST] $tag failed by timeout: $timeout") - else -> println("[TEST] $tag failed with error: $error") + println("[TEST] $tag started") + return when (val result = runWithTimeout(timeout, block)) { + is TestResult.Success -> { + println("[TEST] $tag completed in ${result.duration}") + null + } + is TestResult.Failed -> { + println("[TEST] $tag failed in ${result.duration} with error: ${result.cause.stackTraceToString()}") + result.cause + } + is TestResult.Timeout -> { + println("[TEST] $tag failed by timeout: ${result.timeout}") + result.cause + } } - return error } + private sealed interface TestResult { + class Success(val duration: Duration) : TestResult + class Failed(val duration: Duration, val cause: Throwable) : TestResult + class Timeout(val timeout: Duration, val cause: Throwable) : TestResult + } + + private suspend fun runWithTimeout(timeout: Duration, block: suspend CoroutineScope.() -> Unit): TestResult = + runCatching { + withTimeout(timeout) { + measureTimedValue { + runCatching { + block() + } + } + } + }.fold( + onSuccess = { (result, duration) -> + result.fold( + onSuccess = { TestResult.Success(duration) }, + onFailure = { TestResult.Failed(duration, it) } + ) + }, + onFailure = { TestResult.Timeout(timeout, it) } + ) + suspend fun currentJob(): Job = coroutineContext[Job]!! } diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Test.common.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Test.common.kt index 09a32c34a..a68880034 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Test.common.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/Test.common.kt @@ -16,13 +16,14 @@ package io.rsocket.kotlin.test -import io.rsocket.kotlin.logging.* import kotlinx.coroutines.* -internal expect fun runTest(ignoreNative: Boolean, block: suspend CoroutineScope.() -> Unit) +internal expect fun runTest(block: suspend CoroutineScope.() -> Unit) -expect val anotherDispatcher: CoroutineDispatcher +expect annotation class IgnoreJs() +expect annotation class IgnoreJvm() +expect annotation class IgnoreNative() -expect val TestLoggerFactory: LoggerFactory +expect val anotherDispatcher: CoroutineDispatcher expect fun identityHashCode(instance: Any): Int diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConfig.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConfig.kt new file mode 100644 index 000000000..397415c50 --- /dev/null +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConfig.kt @@ -0,0 +1,46 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@file:Suppress("FunctionName") + +package io.rsocket.kotlin.test + +import io.rsocket.kotlin.core.* +import io.rsocket.kotlin.logging.* + +fun TestServer( + logging: Boolean = true, + block: RSocketServerBuilder.() -> Unit = {} +): RSocketServer = RSocketServer { + loggerFactory = if (logging) { + LoggerFactory { PrintLogger.withLevel(LoggingLevel.DEBUG).logger("SERVER |$it") } + } else { + NoopLogger + } + block() +} + +fun TestConnector( + logging: Boolean = true, + block: RSocketConnectorBuilder.() -> Unit = {} +): RSocketConnector = RSocketConnector { + loggerFactory = if (logging) { + LoggerFactory { PrintLogger.withLevel(LoggingLevel.DEBUG).logger("CLIENT |$it") } + } else { + NoopLogger + } + block() +} diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestRSocket.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestRSocket.kt deleted file mode 100644 index 747f3cf63..000000000 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestRSocket.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.test - -import io.ktor.utils.io.core.* -import io.rsocket.kotlin.* -import io.rsocket.kotlin.payload.* -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* -import kotlin.coroutines.* - -class TestRSocket : RSocket { - override val coroutineContext: CoroutineContext = Job() - - override suspend fun metadataPush(metadata: ByteReadPacket): Unit = metadata.close() - - override suspend fun fireAndForget(payload: Payload): Unit = payload.close() - - override suspend fun requestResponse(payload: Payload): Payload { - payload.close() - return Payload(packet(data), packet(metadata)) - } - - override fun requestStream(payload: Payload): Flow = flow { - payload.close() - repeat(10000) { - emitOrClose(Payload(packet(data), packet(metadata))) - } - } - - override fun requestChannel(initPayload: Payload, payloads: Flow): Flow = flow { - initPayload.close() - payloads.collect { emitOrClose(it) } - } - - companion object { - const val data = "hello world" - const val metadata = "metadata" - } -} diff --git a/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/Test.kt b/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/Test.kt index b8c212ae1..601d8bdad 100644 --- a/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/Test.kt +++ b/rsocket-test/src/jsMain/kotlin/io/rsocket/kotlin/test/Test.kt @@ -16,17 +16,16 @@ package io.rsocket.kotlin.test -import io.rsocket.kotlin.logging.* import kotlinx.coroutines.* -internal actual fun runTest( - ignoreNative: Boolean, - block: suspend CoroutineScope.() -> Unit, -): dynamic = GlobalScope.promise(block = block) +internal actual fun runTest(block: suspend CoroutineScope.() -> Unit): dynamic = GlobalScope.promise(block = block) + +actual typealias IgnoreJs = kotlin.test.Ignore + +actual annotation class IgnoreJvm +actual annotation class IgnoreNative //JS is single threaded, so it have only one dispatcher backed by one threed actual val anotherDispatcher: CoroutineDispatcher get() = Dispatchers.Default -actual val TestLoggerFactory: LoggerFactory = ConsoleLogger - actual fun identityHashCode(instance: Any): Int = instance.hashCode() diff --git a/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Test.kt b/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Test.kt index f7d295ace..686bb756c 100644 --- a/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Test.kt +++ b/rsocket-test/src/jvmMain/kotlin/io/rsocket/kotlin/test/Test.kt @@ -16,26 +16,15 @@ package io.rsocket.kotlin.test -import io.rsocket.kotlin.logging.* import kotlinx.coroutines.* -import java.io.* -import java.util.logging.* -internal actual fun runTest( - ignoreNative: Boolean, - block: suspend CoroutineScope.() -> Unit, -) { - runBlocking(block = block) -} +internal actual fun runTest(block: suspend CoroutineScope.() -> Unit): Unit = runBlocking(block = block) -actual val anotherDispatcher: CoroutineDispatcher get() = Dispatchers.IO +actual annotation class IgnoreJs +actual typealias IgnoreJvm = org.junit.Ignore -actual val TestLoggerFactory: LoggerFactory = run { - //init logger - val file = File("src/jvmTest/resources/logging.properties") - if (file.exists()) LogManager.getLogManager().readConfiguration(file.inputStream()) +actual annotation class IgnoreNative - JavaLogger -} +actual val anotherDispatcher: CoroutineDispatcher get() = Dispatchers.IO actual fun identityHashCode(instance: Any): Int = System.identityHashCode(instance) diff --git a/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/Test.kt b/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/Test.kt index 22d546a78..c7b87a02b 100644 --- a/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/Test.kt +++ b/rsocket-test/src/nativeMain/kotlin/io/rsocket/kotlin/test/Test.kt @@ -16,22 +16,15 @@ package io.rsocket.kotlin.test -import io.rsocket.kotlin.logging.* import kotlinx.coroutines.* import kotlin.native.* -internal actual fun runTest( - ignoreNative: Boolean, - block: suspend CoroutineScope.() -> Unit, -) { - if (ignoreNative) return +internal actual fun runTest(block: suspend CoroutineScope.() -> Unit) = runBlocking(block = block) - runBlocking(block = block) -} +actual annotation class IgnoreJs +actual annotation class IgnoreJvm +actual typealias IgnoreNative = kotlin.test.Ignore actual val anotherDispatcher: CoroutineDispatcher get() = newSingleThreadContext("another") -@SharedImmutable -actual val TestLoggerFactory: LoggerFactory = PrintLogger - actual fun identityHashCode(instance: Any): Int = instance.identityHashCode() diff --git a/rsocket-transport-ktor/build.gradle.kts b/rsocket-transport-ktor/build.gradle.kts index 5e2acd72f..013eb8a9f 100644 --- a/rsocket-transport-ktor/build.gradle.kts +++ b/rsocket-transport-ktor/build.gradle.kts @@ -15,7 +15,7 @@ */ plugins { - rsocket.template.library + rsocket.template.transport } kotlin { diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-tcp/build.gradle.kts b/rsocket-transport-ktor/rsocket-transport-ktor-tcp/build.gradle.kts index 998f83f0b..89709e5c3 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-tcp/build.gradle.kts +++ b/rsocket-transport-ktor/rsocket-transport-ktor-tcp/build.gradle.kts @@ -15,7 +15,7 @@ */ plugins { - rsocket.template.library + rsocket.template.transport } kotlin { diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-tcp/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpServerTest.kt b/rsocket-transport-ktor/rsocket-transport-ktor-tcp/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpServerTest.kt index 828e8a684..e8f93aff3 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-tcp/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpServerTest.kt +++ b/rsocket-transport-ktor/rsocket-transport-ktor-tcp/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpServerTest.kt @@ -18,14 +18,14 @@ package io.rsocket.kotlin.transport.ktor.tcp import io.ktor.network.sockets.* import io.rsocket.kotlin.* -import io.rsocket.kotlin.core.* import io.rsocket.kotlin.test.* +import io.rsocket.kotlin.transport.tests.* import kotlinx.coroutines.* import kotlin.test.* class TcpServerTest : SuspendTest, TestWithLeakCheck { private val testJob = Job() - private val testContext = testJob + CoroutineExceptionHandler { c, e -> println("$c -> $e") } + private val testContext = testJob + TestExceptionHandler private val address = InetSocketAddress("0.0.0.0", PortProvider.next()) private val serverTransport = TcpServerTransport(address, InUseTrackingPool) private val clientTransport = TcpClientTransport(address, testContext, InUseTrackingPool) @@ -36,7 +36,7 @@ class TcpServerTest : SuspendTest, TestWithLeakCheck { @Test fun testFailedConnection() = test { - val server = RSocketServer().bindIn(CoroutineScope(testContext), serverTransport) { + val server = TestServer().bindIn(CoroutineScope(testContext), serverTransport) { if (config.setupPayload.data.readText() == "ok") { RSocketRequestHandler { requestResponse { it } @@ -44,7 +44,7 @@ class TcpServerTest : SuspendTest, TestWithLeakCheck { } else error("FAILED") }.also { it.serverSocket.await() } - suspend fun newClient(text: String) = RSocketConnector { + suspend fun newClient(text: String) = TestConnector { connectionConfig { setupPayload { payload(text) @@ -80,13 +80,13 @@ class TcpServerTest : SuspendTest, TestWithLeakCheck { @Test fun testFailedHandler() = test { val handlers = mutableListOf() - val server = RSocketServer().bindIn(CoroutineScope(testContext), serverTransport) { + val server = TestServer().bindIn(CoroutineScope(testContext), serverTransport) { RSocketRequestHandler { requestResponse { it } }.also { handlers += it } }.also { it.serverSocket.await() } - suspend fun newClient() = RSocketConnector().connect(clientTransport) + suspend fun newClient() = TestConnector().connect(clientTransport) val client1 = newClient() diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-tcp/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpTransportTest.kt b/rsocket-transport-ktor/rsocket-transport-ktor-tcp/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpTransportTest.kt index 0a6594d62..985c92da8 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-tcp/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpTransportTest.kt +++ b/rsocket-transport-ktor/rsocket-transport-ktor-tcp/src/commonTest/kotlin/io/rsocket/kotlin/transport/ktor/tcp/TcpTransportTest.kt @@ -18,24 +18,12 @@ package io.rsocket.kotlin.transport.ktor.tcp import io.ktor.network.sockets.* import io.rsocket.kotlin.test.* -import kotlinx.coroutines.* +import io.rsocket.kotlin.transport.tests.* class TcpTransportTest : TransportTest() { - private val testJob = Job() - override suspend fun before() { val address = InetSocketAddress("0.0.0.0", PortProvider.next()) - val context = testJob + CoroutineExceptionHandler { c, e -> println("$c -> $e") } - SERVER.bindIn( - CoroutineScope(context), - TcpServerTransport(address, InUseTrackingPool), - ACCEPTOR - ).serverSocket.await() - client = CONNECTOR.connect(TcpClientTransport(address, context, InUseTrackingPool)) - } - - override suspend fun after() { - super.after() - testJob.cancelAndJoin() + startServer(TcpServerTransport(address, InUseTrackingPool)).serverSocket.await() + client = connectClient(TcpClientTransport(address, testContext, InUseTrackingPool)) } } diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/build.gradle.kts b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/build.gradle.kts index 36ec34f08..206edacc6 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/build.gradle.kts +++ b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-client/build.gradle.kts @@ -15,7 +15,7 @@ */ plugins { - rsocket.template.library + rsocket.template.transport } kotlin { diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/build.gradle.kts b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/build.gradle.kts index 4f0ff5857..3d1475e35 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/build.gradle.kts +++ b/rsocket-transport-ktor/rsocket-transport-ktor-websocket-server/build.gradle.kts @@ -15,7 +15,7 @@ */ plugins { - rsocket.template.library + rsocket.template.transport } kotlin { diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket/build.gradle.kts b/rsocket-transport-ktor/rsocket-transport-ktor-websocket/build.gradle.kts index 2eef9fcf5..f8b225d27 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket/build.gradle.kts +++ b/rsocket-transport-ktor/rsocket-transport-ktor-websocket/build.gradle.kts @@ -15,7 +15,7 @@ */ plugins { - rsocket.template.library + rsocket.template.transport } kotlin { @@ -52,4 +52,4 @@ kotlin { description = "Ktor WebSocket RSocket transport implementation" -evaluationDependsOn(":rsocket-test-server") +evaluationDependsOn(":rsocket-transport-tests") diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/jsTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/ClientWebSocketTransportTest.kt b/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/jsTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/ClientWebSocketTransportTest.kt index ca87d33e8..51e19e823 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/jsTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/ClientWebSocketTransportTest.kt +++ b/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/jsTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/ClientWebSocketTransportTest.kt @@ -19,8 +19,8 @@ package io.rsocket.kotlin.transport.ktor.websocket import io.ktor.client.* import io.ktor.client.engine.js.* import io.ktor.client.plugins.websocket.* -import io.rsocket.kotlin.test.* import io.rsocket.kotlin.transport.ktor.websocket.client.* +import io.rsocket.kotlin.transport.tests.* import kotlinx.coroutines.* class ClientWebSocketTransportTest : TransportTest() { diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketConnectionTest.kt b/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketConnectionTest.kt index 50939fa11..9748f00a2 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketConnectionTest.kt +++ b/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketConnectionTest.kt @@ -21,12 +21,12 @@ import io.ktor.server.application.* import io.ktor.server.engine.* import io.ktor.server.routing.* import io.rsocket.kotlin.* -import io.rsocket.kotlin.core.* import io.rsocket.kotlin.keepalive.* import io.rsocket.kotlin.payload.* import io.rsocket.kotlin.test.* import io.rsocket.kotlin.transport.ktor.websocket.client.* import io.rsocket.kotlin.transport.ktor.websocket.server.* +import io.rsocket.kotlin.transport.tests.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.test.* @@ -42,7 +42,7 @@ class WebSocketConnectionTest : SuspendTest, TestWithLeakCheck { private val client = HttpClient(ClientCIO) { install(ClientWebSockets) install(ClientRSocketSupport) { - connector = RSocketConnector { + connector = TestConnector { connectionConfig { keepAlive = KeepAlive(500) } @@ -54,7 +54,9 @@ class WebSocketConnectionTest : SuspendTest, TestWithLeakCheck { private val server = embeddedServer(ServerCIO, port) { install(ServerWebSockets) - install(ServerRSocketSupport) + install(ServerRSocketSupport) { + server = TestServer() + } install(Routing) { rSocket { RSocketRequestHandler { @@ -79,7 +81,8 @@ class WebSocketConnectionTest : SuspendTest, TestWithLeakCheck { } override suspend fun after() { - server.stop(0, 0) + server.stop() + client.coroutineContext.job.cancelAndJoin() } @Test diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketTransportTest.kt b/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketTransportTest.kt index 55251adb4..dd317ebd4 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketTransportTest.kt +++ b/rsocket-transport-ktor/rsocket-transport-ktor-websocket/src/jvmTest/kotlin/io/rsocket/kotlin/transport/ktor/websocket/WebSocketTransportTest.kt @@ -21,9 +21,9 @@ import io.ktor.client.engine.* import io.ktor.server.application.* import io.ktor.server.engine.* import io.ktor.server.routing.* -import io.rsocket.kotlin.test.* import io.rsocket.kotlin.transport.ktor.websocket.client.* import io.rsocket.kotlin.transport.ktor.websocket.server.* +import io.rsocket.kotlin.transport.tests.* import kotlinx.coroutines.* import io.ktor.client.plugins.websocket.WebSockets as ClientWebSockets import io.ktor.server.websocket.WebSockets as ServerWebSockets @@ -32,43 +32,26 @@ import io.rsocket.kotlin.transport.ktor.websocket.server.RSocketSupport as Serve abstract class WebSocketTransportTest( clientEngine: HttpClientEngineFactory<*>, - serverEngine: ApplicationEngineFactory<*, *>, + private val serverEngine: ApplicationEngineFactory<*, *>, ) : TransportTest() { private val port = PortProvider.next() - private val testJob = Job() private val httpClient = HttpClient(clientEngine) { install(ClientWebSockets) install(ClientRSocketSupport) { connector = CONNECTOR } } - private val server = (GlobalScope + testJob).embeddedServer(serverEngine, port) { - install(ServerWebSockets) - install(ServerRSocketSupport) { server = SERVER } - install(Routing) { rSocket(acceptor = ACCEPTOR) } - }.apply { start() } - override suspend fun before() { - super.before() - client = trySeveralTimes { httpClient.rSocket(port = port) } + (GlobalScope + testJob).embeddedServer(serverEngine, port) { + install(ServerWebSockets) + install(ServerRSocketSupport) { server = SERVER } + install(Routing) { rSocket(acceptor = ACCEPTOR) } + }.start() + client = httpClient.rSocket(port = port) } override suspend fun after() { super.after() - testJob.cancelAndJoin() httpClient.coroutineContext.job.cancelAndJoin() } - - private suspend inline fun trySeveralTimes(block: () -> R): R { - lateinit var error: Throwable - repeat(10) { - try { - return block() - } catch (e: Throwable) { - error = e - delay(500) //sometimes address isn't yet available (server isn't started) - } - } - throw error - } } diff --git a/rsocket-transport-local/build.gradle.kts b/rsocket-transport-local/build.gradle.kts index df932cfde..033d7f2ad 100644 --- a/rsocket-transport-local/build.gradle.kts +++ b/rsocket-transport-local/build.gradle.kts @@ -15,7 +15,7 @@ */ plugins { - rsocket.template.library + rsocket.template.transport } kotlin { diff --git a/rsocket-transport-local/src/commonTest/kotlin/io/rsocket/kotlin/transport/local/LocalTransportTest.kt b/rsocket-transport-local/src/commonTest/kotlin/io/rsocket/kotlin/transport/local/LocalTransportTest.kt index 2be32e46b..5c4ea312f 100644 --- a/rsocket-transport-local/src/commonTest/kotlin/io/rsocket/kotlin/transport/local/LocalTransportTest.kt +++ b/rsocket-transport-local/src/commonTest/kotlin/io/rsocket/kotlin/transport/local/LocalTransportTest.kt @@ -17,22 +17,11 @@ package io.rsocket.kotlin.transport.local import io.rsocket.kotlin.test.* -import kotlinx.coroutines.* +import io.rsocket.kotlin.transport.tests.* class LocalTransportTest : TransportTest() { - private val testJob = Job() - override suspend fun before() { - val server = SERVER.bindIn( - CoroutineScope(testJob + CoroutineExceptionHandler { c, e -> println("$c -> $e") }), - LocalServerTransport(InUseTrackingPool), - ACCEPTOR - ) - client = CONNECTOR.connect(server) - } - - override suspend fun after() { - super.after() - testJob.cancelAndJoin() + val server = startServer(LocalServerTransport(InUseTrackingPool)) + client = connectClient(server) } } diff --git a/rsocket-transport-nodejs-tcp/build.gradle.kts b/rsocket-transport-nodejs-tcp/build.gradle.kts index 7e2ce0918..ec2f27329 100644 --- a/rsocket-transport-nodejs-tcp/build.gradle.kts +++ b/rsocket-transport-nodejs-tcp/build.gradle.kts @@ -15,7 +15,7 @@ */ plugins { - rsocket.template.library + rsocket.template.transport } kotlin { diff --git a/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt b/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt index d32c9144a..7c39666e2 100644 --- a/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt +++ b/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt @@ -1,35 +1,21 @@ package io.rsocket.kotlin.transport.nodejs.tcp import io.rsocket.kotlin.test.* -import kotlinx.atomicfu.* +import io.rsocket.kotlin.transport.tests.* import kotlinx.coroutines.* -import kotlin.random.* - -object PortProvider { - private val port = atomic(Random.nextInt(20, 90) * 100) - fun next(): Int = port.incrementAndGet() -} - class TcpTransportTest : TransportTest() { - private val testJob = Job() - private lateinit var server: TcpServer override suspend fun before() { val port = PortProvider.next() - server = SERVER.bindIn( - CoroutineScope(testJob + CoroutineExceptionHandler { c, e -> println("$c -> $e") }), - TcpServerTransport(port, "127.0.0.1", InUseTrackingPool), - ACCEPTOR - ) - client = CONNECTOR.connect(TcpClientTransport(port, "127.0.0.1", InUseTrackingPool, testJob)) + server = startServer(TcpServerTransport(port, "127.0.0.1", InUseTrackingPool)) + client = connectClient(TcpClientTransport(port, "127.0.0.1", InUseTrackingPool, testJob)) } override suspend fun after() { delay(100) //TODO close race super.after() - testJob.cancelAndJoin() server.close() } } diff --git a/rsocket-test/rsocket-test-server/build.gradle.kts b/rsocket-transport-tests/build.gradle.kts similarity index 57% rename from rsocket-test/rsocket-test-server/build.gradle.kts rename to rsocket-transport-tests/build.gradle.kts index a6724c534..f55141c77 100644 --- a/rsocket-test/rsocket-test-server/build.gradle.kts +++ b/rsocket-transport-tests/build.gradle.kts @@ -26,20 +26,27 @@ kotlin { sourceSets.all { languageSettings.optInForTest() } - configureJvm { + configureCommon { main { dependencies { - implementation(projects.rsocketTest) + api(projects.rsocketTest) + } + } + } + configureJvm { + test { + dependencies { implementation(projects.rsocketTransportKtor.rsocketTransportKtorTcp) implementation(projects.rsocketTransportKtor.rsocketTransportKtorWebsocketServer) - implementation(libs.ktor.server.cio) } } } + configureJs() + configureNative() } -open class RSocketTestServer : DefaultTask() { +open class StartTransportTestServer : DefaultTask() { @Internal var server: Closeable? = null private set @@ -50,40 +57,44 @@ open class RSocketTestServer : DefaultTask() { @TaskAction fun exec() { try { - println("[TestServer] start") - val loader = URLClassLoader(classpath.map { it.toURI().toURL() }.toTypedArray(), ClassLoader.getSystemClassLoader()) - server = loader.loadClass("io.rsocket.kotlin.test.server.AppKt").getMethod("start").invoke(null) as Closeable - println("[TestServer] started") + println("[TransportTestServer] start") + server = URLClassLoader( + classpath.map { it.toURI().toURL() }.toTypedArray(), + ClassLoader.getSystemClassLoader() + ) + .loadClass("io.rsocket.kotlin.transport.tests.server.AppKt") + .getMethod("start") + .invoke(null) as Closeable + println("[TransportTestServer] started") } catch (cause: Throwable) { - println("[TestServer] failed: ${cause.message}") + println("[TransportTestServer] failed to start: ${cause.message}") cause.printStackTrace() } } } -val startTestServer by tasks.registering(RSocketTestServer::class) { - dependsOn(tasks["jvmJar"]) +val startTransportTestServer by tasks.registering(StartTransportTestServer::class) { + dependsOn(tasks["jvmTest"]) //TODO? classpath = (kotlin.targets["jvm"].compilations["test"] as KotlinJvmCompilation).runtimeDependencyFiles } -val testTasks = setOf( - "jsLegacyNodeTest", - "jsIrNodeTest", - "jsLegacyBrowserTest", - "jsIrBrowserTest", -) - rootProject.allprojects { if (name == "rsocket-transport-ktor-websocket") { - tasks.matching { it.name in testTasks }.all { - dependsOn(startTestServer) + val names = setOf( + "jsLegacyNodeTest", + "jsIrNodeTest", + "jsLegacyBrowserTest", + "jsIrBrowserTest", + ) + tasks.all { + if (name in names) dependsOn(startTransportTestServer) } } } gradle.buildFinished { - startTestServer.get().server?.run { + startTransportTestServer.get().server?.run { close() - println("[TestServer] stop") + println("[TransportTestServer] stopped") } } diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/PortProvider.kt b/rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/PortProvider.kt similarity index 62% rename from rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/PortProvider.kt rename to rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/PortProvider.kt index 4f6c2798f..577511070 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/PortProvider.kt +++ b/rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/PortProvider.kt @@ -1,4 +1,4 @@ -package io.rsocket.kotlin.test +package io.rsocket.kotlin.transport.tests import kotlinx.atomicfu.* import kotlin.random.* @@ -6,4 +6,7 @@ import kotlin.random.* object PortProvider { private val port = atomic(Random.nextInt(20, 90) * 100) fun next(): Int = port.incrementAndGet() + + val testServerTcp = 8000 + val testServerWebSocket = 9000 } diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt b/rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/TransportTest.kt similarity index 64% rename from rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt rename to rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/TransportTest.kt index ceb61be94..39772a3a8 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TransportTest.kt +++ b/rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/TransportTest.kt @@ -14,27 +14,40 @@ * limitations under the License. */ -package io.rsocket.kotlin.test +package io.rsocket.kotlin.transport.tests +import io.ktor.utils.io.core.* import io.rsocket.kotlin.* -import io.rsocket.kotlin.core.* import io.rsocket.kotlin.keepalive.* -import io.rsocket.kotlin.logging.* import io.rsocket.kotlin.payload.* +import io.rsocket.kotlin.test.* +import io.rsocket.kotlin.transport.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* +import kotlin.coroutines.* import kotlin.test.* import kotlin.time.* import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds abstract class TransportTest : SuspendTest, TestWithLeakCheck { - override val testTimeout: Duration = 2.minutes + override val testTimeout: Duration = 3.minutes - lateinit var client: RSocket //should be assigned in `before` + protected val testJob = Job() + protected val testContext = testJob + TestExceptionHandler + protected val testScope = CoroutineScope(testContext) + + protected lateinit var client: RSocket + + protected suspend fun connectClient(clientTransport: ClientTransport): RSocket = + CONNECTOR.connect(clientTransport) + + protected fun startServer(serverTransport: ServerTransport): T = + SERVER.bindIn(testScope, serverTransport, ACCEPTOR) override suspend fun after() { client.coroutineContext.job.cancelAndJoin() + testJob.cancelAndJoin() } @Test @@ -44,17 +57,17 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { @Test fun largePayloadFireAndForget10() = test { - (1..10).map { async { client.fireAndForget(LARGE_PAYLOAD) } }.awaitAll() + (1..10).map { async { client.fireAndForget(requesterLargeMetadata) } }.awaitAll() } @Test fun metadataPush10() = test { - (1..10).map { async { client.metadataPush(packet(MOCK_DATA)) } }.awaitAll() + (1..10).map { async { client.metadataPush(packet(requesterData)) } }.awaitAll() } @Test fun largePayloadMetadataPush10() = test { - (1..10).map { async { client.metadataPush(packet(LARGE_DATA)) } }.awaitAll() + (1..10).map { async { client.metadataPush(packet(requesterLargeData)) } }.awaitAll() } @Test @@ -82,10 +95,10 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { @Test fun largePayloadRequestChannel200() = test { val request = flow { - repeat(200) { emit(LARGE_PAYLOAD) } + repeat(200) { emit(requesterLargeMetadata) } } val list = - client.requestChannel(LARGE_PAYLOAD, request) + client.requestChannel(requesterLargeMetadata, request) .flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)) .onEach { it.close() } .toList() @@ -98,14 +111,15 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { repeat(20_000) { emit(payload(7)) } } val list = client.requestChannel(payload(7), request).flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)).onEach { - assertEquals(MOCK_DATA, it.data.readText()) - assertEquals(MOCK_METADATA, it.metadata?.readText()) + assertEquals(requesterData, it.data.readText()) + assertEquals(requesterMetadata, it.metadata?.readText()) }.toList() assertEquals(20_000, list.size) } @Test - fun requestChannel200000() = test(ignoreNative = true) { + @IgnoreNative //long test + fun requestChannel200000() = test { val request = flow { repeat(200_000) { emit(payload(it)) } } @@ -130,7 +144,8 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { } @Test - fun requestChannel256x512() = test(ignoreNative = true) { + @IgnoreNative //long test + fun requestChannel256x512() = test { val request = flow { repeat(512) { emit(payload(it)) @@ -155,8 +170,8 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { .flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)) .take(500) .onEach { - assertEquals(MOCK_DATA, it.data.readText()) - assertEquals(MOCK_METADATA, it.metadata?.readText()) + assertEquals(requesterData, it.data.readText()) + assertEquals(requesterMetadata, it.metadata?.readText()) }.toList() assertEquals(500, list.size) } @@ -178,7 +193,7 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { @Test fun largePayloadRequestResponse100() = test { - (1..100).map { async { client.requestResponse(LARGE_PAYLOAD) } }.awaitAll().onEach { it.close() } + (1..100).map { async { client.requestResponse(requesterLargeMetadata) } }.awaitAll().onEach { it.close() } } @Test @@ -187,7 +202,8 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { } @Test - fun requestResponse100000() = test(ignoreNative = true) { + @IgnoreNative //long test + fun requestResponse100000() = test { repeat(100000) { client.requestResponse(payload(3)).let(Companion::checkPayload) } } @@ -217,40 +233,62 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { } companion object { - - val SERVER = RSocketServer { - loggerFactory = NoopLogger - } - - val CONNECTOR = RSocketConnector { - loggerFactory = NoopLogger - + val SERVER = TestServer(logging = false) + val CONNECTOR = TestConnector(logging = false) { connectionConfig { keepAlive = KeepAlive(10.minutes, 100.minutes) } } - val ACCEPTOR = ConnectionAcceptor { - TestRSocket() - } + val ACCEPTOR = ConnectionAcceptor { ResponderRSocket() } + + const val responderData = "hello world" + const val responderMetadata = "metadata" - const val MOCK_DATA: String = "test-data" - const val MOCK_METADATA: String = "metadata" - val LARGE_DATA = "large.text.12345".repeat(2000) - val LARGE_PAYLOAD get() = payload(LARGE_DATA, LARGE_DATA) + const val requesterData: String = "test-data" + const val requesterMetadata: String = "metadata" - private fun payload(metadataPresent: Int): Payload { + val requesterLargeData = "large.text.12345".repeat(2000) + val requesterLargeMetadata get() = payload(requesterLargeData, requesterLargeData) + + fun payload(metadataPresent: Int): Payload { val metadata = when (metadataPresent % 5) { 0 -> null 1 -> "" - else -> MOCK_METADATA + else -> requesterMetadata } - return payload(MOCK_DATA, metadata) + return payload(requesterData, metadata) } fun checkPayload(payload: Payload) { - assertEquals(TestRSocket.data, payload.data.readText()) - assertEquals(TestRSocket.metadata, payload.metadata?.readText()) + assertEquals(responderData, payload.data.readText()) + assertEquals(responderMetadata, payload.metadata?.readText()) + } + } + + private class ResponderRSocket : RSocket { + override val coroutineContext: CoroutineContext = Job() + + override suspend fun metadataPush(metadata: ByteReadPacket): Unit = metadata.close() + + override suspend fun fireAndForget(payload: Payload): Unit = payload.close() + + override suspend fun requestResponse(payload: Payload): Payload { + payload.close() + return Payload(packet(responderData), packet(responderMetadata)) + } + + override fun requestStream(payload: Payload): Flow = flow { + payload.close() + repeat(10000) { + emitOrClose(Payload(packet(responderData), packet(responderMetadata))) + } + } + + override fun requestChannel(initPayload: Payload, payloads: Flow): Flow = flow { + initPayload.close() + payloads.collect { emitOrClose(it) } } } + } diff --git a/rsocket-transport-tests/src/jvmTest/kotlin/io/rsocket/kotlin/transport/tests/server/App.kt b/rsocket-transport-tests/src/jvmTest/kotlin/io/rsocket/kotlin/transport/tests/server/App.kt new file mode 100644 index 000000000..af2dba805 --- /dev/null +++ b/rsocket-transport-tests/src/jvmTest/kotlin/io/rsocket/kotlin/transport/tests/server/App.kt @@ -0,0 +1,55 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.transport.tests.server + +import io.ktor.server.application.* +import io.ktor.server.cio.* +import io.ktor.server.engine.* +import io.ktor.server.routing.* +import io.ktor.server.websocket.* +import io.rsocket.kotlin.transport.ktor.tcp.* +import io.rsocket.kotlin.transport.ktor.websocket.server.* +import io.rsocket.kotlin.transport.tests.* +import kotlinx.coroutines.* +import java.io.* + +fun start(): Closeable { + val job = Job() + val scope = CoroutineScope(job) + + runBlocking { + TransportTest.SERVER.bindIn( + scope, + TcpServerTransport(port = PortProvider.testServerTcp), + TransportTest.ACCEPTOR + ).serverSocket.await() //await server start + } + + scope.embeddedServer(CIO, port = PortProvider.testServerWebSocket) { + install(WebSockets) + install(RSocketSupport) { server = TransportTest.SERVER } + install(Routing) { rSocket(acceptor = TransportTest.ACCEPTOR) } + }.start() + + Thread.sleep(1000) //await start + + return Closeable { + runBlocking { + job.cancelAndJoin() + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index c7d3b3c8d..3806760ef 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -51,9 +51,8 @@ include("benchmarks") include("rsocket-core") include("rsocket-test") -include("rsocket-test-server") -project(":rsocket-test-server").projectDir = file("rsocket-test/rsocket-test-server") +include("rsocket-transport-tests") include("rsocket-transport-local") //ktor transport modules