diff --git a/README.md b/README.md index 5b5193329..d54a42dad 100644 --- a/README.md +++ b/README.md @@ -26,8 +26,8 @@ repositories { mavenCentral() } dependencies { - implementation 'io.rsocket:rsocket-core:1.0.1' - implementation 'io.rsocket:rsocket-transport-netty:1.0.1' + implementation 'io.rsocket:rsocket-core:1.0.2' + implementation 'io.rsocket:rsocket-transport-netty:1.0.2' } ``` @@ -40,8 +40,8 @@ repositories { maven { url 'https://oss.jfrog.org/oss-snapshot-local' } } dependencies { - implementation 'io.rsocket:rsocket-core:1.0.2-SNAPSHOT' - implementation 'io.rsocket:rsocket-transport-netty:1.0.2-SNAPSHOT' + implementation 'io.rsocket:rsocket-core:1.0.3-SNAPSHOT' + implementation 'io.rsocket:rsocket-transport-netty:1.0.3-SNAPSHOT' } ``` diff --git a/build.gradle b/build.gradle index 1829a512f..a576f22f8 100644 --- a/build.gradle +++ b/build.gradle @@ -32,11 +32,10 @@ subprojects { apply plugin: 'io.spring.dependency-management' apply plugin: 'com.github.sherter.google-java-format' - ext['reactor-bom.version'] = 'Dysprosium-BUILD-SNAPSHOT' + ext['reactor-bom.version'] = 'Dysprosium-SR11' ext['logback.version'] = '1.2.3' - ext['findbugs.version'] = '3.0.2' - ext['netty-bom.version'] = '4.1.50.Final' - ext['netty-boringssl.version'] = '2.0.30.Final' + ext['netty-bom.version'] = '4.1.51.Final' + ext['netty-boringssl.version'] = '2.0.31.Final' ext['hdrhistogram.version'] = '2.1.10' ext['mockito.version'] = '3.2.0' ext['slf4j.version'] = '1.7.25' diff --git a/gradle.properties b/gradle.properties index 9021ebfab..09eb2d90f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -11,5 +11,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # -version=1.0.2 -perfBaselineVersion=1.0.1 +version=1.0.3 +perfBaselineVersion=1.0.2 diff --git a/rsocket-core/src/main/java/io/rsocket/RSocket.java b/rsocket-core/src/main/java/io/rsocket/RSocket.java index 773c93dc2..b05241365 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocket.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocket.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,8 +34,7 @@ public interface RSocket extends Availability, Closeable { * handled, otherwise errors. */ default Mono fireAndForget(Payload payload) { - payload.release(); - return Mono.error(new UnsupportedOperationException("Fire-and-Forget not implemented.")); + return RSocketAdapter.fireAndForget(payload); } /** @@ -46,8 +45,7 @@ default Mono fireAndForget(Payload payload) { * response. */ default Mono requestResponse(Payload payload) { - payload.release(); - return Mono.error(new UnsupportedOperationException("Request-Response not implemented.")); + return RSocketAdapter.requestResponse(payload); } /** @@ -57,8 +55,7 @@ default Mono requestResponse(Payload payload) { * @return {@code Publisher} containing the stream of {@code Payload}s representing the response. */ default Flux requestStream(Payload payload) { - payload.release(); - return Flux.error(new UnsupportedOperationException("Request-Stream not implemented.")); + return RSocketAdapter.requestStream(payload); } /** @@ -68,7 +65,7 @@ default Flux requestStream(Payload payload) { * @return Stream of response payloads. */ default Flux requestChannel(Publisher payloads) { - return Flux.error(new UnsupportedOperationException("Request-Channel not implemented.")); + return RSocketAdapter.requestChannel(payloads); } /** @@ -79,8 +76,7 @@ default Flux requestChannel(Publisher payloads) { * handled, otherwise errors. */ default Mono metadataPush(Payload payload) { - payload.release(); - return Mono.error(new UnsupportedOperationException("Metadata-Push not implemented.")); + return RSocketAdapter.metadataPush(payload); } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketAdapter.java b/rsocket-core/src/main/java/io/rsocket/RSocketAdapter.java new file mode 100644 index 000000000..b5a64b8dd --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/RSocketAdapter.java @@ -0,0 +1,78 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Package private class with default implementations for use in {@link RSocket}. The main purpose + * is to hide static {@link UnsupportedOperationException} declarations. + * + * @since 1.0.3 + */ +class RSocketAdapter { + + private static final Mono UNSUPPORTED_FIRE_AND_FORGET = + Mono.error(new UnsupportedInteractionException("Fire-and-Forget")); + + private static final Mono UNSUPPORTED_REQUEST_RESPONSE = + Mono.error(new UnsupportedInteractionException("Request-Response")); + + private static final Flux UNSUPPORTED_REQUEST_STREAM = + Flux.error(new UnsupportedInteractionException("Request-Stream")); + + private static final Flux UNSUPPORTED_REQUEST_CHANNEL = + Flux.error(new UnsupportedInteractionException("Request-Channel")); + + private static final Mono UNSUPPORTED_METADATA_PUSH = + Mono.error(new UnsupportedInteractionException("Metadata-Push")); + + static Mono fireAndForget(Payload payload) { + payload.release(); + return RSocketAdapter.UNSUPPORTED_FIRE_AND_FORGET; + } + + static Mono requestResponse(Payload payload) { + payload.release(); + return RSocketAdapter.UNSUPPORTED_REQUEST_RESPONSE; + } + + static Flux requestStream(Payload payload) { + payload.release(); + return RSocketAdapter.UNSUPPORTED_REQUEST_STREAM; + } + + static Flux requestChannel(Publisher payloads) { + return RSocketAdapter.UNSUPPORTED_REQUEST_CHANNEL; + } + + static Mono metadataPush(Payload payload) { + payload.release(); + return RSocketAdapter.UNSUPPORTED_METADATA_PUSH; + } + + private static class UnsupportedInteractionException extends RuntimeException { + + private static final long serialVersionUID = 5084623297446471999L; + + UnsupportedInteractionException(String interactionName) { + super(interactionName + " not implemented.", null, false, false); + } + } +} diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index 2ecdec215..a249ea888 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -64,7 +64,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; -import reactor.core.publisher.Operators; import reactor.core.publisher.SignalType; import reactor.core.publisher.UnicastProcessor; import reactor.core.scheduler.Scheduler; @@ -267,68 +266,54 @@ private Mono handleRequestResponse(final Payload payload) { final UnboundedProcessor sendProcessor = this.sendProcessor; final UnicastProcessor receiver = UnicastProcessor.create(Queues.one().get()); - final AtomicBoolean once = new AtomicBoolean(); + return Mono.fromDirect( + new RequestOperator( + receiver.next(), "RequestResponseMono allows only a single subscriber") { - return Mono.defer( - () -> { - if (once.getAndSet(true)) { - return Mono.error( - new IllegalStateException("RequestResponseMono allows only a single subscriber")); - } + @Override + void hookOnFirstRequest(long n) { + if (isDisposed()) { + payload.release(); + final Throwable t = terminationError; + receiver.onError(t); + return; + } - return receiver - .next() - .transform( - Operators.lift( - (s, actual) -> - new RequestOperator(actual) { - - @Override - void hookOnFirstRequest(long n) { - if (isDisposed()) { - payload.release(); - final Throwable t = terminationError; - receiver.onError(t); - return; - } - - RequesterLeaseHandler lh = leaseHandler; - if (!lh.useLease()) { - payload.release(); - receiver.onError(lh.leaseError()); - return; - } - - int streamId = streamIdSupplier.nextStreamId(receivers); - this.streamId = streamId; - - ByteBuf requestResponseFrame = - RequestResponseFrameCodec.encodeReleasingPayload( - allocator, streamId, payload); - - receivers.put(streamId, receiver); - sendProcessor.onNext(requestResponseFrame); - } - - @Override - void hookOnCancel() { - if (receivers.remove(streamId, receiver)) { - sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); - } else { - if (this.firstRequest) { - payload.release(); - } - } - } - - @Override - public void hookOnTerminal(SignalType signalType) { - receivers.remove(streamId, receiver); - } - })) - .subscribeOn(serialScheduler) - .doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER); - }); + RequesterLeaseHandler lh = leaseHandler; + if (!lh.useLease()) { + payload.release(); + receiver.onError(lh.leaseError()); + return; + } + + int streamId = streamIdSupplier.nextStreamId(receivers); + this.streamId = streamId; + + ByteBuf requestResponseFrame = + RequestResponseFrameCodec.encodeReleasingPayload(allocator, streamId, payload); + + receivers.put(streamId, receiver); + sendProcessor.onNext(requestResponseFrame); + } + + @Override + void hookOnCancel() { + if (receivers.remove(streamId, receiver)) { + sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); + } else { + if (this.firstRequest) { + payload.release(); + } + } + } + + @Override + public void hookOnTerminal(SignalType signalType) { + receivers.remove(streamId, receiver); + } + }) + .subscribeOn(serialScheduler) + .doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER); } private Flux handleRequestStream(final Payload payload) { @@ -348,79 +333,65 @@ private Flux handleRequestStream(final Payload payload) { } final UnboundedProcessor sendProcessor = this.sendProcessor; - final UnicastProcessor receiver = UnicastProcessor.create(); - final AtomicBoolean once = new AtomicBoolean(); + final UnicastProcessor receiver = UnicastProcessor.create(Queues.one().get()); - return Flux.defer( - () -> { - if (once.getAndSet(true)) { - return Flux.error( - new IllegalStateException("RequestStreamFlux allows only a single subscriber")); - } + return Flux.from( + new RequestOperator(receiver, "RequestStreamFlux allows only a single subscriber") { - return receiver - .transform( - Operators.lift( - (s, actual) -> - new RequestOperator(actual) { - - @Override - void hookOnFirstRequest(long n) { - if (isDisposed()) { - payload.release(); - final Throwable t = terminationError; - receiver.onError(t); - return; - } - - RequesterLeaseHandler lh = leaseHandler; - if (!lh.useLease()) { - payload.release(); - receiver.onError(lh.leaseError()); - return; - } - - int streamId = streamIdSupplier.nextStreamId(receivers); - this.streamId = streamId; - - ByteBuf requestStreamFrame = - RequestStreamFrameCodec.encodeReleasingPayload( - allocator, streamId, n, payload); - - receivers.put(streamId, receiver); - - sendProcessor.onNext(requestStreamFrame); - } - - @Override - void hookOnRemainingRequests(long n) { - if (receiver.isDisposed()) { - return; - } - - sendProcessor.onNext( - RequestNFrameCodec.encode(allocator, streamId, n)); - } - - @Override - void hookOnCancel() { - if (receivers.remove(streamId, receiver)) { - sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); - } else { - if (this.firstRequest) { - payload.release(); - } - } - } - - @Override - void hookOnTerminal(SignalType signalType) { - receivers.remove(streamId); - } - })) - .subscribeOn(serialScheduler, false) - .doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER); - }); + @Override + void hookOnFirstRequest(long n) { + if (isDisposed()) { + payload.release(); + final Throwable t = terminationError; + receiver.onError(t); + return; + } + + RequesterLeaseHandler lh = leaseHandler; + if (!lh.useLease()) { + payload.release(); + receiver.onError(lh.leaseError()); + return; + } + + int streamId = streamIdSupplier.nextStreamId(receivers); + this.streamId = streamId; + + ByteBuf requestStreamFrame = + RequestStreamFrameCodec.encodeReleasingPayload(allocator, streamId, n, payload); + + receivers.put(streamId, receiver); + + sendProcessor.onNext(requestStreamFrame); + } + + @Override + void hookOnRemainingRequests(long n) { + if (receiver.isDisposed()) { + return; + } + + sendProcessor.onNext(RequestNFrameCodec.encode(allocator, streamId, n)); + } + + @Override + void hookOnCancel() { + if (receivers.remove(streamId, receiver)) { + sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); + } else { + if (this.firstRequest) { + payload.release(); + } + } + } + + @Override + void hookOnTerminal(SignalType signalType) { + receivers.remove(streamId); + } + }) + .subscribeOn(serialScheduler, false) + .doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER); } private Flux handleChannel(Flux request) { @@ -456,137 +427,135 @@ private Flux handleChannel(Flux request) { private Flux handleChannel(Payload initialPayload, Flux inboundFlux) { final UnboundedProcessor sendProcessor = this.sendProcessor; - final UnicastProcessor receiver = UnicastProcessor.create(); - - return receiver - .transform( - Operators.lift( - (s, actual) -> - new RequestOperator(actual) { - - final BaseSubscriber upstreamSubscriber = - new BaseSubscriber() { - - boolean first = true; - - @Override - protected void hookOnSubscribe(Subscription subscription) { - // noops - } - - @Override - protected void hookOnNext(Payload payload) { - if (first) { - // need to skip first since we have already sent it - // no need to release it since it was released earlier on the - // request - // establishment - // phase - first = false; - request(1); - return; - } - if (!PayloadValidationUtils.isValid(mtu, payload, maxFrameLength)) { - payload.release(); - cancel(); - final IllegalArgumentException t = - new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE); - // no need to send any errors. - sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); - receiver.onError(t); - return; - } - final ByteBuf frame = - PayloadFrameCodec.encodeNextReleasingPayload( - allocator, streamId, payload); - - sendProcessor.onNext(frame); - } - - @Override - protected void hookOnComplete() { - ByteBuf frame = PayloadFrameCodec.encodeComplete(allocator, streamId); - sendProcessor.onNext(frame); - } - - @Override - protected void hookOnError(Throwable t) { - ByteBuf frame = ErrorFrameCodec.encode(allocator, streamId, t); - sendProcessor.onNext(frame); - receiver.onError(t); - } - - @Override - protected void hookFinally(SignalType type) { - senders.remove(streamId, this); - } - }; - - @Override - void hookOnFirstRequest(long n) { - if (isDisposed()) { - initialPayload.release(); - final Throwable t = terminationError; - upstreamSubscriber.cancel(); - receiver.onError(t); - return; - } - - RequesterLeaseHandler lh = leaseHandler; - if (!lh.useLease()) { - initialPayload.release(); - receiver.onError(lh.leaseError()); - return; - } - - final int streamId = streamIdSupplier.nextStreamId(receivers); - this.streamId = streamId; - - final ByteBuf frame = - RequestChannelFrameCodec.encodeReleasingPayload( - allocator, streamId, false, n, initialPayload); - - senders.put(streamId, upstreamSubscriber); - receivers.put(streamId, receiver); - - inboundFlux - .doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER) - .subscribe(upstreamSubscriber); - - sendProcessor.onNext(frame); + final UnicastProcessor receiver = UnicastProcessor.create(Queues.one().get()); + + return Flux.from( + new RequestOperator( + receiver, "RequestStreamFlux allows only a " + "single subscriber") { + + final BaseSubscriber upstreamSubscriber = + new BaseSubscriber() { + + boolean first = true; + + @Override + protected void hookOnSubscribe(Subscription subscription) { + // noops + } + + @Override + protected void hookOnNext(Payload payload) { + if (first) { + // need to skip first since we have already sent it + // no need to release it since it was released earlier on the + // request + // establishment + // phase + first = false; + request(1); + return; } + if (!PayloadValidationUtils.isValid(mtu, payload, maxFrameLength)) { + payload.release(); + cancel(); + final IllegalArgumentException t = + new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE); + // no need to send any errors. + sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); + receiver.onError(t); + return; + } + final ByteBuf frame = + PayloadFrameCodec.encodeNextReleasingPayload( + allocator, streamId, payload); + + sendProcessor.onNext(frame); + } + + @Override + protected void hookOnComplete() { + ByteBuf frame = PayloadFrameCodec.encodeComplete(allocator, streamId); + sendProcessor.onNext(frame); + } + + @Override + protected void hookOnError(Throwable t) { + ByteBuf frame = ErrorFrameCodec.encode(allocator, streamId, t); + sendProcessor.onNext(frame); + receiver.onError(t); + } + + @Override + protected void hookFinally(SignalType type) { + senders.remove(streamId, this); + } + }; + + @Override + void hookOnFirstRequest(long n) { + if (isDisposed()) { + initialPayload.release(); + final Throwable t = terminationError; + upstreamSubscriber.cancel(); + receiver.onError(t); + return; + } - @Override - void hookOnRemainingRequests(long n) { - if (receiver.isDisposed()) { - return; - } + RequesterLeaseHandler lh = leaseHandler; + if (!lh.useLease()) { + initialPayload.release(); + receiver.onError(lh.leaseError()); + return; + } - sendProcessor.onNext(RequestNFrameCodec.encode(allocator, streamId, n)); - } + final int streamId = streamIdSupplier.nextStreamId(receivers); + this.streamId = streamId; - @Override - void hookOnCancel() { - senders.remove(streamId, upstreamSubscriber); - if (receivers.remove(streamId, receiver)) { - sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); - } - } + final ByteBuf frame = + RequestChannelFrameCodec.encodeReleasingPayload( + allocator, streamId, false, n, initialPayload); - @Override - void hookOnTerminal(SignalType signalType) { - if (signalType == SignalType.ON_ERROR) { - upstreamSubscriber.cancel(); - } - receivers.remove(streamId, receiver); - } + senders.put(streamId, upstreamSubscriber); + receivers.put(streamId, receiver); - @Override - public void cancel() { - upstreamSubscriber.cancel(); - super.cancel(); - } - })) + inboundFlux + .doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER) + .subscribe(upstreamSubscriber); + + sendProcessor.onNext(frame); + } + + @Override + void hookOnRemainingRequests(long n) { + if (receiver.isDisposed()) { + return; + } + + sendProcessor.onNext(RequestNFrameCodec.encode(allocator, streamId, n)); + } + + @Override + void hookOnCancel() { + senders.remove(streamId, upstreamSubscriber); + if (receivers.remove(streamId, receiver)) { + sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); + } + } + + @Override + void hookOnTerminal(SignalType signalType) { + if (signalType == SignalType.ON_ERROR) { + upstreamSubscriber.cancel(); + } + receivers.remove(streamId, receiver); + } + + @Override + public void cancel() { + upstreamSubscriber.cancel(); + super.cancel(); + } + }) .subscribeOn(serialScheduler, false); } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index 581605ff4..3e2c06e92 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,6 +48,7 @@ import reactor.core.Exceptions; import reactor.core.publisher.*; import reactor.util.annotation.Nullable; +import reactor.util.concurrent.Queues; /** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */ class RSocketResponder implements RSocket { @@ -537,7 +538,7 @@ protected void hookOnError(Throwable throwable) { } private void handleChannel(int streamId, Payload payload, long initialRequestN) { - UnicastProcessor frames = UnicastProcessor.create(); + UnicastProcessor frames = UnicastProcessor.create(Queues.one().get()); channelProcessors.put(streamId, frames); Flux payloads = diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java b/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java index 6123b0492..f95a5f66c 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java @@ -2,7 +2,9 @@ import io.rsocket.Payload; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import reactor.core.CorePublisher; import reactor.core.CoreSubscriber; import reactor.core.Fuseable; import reactor.core.publisher.Operators; @@ -16,9 +18,14 @@ * invocations. */ abstract class RequestOperator - implements CoreSubscriber, Fuseable.QueueSubscription { + implements CoreSubscriber, + CorePublisher, + Fuseable.QueueSubscription, + Fuseable { - final CoreSubscriber actual; + final String errorMessageOnSecondSubscription; + + CoreSubscriber actual; Subscription s; Fuseable.QueueSubscription qs; @@ -30,8 +37,25 @@ abstract class RequestOperator static final AtomicIntegerFieldUpdater WIP = AtomicIntegerFieldUpdater.newUpdater(RequestOperator.class, "wip"); - RequestOperator(CoreSubscriber actual) { - this.actual = actual; + RequestOperator(CorePublisher source, String errorMessageOnSecondSubscription) { + this.errorMessageOnSecondSubscription = errorMessageOnSecondSubscription; + source.subscribe(this); + WIP.lazySet(this, -1); + } + + @Override + public void subscribe(Subscriber actual) { + subscribe(Operators.toCoreSubscriber(actual)); + } + + @Override + public void subscribe(CoreSubscriber actual) { + if (this.wip == -1 && WIP.compareAndSet(this, -1, 0)) { + this.actual = actual; + actual.onSubscribe(this); + } else { + Operators.error(actual, new IllegalStateException(this.errorMessageOnSecondSubscription)); + } } /** @@ -129,7 +153,6 @@ public void onSubscribe(Subscription s) { if (s instanceof Fuseable.QueueSubscription) { this.qs = (Fuseable.QueueSubscription) s; } - this.actual.onSubscribe(this); } } diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java index 1e7bb337f..7320d9ade 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java @@ -158,13 +158,13 @@ public Flux requestChannel(Publisher payloads) { } @Test(timeout = 2000) - public void testStream() throws Exception { + public void testStream() { Flux responses = rule.crs.requestStream(DefaultPayload.create("Payload In")); StepVerifier.create(responses).expectNextCount(10).expectComplete().verify(); } @Test(timeout = 2000) - public void testChannel() throws Exception { + public void testChannel() { Flux requests = Flux.range(0, 10).map(i -> DefaultPayload.create("streaming in -> " + i)); Flux responses = rule.crs.requestChannel(requests);