Skip to content

Commit 7aa0d42

Browse files
committed
Introduce async shard operation
This commit backports the ability for a single shard operation to execute asynchronously. That is, rather than forking execution of the operation to another executor, this mechanism allows us to defer execution of the operation until some condition holds (for example, the global checkpoint being larger than some specified value). At this point, a callback could be triggered to complete execution of the operation on the executor specified for the operation.
1 parent bd8aa49 commit 7aa0d42

File tree

1 file changed

+37
-4
lines changed

1 file changed

+37
-4
lines changed

server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.common.Nullable;
3939
import org.elasticsearch.common.logging.LoggerMessageFormat;
4040
import org.elasticsearch.common.settings.Settings;
41+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
4142
import org.elasticsearch.index.shard.ShardId;
4243
import org.elasticsearch.threadpool.ThreadPool;
4344
import org.elasticsearch.transport.TransportChannel;
@@ -47,6 +48,7 @@
4748
import org.elasticsearch.transport.TransportService;
4849

4950
import java.io.IOException;
51+
import java.io.UncheckedIOException;
5052
import java.util.function.Supplier;
5153

5254
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
@@ -78,7 +80,7 @@ protected TransportSingleShardAction(Settings settings, String actionName, Threa
7880
if (!isSubAction()) {
7981
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
8082
}
81-
transportService.registerRequestHandler(transportShardAction, request, executor, new ShardTransportHandler());
83+
transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler());
8284
}
8385

8486
/**
@@ -97,6 +99,21 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
9799

98100
protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;
99101

102+
103+
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
104+
threadPool.executor(this.executor).execute(new AbstractRunnable() {
105+
@Override
106+
public void onFailure(Exception e) {
107+
listener.onFailure(e);
108+
}
109+
110+
@Override
111+
protected void doRun() throws Exception {
112+
listener.onResponse(shardOperation(request, shardId));
113+
}
114+
});
115+
}
116+
100117
protected abstract Response newResponse();
101118

102119
protected abstract boolean resolveIndex(Request request);
@@ -284,11 +301,27 @@ public void messageReceived(final Request request, final TransportChannel channe
284301
if (logger.isTraceEnabled()) {
285302
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
286303
}
287-
Response response = shardOperation(request, request.internalShardId);
288-
channel.sendResponse(response);
304+
asyncShardOperation(request, request.internalShardId, new ActionListener<Response>() {
305+
@Override
306+
public void onResponse(Response response) {
307+
try {
308+
channel.sendResponse(response);
309+
} catch (IOException e) {
310+
onFailure(e);
311+
}
312+
}
313+
314+
@Override
315+
public void onFailure(Exception e) {
316+
try {
317+
channel.sendResponse(e);
318+
} catch (IOException e1) {
319+
throw new UncheckedIOException(e1);
320+
}
321+
}
322+
});
289323
}
290324
}
291-
292325
/**
293326
* Internal request class that gets built on each node. Holds the original request plus additional info.
294327
*/

0 commit comments

Comments
 (0)