From bc36d7bc3b8c6bcf155fccf6d046eeca1367d2ff Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 17 Jun 2021 18:22:38 +0200 Subject: [PATCH 1/3] Increment Request Before Serializing it in OutboundHandler If there are outside ways by which a request can be decremented (e.g. due to cancelling a recovery concurrently) we may run into a situation where we try to send a `refcount == 0` request. We have to avoid this by incrementing a request before serializing and decrementing after it returns to make sure we don't corrupt the request bytes while they're being serialized or sent over the wire. Closes #74253 --- .../org/elasticsearch/transport/OutboundHandler.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index a98e67856ce22..3e6da45538110 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -68,8 +68,14 @@ void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long Version version = Version.min(this.version, channelVersion); OutboundMessage.Request message = new OutboundMessage.Request(threadPool.getThreadContext(), request, version, action, requestId, isHandshake, compressRequest); - ActionListener listener = ActionListener.wrap(() -> - messageListener.onRequestSent(node, requestId, action, request, options)); + request.incRef(); + ActionListener listener = ActionListener.wrap(() -> { + try { + messageListener.onRequestSent(node, requestId, action, request, options); + } finally { + request.decRef(); + } + }); sendMessage(channel, message, listener); } From b9c89dabd45923cf090123d9b2ace9e4620b3e46 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 17 Jun 2021 19:04:20 +0200 Subject: [PATCH 2/3] don't run into released requests --- .../recovery/RemoteRecoveryTargetHandler.java | 16 ++++++++++++++-- .../elasticsearch/transport/OutboundHandler.java | 6 +++++- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index db3147c52c2d1..a1f89da448acc 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; @@ -243,8 +244,19 @@ private void executeRetryableAction(String action, @Override public void tryAction(ActionListener listener) { - transportService.sendRequest(targetNode, action, request, options, - new ActionListenerResponseHandler<>(listener, reader, ThreadPool.Names.GENERIC)); + if (request.tryIncRef()) { + transportService.sendRequest( + targetNode, + action, + request, + options, + new ActionListenerResponseHandler<>( + ActionListener.runBefore(listener, request::decRef), + reader, + ThreadPool.Names.GENERIC)); + } else { + listener.onFailure(new AlreadyClosedException("already closed")); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index 3e6da45538110..1108cb94f5c4c 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -68,7 +69,10 @@ void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long Version version = Version.min(this.version, channelVersion); OutboundMessage.Request message = new OutboundMessage.Request(threadPool.getThreadContext(), request, version, action, requestId, isHandshake, compressRequest); - request.incRef(); + if (request.tryIncRef() == false) { + assert false : "request [" + request + "] has been released already"; + throw new AlreadyClosedException("request [" + request + "] has been released already"); + } ActionListener listener = ActionListener.wrap(() -> { try { messageListener.onRequestSent(node, requestId, action, request, options); From 9c51a0c2260fb7a34fbf4851d623a02d0718658d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 17 Jun 2021 19:13:36 +0200 Subject: [PATCH 3/3] nevermind just a really dumb bug --- .../java/org/elasticsearch/transport/BytesTransportRequest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java b/server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java index c07d97b362538..b650659e63680 100644 --- a/server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java +++ b/server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java @@ -67,7 +67,7 @@ public void incRef() { @Override public boolean tryIncRef() { - return bytes.decRef(); + return bytes.tryIncRef(); } @Override