-
Notifications
You must be signed in to change notification settings - Fork 357
Closed
Labels
Description
Rogue clients may not respect requestN from server, send frames firehose which are queued by receivers - UnicastProcessors, backed by unbounded queue.
Affects server Responder
channel; also Requester
stream & channel - if server sends requests to client.
Reproducible with test below, It roughly models system that processes batches of messages asynchronously, calls new batch with requestN once current one is completed
@Test
void serverOom() {
EmitterProcessor<Void> completed = EmitterProcessor.create();
TestServerTransport testTransport = new TestServerTransport();
Closeable server =
RSocketServer.create()
.acceptor(
(setup, sendingSocket) ->
Mono.just(
new RSocket() {
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
Flux.from(payloads)
.doFinally(s -> completed.onComplete())
.subscribe(new BatchProcessingSubscriber());
return completed.cast(Payload.class);
}
}))
.bind(testTransport)
.block();
byte[] bytes = bytes(1_000_000);
TestDuplexConnection testDuplexConnection = testTransport.connect();
/*setup*/
testDuplexConnection.addToReceivedBuffer(
SetupFrameCodec.encode(
ByteBufAllocator.DEFAULT,
false,
1111111,
1111111,
Unpooled.EMPTY_BUFFER,
"application/binary",
"application/binary",
EmptyPayload.INSTANCE));
/*request-channel which sends many frames regardless received requestN*/
testDuplexConnection.addToReceivedBuffer(
RequestChannelFrameCodec.encode(
ByteBufAllocator.DEFAULT,
1,
false,
false,
1111111,
Unpooled.EMPTY_BUFFER,
Unpooled.EMPTY_BUFFER));
int framesCount = 8000;
for (int i = 0; i < framesCount; i++) {
boolean complete = i == framesCount - 1;
testDuplexConnection.addToReceivedBuffer(
PayloadFrameCodec.encode(
ByteBufAllocator.DEFAULT,
1,
false,
complete,
true,
Unpooled.EMPTY_BUFFER,
Unpooled.wrappedBuffer(bytes)));
}
completed
.mergeWith(testDuplexConnection.onClose())
.as(StepVerifier::create)
.expectComplete()
.verify(Duration.ofSeconds(30));
}
private static final Random random = new Random();
static byte[] bytes(int size) {
byte[] data = new byte[size];
random.nextBytes(data);
return data;
}
private static class BatchProcessingSubscriber implements Subscriber<Payload>, Runnable {
static final int BATCH_SIZE = 8;
final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
volatile Subscription s;
int received;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
/*request initial batch*/
s.request(BATCH_SIZE);
}
@Override
public void onNext(Payload payload) {
/*emulate batch processing: request next batch once current is finished*/
payload.release();
if (++received == BATCH_SIZE) {
received = 0;
executorService.schedule(this, 1_000, TimeUnit.MILLISECONDS);
}
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
@Override
public void run() {
s.request(BATCH_SIZE);
}
}
Test fails with error
Caused by: java.lang.OutOfMemoryError: Direct buffer memory at
java.base/java.nio.Bits.reserveMemory(Bits.java:175) at
java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) at
java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) at
io.rsocket.frame.decoder.DefaultPayloadDecoder.apply(DefaultPayloadDecoder.java:55) at
io.rsocket.frame.decoder.DefaultPayloadDecoder.apply(DefaultPayloadDecoder.java:18) at
io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:332) at
reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) at
reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) at
reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) at
reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) at
reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:670) at
reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205) at
reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112) at
reactor.core.publisher.DirectProcessor$DirectInner.onNext(DirectProcessor.java:333) at
reactor.core.publisher.DirectProcessor.onNext(DirectProcessor.java:142) at
reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:618) at
reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153) at
io.rsocket.test.util.TestDuplexConnection.addToReceivedBuffer(TestDuplexConnection.java:132) at
io.rsocket.ServerOomTest.serverOom(ServerOomTest.java:86)