From 91e6ab7bfa846013e917e4b09c4a767052d09c62 Mon Sep 17 00:00:00 2001 From: olme04 Date: Mon, 23 Aug 2021 14:47:48 +0300 Subject: [PATCH] set version 0.14.0 gradle 7.2 kotlin 1.5.31 ktor 1.6.4 turbine 0.6.0 update README.md --- README.md | 103 +++---- build.gradle.kts | 38 +-- gradle.properties | 16 +- gradle/wrapper/gradle-wrapper.properties | 2 +- gradlew | 257 +++++++++++------- .../kotlin/transport/ClientTransport.kt | 2 +- .../kotlin/ConnectionEstablishmentTest.kt | 2 +- .../io/rsocket/kotlin/core/RSocketTest.kt | 24 +- .../kotlin/core/ReconnectableRSocketTest.kt | 4 +- .../kotlin/internal/RSocketRequesterTest.kt | 52 ++-- .../rsocket/kotlin/keepalive/KeepAliveTest.kt | 8 +- .../io/rsocket/kotlin/test/TestConnection.kt | 4 +- .../kotlin/transport/ktor/server/Routing.kt | 2 + .../kotlin/transport/ktor/TcpConnection.kt | 2 +- .../transport/ktor/TcpServerTransport.kt | 2 + .../kotlin/transport/local/LocalConnection.kt | 2 +- 16 files changed, 279 insertions(+), 241 deletions(-) diff --git a/README.md b/README.md index c0c4bd4a5..2b928e146 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ # rsocket-kotlin + RSocket Kotlin multi-platform implementation based on [kotlinx.coroutines](https://github.com/Kotlin/kotlinx.coroutines). RSocket is a binary protocol for use on byte stream transports such as TCP, WebSockets and Aeron. @@ -13,92 +14,72 @@ It enables the following symmetric interaction models via async message passing Learn more at http://rsocket.io ## Supported platforms and transports : -Transports are implemented based on [ktor](https://github.com/ktorio/ktor) to ensure Kotlin multiplatform. -So it depends on `ktor` engines for available transports and platforms (JVM, JS, Native): + +Transports are implemented based on [ktor](https://github.com/ktorio/ktor) to ensure Kotlin multiplatform. So it depends on `ktor` engines +for available transports and platforms (JVM, JS, Native): + * JVM - TCP and WebSocket for both client and server * JS - WebSocket client only -* Native - TCP client only (linux x64, macos, ios, watchos, tvos) +* Native - TCP (linux x64, macos, ios, watchos, tvos) for both client and server ## Interactions RSocket interface contains 5 methods: -* Fire and Forget: + +* Fire and Forget: `suspend fun fireAndForget(payload: Payload)` * Request-Response: - + `suspend requestResponse(payload: Payload): Payload` -* Request-Stream: - +* Request-Stream: + `fun requestStream(payload: Payload): Flow` * Request-Channel: `fun requestChannel(initPayload: Payload, payloads: Flow): Flow` * Metadata-Push: - + `suspend fun metadataPush(metadata: ByteReadPacket)` ## Using in your projects + Make sure, that you use Kotlin 1.5.20 ### Gradle: - -```groovy +```kotlin repositories { - jcenter() + mavenCentral() } dependencies { - implementation 'io.rsocket.kotlin:rsocket-core:0.13.1' - implementation 'io.rsocket.kotlin:rsocket-transport-ktor:0.13.1' - -// client feature for ktor -// implementation 'io.rsocket.kotlin:rsocket-transport-ktor-client:0.13.1' - -// server feature for ktor -// implementation 'io.rsocket.kotlin:rsocket-transport-ktor-server:0.13.1' - -// one of ktor engines to work with websockets -// client engines -// implementation 'io.ktor:ktor-client-js:1.6.1' //js -// implementation 'io.ktor:ktor-client-cio:1.6.1' //jvm -// implementation 'io.ktor:ktor-client-okhttp:1.6.1' //jvm - -// server engines (jvm only) -// implementation 'io.ktor:ktor-server-cio:1.6.1' -// implementation 'io.ktor:ktor-server-netty:1.6.1' -// implementation 'io.ktor:ktor-server-jetty:1.6.1' -// implementation 'io.ktor:ktor-server-tomcat:1.6.1' + implementation("io.rsocket.kotlin:rsocket-core:0.14.0") + + // TCP ktor transport + implementation("io.rsocket.kotlin:rsocket-transport-ktor:0.14.0") + + // WS ktor transport client plugin + implementation("io.rsocket.kotlin:rsocket-transport-ktor-client:0.14.0") + + // WS ktor transport server plugin + implementation("io.rsocket.kotlin:rsocket-transport-ktor-server:0.14.0") } ``` -### Gradle Kotlin DSL: +For WS ktor transport, available client or server engine should be added: ```kotlin -repositories { - jcenter() -} dependencies { - implementation("io.rsocket.kotlin:rsocket-core:0.13.1") - implementation("io.rsocket.kotlin:rsocket-transport-ktor:0.13.1") - -// client feature for ktor -// implementation("io.rsocket.kotlin:rsocket-transport-ktor-client:0.13.1") - -// server feature for ktor -// implementation("io.rsocket.kotlin:rsocket-transport-ktor-server:0.13.1") - -// one of ktor engines to work with websockets -// client engines -// implementation("io.ktor:ktor-client-js:1.6.1") //js -// implementation("io.ktor:ktor-client-cio:1.6.1") //jvm -// implementation("io.ktor:ktor-client-okhttp:1.6.1") //jvm - -// server engines (jvm only) -// implementation("io.ktor:ktor-server-cio:1.6.1") -// implementation("io.ktor:ktor-server-netty:1.6.1") -// implementation("io.ktor:ktor-server-jetty:1.6.1") -// implementation("io.ktor:ktor-server-tomcat:1.6.1") + // client engines for WS transport + implementation("io.ktor:ktor-client-js:1.6.2") //js + implementation("io.ktor:ktor-client-cio:1.6.2") //jvm + implementation("io.ktor:ktor-client-okhttp:1.6.2") //jvm + + // server engines for WS transport (jvm only) + implementation("io.ktor:ktor-server-cio:1.6.2") + implementation("io.ktor:ktor-server-netty:1.6.2") + implementation("io.ktor:ktor-server-jetty:1.6.2") + implementation("io.ktor:ktor-server-tomcat:1.6.2") } ``` @@ -116,7 +97,7 @@ val client = HttpClient(CIO) { connectionConfig { keepAlive = KeepAlive( interval = 30.seconds, - maxLifetime = 2.minutes + maxLifetime = 2.minutes ) //payload for setup frame @@ -128,7 +109,7 @@ val client = HttpClient(CIO) { metadata = "application/json" ) } - + //optional acceptor for server requests acceptor { RSocketRequestHandler { @@ -136,7 +117,7 @@ val client = HttpClient(CIO) { } } } - } + } } //connect to some url @@ -196,9 +177,9 @@ embeddedServer(CIO) { ### More examples: - [interactions](examples/interactions) - contains usages of some supported functions -- [multiplatform-chat](examples/multiplatform-chat) - chat implementation with JVM server and JS/JVM client with shared classes -and serializing data using [kotlinx.serialization](https://github.com/Kotlin/kotlinx.serialization) -- [nodejs-tcp-transport](examples/nodejs-tcp-transport) - implementation of TCP transport for nodejs +- [multiplatform-chat](examples/multiplatform-chat) - chat implementation with JVM server and JS/JVM client with shared classes and + serializing data using [kotlinx.serialization](https://github.com/Kotlin/kotlinx.serialization) +- [nodejs-tcp-transport](examples/nodejs-tcp-transport) - implementation of TCP transport for nodejs ## Reactive Streams Semantics diff --git a/build.gradle.kts b/build.gradle.kts index 6447c1dd0..bd73ed7c3 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -152,25 +152,29 @@ subprojects { languageVersion = "1.5" apiVersion = "1.5" - useExperimentalAnnotation("kotlin.RequiresOptIn") + optIn("kotlin.RequiresOptIn") + + //TODO: kludge, this is needed now, as ktor isn't fully supports kotlin 1.5.3x opt-in changes + // will be not needed in future with ktor 2.0.0 + optIn("io.ktor.utils.io.core.ExperimentalIoApi") if (name.contains("test", ignoreCase = true) || isTestProject || isPlaygroundProject) { - useExperimentalAnnotation("kotlin.time.ExperimentalTime") - useExperimentalAnnotation("kotlin.ExperimentalStdlibApi") - - useExperimentalAnnotation("kotlinx.coroutines.ExperimentalCoroutinesApi") - useExperimentalAnnotation("kotlinx.coroutines.InternalCoroutinesApi") - useExperimentalAnnotation("kotlinx.coroutines.ObsoleteCoroutinesApi") - useExperimentalAnnotation("kotlinx.coroutines.FlowPreview") - useExperimentalAnnotation("kotlinx.coroutines.DelicateCoroutinesApi") - - useExperimentalAnnotation("io.ktor.util.InternalAPI") - useExperimentalAnnotation("io.ktor.utils.io.core.internal.DangerousInternalIoApi") - - useExperimentalAnnotation("io.rsocket.kotlin.TransportApi") - useExperimentalAnnotation("io.rsocket.kotlin.ExperimentalMetadataApi") - useExperimentalAnnotation("io.rsocket.kotlin.ExperimentalStreamsApi") - useExperimentalAnnotation("io.rsocket.kotlin.RSocketLoggingApi") + optIn("kotlin.time.ExperimentalTime") + optIn("kotlin.ExperimentalStdlibApi") + + optIn("kotlinx.coroutines.ExperimentalCoroutinesApi") + optIn("kotlinx.coroutines.InternalCoroutinesApi") + optIn("kotlinx.coroutines.ObsoleteCoroutinesApi") + optIn("kotlinx.coroutines.FlowPreview") + optIn("kotlinx.coroutines.DelicateCoroutinesApi") + + optIn("io.ktor.util.InternalAPI") + optIn("io.ktor.utils.io.core.internal.DangerousInternalIoApi") + + optIn("io.rsocket.kotlin.TransportApi") + optIn("io.rsocket.kotlin.ExperimentalMetadataApi") + optIn("io.rsocket.kotlin.ExperimentalStreamsApi") + optIn("io.rsocket.kotlin.RSocketLoggingApi") } } } diff --git a/gradle.properties b/gradle.properties index e01243605..d4936647f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -16,19 +16,19 @@ #Project group=io.rsocket.kotlin -version=0.13.1 +version=0.14.0 #Versions -kotlinVersion=1.5.21 -ktorVersion=1.6.1 -kotlinxCoroutinesVersion=1.5.1-native-mt -kotlinxAtomicfuVersion=0.16.2 -kotlinxSerializationVersion=1.2.2 +kotlinVersion=1.5.31 +ktorVersion=1.6.4 +kotlinxCoroutinesVersion=1.5.2-native-mt +kotlinxAtomicfuVersion=0.16.3 +kotlinxSerializationVersion=1.3.0 kotlinxBenchmarkVersion=0.3.1 kotlinxNodejsVersion=0.0.7 rsocketJavaVersion=1.1.1 -turbineVersion=0.5.2 -artifactoryVersion=4.24.12 +turbineVersion=0.6.1 +artifactoryVersion=4.24.20 versionUpdatesVersion=0.39.0 gradleEnterpriseVersion=3.4.1 diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 05679dc3c..ffed3a254 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.1.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.2-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index 744e882ed..1b6c78733 100755 --- a/gradlew +++ b/gradlew @@ -1,7 +1,7 @@ -#!/usr/bin/env sh +#!/bin/sh # -# Copyright 2015 the original author or authors. +# Copyright © 2015-2021 the original authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,67 +17,101 @@ # ############################################################################## -## -## Gradle start up script for UN*X -## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# ############################################################################## # Attempt to set APP_HOME + # Resolve links: $0 may be a link -PRG="$0" -# Need this for relative symlinks. -while [ -h "$PRG" ] ; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - PRG="$link" - else - PRG=`dirname "$PRG"`"/$link" - fi +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac done -SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >/dev/null -APP_HOME="`pwd -P`" -cd "$SAVED" >/dev/null + +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit APP_NAME="Gradle" -APP_BASE_NAME=`basename "$0"` +APP_BASE_NAME=${0##*/} # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' # Use the maximum available, or set MAX_FD != -1 to use that value. -MAX_FD="maximum" +MAX_FD=maximum warn () { echo "$*" -} +} >&2 die () { echo echo "$*" echo exit 1 -} +} >&2 # OS specific support (must be 'true' or 'false'). cygwin=false msys=false darwin=false nonstop=false -case "`uname`" in - CYGWIN* ) - cygwin=true - ;; - Darwin* ) - darwin=true - ;; - MSYS* | MINGW* ) - msys=true - ;; - NONSTOP* ) - nonstop=true - ;; +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; esac CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar @@ -87,9 +121,9 @@ CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar if [ -n "$JAVA_HOME" ] ; then if [ -x "$JAVA_HOME/jre/sh/java" ] ; then # IBM's JDK on AIX uses strange locations for the executables - JAVACMD="$JAVA_HOME/jre/sh/java" + JAVACMD=$JAVA_HOME/jre/sh/java else - JAVACMD="$JAVA_HOME/bin/java" + JAVACMD=$JAVA_HOME/bin/java fi if [ ! -x "$JAVACMD" ] ; then die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME @@ -98,7 +132,7 @@ Please set the JAVA_HOME variable in your environment to match the location of your Java installation." fi else - JAVACMD="java" + JAVACMD=java which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the @@ -106,80 +140,95 @@ location of your Java installation." fi # Increase the maximum file descriptors if we can. -if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then - MAX_FD_LIMIT=`ulimit -H -n` - if [ $? -eq 0 ] ; then - if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then - MAX_FD="$MAX_FD_LIMIT" - fi - ulimit -n $MAX_FD - if [ $? -ne 0 ] ; then - warn "Could not set maximum file descriptor limit: $MAX_FD" - fi - else - warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" - fi +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac fi -# For Darwin, add options to specify how the application appears in the dock -if $darwin; then - GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" -fi +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. # For Cygwin or MSYS, switch paths to Windows format before running java -if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then - APP_HOME=`cygpath --path --mixed "$APP_HOME"` - CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` - - JAVACMD=`cygpath --unix "$JAVACMD"` - - # We build the pattern for arguments to be converted via cygpath - ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` - SEP="" - for dir in $ROOTDIRSRAW ; do - ROOTDIRS="$ROOTDIRS$SEP$dir" - SEP="|" - done - OURCYGPATTERN="(^($ROOTDIRS))" - # Add a user-defined pattern to the cygpath arguments - if [ "$GRADLE_CYGPATTERN" != "" ] ; then - OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" - fi +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + # Now convert the arguments - kludge to limit ourselves to /bin/sh - i=0 - for arg in "$@" ; do - CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` - CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option - - if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition - eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` - else - eval `echo args$i`="\"$arg\"" + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) fi - i=`expr $i + 1` + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg done - case $i in - 0) set -- ;; - 1) set -- "$args0" ;; - 2) set -- "$args0" "$args1" ;; - 3) set -- "$args0" "$args1" "$args2" ;; - 4) set -- "$args0" "$args1" "$args2" "$args3" ;; - 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; - 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; - 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; - 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; - 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; - esac fi -# Escape application args -save () { - for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done - echo " " -} -APP_ARGS=`save "$@"` +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# -# Collect all arguments for the java command, following the shell quoting and substitution rules -eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' exec "$JAVACMD" "$@" diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/ClientTransport.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/ClientTransport.kt index 4c439a6b6..918a405ee 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/ClientTransport.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/transport/ClientTransport.kt @@ -27,7 +27,7 @@ public fun interface ClientTransport : CoroutineScope { public suspend fun connect(): Connection } -@OptIn(TransportApi::class) +@TransportApi public fun ClientTransport(coroutineContext: CoroutineContext, transport: ClientTransport): ClientTransport = object : ClientTransport { override val coroutineContext: CoroutineContext get() = coroutineContext diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/ConnectionEstablishmentTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/ConnectionEstablishmentTest.kt index 57b555c61..1e7a44b5d 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/ConnectionEstablishmentTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/ConnectionEstablishmentTest.kt @@ -58,7 +58,7 @@ class ConnectionEstablishmentTest : SuspendTest, TestWithLeakCheck { assertFailsWith(RSocketError.Setup.Rejected::class, errorMessage) { deferred.await() } connection.test { - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is ErrorFrame) assertTrue(frame.throwable is RSocketError.Setup.Rejected) assertEquals(errorMessage, frame.throwable.message) diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt index 4b0311cb7..d2209a2b4 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/RSocketTest.kt @@ -96,9 +96,9 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { val requester = start() requester.requestStream(payload("HELLO")).test { repeat(10) { - expectItem().close() + awaitItem().close() } - expectComplete() + awaitComplete() } } @@ -122,9 +122,9 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { }) requester.requestStream(payload("HELLO")).flowOn(PrefetchStrategy(1, 0)).test { repeat(3) { - expectItem().close() + awaitItem().close() } - val error = expectError() + val error = awaitError() assertTrue(error is RSocketError.ApplicationError) assertEquals("FAIL", error.message) } @@ -148,9 +148,9 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { .map { it.value } .test { repeat(23) { - expectItem().close() + awaitItem().close() } - val error = expectError() + val error = awaitError() assertTrue(error is IllegalStateException) assertEquals("oops", error.message) } @@ -170,9 +170,9 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { .take(3) //canceled after 3 element .test { repeat(3) { - expectItem().close() + awaitItem().close() } - expectComplete() + awaitComplete() } } @@ -203,9 +203,9 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { val request = (1..10).asFlow().map { payload(it.toString()) }.onCompletion { awaiter.complete() } requester.requestChannel(payload(""), request).test { repeat(10) { - expectItem().close() + awaitItem().close() } - expectComplete() + awaitComplete() } awaiter.join() delay(500) @@ -241,9 +241,9 @@ class RSocketTest : SuspendTest, TestWithLeakCheck { val request = (1..3).asFlow().map { payload(it.toString()) } requester.requestChannel(payload("0"), request).flowOn(PrefetchStrategy(3, 0)).test { repeat(3) { - expectItem().close() + awaitItem().close() } - expectComplete() + awaitComplete() } } diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/ReconnectableRSocketTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/ReconnectableRSocketTest.kt index c5d6c8097..6d2db3a87 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/ReconnectableRSocketTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/core/ReconnectableRSocketTest.kt @@ -204,9 +204,9 @@ class ReconnectableRSocketTest : SuspendTest, TestWithLeakCheck { rSocket.requestStream(Payload.Empty).test { repeat(5) { - assertEquals(Payload.Empty, expectItem()) + assertEquals(Payload.Empty, awaitItem()) } - expectComplete() + awaitComplete() } assertTrue(rSocket.isActive) diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt index 28dcc00b1..ffb872aa1 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/internal/RSocketRequesterTest.kt @@ -62,7 +62,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { expectNoEventsIn(200) flow.launchIn(connection) - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is RequestFrame) assertEquals(FrameType.RequestStream, frame.type) assertEquals(5, frame.initialRequest) @@ -80,7 +80,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { expectNoEventsIn(200) flow.launchIn(connection) - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is RequestFrame) assertEquals(FrameType.RequestStream, frame.type) assertEquals(2, frame.initialRequest) @@ -92,7 +92,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { expectNoEventsIn(200) connection.sendToReceiver(NextPayloadFrame(1, Payload.Empty)) - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is CancelFrame) } @@ -108,7 +108,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { expectNoEventsIn(200) flow.launchIn(connection + anotherDispatcher) - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is RequestFrame) assertEquals(FrameType.RequestStream, frame.type) assertEquals(1, frame.initialRequest) @@ -117,7 +117,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { expectNoEventsIn(200) connection.sendToReceiver(NextPayloadFrame(1, Payload.Empty)) - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is RequestNFrame) assertEquals(1, frame.requestN) } @@ -125,7 +125,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { expectNoEventsIn(200) connection.sendToReceiver(NextPayloadFrame(1, Payload.Empty)) - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is CancelFrame) } @@ -141,7 +141,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { expectNoEventsIn(200) flow.launchIn(connection) - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is RequestFrame) assertEquals(FrameType.RequestStream, frame.type) assertEquals(3, frame.initialRequest) @@ -156,7 +156,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { expectNoEventsIn(200) connection.sendToReceiver(NextPayloadFrame(1, Payload.Empty)) - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is RequestNFrame) assertEquals(3, frame.requestN) } @@ -179,7 +179,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { expectNoEventsIn(200) flow.launchIn(connection) - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is RequestFrame) assertEquals(FrameType.RequestStream, frame.type) assertEquals(5, frame.initialRequest) @@ -194,7 +194,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { expectNoEventsIn(200) connection.sendToReceiver(NextPayloadFrame(1, Payload.Empty)) - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is RequestNFrame) assertEquals(5, frame.requestN) } @@ -220,7 +220,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { expectNoEventsIn(200) flow.launchIn(connection) - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is RequestFrame) assertEquals(FrameType.RequestStream, frame.type) assertEquals(1, frame.initialRequest) @@ -229,7 +229,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { expectNoEventsIn(200) connection.sendToReceiver(NextPayloadFrame(1, Payload.Empty)) - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is RequestNFrame) assertEquals(1, frame.requestN) } @@ -237,7 +237,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { expectNoEventsIn(200) connection.sendToReceiver(NextPayloadFrame(1, Payload.Empty)) - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is RequestNFrame) assertEquals(1, frame.requestN) } @@ -245,7 +245,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { expectNoEventsIn(200) connection.sendToReceiver(NextPayloadFrame(1, Payload.Empty)) - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is CancelFrame) } @@ -270,7 +270,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { val deferred = GlobalScope.async { requester.requestResponse(Payload.Empty) } connection.test { - expectFrame { frame -> + awaitFrame { frame -> val streamId = frame.streamId connection.sendToReceiver(ErrorFrame(streamId, RSocketError.ApplicationError(errorMessage))) } @@ -282,7 +282,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { fun testHandleValidFrame() = test { connection.test { val deferred = async { requester.requestResponse(Payload.Empty) } - expectFrame { frame -> + awaitFrame { frame -> val streamId = frame.streamId connection.sendToReceiver(NextPayloadFrame(streamId, Payload.Empty)) } @@ -296,8 +296,8 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { connection.test { withTimeoutOrNull(100) { requester.requestResponse(Payload.Empty) } - expectFrame { assertTrue(it is RequestFrame) } - expectFrame { assertTrue(it is CancelFrame) } + awaitFrame { assertTrue(it is RequestFrame) } + awaitFrame { assertTrue(it is CancelFrame) } expectNoEventsIn(200) } @@ -309,10 +309,10 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { val request = flow { Job().join() }.onCompletion { job.complete() } val response = requester.requestChannel(Payload.Empty, request).launchIn(connection) connection.test { - expectFrame { assertTrue(it is RequestFrame) } + awaitFrame { assertTrue(it is RequestFrame) } expectNoEventsIn(200) response.cancelAndJoin() - expectFrame { assertTrue(it is CancelFrame) } + awaitFrame { assertTrue(it is CancelFrame) } expectNoEventsIn(200) assertTrue(job.isCompleted) } @@ -324,10 +324,10 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { val request = flow { repeat(100) { emit(Payload.Empty) } }.onCompletion { job.complete() } val response = requester.requestChannel(Payload.Empty, request).launchIn(connection) connection.test { - expectFrame { assertTrue(it is RequestFrame) } + awaitFrame { assertTrue(it is RequestFrame) } expectNoEventsIn(200) response.cancelAndJoin() - expectFrame { assertTrue(it is CancelFrame) } + awaitFrame { assertTrue(it is CancelFrame) } expectNoEventsIn(200) assertTrue(job.isCompleted) } @@ -342,7 +342,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { } val response = requester.requestChannel(payload(byteArrayOf(1), byteArrayOf(2)), request).launchIn(connection) connection.test { - expectFrame { frame -> + awaitFrame { frame -> val streamId = frame.streamId assertTrue(frame is RequestFrame) assertEquals(FrameType.RequestChannel, frame.type) @@ -368,7 +368,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { requester.requestChannel(payload("INIT"), request).flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)).launchIn(connection) connection.test { - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is RequestFrame) assertEquals(FrameType.RequestChannel, frame.type) assertEquals(Int.MAX_VALUE, frame.initialRequest) @@ -384,9 +384,9 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck { connection.launch { delay(200) connection.test { - expectFrame { assertTrue(it is RequestFrame) } + awaitFrame { assertTrue(it is RequestFrame) } connection.cancel() - expectComplete() + awaitComplete() } } assertFailsWith(CancellationException::class) { request() } diff --git a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt index f918627c3..a9bd24727 100644 --- a/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt +++ b/rsocket-core/src/commonTest/kotlin/io/rsocket/kotlin/keepalive/KeepAliveTest.kt @@ -44,7 +44,7 @@ class KeepAliveTest : TestWithConnection(), TestWithLeakCheck { requester() connection.test { repeat(5) { - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is KeepAliveFrame) assertTrue(frame.respond) } @@ -65,7 +65,7 @@ class KeepAliveTest : TestWithConnection(), TestWithLeakCheck { assertTrue(rSocket.isActive) connection.test { repeat(50) { - expectItem() + awaitItem() } } } @@ -82,7 +82,7 @@ class KeepAliveTest : TestWithConnection(), TestWithLeakCheck { connection.test { repeat(5) { - expectFrame { frame -> + awaitFrame { frame -> assertTrue(frame is KeepAliveFrame) assertFalse(frame.respond) } @@ -102,7 +102,7 @@ class KeepAliveTest : TestWithConnection(), TestWithLeakCheck { fun rSocketCanceledOnMissingKeepAliveTicks() = test { val rSocket = requester() connection.test { - while (rSocket.isActive) kotlin.runCatching { expectItem() } + while (rSocket.isActive) kotlin.runCatching { awaitItem() } } assertTrue(rSocket.coroutineContext.job.getCancellationException().cause is RSocketError.ConnectionError) } diff --git a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt index 7e92b12b4..3406f1c33 100644 --- a/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt +++ b/rsocket-test/src/commonMain/kotlin/io/rsocket/kotlin/test/TestConnection.kt @@ -76,6 +76,6 @@ suspend fun FlowTurbine<*>.expectNoEventsIn(timeMillis: Long) { expectNoEvents() } -suspend inline fun FlowTurbine.expectFrame(block: (frame: Frame) -> Unit) { - block(expectItem()) +suspend inline fun FlowTurbine.awaitFrame(block: (frame: Frame) -> Unit) { + block(awaitItem()) } diff --git a/rsocket-transport-ktor/rsocket-transport-ktor-server/src/jvmMain/kotlin/io/rsocket/kotlin/transport/ktor/server/Routing.kt b/rsocket-transport-ktor/rsocket-transport-ktor-server/src/jvmMain/kotlin/io/rsocket/kotlin/transport/ktor/server/Routing.kt index 06d26e173..3ba0fee6c 100644 --- a/rsocket-transport-ktor/rsocket-transport-ktor-server/src/jvmMain/kotlin/io/rsocket/kotlin/transport/ktor/server/Routing.kt +++ b/rsocket-transport-ktor/rsocket-transport-ktor-server/src/jvmMain/kotlin/io/rsocket/kotlin/transport/ktor/server/Routing.kt @@ -19,7 +19,9 @@ package io.rsocket.kotlin.transport.ktor.server import io.ktor.application.* import io.ktor.routing.* import io.rsocket.kotlin.* +import kotlinx.coroutines.* +@OptIn(DelicateCoroutinesApi::class) public fun Route.rSocket( path: String? = null, protocol: String? = null, diff --git a/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpConnection.kt b/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpConnection.kt index 139b2dae6..6c3fb42be 100644 --- a/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpConnection.kt +++ b/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpConnection.kt @@ -29,7 +29,7 @@ import io.rsocket.kotlin.internal.* import kotlinx.coroutines.* import kotlin.coroutines.* -@OptIn(TransportApi::class) +@TransportApi internal class TcpConnection( socket: Socket, override val coroutineContext: CoroutineContext, diff --git a/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpServerTransport.kt b/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpServerTransport.kt index 6026fc20c..12e80cc65 100644 --- a/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpServerTransport.kt +++ b/rsocket-transport-ktor/src/commonMain/kotlin/io/rsocket/kotlin/transport/ktor/TcpServerTransport.kt @@ -14,6 +14,7 @@ * limitations under the License. */ +@file:OptIn(TransportApi::class) @file:Suppress("FunctionName") package io.rsocket.kotlin.transport.ktor @@ -25,6 +26,7 @@ import io.ktor.util.network.* import io.ktor.utils.io.core.* import io.ktor.utils.io.core.internal.* import io.ktor.utils.io.pool.* +import io.rsocket.kotlin.* import io.rsocket.kotlin.transport.* import kotlinx.coroutines.* diff --git a/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalConnection.kt b/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalConnection.kt index 58972a217..fcc1bd787 100644 --- a/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalConnection.kt +++ b/rsocket-transport-local/src/commonMain/kotlin/io/rsocket/kotlin/transport/local/LocalConnection.kt @@ -23,7 +23,7 @@ import io.rsocket.kotlin.* import kotlinx.coroutines.channels.* import kotlin.coroutines.* -@OptIn(TransportApi::class) +@TransportApi internal class LocalConnection( private val sender: SendChannel, private val receiver: ReceiveChannel,