Skip to content

Commit e9f1ff8

Browse files
committed
netty TCP, WebSocket and QUIC transports
* QUIC implementation is still WIP
1 parent fc6b2a3 commit e9f1ff8

File tree

28 files changed

+2459
-0
lines changed

28 files changed

+2459
-0
lines changed

gradle/libs.versions.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ kotlinx-bcv = "0.14.0"
88

99
ktor = "2.3.11"
1010

11+
netty = "4.1.110.Final"
12+
netty-quic = "0.0.63.Final"
13+
14+
# for netty TLS tests
15+
bouncycastle = "1.78.1"
16+
1117
turbine = "1.1.0"
1218

1319
rsocket-java = "1.1.3"
@@ -39,6 +45,12 @@ ktor-server-cio = { module = "io.ktor:ktor-server-cio", version.ref = "ktor" }
3945
ktor-server-netty = { module = "io.ktor:ktor-server-netty", version.ref = "ktor" }
4046
ktor-server-jetty = { module = "io.ktor:ktor-server-jetty", version.ref = "ktor" }
4147

48+
netty-handler = { module = "io.netty:netty-handler", version.ref = "netty" }
49+
netty-codec-http = { module = "io.netty:netty-codec-http", version.ref = "netty" }
50+
netty-codec-quic = { module = "io.netty.incubator:netty-incubator-codec-native-quic", version.ref = "netty-quic" }
51+
52+
bouncycastle = { module = "org.bouncycastle:bcpkix-jdk18on", version.ref = "bouncycastle" }
53+
4254
turbine = { module = "app.cash.turbine:turbine", version.ref = "turbine" }
4355

