Skip to content

Commit af5ca83

Browse files
Increment Request Before Serializing it in OutboundHandler (#74256)
If there are outside ways by which a request can be decremented (e.g. due to cancelling a recovery concurrently) we may run into a situation where we try to send a `refcount == 0` request. We have to avoid this by incrementing a request before serializing and decrementing after it returns to make sure we don't corrupt the request bytes while they're being serialized or sent over the wire. Closes #74253
1 parent 1eacdcb commit af5ca83

File tree

3 files changed

+27
-5
lines changed

3 files changed

+27
-5
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13+
import org.apache.lucene.store.AlreadyClosedException;
1314
import org.apache.lucene.store.RateLimiter;
1415
import org.elasticsearch.ElasticsearchException;
1516
import org.elasticsearch.ExceptionsHelper;
@@ -243,8 +244,19 @@ private <T extends TransportResponse> void executeRetryableAction(String action,
243244

244245
@Override
245246
public void tryAction(ActionListener<T> listener) {
246-
transportService.sendRequest(targetNode, action, request, options,
247-
new ActionListenerResponseHandler<>(listener, reader, ThreadPool.Names.GENERIC));
247+
if (request.tryIncRef()) {
248+
transportService.sendRequest(
249+
targetNode,
250+
action,
251+
request,
252+
options,
253+
new ActionListenerResponseHandler<>(
254+
ActionListener.runBefore(listener, request::decRef),
255+
reader,
256+
ThreadPool.Names.GENERIC));
257+
} else {
258+
listener.onFailure(new AlreadyClosedException("already closed"));
259+
}
248260
}
249261

250262
@Override

server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void incRef() {
6767

6868
@Override
6969
public boolean tryIncRef() {
70-
return bytes.decRef();
70+
return bytes.tryIncRef();
7171
}
7272

7373
@Override

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
14+
import org.apache.lucene.store.AlreadyClosedException;
1415
import org.elasticsearch.Version;
1516
import org.elasticsearch.action.ActionListener;
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -68,8 +69,17 @@ void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long
6869
Version version = Version.min(this.version, channelVersion);
6970
OutboundMessage.Request message =
7071
new OutboundMessage.Request(threadPool.getThreadContext(), request, version, action, requestId, isHandshake, compressRequest);
71-
ActionListener<Void> listener = ActionListener.wrap(() ->
72-
messageListener.onRequestSent(node, requestId, action, request, options));
72+
if (request.tryIncRef() == false) {
73+
assert false : "request [" + request + "] has been released already";
74+
throw new AlreadyClosedException("request [" + request + "] has been released already");
75+
}
76+
ActionListener<Void> listener = ActionListener.wrap(() -> {
77+
try {
78+
messageListener.onRequestSent(node, requestId, action, request, options);
79+
} finally {
80+
request.decRef();
81+
}
82+
});
7383
sendMessage(channel, message, listener);
7484
}
7585

0 commit comments

Comments
 (0)