Skip to content

Commit 3ed0674

Browse files
author
olme04
authored
improves transport api and lifecycle (#178)
Co-authored-by: olme04 <olme04>
1 parent 899e9b7 commit 3ed0674

File tree

59 files changed

+454
-451
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+454
-451
lines changed

benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ import kotlinx.coroutines.*
2525
import kotlinx.coroutines.flow.*
2626
import kotlin.random.*
2727

28-
@OptIn(ExperimentalStreamsApi::class)
28+
@OptIn(ExperimentalStreamsApi::class, DelicateCoroutinesApi::class)
2929
class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
3030
private val requestStrategy = PrefetchStrategy(64, 0)
3131

32+
private val benchJob = Job()
3233
lateinit var client: RSocket
33-
lateinit var server: Job
3434

3535
lateinit var payload: Payload
3636
lateinit var payloadsFlow: Flow<Payload>
@@ -40,9 +40,7 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
4040
override fun setup() {
4141
payload = createPayload(payloadSize)
4242
payloadsFlow = flow { repeat(5000) { emit(payloadCopy()) } }
43-
44-
val localServer = LocalServer()
45-
server = RSocketServer().bind(localServer) {
43+
val server = RSocketServer().bindIn(CoroutineScope(benchJob + Dispatchers.Unconfined), LocalServerTransport()) {
4644
RSocketRequestHandler {
4745
requestResponse {
4846
it.release()
@@ -59,14 +57,14 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
5957
}
6058
}
6159
client = runBlocking {
62-
RSocketConnector().connect(localServer)
60+
RSocketConnector().connect(server)
6361
}
6462
}
6563

6664
override fun cleanup() {
6765
runBlocking {
68-
client.job.runCatching { cancelAndJoin() }
69-
server.runCatching { cancelAndJoin() }
66+
client.coroutineContext.job.cancelAndJoin()
67+
benchJob.cancelAndJoin()
7068
}
7169
}
7270

examples/interactions/src/jvmMain/kotlin/ReconnectExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ import kotlinx.coroutines.flow.*
2525

2626
@TransportApi
2727
fun main(): Unit = runBlocking {
28-
val server = LocalServer()
29-
RSocketServer().bind(server) {
28+
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
3029
RSocketRequestHandler {
3130
requestStream { requestPayload ->
3231
val data = requestPayload.data.readText()

examples/interactions/src/jvmMain/kotlin/ReconnectOnConnectFailExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ import kotlinx.coroutines.flow.*
2323

2424
@TransportApi
2525
fun main(): Unit = runBlocking {
26-
val server = LocalServer()
27-
RSocketServer().bind(server) {
26+
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
2827
RSocketRequestHandler {
2928
requestStream { requestPayload ->
3029
val data = requestPayload.data.readText()

examples/interactions/src/jvmMain/kotlin/RequestChannelExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ import kotlinx.coroutines.flow.*
2222

2323

2424
fun main(): Unit = runBlocking {
25-
val server = LocalServer()
26-
RSocketServer().bind(server) {
25+
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
2726
RSocketRequestHandler {
2827
requestChannel { init, request ->
2928
println("Init with: ${init.data.readText()}")

examples/interactions/src/jvmMain/kotlin/RequestResponseErrorExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ import io.rsocket.kotlin.transport.local.*
2020
import kotlinx.coroutines.*
2121

2222
fun main(): Unit = runBlocking {
23-
val server = LocalServer()
24-
RSocketServer().bind(server) {
23+
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
2524
RSocketRequestHandler {
2625
requestResponse {
2726
val data = it.data.readText()

examples/interactions/src/jvmMain/kotlin/RequestResponseExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ import io.rsocket.kotlin.transport.local.*
2020
import kotlinx.coroutines.*
2121

2222
fun main(): Unit = runBlocking {
23-
val server = LocalServer()
24-
RSocketServer().bind(server) {
23+
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
2524
RSocketRequestHandler {
2625
requestResponse {
2726
val data = it.data.readText()

examples/interactions/src/jvmMain/kotlin/RequestStreamExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ import kotlinx.coroutines.*
2121
import kotlinx.coroutines.flow.*
2222

2323
fun main(): Unit = runBlocking {
24-
val server = LocalServer()
25-
RSocketServer().bind(server) {
24+
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
2625
RSocketRequestHandler {
2726
requestStream {
2827
val data = it.data.readText()

examples/interactions/src/jvmMain/kotlin/ServerRequestExample.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ import io.rsocket.kotlin.transport.local.*
2020
import kotlinx.coroutines.*
2121

2222
fun main(): Unit = runBlocking {
23-
val server = LocalServer()
24-
RSocketServer().bind(server) {
23+
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
2524
RSocketRequestHandler {
2625
requestResponse {
2726
val clientRequest = it.data.readText()

examples/interactions/src/jvmMain/kotlin/ServerSetupExample.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ import kotlinx.coroutines.flow.*
2323

2424

2525
fun main(): Unit = runBlocking {
26-
27-
val server = LocalServer()
28-
RSocketServer().bind(server) {
26+
val server = RSocketServer().bindIn(this, LocalServerTransport()) {
2927
val data = config.setupPayload.metadata?.readText() ?: error("Empty metadata")
3028
RSocketRequestHandler {
3129
when (data) {
@@ -43,8 +41,8 @@ fun main(): Unit = runBlocking {
4341

4442
suspend fun client1() {
4543
val rSocketClient = RSocketConnector().connect(server)
46-
rSocketClient.job.join()
47-
println("Client 1 canceled: ${rSocketClient.job.isCancelled}")
44+
rSocketClient.coroutineContext.job.join()
45+
println("Client 1 canceled: ${rSocketClient.coroutineContext.job.isCancelled}")
4846
try {
4947
rSocketClient.requestResponse(Payload.Empty)
5048
} catch (e: Throwable) {

examples/multiplatform-chat/src/clientMain/kotlin/Api.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616

1717
import io.ktor.client.*
1818
import io.ktor.client.features.websocket.*
19-
import io.ktor.network.selector.*
20-
import io.ktor.util.*
2119
import io.rsocket.kotlin.*
2220
import io.rsocket.kotlin.core.*
2321
import io.rsocket.kotlin.payload.*
2422
import io.rsocket.kotlin.transport.ktor.*
2523
import io.rsocket.kotlin.transport.ktor.client.*
24+
import kotlinx.coroutines.*
2625

2726
class Api(rSocket: RSocket) {
2827
private val proto = ConfiguredProtoBuf
@@ -42,9 +41,10 @@ suspend fun connectToApiUsingWS(name: String): Api {
4241
return Api(client.rSocket(port = 9000))
4342
}
4443

45-
@OptIn(InternalAPI::class)
4644
suspend fun connectToApiUsingTCP(name: String): Api {
47-
val transport = TcpClientTransport(SelectorManager(), "0.0.0.0", 8000)
45+
val transport = TcpClientTransport("0.0.0.0", 8000, CoroutineExceptionHandler { coroutineContext, throwable ->
46+
println("FAIL: $coroutineContext, $throwable")
47+
})
4848
return Api(connector(name).connect(transport))
4949
}
5050

0 commit comments

Comments
 (0)