Skip to content

Commit 3a7d15b

Browse files
committed
Fix TransportResponse reference counting in DirectResponseChannel (elastic#91289)
In elastic#76474 we fixed a circuit breaker leak in TransportActionProxy by incrementing a reference on the TransportResponse that is later decremented by the OutboundHandler. This works well for all cases except when the request targets the node which is also the proxy node. In that case the reference is incremented but will never be decremented as the local execution (using TransportService#localNodeConnection and DirectResponseChannel) bypasses the OutboundHandler. This change fixes the ref counting by also decrementing the TransportResponse in DirectResponseChannel. This will also have the consequence to correctly decrement used bytes of the request circuit breaker when GetCcrRestoreFileChunkResponse are executed on a node that is also a proxy node.
1 parent 297a0e2 commit 3a7d15b

File tree

3 files changed

+132
-29
lines changed

3 files changed

+132
-29
lines changed

docs/changelog/91289.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 91289
2+
summary: Fix `TransportActionProxy` for local execution
3+
area: Network
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1411,33 +1411,43 @@ public String getProfileName() {
14111411

14121412
@Override
14131413
public void sendResponse(TransportResponse response) throws IOException {
1414-
service.onResponseSent(requestId, action, response);
1415-
try (var shutdownBlock = service.pendingDirectHandlers.withRef()) {
1416-
if (shutdownBlock == null) {
1417-
// already shutting down, the handler will be completed by sendRequestInternal or doStop
1418-
return;
1419-
}
1420-
final TransportResponseHandler<?> handler = service.responseHandlers.onResponseReceived(requestId, service);
1421-
if (handler == null) {
1422-
// handler already completed, likely by a timeout which is logged elsewhere
1423-
return;
1424-
}
1425-
final String executor = handler.executor();
1426-
if (ThreadPool.Names.SAME.equals(executor)) {
1427-
processResponse(handler, response);
1428-
} else {
1429-
threadPool.executor(executor).execute(new ForkingResponseHandlerRunnable(handler, null) {
1430-
@Override
1431-
protected void doRun() {
1432-
processResponse(handler, response);
1433-
}
1414+
try {
1415+
service.onResponseSent(requestId, action, response);
1416+
try (var shutdownBlock = service.pendingDirectHandlers.withRef()) {
1417+
if (shutdownBlock == null) {
1418+
// already shutting down, the handler will be completed by sendRequestInternal or doStop
1419+
return;
1420+
}
1421+
final TransportResponseHandler<?> handler = service.responseHandlers.onResponseReceived(requestId, service);
1422+
if (handler == null) {
1423+
// handler already completed, likely by a timeout which is logged elsewhere
1424+
return;
1425+
}
1426+
final String executor = handler.executor();
1427+
if (ThreadPool.Names.SAME.equals(executor)) {
1428+
processResponse(handler, response);
1429+
} else {
1430+
response.incRef();
1431+
threadPool.executor(executor).execute(new ForkingResponseHandlerRunnable(handler, null) {
1432+
@Override
1433+
protected void doRun() {
1434+
processResponse(handler, response);
1435+
}
14341436

1435-
@Override
1436-
public String toString() {
1437-
return "delivery of response to [" + requestId + "][" + action + "]: " + response;
1438-
}
1439-
});
1437+
@Override
1438+
public void onAfter() {
1439+
response.decRef();
1440+
}
1441+
1442+
@Override
1443+
public String toString() {
1444+
return "delivery of response to [" + requestId + "][" + action + "]: " + response;
1445+
}
1446+
});
1447+
}
14401448
}
1449+
} finally {
1450+
response.decRef();
14411451
}
14421452
}
14431453

server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
import org.elasticsearch.common.io.stream.StreamInput;
1515
import org.elasticsearch.common.io.stream.StreamOutput;
1616
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.core.AbstractRefCounted;
1718
import org.elasticsearch.core.IOUtils;
19+
import org.elasticsearch.core.RefCounted;
1820
import org.elasticsearch.tasks.CancellableTask;
1921
import org.elasticsearch.tasks.Task;
2022
import org.elasticsearch.tasks.TaskId;
@@ -27,8 +29,10 @@
2729
import java.io.IOException;
2830
import java.util.Map;
2931
import java.util.concurrent.CountDownLatch;
32+
import java.util.concurrent.atomic.AtomicReference;
3033

3134
import static org.hamcrest.Matchers.equalTo;
35+
import static org.hamcrest.Matchers.notNullValue;
3236

3337
public class TransportActionProxyTests extends ESTestCase {
3438
protected ThreadPool threadPool;
@@ -76,8 +80,9 @@ private MockTransportService buildService(final Version version) {
7680
public void testSendMessage() throws InterruptedException {
7781
serviceA.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new, (request, channel, task) -> {
7882
assertEquals(request.sourceNode, "TS_A");
79-
SimpleTestResponse response = new SimpleTestResponse("TS_A");
83+
final SimpleTestResponse response = new SimpleTestResponse("TS_A");
8084
channel.sendResponse(response);
85+
assertThat(response.hasReferences(), equalTo(false));
8186
});
8287
final boolean cancellable = randomBoolean();
8388
TransportActionProxy.registerProxyAction(serviceA, "internal:test", cancellable, SimpleTestResponse::new);
@@ -86,21 +91,24 @@ public void testSendMessage() throws InterruptedException {
8691
serviceB.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new, (request, channel, task) -> {
8792
assertThat(task instanceof CancellableTask, equalTo(cancellable));
8893
assertEquals(request.sourceNode, "TS_A");
89-
SimpleTestResponse response = new SimpleTestResponse("TS_B");
94+
final SimpleTestResponse response = new SimpleTestResponse("TS_B");
9095
channel.sendResponse(response);
96+
assertThat(response.hasReferences(), equalTo(false));
9197
});
9298
TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new);
9399
AbstractSimpleTransportTestCase.connectToNode(serviceB, nodeC);
94100
serviceC.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new, (request, channel, task) -> {
95101
assertThat(task instanceof CancellableTask, equalTo(cancellable));
96102
assertEquals(request.sourceNode, "TS_A");
97-
SimpleTestResponse response = new SimpleTestResponse("TS_C");
103+
final SimpleTestResponse response = new SimpleTestResponse("TS_C");
98104
channel.sendResponse(response);
105+
assertThat(response.hasReferences(), equalTo(false));
99106
});
100107

101108
TransportActionProxy.registerProxyAction(serviceC, "internal:test", cancellable, SimpleTestResponse::new);
102109

103-
CountDownLatch latch = new CountDownLatch(1);
110+
final CountDownLatch latch = new CountDownLatch(1);
111+
// Node A -> Node B -> Node C
104112
serviceA.sendRequest(
105113
nodeB,
106114
TransportActionProxy.getProxyAction("internal:test"),
@@ -133,6 +141,61 @@ public void handleException(TransportException exp) {
133141
latch.await();
134142
}
135143

144+
public void testSendLocalRequest() throws InterruptedException {
145+
final AtomicReference<SimpleTestResponse> response = new AtomicReference<>();
146+
final boolean cancellable = randomBoolean();
147+
serviceB.registerRequestHandler(
148+
"internal:test",
149+
randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
150+
SimpleTestRequest::new,
151+
(request, channel, task) -> {
152+
assertThat(task instanceof CancellableTask, equalTo(cancellable));
153+
assertEquals(request.sourceNode, "TS_A");
154+
final SimpleTestResponse responseB = new SimpleTestResponse("TS_B");
155+
channel.sendResponse(responseB);
156+
response.set(responseB);
157+
}
158+
);
159+
TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new);
160+
AbstractSimpleTransportTestCase.connectToNode(serviceA, nodeB);
161+
162+
final CountDownLatch latch = new CountDownLatch(1);
163+
// Node A -> Proxy Node B (Local execution)
164+
serviceA.sendRequest(
165+
nodeB,
166+
TransportActionProxy.getProxyAction("internal:test"),
167+
TransportActionProxy.wrapRequest(nodeB, new SimpleTestRequest("TS_A", cancellable)), // Request
168+
new TransportResponseHandler<SimpleTestResponse>() {
169+
@Override
170+
public SimpleTestResponse read(StreamInput in) throws IOException {
171+
return new SimpleTestResponse(in);
172+
}
173+
174+
@Override
175+
public void handleResponse(SimpleTestResponse response) {
176+
try {
177+
assertEquals("TS_B", response.targetNode);
178+
} finally {
179+
latch.countDown();
180+
}
181+
}
182+
183+
@Override
184+
public void handleException(TransportException exp) {
185+
try {
186+
throw new AssertionError(exp);
187+
} finally {
188+
latch.countDown();
189+
}
190+
}
191+
}
192+
);
193+
latch.await();
194+
195+
assertThat(response.get(), notNullValue());
196+
assertThat(response.get().hasReferences(), equalTo(false));
197+
}
198+
136199
public void testException() throws InterruptedException {
137200
boolean cancellable = randomBoolean();
138201
serviceA.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new, (request, channel, task) -> {
@@ -230,7 +293,12 @@ public boolean shouldCancelChildrenOnCancellation() {
230293
}
231294

232295
public static class SimpleTestResponse extends TransportResponse {
296+
233297
final String targetNode;
298+
final RefCounted refCounted = new AbstractRefCounted() {
299+
@Override
300+
protected void closeInternal() {}
301+
};
234302

235303
SimpleTestResponse(String targetNode) {
236304
this.targetNode = targetNode;
@@ -245,6 +313,26 @@ public static class SimpleTestResponse extends TransportResponse {
245313
public void writeTo(StreamOutput out) throws IOException {
246314
out.writeString(targetNode);
247315
}
316+
317+
@Override
318+
public void incRef() {
319+
refCounted.incRef();
320+
}
321+
322+
@Override
323+
public boolean tryIncRef() {
324+
return refCounted.tryIncRef();
325+
}
326+
327+
@Override
328+
public boolean decRef() {
329+
return refCounted.decRef();
330+
}
331+
332+
@Override
333+
public boolean hasReferences() {
334+
return refCounted.hasReferences();
335+
}
248336
}
249337

250338
public void testGetAction() {

0 commit comments

Comments
 (0)