-
Notifications
You must be signed in to change notification settings - Fork 357
Closed
Description
We are experiencing a memory leak that we could reduce to a simple example with RSocket. It only occurs when using the resume functionality and seems to somehow be tied to the keepAlive mechanism.
Expected Behavior
No memory leak.
Actual Behavior
We are seeing a lot of these log messages:
[reactor-tcp-nio-3] ERROR i.n.u.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
#1:
io.netty.buffer.CompositeByteBuf$Component.free(CompositeByteBuf.java:1941)
io.netty.buffer.CompositeByteBuf.deallocate(CompositeByteBuf.java:2246)
io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110)
io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
io.netty.buffer.WrappedCompositeByteBuf.release(WrappedCompositeByteBuf.java:43)
io.netty.buffer.SimpleLeakAwareCompositeByteBuf.release(SimpleLeakAwareCompositeByteBuf.java:38)
io.netty.buffer.AdvancedLeakAwareCompositeByteBuf.release(AdvancedLeakAwareCompositeByteBuf.java:1029)
io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
io.netty.util.ReferenceCountUtil.safeRelease(ReferenceCountUtil.java:113)
io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:271)
io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352)
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:431)
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:898)
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
reactor.netty.channel.MonoSendMany$SendManyInner$AsyncFlush.run(MonoSendMany.java:758)
io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.base/java.lang.Thread.run(Thread.java:832)
#2:
...
We also see the of-heap memory usage constantly increase, up to the point were we get OutOfMemoryErrors:
18-05-2021 15:57:24.816 [parallel-1] ERROR r.c.s.Schedulers - Scheduler worker in group main failed with an uncaught exception
java.lang.OutOfMemoryError: Cannot reserve 16777216 bytes of direct buffer memory (allocated: 100679681, limit: 104857600)
at java.base/java.nio.Bits.reserveMemory(Bits.java:178)
at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:120)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:329)
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:645)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:621)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:204)
at io.netty.buffer.PoolArena.tcacheAllocateSmall(PoolArena.java:174)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:136)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:128)
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:378)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:173)
at io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:107)
at io.rsocket.frame.FrameHeaderCodec.encode(FrameHeaderCodec.java:56)
at io.rsocket.frame.FrameHeaderCodec.encodeStreamZero(FrameHeaderCodec.java:45)
at io.rsocket.frame.KeepAliveFrameCodec.encode(KeepAliveFrameCodec.java:23)
at io.rsocket.keepalive.KeepAliveSupport$ClientKeepAliveSupport.onIntervalTick(KeepAliveSupport.java:154)
at io.rsocket.keepalive.KeepAliveSupport.lambda$start$0(KeepAliveSupport.java:63)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:124)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:305)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
Steps to Reproduce
We reduced the problem to a very basic setup (NOTE the important VM parameters below!):
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.core.RSocketServer;
import io.rsocket.core.Resume;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import reactor.core.publisher.Mono;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpServer;
import reactor.util.retry.Retry;
import java.time.Duration;
class KeepAliveMemoryLeak2 {
public static void main(String[] args) throws InterruptedException {
createRawRSocketServer(9876);
createRawRSocketClient(9876);
Thread.sleep(1000_000_000);
}
private static void createRawRSocketServer(int port) {
TcpServer tcpServer = TcpServer.create()
.host("localhost")
.port(port);
RSocketServer.create((setup, sendingRSocket) -> Mono.just(new RSocket() {
}))
.resume(createResume())
.bind(TcpServerTransport.create(tcpServer))
.block();
}
private static void createRawRSocketClient(int port) {
TcpClient tcpClient = TcpClient.create()
.host("localhost")
.port(port);
RSocketConnector.create()
.keepAlive(Duration.ofMillis(10), Duration.ofMillis(5000))
.resume(createResume())
.connect(TcpClientTransport.create(tcpClient))
.subscribe();
}
private static Resume createResume() {
return new Resume()
.sessionDuration(Duration.ofSeconds(30))
.retry(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1)));
}
}
Notes:
- As you can see, we set the keepAlive to a ridiculously small interval. This is intentional. The more keepAlives sent, the more memory leak logs we see.
- In order to see more leak logs, set the VM option
-Dio.netty.leakDetection.level=PARANOID
- In order to see the OutOfMemoryError, set
-Xmx100M
or a similar value. Will still take 15min on our machine.
Possible Solution
No suggestion from our side. I hope you can figure it out using our example.
Your Environment
- RSocket version(s) used: 1.1.0
- Netty: 4.1.52.Final
- JVM: 14 / 16
- OS: MacOS, Alpine