Skip to content

Commit 9b6a006

Browse files
committed
Finalize new Transport API
* use CoroutineScope for both connection and streams * drop isClosedForSend * drop RSocketConnectionHandler, use just connection
1 parent 23ba2d5 commit 9b6a006

File tree

2 files changed

+9
-24
lines changed

2 files changed

+9
-24
lines changed
Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,24 +16,17 @@
1616

1717
package io.rsocket.kotlin.transport
1818

19+
import kotlinx.coroutines.*
1920
import kotlinx.io.*
2021

2122
// all methods can be called from any thread/context at any time
2223
// should be accessed only internally
2324
// should be implemented only by transports
2425
@RSocketTransportApi
25-
public sealed interface RSocketConnection
26-
27-
@RSocketTransportApi
28-
public fun interface RSocketConnectionHandler {
29-
public suspend fun handleConnection(connection: RSocketConnection)
30-
}
26+
public sealed interface RSocketConnection : CoroutineScope
3127

3228
@RSocketTransportApi
3329
public interface RSocketSequentialConnection : RSocketConnection {
34-
// TODO: is it needed for connection?
35-
public val isClosedForSend: Boolean
36-
3730
// throws if frame not sent
3831
// streamId=0 should be sent earlier
3932
public suspend fun sendFrame(streamId: Int, frame: Buffer)
@@ -47,9 +40,8 @@ public interface RSocketMultiplexedConnection : RSocketConnection {
4740
public suspend fun createStream(): Stream
4841
public suspend fun acceptStream(): Stream?
4942

50-
public interface Stream : AutoCloseable {
51-
public val isClosedForSend: Boolean
52-
43+
@RSocketTransportApi
44+
public interface Stream : CoroutineScope {
5345
// 0 - highest priority
5446
// Int.MAX - lowest priority
5547
public fun setSendPriority(priority: Int)
@@ -59,10 +51,5 @@ public interface RSocketMultiplexedConnection : RSocketConnection {
5951

6052
// null if no more frames could be received
6153
public suspend fun receiveFrame(): Buffer?
62-
63-
// closing stream will send buffered frames (if needed)
64-
// sending/receiving frames will be not possible after it
65-
// should not throw
66-
override fun close()
6754
}
6855
}

rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/RSocketTransport.kt

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -44,17 +44,15 @@ public interface RSocketTransport : CoroutineScope {
4444

4545
@SubclassOptInRequired(RSocketTransportApi::class)
4646
public interface RSocketClientTarget : CoroutineScope {
47-
// cancelling Job will cancel connection
48-
// Job will be completed when the connection is finished
4947
@RSocketTransportApi
50-
public fun connectClient(handler: RSocketConnectionHandler): Job
48+
public suspend fun connectClient(): RSocketConnection
5149
}
5250

5351
@SubclassOptInRequired(RSocketTransportApi::class)
5452
public interface RSocketServerTarget<Instance : RSocketServerInstance> : CoroutineScope {
55-
// handler will be called for all new connections
53+
// onConnection shouldn't throw.
5654
@RSocketTransportApi
57-
public suspend fun startServer(handler: RSocketConnectionHandler): Instance
55+
public suspend fun startServer(onConnection: (RSocketConnection) -> Unit): Instance
5856
}
5957

6058
// cancelling it will cancel server

0 commit comments

Comments
 (0)