Skip to content

Commit 7a9ab2e

Browse files
author
olme04
authored
Working nodejs TCP server and client (#191)
* Run tests for new transport * Don't depend on kotlinx.nodejs * Support publishing plain JS module
1 parent ebdb0d9 commit 7a9ab2e

File tree

15 files changed

+278
-174
lines changed

15 files changed

+278
-174
lines changed

.github/workflows/gradle-main.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,15 @@ jobs:
8686
distributions-cache-enabled: false
8787
dependencies-cache-enabled: false
8888
configuration-cache-enabled: false
89+
- name: Test rsocket-transport-nodejs module
90+
if: (matrix.target == 'jsIrNode' || matrix.target == 'jsLegacyNode') && (success() || failure())
91+
timeout-minutes: 15
92+
uses: gradle/gradle-build-action@v1
93+
with:
94+
arguments: rsocket-transport-nodejs-tcp:${{ matrix.target }}Test --scan --info
95+
distributions-cache-enabled: false
96+
dependencies-cache-enabled: false
97+
configuration-cache-enabled: false
8998

9099
- name: Publish Test Report
91100
if: always()

.github/workflows/gradle-pr.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,15 @@ jobs:
8181
distributions-cache-enabled: false
8282
dependencies-cache-enabled: false
8383
configuration-cache-enabled: false
84+
- name: Test rsocket-transport-nodejs module
85+
if: (matrix.target == 'jsIrNode' || matrix.target == 'jsLegacyNode') && (success() || failure())
86+
timeout-minutes: 15
87+
uses: gradle/gradle-build-action@v1
88+
with:
89+
arguments: rsocket-transport-nodejs-tcp:${{ matrix.target }}Test --scan --info
90+
distributions-cache-enabled: false
91+
dependencies-cache-enabled: false
92+
configuration-cache-enabled: false
8493

8594
- name: Publish Test Report
8695
if: always()

.github/workflows/run-tests.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,15 @@ jobs:
8080
distributions-cache-enabled: false
8181
dependencies-cache-enabled: false
8282
configuration-cache-enabled: false
83+
- name: Test rsocket-transport-nodejs module
84+
if: (matrix.target == 'jsIrNode' || matrix.target == 'jsLegacyNode') && (success() || failure())
85+
timeout-minutes: 15
86+
uses: gradle/gradle-build-action@v1
87+
with:
88+
arguments: rsocket-transport-nodejs-tcp:${{ matrix.target }}Test --scan --info
89+
distributions-cache-enabled: false
90+
dependencies-cache-enabled: false
91+
configuration-cache-enabled: false
8392

8493
- name: Publish Test Report
8594
if: always()

build.gradle.kts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,12 @@ subprojects {
6565
project.name != "rsocket-transport-ktor" &&
6666
project.name != "rsocket-transport-ktor-client"
6767

68+
val jsOnly = project.name == "rsocket-transport-nodejs-tcp"
69+
val nodejsOnly = project.name == "rsocket-transport-nodejs-tcp"
6870

6971
if (!isAutoConfigurable) return@configure
7072

71-
jvm {
73+
if (!jsOnly) jvm {
7274
testRuns.all {
7375
executionTask.configure {
7476
// ActiveProcessorCount is used here, to make sure local setup is similar as on CI
@@ -90,7 +92,7 @@ subprojects {
9092
}
9193
}
9294
}
93-
browser {
95+
if (!nodejsOnly) browser {
9496
testTask {
9597
useKarma {
9698
useConfigDirectory(rootDir.resolve("js").resolve("karma.config.d"))
@@ -100,6 +102,8 @@ subprojects {
100102
}
101103
}
102104

105+
if (jsOnly) return@configure
106+
103107
//native targets configuration
104108
val linuxTargets = listOf(linuxX64())
105109
val mingwTargets = if (supportMingw) listOf(mingwX64()) else emptyList()
@@ -317,7 +321,7 @@ subprojects {
317321
}
318322

319323
tasks.matching { it.name == "generatePomFileForKotlinMultiplatformPublication" }.configureEach {
320-
dependsOn(tasks["generatePomFileForJvmPublication"])
324+
tasks.findByName("generatePomFileForJvmPublication")?.let { dependsOn(it) }
321325
}
322326
}
323327
}

examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt

Lines changed: 0 additions & 160 deletions
This file was deleted.

gradle.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ kotlinxCoroutinesVersion=1.5.2-native-mt
2323
kotlinxAtomicfuVersion=0.17.0
2424
kotlinxSerializationVersion=1.3.1
2525
kotlinxBenchmarkVersion=0.4.0
26-
kotlinxNodejsVersion=0.0.7
2726
rsocketJavaVersion=1.1.1
2827
turbineVersion=0.7.0
2928
artifactoryVersion=4.25.1

examples/nodejs-tcp-transport/build.gradle.kts renamed to rsocket-transport-nodejs-tcp/build.gradle.kts

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,18 @@
1616

1717
plugins {
1818
kotlin("multiplatform")
19-
}
19+
id("kotlinx-atomicfu")
2020

21-
val kotlinxNodejsVersion: String by rootProject
21+
signing
22+
`maven-publish`
23+
id("com.jfrog.artifactory")
24+
}
2225

2326
kotlin {
24-
js(IR) {
25-
nodejs {
26-
binaries.executable()
27-
}
28-
}
29-
3027
sourceSets {
3128
val jsMain by getting {
3229
dependencies {
33-
implementation(project(":rsocket-core"))
34-
implementation("org.jetbrains.kotlinx:kotlinx-nodejs:$kotlinxNodejsVersion")
30+
api(project(":rsocket-core"))
3531
}
3632
}
3733
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package io.rsocket.kotlin.transport.nodejs.tcp
2+
3+
import io.ktor.utils.io.core.*
4+
import io.rsocket.kotlin.frame.io.*
5+
6+
internal fun ByteReadPacket.withLength(): ByteReadPacket = buildPacket {
7+
@Suppress("INVISIBLE_MEMBER") writeLength(this@withLength.remaining.toInt())
8+
writePacket(this@withLength)
9+
}
10+
11+
internal class FrameWithLengthAssembler(private val onFrame: (frame: ByteReadPacket) -> Unit) {
12+
private var expectedFrameLength = 0 //TODO atomic for native
13+
private val packetBuilder: BytePacketBuilder = BytePacketBuilder()
14+
inline fun write(write: BytePacketBuilder.() -> Unit) {
15+
packetBuilder.write()
16+
loop()
17+
}
18+
19+
private fun loop() {
20+
while (true) when {
21+
expectedFrameLength == 0 && packetBuilder.size < 3 -> return // no length
22+
expectedFrameLength == 0 -> withTemp { // has length
23+
expectedFrameLength = @Suppress("INVISIBLE_MEMBER") it.readLength()
24+
if (it.remaining >= expectedFrameLength) build(it) // if has length and frame
25+
}
26+
packetBuilder.size < expectedFrameLength -> return // not enough bytes to read frame
27+
else -> withTemp { build(it) } // enough bytes to read frame
28+
}
29+
}
30+
31+
private fun build(from: ByteReadPacket) {
32+
val frame = buildPacket {
33+
writePacket(from, expectedFrameLength)
34+
}
35+
expectedFrameLength = 0
36+
onFrame(frame)
37+
}
38+
39+
private inline fun withTemp(block: (tempPacket: ByteReadPacket) -> Unit) {
40+
val tempPacket = packetBuilder.build()
41+
block(tempPacket)
42+
packetBuilder.writePacket(tempPacket)
43+
}
44+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.rsocket.kotlin.transport.nodejs.tcp
2+
3+
import io.ktor.utils.io.core.internal.*
4+
import io.ktor.utils.io.pool.*
5+
import io.rsocket.kotlin.*
6+
import io.rsocket.kotlin.transport.*
7+
import io.rsocket.kotlin.transport.nodejs.tcp.internal.*
8+
import kotlinx.coroutines.*
9+
import kotlin.coroutines.*
10+
11+
public class TcpClientTransport(
12+
private val port: Int,
13+
private val hostname: String,
14+
private val pool: ObjectPool<ChunkBuffer> = ChunkBuffer.Pool,
15+
coroutineContext: CoroutineContext = EmptyCoroutineContext
16+
) : ClientTransport {
17+
18+
override val coroutineContext: CoroutineContext = coroutineContext + SupervisorJob(coroutineContext[Job])
19+
20+
@TransportApi
21+
override suspend fun connect(): Connection {
22+
val socket = connect(port, hostname)
23+
return TcpConnection(coroutineContext, pool, socket)
24+
}
25+
}

0 commit comments

Comments
 (0)