Skip to content

Commit b138d60

Browse files
Fix memory/breaker leaks for outbound responses (#76474)
Outbound responses would not get the expected `decRef`, resulting in memory and/or circuit breaker leaks. In particular, the `GetCcrRestoreFileChunkResponse` expects this, causing a leak when a follower bootstraps. Relates #65921
1 parent 04dd273 commit b138d60

File tree

3 files changed

+11
-1
lines changed

3 files changed

+11
-1
lines changed

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,13 @@ void sendResponse(final Version nodeVersion, final TcpChannel channel, final lon
9595
Version version = Version.min(this.version, nodeVersion);
9696
OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), response, version,
9797
requestId, isHandshake, compressionScheme);
98-
ActionListener<Void> listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, response));
98+
ActionListener<Void> listener = ActionListener.wrap(() -> {
99+
try {
100+
messageListener.onResponseSent(requestId, action, response);
101+
} finally {
102+
response.decRef();
103+
}
104+
});
99105
sendMessage(channel, message, listener);
100106
}
101107

server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public T read(StreamInput in) throws IOException {
8181
@Override
8282
public void handleResponse(T response) {
8383
try {
84+
response.incRef();
8485
channel.sendResponse(response);
8586
} catch (IOException e) {
8687
throw new UncheckedIOException(e);

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,9 @@ public void afterTest() throws Exception {
259259
clusterGroup.leaderCluster.wipe(Collections.emptySet());
260260
clusterGroup.followerCluster.wipe(Collections.emptySet());
261261
}
262+
263+
clusterGroup.leaderCluster.assertAfterTest();
264+
clusterGroup.followerCluster.assertAfterTest();
262265
}
263266

264267
private NodeConfigurationSource createNodeConfigurationSource(final String leaderSeedAddress, final boolean leaderCluster) {

0 commit comments

Comments
 (0)