diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index 9d8c1b2ecbeb9..5da5fd425b666 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -109,7 +109,13 @@ void sendResponse(final Version nodeVersion, final TcpChannel channel, final lon } OutboundMessage.Response message = new OutboundMessage.Response(threadPool.getThreadContext(), response, version, requestId, isHandshake, compressionScheme); - ActionListener listener = ActionListener.wrap(() -> messageListener.onResponseSent(requestId, action, response)); + ActionListener listener = ActionListener.wrap(() -> { + try { + messageListener.onResponseSent(requestId, action, response); + } finally { + response.decRef(); + } + }); sendMessage(channel, message, listener); } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index 68bb590a7df9c..a7ae4537b1f14 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -81,6 +81,7 @@ public T read(StreamInput in) throws IOException { @Override public void handleResponse(T response) { try { + response.incRef(); channel.sendResponse(response); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 5f2f34d69d8c3..ecaf82e8e1104 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -259,6 +259,9 @@ public void afterTest() throws Exception { clusterGroup.leaderCluster.wipe(Collections.emptySet()); clusterGroup.followerCluster.wipe(Collections.emptySet()); } + + clusterGroup.leaderCluster.assertAfterTest(); + clusterGroup.followerCluster.assertAfterTest(); } private NodeConfigurationSource createNodeConfigurationSource(final String leaderSeedAddress, final boolean leaderCluster) {