-
Notifications
You must be signed in to change notification settings - Fork 536
fix(network): fix potential ByteBuf LEAK in fetch #3027
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Robin Han <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR addresses potential ByteBuf memory leaks in the fetch request handling by ensuring proper resource cleanup when requests fail or connections are closed.
Key changes:
- Adds resource release logic for fetch responses when fetch session epoch validation fails
- Implements a
removeChannelContextmethod to properly clean up resources when connections are closed - Moves the
MAX_INFLIGHT_REQUESTS_PER_CONNECTIONconstant from config toSocketServerobject for better code organization
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java | Removes the MAX_INFLIGHT_REQUESTS_PER_CONNECTION constant from config class |
| core/src/main/scala/kafka/server/FetchSession.scala | Adds resource cleanup for pooled fetch records when session epoch validation fails |
| core/src/main/scala/kafka/network/SocketServer.scala | Moves constant definition, adds removeChannelContext method for proper resource cleanup, and updates references to use the new constant location |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| updates.forEach((_, response) => { | ||
| response.records() match { | ||
| case r: PooledResource => | ||
| r.release() |
Copilot
AI
Nov 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, the resource release in the fetch session error handling lacks exception handling. If r.release() throws an exception for one record, subsequent records won't be released, leading to potential resource leaks. Consider adding try-catch around the release operation to ensure all resources are attempted to be released.
| r.release() | |
| try { | |
| r.release() | |
| } catch { | |
| case ex: Throwable => | |
| info(s"Exception while releasing PooledResource in fetch session error handling: ${ex.getMessage}", ex) | |
| } |
| val ListenerReconfigurableConfigs: Set[String] = Set(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG) | ||
|
|
||
| // AutoMQ inject start | ||
| val MaxInflightRequestsPerConnection = 64; |
Copilot
AI
Nov 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semicolon at the end of this line is inconsistent with Scala conventions. Scala doesn't require semicolons at the end of statements, and they are typically omitted unless multiple statements are on the same line.
| val MaxInflightRequestsPerConnection = 64; | |
| val MaxInflightRequestsPerConnection = 64 |
| channelContext.synchronized { | ||
| channelContext.nextCorrelationId.clear() | ||
| channelContext.responses.forEach((_, response) => { | ||
| response match { | ||
| case sendResponse: SendResponse => | ||
| sendResponse.responseSend.release() | ||
| case _ => | ||
| } | ||
| }) | ||
| channelContext.responses.clear() | ||
| } |
Copilot
AI
Nov 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the removeChannelContext method, both clear() calls at lines 1456 and 1464 are executed inside a synchronized block on channelContext. However, the forEach iteration at lines 1457-1463 is also inside this synchronized block, which could potentially cause a deadlock or blocking issue if the release() method on responseSend acquires other locks or performs time-consuming operations. Consider releasing resources outside the synchronized block or ensure that the release() method doesn't acquire locks that could cause contention.
| channelContext.synchronized { | |
| channelContext.nextCorrelationId.clear() | |
| channelContext.responses.forEach((_, response) => { | |
| response match { | |
| case sendResponse: SendResponse => | |
| sendResponse.responseSend.release() | |
| case _ => | |
| } | |
| }) | |
| channelContext.responses.clear() | |
| } | |
| // Collect SendResponse objects to release outside the synchronized block | |
| val sendResponses = new ArrayBuffer[SendResponse]() | |
| channelContext.synchronized { | |
| channelContext.nextCorrelationId.clear() | |
| channelContext.responses.forEach((_, response) => { | |
| response match { | |
| case sendResponse: SendResponse => | |
| sendResponses += sendResponse | |
| case _ => | |
| } | |
| }) | |
| channelContext.responses.clear() | |
| } | |
| // Release resources outside the synchronized block | |
| sendResponses.foreach(_.responseSend.release()) |
| channelContext.responses.forEach((_, response) => { | ||
| response match { | ||
| case sendResponse: SendResponse => | ||
| sendResponse.responseSend.release() |
Copilot
AI
Nov 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error handling in removeChannelContext doesn't account for potential exceptions that could be thrown during the release() call. If responseSend.release() throws an exception, it could leave the channelContext in an inconsistent state with some responses released and others not. Consider wrapping the release operation in a try-catch block to ensure all resources are attempted to be released even if some fail.
| sendResponse.responseSend.release() | |
| try { | |
| sendResponse.responseSend.release() | |
| } catch { | |
| case e: Throwable => | |
| error(s"Error releasing responseSend for connection $connectionId", e) | |
| } |
No description provided.