From 93c30b6dd2ba57d5b8eb2458da0a2634cc40d1e2 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 9 Nov 2021 16:18:25 +0200 Subject: [PATCH 01/10] adds first frame handling timeout Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka --- .../java/io/rsocket/core/RSocketServer.java | 29 +++++++++++++++---- .../java/io/rsocket/core/ServerSetup.java | 13 +++++++++ .../core/SetupHandlingDuplexConnection.java | 1 + .../io/rsocket/core/RSocketServerTest.java | 28 ++++++++++++++++++ 4 files changed, 66 insertions(+), 5 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java index 5ec33e76f..c11b181b0 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java @@ -41,6 +41,7 @@ import io.rsocket.plugins.RequestInterceptor; import io.rsocket.resume.SessionManager; import io.rsocket.transport.ServerTransport; +import java.time.Duration; import java.util.Objects; import java.util.function.Consumer; import java.util.function.Supplier; @@ -70,6 +71,7 @@ public final class RSocketServer { private int mtu = 0; private int maxInboundPayloadSize = Integer.MAX_VALUE; private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT; + private Duration timeout = Duration.ofMinutes(1); private RSocketServer() {} @@ -223,6 +225,22 @@ public RSocketServer maxInboundPayloadSize(int maxInboundPayloadSize) { return this; } + /** + * Specifies timeout for the first incoming frame from the accepted connection. + * + *

By default this is set to 1 minute. + * + * @param timeout duration + * @return the same instance for method chaining + */ + public RSocketServer setupHandlingTimeout(Duration timeout) { + if (timeout.isNegative() || timeout.isZero()) { + throw new IllegalArgumentException("Setup Handling Timeout should be greater than zero"); + } + this.timeout = timeout; + return this; + } + /** * When this is set, frames larger than the given maximum transmission unit (mtu) size value are * fragmented. @@ -287,7 +305,7 @@ public RSocketServer payloadDecoder(PayloadDecoder decoder) { public Mono bind(ServerTransport transport) { return Mono.defer( new Supplier>() { - final ServerSetup serverSetup = serverSetup(); + final ServerSetup serverSetup = serverSetup(timeout); @Override public Mono get() { @@ -326,7 +344,7 @@ public ServerTransport.ConnectionAcceptor asConnectionAcceptor() { public ServerTransport.ConnectionAcceptor asConnectionAcceptor(int maxFrameLength) { assertValidateSetup(maxFrameLength, maxInboundPayloadSize, mtu); return new ServerTransport.ConnectionAcceptor() { - private final ServerSetup serverSetup = serverSetup(); + private final ServerSetup serverSetup = serverSetup(timeout); @Override public Mono apply(DuplexConnection connection) { @@ -469,12 +487,13 @@ private Mono acceptSetup( }); } - private ServerSetup serverSetup() { - return resume != null ? createSetup() : new ServerSetup.DefaultServerSetup(); + private ServerSetup serverSetup(Duration timeout) { + return resume != null ? createSetup(timeout) : new ServerSetup.DefaultServerSetup(timeout); } - ServerSetup createSetup() { + ServerSetup createSetup(Duration timeout) { return new ServerSetup.ResumableServerSetup( + timeout, new SessionManager(), resume.getSessionDuration(), resume.getStreamTimeout(), diff --git a/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java b/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java index e716b8fcb..2d367bd73 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java +++ b/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java @@ -36,9 +36,16 @@ abstract class ServerSetup { + final Duration timeout; + + protected ServerSetup(Duration timeout) { + this.timeout = timeout; + } + Mono> init(DuplexConnection connection) { return Mono.>create( sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, sink))) + .timeout(this.timeout) .or(connection.onClose().then(Mono.error(ClosedChannelException::new))); } @@ -57,6 +64,10 @@ void sendError(DuplexConnection duplexConnection, RSocketErrorException exceptio static class DefaultServerSetup extends ServerSetup { + DefaultServerSetup(Duration timeout) { + super(timeout); + } + @Override public Mono acceptRSocketSetup( ByteBuf frame, @@ -86,11 +97,13 @@ static class ResumableServerSetup extends ServerSetup { private final boolean cleanupStoreOnKeepAlive; ResumableServerSetup( + Duration timeout, SessionManager sessionManager, Duration resumeSessionDuration, Duration resumeStreamTimeout, Function resumeStoreFactory, boolean cleanupStoreOnKeepAlive) { + super(timeout); this.sessionManager = sessionManager; this.resumeSessionDuration = resumeSessionDuration; this.resumeStreamTimeout = resumeStreamTimeout; diff --git a/rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java index b6bc87513..2da572de3 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/core/SetupHandlingDuplexConnection.java @@ -96,6 +96,7 @@ public void request(long n) { @Override public void cancel() { + source.dispose(); s.cancel(); } diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java index 08555740c..f0b92d5a5 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java @@ -29,11 +29,13 @@ import io.rsocket.test.util.TestServerTransport; import java.time.Duration; import java.util.Random; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import reactor.core.Scannable; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import reactor.test.StepVerifier; +import reactor.test.scheduler.VirtualTimeScheduler; public class RSocketServerTest { @@ -60,6 +62,32 @@ public void unexpectedFramesBeforeSetupFrame() { .hasNoLeaks(); } + @Test + public void timeoutOnNoFirstFrame() { + final VirtualTimeScheduler scheduler = VirtualTimeScheduler.getOrSet(); + try { + TestServerTransport transport = new TestServerTransport(); + RSocketServer.create().setupHandlingTimeout(Duration.ofMinutes(2)).bind(transport).block(); + + final TestDuplexConnection duplexConnection = transport.connect(); + + scheduler.advanceTimeBy(Duration.ofMinutes(1)); + + Assertions.assertThat(duplexConnection.isDisposed()).isFalse(); + + scheduler.advanceTimeBy(Duration.ofMinutes(1)); + + StepVerifier.create(duplexConnection.onClose()) + .expectSubscription() + .expectComplete() + .verify(Duration.ofSeconds(10)); + + FrameAssert.assertThat(duplexConnection.pollFrame()).isNull(); + } finally { + VirtualTimeScheduler.reset(); + } + } + @Test public void ensuresMaxFrameLengthCanNotBeLessThenMtu() { RSocketServer.create() From e916ab904869f0b7df237200b4fa2754e2d2432c Mon Sep 17 00:00:00 2001 From: Oleh Dokuka <5380167+OlegDokuka@users.noreply.github.com> Date: Tue, 9 Nov 2021 16:52:57 +0200 Subject: [PATCH 02/10] Update rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java Co-authored-by: Rossen Stoyanchev --- rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java index c11b181b0..a4a8f852f 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java @@ -226,7 +226,7 @@ public RSocketServer maxInboundPayloadSize(int maxInboundPayloadSize) { } /** - * Specifies timeout for the first incoming frame from the accepted connection. + * Specify the max time to wait for the first frame (e.g. {@code SETUP}) on an accepted connection. * *

By default this is set to 1 minute. * From b0131e85a447f72ae116d47edf9752ed9444f02d Mon Sep 17 00:00:00 2001 From: Oleh Dokuka <5380167+OlegDokuka@users.noreply.github.com> Date: Tue, 9 Nov 2021 16:53:51 +0200 Subject: [PATCH 03/10] Update rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java --- rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java index a4a8f852f..3e3b2590c 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java @@ -233,7 +233,7 @@ public RSocketServer maxInboundPayloadSize(int maxInboundPayloadSize) { * @param timeout duration * @return the same instance for method chaining */ - public RSocketServer setupHandlingTimeout(Duration timeout) { + public RSocketServer maxTimeToFirstFrame(Duration timeout) { if (timeout.isNegative() || timeout.isZero()) { throw new IllegalArgumentException("Setup Handling Timeout should be greater than zero"); } From d6c11ba5cedb6e44e269337201e52d36897614b9 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka <5380167+OlegDokuka@users.noreply.github.com> Date: Tue, 9 Nov 2021 16:54:23 +0200 Subject: [PATCH 04/10] Update rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java --- .../src/test/java/io/rsocket/core/RSocketServerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java index f0b92d5a5..24bf95215 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java @@ -67,7 +67,7 @@ public void timeoutOnNoFirstFrame() { final VirtualTimeScheduler scheduler = VirtualTimeScheduler.getOrSet(); try { TestServerTransport transport = new TestServerTransport(); - RSocketServer.create().setupHandlingTimeout(Duration.ofMinutes(2)).bind(transport).block(); + RSocketServer.create().maxTimeToFirstFrame(Duration.ofMinutes(2)).bind(transport).block(); final TestDuplexConnection duplexConnection = transport.connect(); From 1da568daa68e2b2d06302acd290df0c1dda80362 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 9 Nov 2021 16:57:26 +0200 Subject: [PATCH 05/10] fixes format Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka --- rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java index 3e3b2590c..3208bb4fd 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java @@ -226,7 +226,8 @@ public RSocketServer maxInboundPayloadSize(int maxInboundPayloadSize) { } /** - * Specify the max time to wait for the first frame (e.g. {@code SETUP}) on an accepted connection. + * Specify the max time to wait for the first frame (e.g. {@code SETUP}) on an accepted + * connection. * *

By default this is set to 1 minute. * From 33ec987a5f5e0a62a345c6948f4b1caf803ad062 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 9 Nov 2021 18:18:11 +0200 Subject: [PATCH 06/10] improves error info Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka --- .../src/main/java/io/rsocket/test/TransportTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java index 5384c7e8d..4f81f9bc3 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java @@ -96,7 +96,12 @@ default void close() { getTransportPair().responder.awaitAllInteractionTermination(getTimeout()); getTransportPair().dispose(); getTransportPair().awaitClosed(); - RuntimeException throwable = new RuntimeException(); + RuntimeException throwable = new RuntimeException() { + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } + }; try { getTransportPair().byteBufAllocator2.assertHasNoLeaks(); From 68ddec3e3086a37122510c2a60ea10ad7c156843 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 9 Nov 2021 18:18:35 +0200 Subject: [PATCH 07/10] improves snapshot version name Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka --- .github/workflows/gradle-all.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/gradle-all.yml b/.github/workflows/gradle-all.yml index 03e6a4e68..8826f511a 100644 --- a/.github/workflows/gradle-all.yml +++ b/.github/workflows/gradle-all.yml @@ -142,7 +142,10 @@ jobs: run: chmod +x gradlew - name: Publish Packages to Artifactory if: ${{ matrix.jdk == '1.8' }} - run: ./gradlew -PversionSuffix="-${githubRef#refs/heads/}-SNAPSHOT" -PbuildNumber="${buildNumber}" publishMavenPublicationToGitHubPackagesRepository --no-daemon --stacktrace + run: | + githubRef="${githubRef#refs/heads/}" + githubRef="${githubRef////-}" + ./gradlew -PversionSuffix="-${githubRef}-SNAPSHOT" -PbuildNumber="${buildNumber}" publishMavenPublicationToGitHubPackagesRepository --no-daemon --stacktrace env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} githubRef: ${{ github.ref }} From ece8d503f2dde4490006e81c533a935acba54be2 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 9 Nov 2021 18:20:41 +0200 Subject: [PATCH 08/10] fixes format Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka --- .../main/java/io/rsocket/test/TransportTest.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java index 4f81f9bc3..03b9646fa 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java @@ -96,12 +96,13 @@ default void close() { getTransportPair().responder.awaitAllInteractionTermination(getTimeout()); getTransportPair().dispose(); getTransportPair().awaitClosed(); - RuntimeException throwable = new RuntimeException() { - @Override - public synchronized Throwable fillInStackTrace() { - return this; - } - }; + RuntimeException throwable = + new RuntimeException() { + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } + }; try { getTransportPair().byteBufAllocator2.assertHasNoLeaks(); From 2607b389607b1302389837a89e2f464a908a8b9d Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 9 Nov 2021 18:33:56 +0200 Subject: [PATCH 09/10] improves failure message Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka --- .../src/main/java/io/rsocket/test/TransportTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java index 03b9646fa..9ef253453 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java @@ -39,6 +39,7 @@ import java.io.InputStreamReader; import java.net.SocketAddress; import java.time.Duration; +import java.util.Arrays; import java.util.concurrent.CancellationException; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; @@ -102,6 +103,11 @@ default void close() { public synchronized Throwable fillInStackTrace() { return this; } + + @Override + public String getMessage() { + return Arrays.toString(getSuppressed()); + } }; try { From 80cb6746b66506d8c070e2079b5ca6305d09ebce Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 9 Nov 2021 20:30:34 +0200 Subject: [PATCH 10/10] improvements Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka --- .../src/main/java/io/rsocket/test/TransportTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java index 9ef253453..902a5844c 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java @@ -788,8 +788,11 @@ public void onSubscribe(Subscription s) { @Override public void onNext(ByteBuf buf) { - actual.onNext(buf); - buf.release(); + try { + actual.onNext(buf); + } finally { + buf.release(); + } } Mono onClose() {