Skip to content

Commit 91e2bb1

Browse files
committed
TransportClient: Ensure netty I/O thread is not blocked
Whenever a transport client executes a request, it uses a built-in RetryListener which tries to execute the request on another node. However, if a connection error occurs, the onFailure() callback of the listener is triggered, the netty I/O thread might still be used to whatever failure has been added. This commit offloads the onFailure handling to the generic thread pool.
1 parent fe331b5 commit 91e2bb1

File tree

1 file changed

+33
-7
lines changed

1 file changed

+33
-7
lines changed

src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@
3939
import org.elasticsearch.cluster.node.DiscoveryNode;
4040
import org.elasticsearch.common.component.AbstractComponent;
4141
import org.elasticsearch.common.inject.Inject;
42+
import org.elasticsearch.common.logging.ESLogger;
4243
import org.elasticsearch.common.settings.Settings;
4344
import org.elasticsearch.common.transport.TransportAddress;
4445
import org.elasticsearch.common.unit.TimeValue;
46+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4547
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
4648
import org.elasticsearch.common.util.concurrent.FutureUtils;
4749
import org.elasticsearch.threadpool.ThreadPool;
@@ -198,7 +200,7 @@ public <Response> void execute(NodeListenerCallback<Response> callback, ActionLi
198200
ImmutableList<DiscoveryNode> nodes = this.nodes;
199201
ensureNodesAreAvailable(nodes);
200202
int index = getNodeNumber();
201-
RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index);
203+
RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index, threadPool, logger);
202204
DiscoveryNode node = nodes.get((index) % nodes.size());
203205
try {
204206
callback.doWithNode(node, retryListener);
@@ -212,15 +214,20 @@ public static class RetryListener<Response> implements ActionListener<Response>
212214
private final NodeListenerCallback<Response> callback;
213215
private final ActionListener<Response> listener;
214216
private final ImmutableList<DiscoveryNode> nodes;
217+
private final ESLogger logger;
215218
private final int index;
219+
private ThreadPool threadPool;
216220

217221
private volatile int i;
218222

219-
public RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener, ImmutableList<DiscoveryNode> nodes, int index) {
223+
public RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener, ImmutableList<DiscoveryNode> nodes,
224+
int index, ThreadPool threadPool, ESLogger logger) {
220225
this.callback = callback;
221226
this.listener = listener;
222227
this.nodes = nodes;
223228
this.index = index;
229+
this.threadPool = threadPool;
230+
this.logger = logger;
224231
}
225232

226233
@Override
@@ -233,19 +240,38 @@ public void onFailure(Throwable e) {
233240
if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) {
234241
int i = ++this.i;
235242
if (i >= nodes.size()) {
236-
listener.onFailure(new NoNodeAvailableException("None of the configured nodes were available: " + nodes, e));
243+
runFailureInListenerThreadPool(new NoNodeAvailableException("None of the configured nodes were available: " + nodes, e));
237244
} else {
238245
try {
239246
callback.doWithNode(nodes.get((index + i) % nodes.size()), this);
240-
} catch(Throwable t) {
241-
//this exception can't come from the TransportService as it doesn't throw exceptions at all
242-
listener.onFailure(t);
247+
} catch(final Throwable t) {
248+
// this exception can't come from the TransportService as it doesn't throw exceptions at all
249+
runFailureInListenerThreadPool(t);
243250
}
244251
}
245252
} else {
246-
listener.onFailure(e);
253+
runFailureInListenerThreadPool(e);
247254
}
248255
}
256+
257+
// need to ensure to not block the netty I/O thread, in case of retry due to the node sampling
258+
private void runFailureInListenerThreadPool(final Throwable t) {
259+
threadPool.executor(ThreadPool.Names.LISTENER).execute(new AbstractRunnable() {
260+
@Override
261+
protected void doRun() throws Exception {
262+
listener.onFailure(t);
263+
}
264+
265+
@Override
266+
public void onFailure(Throwable t) {
267+
if (logger.isDebugEnabled()) {
268+
logger.debug("Could not execute failure listener: [{}]", t, t.getMessage());
269+
} else {
270+
logger.error("Could not execute failure listener: [{}]", t.getMessage());
271+
}
272+
}
273+
});
274+
}
249275
}
250276

251277
public void close() {

0 commit comments

Comments
 (0)