4456
rsocket-java-core = { module = 'io.rsocket:rsocket-core', version.ref = "rsocket-java" }
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
public final class io/rsocket/kotlin/transport/netty/internal/CoroutinesKt {
2+
public static final fun awaitChannel (Lio/netty/channel/ChannelFuture;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
3+
public static final fun awaitFuture (Lio/netty/util/concurrent/Future;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
4+
public static final fun callOnCancellation (Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function1;)V
5+
public static final fun toByteBuf (Lio/ktor/utils/io/core/ByteReadPacket;)Lio/netty/buffer/ByteBuf;
6+
public static final fun toByteReadPacket (Lio/netty/buffer/ByteBuf;)Lio/ktor/utils/io/core/ByteReadPacket;
7+
}
8+
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import rsocketbuild.*
18+
19+
plugins {
20+
id("rsocketbuild.multiplatform-library")
21+
}
22+
23+
description = "rsocket-kotlin Netty transport utils"
24+
25+
kotlin {
26+
jvmTarget()
27+
28+
sourceSets {
29+
jvmMain.dependencies {
30+
implementation(projects.rsocketInternalIo)
31+
api(projects.rsocketCore)
32+
api(libs.netty.handler)
33+
}
34+
}
35+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.netty.internal
18+
19+
import io.ktor.utils.io.core.*
20+
import io.netty.buffer.*
21+
import io.netty.channel.*
22+
import io.netty.util.concurrent.*
23+
import kotlinx.coroutines.*
24+
import kotlin.coroutines.*
25+
26+
@Suppress("UNCHECKED_CAST")
27+
public suspend inline fun <T> Future<T>.awaitFuture(): T = suspendCancellableCoroutine { cont ->
28+
addListener {
29+
when {
30+
it.isSuccess -> cont.resume(it.now as T)
31+
else -> cont.resumeWithException(it.cause())
32+
}
33+
}
34+
cont.invokeOnCancellation {
35+
cancel(true)
36+
}
37+
}
38+
39+
public suspend fun ChannelFuture.awaitChannel(): Channel {
40+
awaitFuture()
41+
return channel()
42+
}
43+
44+
// it should be used only for cleanup and so should not really block, only suspend
45+
public inline fun CoroutineScope.callOnCancellation(crossinline block: suspend () -> Unit) {
46+
launch(Dispatchers.Unconfined) {
47+
try {
48+
awaitCancellation()
49+
} catch (cause: Throwable) {
50+
withContext(NonCancellable) {
51+
try {
52+
block()
53+
} catch (suppressed: Throwable) {
54+
cause.addSuppressed(suppressed)
55+
}
56+
}
57+
throw cause
58+
}
59+
}
60+
}
61+
62+
// TODO: what to use: this or ByteReadPacket(msg.nioBuffer())
63+
public fun ByteBuf.toByteReadPacket(): ByteReadPacket = buildPacket { writeFully(nioBuffer()) }
64+
public fun ByteReadPacket.toByteBuf(): ByteBuf = Unpooled.wrappedBuffer(readByteBuffer())
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
public abstract interface class io/rsocket/kotlin/transport/netty/quic/NettyQuicClientTransport : io/rsocket/kotlin/transport/RSocketTransport {
2+
public static final field Factory Lio/rsocket/kotlin/transport/netty/quic/NettyQuicClientTransport$Factory;
3+
public abstract fun target (Ljava/lang/String;I)Lio/rsocket/kotlin/transport/RSocketClientTarget;
4+
public abstract fun target (Ljava/net/InetSocketAddress;)Lio/rsocket/kotlin/transport/RSocketClientTarget;
5+
}
6+
7+
public final class io/rsocket/kotlin/transport/netty/quic/NettyQuicClientTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
8+
}
9+
10+
public abstract interface class io/rsocket/kotlin/transport/netty/quic/NettyQuicClientTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
11+
public abstract fun bootstrap (Lkotlin/jvm/functions/Function1;)V
12+
public abstract fun channel (Lkotlin/reflect/KClass;)V
13+
public abstract fun channelFactory (Lio/netty/channel/ChannelFactory;)V
14+
public abstract fun codec (Lkotlin/jvm/functions/Function1;)V
15+
public abstract fun eventLoopGroup (Lio/netty/channel/EventLoopGroup;Z)V
16+
public abstract fun quicBootstrap (Lkotlin/jvm/functions/Function1;)V
17+
public abstract fun ssl (Lkotlin/jvm/functions/Function1;)V
18+
}
19+
20+
public abstract interface class io/rsocket/kotlin/transport/netty/quic/NettyQuicServerInstance : io/rsocket/kotlin/transport/RSocketServerInstance {
21+
public abstract fun getLocalAddress ()Ljava/net/InetSocketAddress;
22+
}
23+
24+
public abstract interface class io/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransport : io/rsocket/kotlin/transport/RSocketTransport {
25+
public static final field Factory Lio/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransport$Factory;
26+
public abstract fun target (Ljava/lang/String;I)Lio/rsocket/kotlin/transport/RSocketServerTarget;
27+
public abstract fun target (Ljava/net/InetSocketAddress;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
28+
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransport;Ljava/lang/String;IILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
29+
public static synthetic fun target$default (Lio/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransport;Ljava/net/InetSocketAddress;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketServerTarget;
30+
}
31+
32+
public final class io/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransport$Factory : io/rsocket/kotlin/transport/RSocketTransportFactory {
33+
}
34+
35+
public abstract interface class io/rsocket/kotlin/transport/netty/quic/NettyQuicServerTransportBuilder : io/rsocket/kotlin/transport/RSocketTransportBuilder {
36+
public abstract fun bootstrap (Lkotlin/jvm/functions/Function1;)V
37+
public abstract fun channel (Lkotlin/reflect/KClass;)V
38+
public abstract fun channelFactory (Lio/netty/channel/ChannelFactory;)V
39+
public abstract fun codec (Lkotlin/jvm/functions/Function1;)V
40+
public abstract fun eventLoopGroup (Lio/netty/channel/EventLoopGroup;Z)V
41+
public abstract fun ssl (Lkotlin/jvm/functions/Function1;)V
42+
}
43+
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import rsocketbuild.*
18+
19+
plugins {
20+
id("rsocketbuild.multiplatform-library")
21+
}
22+
23+
description = "rsocket-kotlin Netty QUIC client/server transport implementation"
24+
25+
kotlin {
26+
jvmTarget()
27+
28+
sourceSets {
29+
jvmMain.dependencies {
30+
implementation(projects.rsocketTransportNettyInternal)
31+
implementation(projects.rsocketInternalIo)
32+
api(projects.rsocketCore)
33+
api(libs.netty.handler)
34+
api(libs.netty.codec.quic)
35+
}
36+
jvmTest.dependencies {
37+
implementation(projects.rsocketTransportTests)
38+
implementation(libs.bouncycastle)
39+
implementation(libs.netty.codec.quic.map {
40+
val javaOsName = System.getProperty("os.name")
41+
val javaOsArch = System.getProperty("os.arch")
42+
val suffix = when {
43+
javaOsName.contains("mac", ignoreCase = true) -> "osx"
44+
javaOsName.contains("linux", ignoreCase = true) -> "linux"
45+
javaOsName.contains("windows", ignoreCase = true) -> "windows"
46+
else -> error("Unknown os.name: $javaOsName")
47+
} + "-" + when (javaOsArch) {
48+
"x86_64", "amd64" -> "x86_64"
49+
"arm64", "aarch64" -> "aarch_64"
50+
else -> error("Unknown os.arch: $javaOsArch")
51+
}
52+
"$it:$suffix"
53+
})
54+
//implementation("ch.qos.logback:logback-classic:1.2.11")
55+
}
56+
}
57+
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright 2015-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.kotlin.transport.netty.quic
18+
19+
import io.netty.bootstrap.*
20+
import io.netty.channel.*
21+
import io.netty.channel.ChannelFactory
22+
import io.netty.channel.nio.*
23+
import io.netty.channel.socket.*
24+
import io.netty.channel.socket.nio.*
25+
import io.netty.incubator.codec.quic.*
26+
import io.rsocket.kotlin.internal.io.*
27+
import io.rsocket.kotlin.transport.*
28+
import io.rsocket.kotlin.transport.netty.internal.*
29+
import kotlinx.coroutines.*
30+
import java.net.*
31+
import kotlin.coroutines.*
32+
import kotlin.reflect.*
33+
34+
@OptIn(RSocketTransportApi::class)
35+
public sealed interface NettyQuicClientTransport : RSocketTransport {
36+
public fun target(remoteAddress: InetSocketAddress): RSocketClientTarget
37+
public fun target(host: String, port: Int): RSocketClientTarget
38+
39+
public companion object Factory :
40+
RSocketTransportFactory<NettyQuicClientTransport, NettyQuicClientTransportBuilder>(::NettyQuicClientTransportBuilderImpl)
41+
}
42+
43+
@OptIn(RSocketTransportApi::class)
44+
public sealed interface NettyQuicClientTransportBuilder : RSocketTransportBuilder<NettyQuicClientTransport> {
45+
public fun channel(cls: KClass<out DatagramChannel>)
46+
public fun channelFactory(factory: ChannelFactory<out DatagramChannel>)
47+
public fun eventLoopGroup(group: EventLoopGroup, manage: Boolean)
48+
49+
public fun bootstrap(block: Bootstrap.() -> Unit)
50+
public fun codec(block: QuicClientCodecBuilder.() -> Unit)
51+
public fun ssl(block: QuicSslContextBuilder.() -> Unit)
52+
public fun quicBootstrap(block: QuicChannelBootstrap.() -> Unit)
53+
}
54+
55+
private class NettyQuicClientTransportBuilderImpl : NettyQuicClientTransportBuilder {
56+
private var channelFactory: ChannelFactory<out DatagramChannel>? = null
57+
private var eventLoopGroup: EventLoopGroup? = null
58+
private var manageEventLoopGroup: Boolean = false
59+
private var bootstrap: (Bootstrap.() -> Unit)? = null
60+
private var codec: (QuicClientCodecBuilder.() -> Unit)? = null
61+
private var ssl: (QuicSslContextBuilder.() -> Unit)? = null
62+
private var quicBootstrap: (QuicChannelBootstrap.() -> Unit)? = null
63+
64+
override fun channel(cls: KClass<out DatagramChannel>) {
65+
this.channelFactory = ReflectiveChannelFactory(cls.java)
66+
}
67+
68+
override fun channelFactory(factory: ChannelFactory<out DatagramChannel>) {
69+
this.channelFactory = factory
70+
}
71+
72+
override fun eventLoopGroup(group: EventLoopGroup, manage: Boolean) {
73+
this.eventLoopGroup = group
74+
this.manageEventLoopGroup = manage
75+
}
76+
77+
override fun bootstrap(block: Bootstrap.() -> Unit) {
78+
bootstrap = block
79+
}
80+
81+
override fun codec(block: QuicClientCodecBuilder.() -> Unit) {
82+
codec = block
83+
}
84+
85+
override fun ssl(block: QuicSslContextBuilder.() -> Unit) {
86+
ssl = block
87+
}
88+
89+
override fun quicBootstrap(block: QuicChannelBootstrap.() -> Unit) {
90+
quicBootstrap = block
91+
}
92+
93+
@RSocketTransportApi
94+
override fun buildTransport(context: CoroutineContext): NettyQuicClientTransport {
95+
val codecHandler = QuicClientCodecBuilder().apply {
96+
// by default, we allow Int.MAX_VALUE of active stream
97+
initialMaxData(Int.MAX_VALUE.toLong())
98+
initialMaxStreamDataBidirectionalLocal(Int.MAX_VALUE.toLong())
99+
initialMaxStreamDataBidirectionalRemote(Int.MAX_VALUE.toLong())
100+
initialMaxStreamsBidirectional(Int.MAX_VALUE.toLong())
101+
codec?.invoke(this)
102+
ssl?.let {
103+
sslContext(QuicSslContextBuilder.forClient().apply(it).build())
104+
}
105+
}.build()
106+
val bootstrap = Bootstrap().apply {
107+
bootstrap?.invoke(this)
108+
localAddress(0)
109+
handler(codecHandler)
110+
channelFactory(channelFactory ?: ReflectiveChannelFactory(NioDatagramChannel::class.java))
111+
group(eventLoopGroup ?: NioEventLoopGroup())
112+
}
113+
114+
return NettyQuicClientTransportImpl(
115+
coroutineContext = context.supervisorContext() + bootstrap.config().group().asCoroutineDispatcher(),
116+
bootstrap = bootstrap,
117+
quicBootstrap = quicBootstrap,
118+
manageBootstrap = manageEventLoopGroup
119+
)
120+
}
121+
}
122+
123+
private class NettyQuicClientTransportImpl(
124+
override val coroutineContext: CoroutineContext,
125+
private val bootstrap: Bootstrap,
126+
private val quicBootstrap: (QuicChannelBootstrap.() -> Unit)?,
127+
manageBootstrap: Boolean,
128+
) : NettyQuicClientTransport {
129+
init {
130+
if (manageBootstrap) callOnCancellation {
131+
bootstrap.config().group().shutdownGracefully().awaitFuture()
132+
}
133+
}
134+
135+
override fun target(remoteAddress: InetSocketAddress): NettyQuicClientTargetImpl = NettyQuicClientTargetImpl(
136+
coroutineContext = coroutineContext.supervisorContext(),
137+
bootstrap = bootstrap,
138+
quicBootstrap = quicBootstrap,
139+
remoteAddress = remoteAddress
140+
)
141+
142+
override fun target(host: String, port: Int): RSocketClientTarget = target(InetSocketAddress(host, port))
143+
}
144+
145+
@OptIn(RSocketTransportApi::class)
146+
private class NettyQuicClientTargetImpl(
147+
override val coroutineContext: CoroutineContext,
148+
private val bootstrap: Bootstrap,
149+
private val quicBootstrap: (QuicChannelBootstrap.() -> Unit)?,
150+
private val remoteAddress: SocketAddress,
151+
) : RSocketClientTarget {
152+
@RSocketTransportApi
153+
override fun connectClient(handler: RSocketConnectionHandler): Job = launch {
154+
QuicChannel.newBootstrap(bootstrap.bind().awaitChannel()).also { quicBootstrap?.invoke(it) }
155+
.handler(
156+
NettyQuicConnectionInitializer(handler, coroutineContext, isClient = true)
157+
).remoteAddress(remoteAddress).connect().awaitFuture()
158+
}
159+
}

0 commit comments

Comments
 (0)