From bd7b674d070921d2e99f9408c6d6c9626b1f4caa Mon Sep 17 00:00:00 2001 From: Tomas Kolda Date: Wed, 3 Mar 2021 21:43:45 +0100 Subject: [PATCH] Fix performance degradation when fragmentation is used (#994) Signed-off-by: Tomas Kolda --- .../FragmentationDuplexConnection.java | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java index 6eebd676c..84338d1df 100644 --- a/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java @@ -84,7 +84,18 @@ public static int assertMtu(int mtu) { @Override public Mono send(Publisher frames) { - return Flux.from(frames).concatMap(this::sendOne).then(); + return delegate.send( + Flux.from(frames) + .concatMap( + frame -> { + FrameType frameType = FrameHeaderCodec.frameType(frame); + int readableBytes = frame.readableBytes(); + if (!shouldFragment(frameType, readableBytes)) { + return Flux.just(frame); + } + + return logFragments(Flux.from(fragmentFrame(alloc(), mtu, frame, frameType))); + })); } @Override @@ -95,6 +106,11 @@ public Mono sendOne(ByteBuf frame) { return delegate.sendOne(frame); } Flux fragments = Flux.from(fragmentFrame(alloc(), mtu, frame, frameType)); + fragments = logFragments(fragments); + return delegate.send(fragments); + } + + protected Flux logFragments(Flux fragments) { if (logger.isDebugEnabled()) { fragments = fragments.doOnNext( @@ -107,6 +123,6 @@ public Mono sendOne(ByteBuf frame) { ByteBufUtil.prettyHexDump(byteBuf)); }); } - return delegate.send(fragments); + return fragments; } }