From cdf43b8efd943af755c1fa2b9c344c6afaf28f80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Thu, 14 Aug 2025 09:50:29 +0200 Subject: [PATCH] Fix back-pressure in Netty frame handler A race condition can cause to see the channel as non-writable but end up with a new count down latch, not the one that was present when the writability flag was true. So the code waits on this new latch which will be never counted down. This can happen when the writability flips from false to true very fast. This commit makes sure to get the latch first, then check the writability a second time. In case we get an "old" latch, it is counted down automatically, so the enqueuing code will not be blocked. --- .../client/impl/NettyFrameHandlerFactory.java | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java index 4426847d8..263a0c01c 100644 --- a/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java +++ b/src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java @@ -318,17 +318,27 @@ public void writeFrame(Frame frame) throws IOException { // we do not wait in the event loop this.doWriteFrame(frame); } else { - try { - boolean canWriteNow = - this.handler.writableLatch().await(enqueuingTimeout.toMillis(), MILLISECONDS); - if (canWriteNow) { - this.doWriteFrame(frame); - } else { - this.handler.logEvents(); - throw new IOException("Frame enqueuing failed"); + // we get the current latch + CountDownLatch latch = this.handler.writableLatch(); + if (this.handler.isWritable()) { + // the channel became writable + this.doWriteFrame(frame); + } else { + try { + // the channel is still non-writable + // in case its writability flipped, we have a reference to a latch that has been + // counted down + // so, worst case scenario, we'll enqueue only one frame right away + boolean canWriteNow = latch.await(enqueuingTimeout.toMillis(), MILLISECONDS); + if (canWriteNow) { + this.doWriteFrame(frame); + } else { + this.handler.logEvents(); + throw new IOException("Frame enqueuing failed"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); } } }