-
Notifications
You must be signed in to change notification settings - Fork 357
Closed
Description
Hi, RSocket maintainers!
We noticed that in some cases we observe NPEs on cancelling streams. E.g.
java.lang.NullPointerException
at io.rsocket.util.ByteBufPayload.sliceData(ByteBufPayload.java:149)
at io.rsocket.RSocketRequester$3$1.hookOnNext(RSocketRequester.java:351)
at io.rsocket.RSocketRequester$3$1.hookOnNext(RSocketRequester.java:333)
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
at io.rsocket.internal.RateLimitableRequestPublisher$InnerOperator.onNext(RateLimitableRequestPublisher.java:173)
at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:99)
at io.rsocket.internal.RateLimitableRequestPublisher.requestN(RateLimitableRequestPublisher.java:124)
at io.rsocket.internal.RateLimitableRequestPublisher.access$400(RateLimitableRequestPublisher.java:29)
at io.rsocket.internal.RateLimitableRequestPublisher$InnerOperator.onSubscribe(RateLimitableRequestPublisher.java:167)
at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:70)
at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
at io.rsocket.internal.RateLimitableRequestPublisher.subscribe(RateLimitableRequestPublisher.java:74)
at io.rsocket.RSocketRequester$3.accept(RSocketRequester.java:332)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:131)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:137)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:137)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.request(FluxDoFinally.java:150)
at reactor.core.publisher.FluxCancelOn$CancelSubscriber.request(FluxCancelOn.java:111)
at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:119)
at reactor.core.publisher.FluxCancelOn$CancelSubscriber.onSubscribe(FluxCancelOn.java:70)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onSubscribe(FluxDoFinally.java:117)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:171)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:171)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:171)
at reactor.core.publisher.UnicastProcessor.subscribe(UnicastProcessor.java:422)
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:53)
at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
at reactor.core.publisher.Flux.subscribe(Flux.java:8134)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8298)
at reactor.core.publisher.Flux.subscribe(Flux.java:8105)
at reactor.core.publisher.Flux.subscribe(Flux.java:8032)
at reactor.core.publisher.Flux.subscribe(Flux.java:7950)
Recently, I found a way to trigger this by subscribing on a cancelled stream twice, here's a reproducer.
@Test
public void testCancellation() {
TcpServerTransport serverTransport = TcpServerTransport.create("localhost", 0);
CloseableChannel server =
RSocketFactory.receive()
.acceptor((setup, rsocket) -> Mono.just(new EchoRSocket()))
.transport(serverTransport)
.start()
.block(Duration.ofSeconds(10));
TcpClientTransport transport = TcpClientTransport.create(server.address());
RSocket client =
RSocketFactory.connect().transport(transport).start().block(Duration.ofSeconds(10));
Flux<Payload> channel = client.requestChannel(Flux.just(ByteBufPayload.create("foo")));
Flux<Payload> channel = client.requestChannel(Flux.just(ByteBufPayload.create("foo")));
channel.subscribe().dispose();
// operates on a released payload
channel.subscribe();
}
However, I'm not entirely sure how a patch for this would look like. IIUC, cancelling Flux
requests all elements, and the same payload gets processed twice after being released. However, maybe it's expected behaviour since the stream is cancelled.