Skip to content

Commit d4a5e7e

Browse files
author
olme04
authored
Improve tests: (#210)
* turn on frame logging in all tests except transport tests * fix flakiness of some tests * improve `SuspendTest` test logs * new way to ignore several native tests * move TestConnection to core module, as it used only there * make frame/frameType fully internal for now * extract transport related tests into separate module * add build file template for transport * merge tests with test server * simplify transport tests configuration
1 parent e4a4fab commit d4a5e7e

File tree

39 files changed

+414
-426
lines changed

39 files changed

+414
-426
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
plugins {
2+
id("rsocket.template.library")
3+
}
4+
5+
kotlin {
6+
sourceSets {
7+
commonTest {
8+
dependencies {
9+
implementation(project(":rsocket-transport-tests"))
10+
}
11+
}
12+
}
13+
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ import io.rsocket.kotlin.frame.io.*
2424
private const val FlagsMask: Int = 1023
2525
private const val FrameTypeShift: Int = 10
2626

27-
public sealed class Frame : Closeable {
28-
public abstract val type: FrameType
29-
public abstract val streamId: Int
30-
public abstract val flags: Int
27+
internal sealed class Frame : Closeable {
28+
abstract val type: FrameType
29+
abstract val streamId: Int
30+
abstract val flags: Int
3131

3232
protected abstract fun BytePacketBuilder.writeSelf()
3333
protected abstract fun StringBuilder.appendFlags()

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/FrameType.kt

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package io.rsocket.kotlin.frame
1818

1919
import io.rsocket.kotlin.frame.io.*
2020

21-
public enum class FrameType(public val encodedType: Int, flags: Int = Flags.Empty) {
21+
internal enum class FrameType(val encodedType: Int, flags: Int = Flags.Empty) {
2222
Reserved(0x00),
2323

2424
//CONNECTION
@@ -32,8 +32,14 @@ public enum class FrameType(public val encodedType: Int, flags: Int = Flags.Empt
3232
//REQUEST
3333
RequestFnF(0x05, Flags.CanHaveData or Flags.CanHaveMetadata or Flags.Fragmentable or Flags.Request),
3434
RequestResponse(0x04, Flags.CanHaveData or Flags.CanHaveMetadata or Flags.Fragmentable or Flags.Request),
35-
RequestStream(0x06, Flags.CanHaveMetadata or Flags.CanHaveData or Flags.HasInitialRequest or Flags.Fragmentable or Flags.Request),
36-
RequestChannel(0x07, Flags.CanHaveMetadata or Flags.CanHaveData or Flags.HasInitialRequest or Flags.Fragmentable or Flags.Request),
35+
RequestStream(
36+
0x06,
37+
Flags.CanHaveMetadata or Flags.CanHaveData or Flags.HasInitialRequest or Flags.Fragmentable or Flags.Request
38+
),
39+
RequestChannel(
40+
0x07,
41+
Flags.CanHaveMetadata or Flags.CanHaveData or Flags.HasInitialRequest or Flags.Fragmentable or Flags.Request
42+
),
3743

3844
// DURING REQUEST
3945
RequestN(0x08),
@@ -49,11 +55,11 @@ public enum class FrameType(public val encodedType: Int, flags: Int = Flags.Empt
4955

5056
Extension(0x3F, Flags.CanHaveData or Flags.CanHaveMetadata);
5157

52-
public val hasInitialRequest: Boolean = flags check Flags.HasInitialRequest
53-
public val isRequestType: Boolean = flags check Flags.Request
54-
public val isFragmentable: Boolean = flags check Flags.Fragmentable
55-
public val canHaveMetadata: Boolean = flags check Flags.CanHaveMetadata
56-
public val canHaveData: Boolean = flags check Flags.CanHaveData
58+
val hasInitialRequest: Boolean = flags check Flags.HasInitialRequest
59+
val isRequestType: Boolean = flags check Flags.Request
60+
val isFragmentable: Boolean = flags check Flags.Fragmentable
61+
val canHaveMetadata: Boolean = flags check Flags.CanHaveMetadata
62+
val canHaveData: Boolean = flags check Flags.CanHaveData
5763

5864
private object Flags {
5965
const val Empty = 0

rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/ConnectionEstablishmentTest.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.rsocket.kotlin
1818

1919
import io.ktor.utils.io.core.*
20-
import io.rsocket.kotlin.core.*
2120
import io.rsocket.kotlin.frame.*
2221
import io.rsocket.kotlin.frame.io.*
2322
import io.rsocket.kotlin.keepalive.*
@@ -39,7 +38,7 @@ class ConnectionEstablishmentTest : SuspendTest, TestWithLeakCheck {
3938
GlobalScope.async { accept(connection) }
4039
}
4140

42-
val deferred = RSocketServer().bind(serverTransport) {
41+
val deferred = TestServer().bind(serverTransport) {
4342
sendingRSocket.complete(requester)
4443
error(errorMessage)
4544
}
@@ -67,6 +66,7 @@ class ConnectionEstablishmentTest : SuspendTest, TestWithLeakCheck {
6766
assertFalse(sender.isActive)
6867
expectNoEventsIn(100)
6968
}
69+
connection.coroutineContext.job.join()
7070
val error = connection.coroutineContext.job.getCancellationException().cause
7171
assertTrue(error is RSocketError.Setup.Rejected)
7272
assertEquals(errorMessage, error.message)
@@ -77,7 +77,7 @@ class ConnectionEstablishmentTest : SuspendTest, TestWithLeakCheck {
7777
val connection = TestConnection()
7878
val p = payload("setup")
7979
assertFailsWith(IllegalStateException::class, "failed") {
80-
RSocketConnector {
80+
TestConnector {
8181
connectionConfig {
8282
setupPayload { p }
8383
}
@@ -86,8 +86,9 @@ class ConnectionEstablishmentTest : SuspendTest, TestWithLeakCheck {
8686
assertTrue(p.data.isNotEmpty)
8787
error("failed")
8888
}
89-
}.connect { connection }
89+
}.connect(connection)
9090
}
91+
connection.coroutineContext.job.join()
9192
assertTrue(p.data.isEmpty)
9293
}
9394

rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt renamed to rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestConnection.kt

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,36 +14,41 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.rsocket.kotlin.test
17+
package io.rsocket.kotlin
1818

1919
import app.cash.turbine.*
2020
import io.ktor.utils.io.core.*
2121
import io.ktor.utils.io.core.internal.*
2222
import io.ktor.utils.io.pool.*
23-
import io.rsocket.kotlin.*
2423
import io.rsocket.kotlin.frame.*
2524
import io.rsocket.kotlin.internal.*
25+
import io.rsocket.kotlin.test.*
26+
import io.rsocket.kotlin.transport.*
2627
import kotlinx.coroutines.*
2728
import kotlinx.coroutines.channels.*
2829
import kotlinx.coroutines.flow.*
2930
import kotlin.coroutines.*
31+
import kotlin.test.*
3032
import kotlin.time.*
33+
import kotlin.time.Duration.Companion.seconds
3134

32-
class TestConnection : Connection {
35+
class TestConnection : Connection, ClientTransport {
3336
override val pool: ObjectPool<ChunkBuffer> = InUseTrackingPool
3437
override val coroutineContext: CoroutineContext =
35-
Job() + Dispatchers.Unconfined + CoroutineExceptionHandler { c, e -> println("$c -> $e") }
38+
Job() + Dispatchers.Unconfined + TestExceptionHandler
3639

3740
private val sendChannel = Channel<ByteReadPacket>(Channel.UNLIMITED)
3841
private val receiveChannel = Channel<ByteReadPacket>(Channel.UNLIMITED)
3942

4043
init {
4144
coroutineContext.job.invokeOnCompletion {
4245
sendChannel.close(it)
43-
@Suppress("INVISIBLE_MEMBER") receiveChannel.fullClose(it)
46+
receiveChannel.fullClose(it)
4447
}
4548
}
4649

50+
override suspend fun connect(): Connection = this
51+
4752
override suspend fun send(packet: ByteReadPacket) {
4853
sendChannel.send(packet)
4954
}
@@ -52,17 +57,21 @@ class TestConnection : Connection {
5257
return receiveChannel.receive()
5358
}
5459

55-
suspend fun sendToReceiver(vararg frames: Frame) {
60+
suspend fun ignoreSetupFrame() {
61+
assertEquals(FrameType.Setup, sendChannel.receive().readFrame(InUseTrackingPool).type)
62+
}
63+
64+
internal suspend fun sendToReceiver(vararg frames: Frame) {
5665
frames.forEach {
57-
val packet = @Suppress("INVISIBLE_MEMBER") it.toPacket(InUseTrackingPool)
66+
val packet = it.toPacket(InUseTrackingPool)
5867
receiveChannel.send(packet)
5968
}
6069
}
6170

62-
suspend fun test(validate: suspend FlowTurbine<Frame>.() -> Unit) {
71+
internal suspend fun test(validate: suspend FlowTurbine<Frame>.() -> Unit) {
6372
sendChannel.consumeAsFlow().map {
64-
@Suppress("INVISIBLE_MEMBER") it.readFrame(InUseTrackingPool)
65-
}.test(validate = validate)
73+
it.readFrame(InUseTrackingPool)
74+
}.test(5.seconds, validate = validate)
6675
}
6776
}
6877

@@ -76,6 +85,6 @@ suspend fun FlowTurbine<*>.expectNoEventsIn(timeMillis: Long) {
7685
expectNoEvents()
7786
}
7887

79-
suspend inline fun FlowTurbine<Frame>.awaitFrame(block: (frame: Frame) -> Unit) {
88+
internal suspend inline fun FlowTurbine<Frame>.awaitFrame(block: (frame: Frame) -> Unit) {
8089
block(awaitItem())
8190
}

rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestWithConnection.kt renamed to rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/TestWithConnection.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.rsocket.kotlin.test
17+
package io.rsocket.kotlin
1818

19+
import io.rsocket.kotlin.test.*
1920
import kotlinx.coroutines.*
2021

2122
abstract class TestWithConnection : SuspendTest {

rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import app.cash.turbine.*
2020
import io.ktor.utils.io.core.*
2121
import io.rsocket.kotlin.*
2222
import io.rsocket.kotlin.keepalive.*
23-
import io.rsocket.kotlin.logging.*
2423
import io.rsocket.kotlin.payload.*
2524
import io.rsocket.kotlin.test.*
2625
import io.rsocket.kotlin.transport.local.*
@@ -40,10 +39,8 @@ class RSocketTest : SuspendTest, TestWithLeakCheck {
4039
}
4140

4241
private suspend fun start(handler: RSocket? = null): RSocket {
43-
val localServer = RSocketServer {
44-
loggerFactory = LoggerFactory { PrintLogger.withLevel(LoggingLevel.DEBUG).logger("SERVER |$it") }
45-
}.bindIn(
46-
CoroutineScope(Dispatchers.Unconfined + testJob + CoroutineExceptionHandler { c, e -> println("$c -> $e") }),
42+
val localServer = TestServer().bindIn(
43+
CoroutineScope(Dispatchers.Unconfined + testJob + TestExceptionHandler),
4744
LocalServerTransport(InUseTrackingPool)
4845
) {
4946
handler ?: RSocketRequestHandler {
@@ -60,8 +57,7 @@ class RSocketTest : SuspendTest, TestWithLeakCheck {
6057
}
6158
}
6259

63-
return RSocketConnector {
64-
loggerFactory = LoggerFactory { PrintLogger.withLevel(LoggingLevel.DEBUG).logger("CLIENT |$it") }
60+
return TestConnector {
6561
connectionConfig {
6662
keepAlive = KeepAlive(1000.seconds, 1000.seconds)
6763
}
@@ -102,8 +98,8 @@ class RSocketTest : SuspendTest, TestWithLeakCheck {
10298
}
10399
}
104100

105-
@Test //ignored on native because of bug inside native coroutines
106-
fun testStreamResponderError() = test(ignoreNative = true) {
101+
@Test
102+
fun testStreamResponderError() = test {
107103
var p: Payload? = null
108104
val requester = start(RSocketRequestHandler {
109105
requestStream {
@@ -361,13 +357,13 @@ class RSocketTest : SuspendTest, TestWithLeakCheck {
361357
private suspend inline fun complete(sendChannel: SendChannel<Payload>, receiveChannel: ReceiveChannel<Payload>) {
362358
sendChannel.close()
363359
delay(100)
364-
assertTrue(receiveChannel.isClosedForReceive)
360+
assertTrue(receiveChannel.isClosedForReceive, "receiveChannel.isClosedForReceive=true")
365361
}
366362

367363
private suspend inline fun cancel(requesterChannel: SendChannel<Payload>, responderChannel: ReceiveChannel<Payload>) {
368364
responderChannel.cancel()
369365
delay(100)
370-
assertTrue(requesterChannel.isClosedForSend)
366+
assertTrue(requesterChannel.isClosedForSend, "requesterChannel.isClosedForSend=true")
371367
}
372368

373369
private suspend fun sendAndCheckReceived(
@@ -376,8 +372,8 @@ class RSocketTest : SuspendTest, TestWithLeakCheck {
376372
payloads: List<Payload>,
377373
) {
378374
delay(100)
379-
assertFalse(requesterChannel.isClosedForSend)
380-
assertFalse(responderChannel.isClosedForReceive)
375+
assertFalse(requesterChannel.isClosedForSend, "requesterChannel.isClosedForSend=false")
376+
assertFalse(responderChannel.isClosedForReceive, "responderChannel.isClosedForReceive=false")
381377
payloads.forEach { requesterChannel.send(it.copy()) } //TODO?
382378
payloads.forEach { responderChannel.checkReceived(it) }
383379
}

rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/ReconnectableRSocketTest.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,14 @@ import kotlinx.atomicfu.*
2727
import kotlinx.coroutines.*
2828
import kotlinx.coroutines.flow.*
2929
import kotlin.test.*
30+
import kotlin.time.Duration.Companion.seconds
3031

3132
class ReconnectableRSocketTest : SuspendTest, TestWithLeakCheck {
3233

3334
//needed for native
3435
private val fails = atomic(0)
3536
private val first = atomic(true)
36-
private val logger = DefaultLoggerFactory.logger("io.rsocket.kotlin.connection")
37+
private val logger = PrintLogger.withLevel(LoggingLevel.DEBUG).logger("io.rsocket.kotlin.connection")
3738

3839
private suspend fun connectWithReconnect(
3940
connect: suspend () -> RSocket,
@@ -202,7 +203,7 @@ class ReconnectableRSocketTest : SuspendTest, TestWithLeakCheck {
202203
rSocket.requestStream(Payload.Empty).collect()
203204
}
204205

205-
rSocket.requestStream(Payload.Empty).test {
206+
rSocket.requestStream(Payload.Empty).test(5.seconds) {
206207
repeat(5) {
207208
assertEquals(Payload.Empty, awaitItem())
208209
}

rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/frame/Util.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@ import io.rsocket.kotlin.frame.io.*
2121
import io.rsocket.kotlin.test.*
2222
import kotlin.test.*
2323

24-
fun Frame.toPacketWithLength(): ByteReadPacket = buildPacket(InUseTrackingPool) {
24+
internal fun Frame.toPacketWithLength(): ByteReadPacket = buildPacket(InUseTrackingPool) {
2525
val packet = toPacket(InUseTrackingPool)
2626
writeLength(packet.remaining.toInt())
2727
writePacket(packet)
2828
}
2929

30-
fun ByteReadPacket.toFrameWithLength(): Frame {
30+
internal fun ByteReadPacket.toFrameWithLength(): Frame {
3131
val length = readLength()
3232
assertEquals(length, remaining.toInt())
3333
return readFrame(InUseTrackingPool)
3434
}
3535

36-
fun Frame.loopFrame(): Frame = toPacket(InUseTrackingPool).readFrame(InUseTrackingPool)
36+
internal fun Frame.loopFrame(): Frame = toPacket(InUseTrackingPool).readFrame(InUseTrackingPool)

rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.rsocket.kotlin.internal
1818

1919
import io.rsocket.kotlin.*
20-
import io.rsocket.kotlin.core.*
2120
import io.rsocket.kotlin.frame.*
2221
import io.rsocket.kotlin.keepalive.*
2322
import io.rsocket.kotlin.payload.*
@@ -34,17 +33,13 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck {
3433
override suspend fun before() {
3534
super.before()
3635

37-
requester = connect(
38-
connection = connection,
39-
isServer = false,
40-
maxFragmentSize = 0,
41-
interceptors = InterceptorsBuilder().build(),
42-
connectionConfig = ConnectionConfig(
43-
keepAlive = KeepAlive(1000.seconds, 1000.seconds),
44-
payloadMimeType = DefaultPayloadMimeType,
45-
setupPayload = Payload.Empty
46-
)
47-
) { RSocketRequestHandler { } }
36+
requester = TestConnector {
37+
connectionConfig {
38+
keepAlive = KeepAlive(1000.seconds, 1000.seconds)
39+
}
40+
}.connect(connection)
41+
42+
connection.ignoreSetupFrame()
4843
}
4944

5045
@Test
@@ -333,8 +328,8 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck {
333328
}
334329
}
335330

336-
@Test //ignored on native because of coroutines bug with channels
337-
fun testChannelRequestServerSideCancellation() = test(ignoreNative = true) {
331+
@Test
332+
fun testChannelRequestServerSideCancellation() = test {
338333
var ch: SendChannel<Payload>? = null
339334
val request = channelFlow<Payload> {
340335
ch = this
@@ -366,7 +361,8 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck {
366361
}
367362
}
368363

369-
requester.requestChannel(payload("INIT"), request).flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)).launchIn(connection)
364+
requester.requestChannel(payload("INIT"), request).flowOn(PrefetchStrategy(Int.MAX_VALUE, 0))
365+
.launchIn(connection)
370366
connection.test {
371367
awaitFrame { frame ->
372368
assertTrue(frame is RequestFrame)
@@ -397,10 +393,12 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck {
397393
}
398394

399395
@Test
400-
fun rrTerminatedOnConnectionClose() = streamIsTerminatedOnConnectionClose { requester.requestResponse(Payload.Empty) }
396+
fun rrTerminatedOnConnectionClose() =
397+
streamIsTerminatedOnConnectionClose { requester.requestResponse(Payload.Empty) }
401398

402399
@Test
403-
fun rsTerminatedOnConnectionClose() = streamIsTerminatedOnConnectionClose { requester.requestStream(Payload.Empty).collect() }
400+
fun rsTerminatedOnConnectionClose() =
401+
streamIsTerminatedOnConnectionClose { requester.requestStream(Payload.Empty).collect() }
404402

405403
@Test
406404
fun rcTerminatedOnConnectionClose() =

0 commit comments

Comments
 (0)