Skip to content

Commit d79a10a

Browse files
committed
Multi-get requests should wait for search active (#46283)
When a shard has fallen search idle, and a non-realtime multi-get request is executed, today such requests do not wait for the shard to become search active and therefore such requests do not wait for a refresh to see the latest changes to the index. This also prevents such requests from triggering the shard as non-search idle, influencing the behavior of scheduled refreshes. This commit addresses this by attaching a listener to the shard search active state for multi-get requests. In this way, when the next scheduled refresh is executed, the multi-get request will then proceed.
1 parent 079a13c commit d79a10a

File tree

2 files changed

+55
-17
lines changed

2 files changed

+55
-17
lines changed

server/src/main/java/org/elasticsearch/action/get/TransportShardMultiGetAction.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.action.get;
2121

2222
import org.apache.logging.log4j.message.ParameterizedMessage;
23+
import org.elasticsearch.action.ActionListener;
2324
import org.elasticsearch.action.support.ActionFilters;
2425
import org.elasticsearch.action.support.TransportActions;
2526
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
@@ -37,6 +38,8 @@
3738
import org.elasticsearch.threadpool.ThreadPool;
3839
import org.elasticsearch.transport.TransportService;
3940

41+
import java.io.IOException;
42+
4043
public class TransportShardMultiGetAction extends TransportSingleShardAction<MultiGetShardRequest, MultiGetShardResponse> {
4144

4245
private static final String ACTION_NAME = MultiGetAction.NAME + "[shard]";
@@ -73,6 +76,24 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
7376
.getShards(state, request.request().index(), request.request().shardId(), request.request().preference());
7477
}
7578

79+
@Override
80+
protected void asyncShardOperation(
81+
MultiGetShardRequest request, ShardId shardId, ActionListener<MultiGetShardResponse> listener) throws IOException {
82+
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
83+
IndexShard indexShard = indexService.getShard(shardId.id());
84+
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
85+
super.asyncShardOperation(request, shardId, listener);
86+
} else {
87+
indexShard.awaitShardSearchActive(b -> {
88+
try {
89+
super.asyncShardOperation(request, shardId, listener);
90+
} catch (Exception ex) {
91+
listener.onFailure(ex);
92+
}
93+
});
94+
}
95+
}
96+
7697
@Override
7798
protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, ShardId shardId) {
7899
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());

server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
2727
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
2828
import org.elasticsearch.action.admin.indices.stats.IndexStats;
29+
import org.elasticsearch.action.get.MultiGetRequest;
2930
import org.elasticsearch.action.index.IndexRequest;
3031
import org.elasticsearch.action.index.IndexResponse;
3132
import org.elasticsearch.action.search.SearchRequest;
@@ -109,6 +110,7 @@
109110
import java.util.concurrent.atomic.AtomicInteger;
110111
import java.util.concurrent.atomic.AtomicLong;
111112
import java.util.concurrent.atomic.AtomicReference;
113+
import java.util.function.IntToLongFunction;
112114
import java.util.function.Predicate;
113115
import java.util.stream.Stream;
114116

@@ -688,7 +690,23 @@ private static ShardRouting getInitializingShardRouting(ShardRouting existingSha
688690
return shardRouting;
689691
}
690692

691-
public void testAutomaticRefresh() throws InterruptedException {
693+
public void testAutomaticRefreshSearch() throws InterruptedException {
694+
runTestAutomaticRefresh(numDocs -> client().prepareSearch("test").get().getHits().getTotalHits().value);
695+
}
696+
697+
public void testAutomaticRefreshMultiGet() throws InterruptedException {
698+
runTestAutomaticRefresh(
699+
numDocs -> {
700+
final MultiGetRequest request = new MultiGetRequest();
701+
request.realtime(false);
702+
for (int i = 0; i < numDocs; i++) {
703+
request.add("test", "" + i);
704+
}
705+
return Arrays.stream(client().multiGet(request).actionGet().getResponses()).filter(r -> r.getResponse().isExists()).count();
706+
});
707+
}
708+
709+
private void runTestAutomaticRefresh(final IntToLongFunction count) throws InterruptedException {
692710
TimeValue randomTimeValue = randomFrom(random(), null, TimeValue.ZERO, TimeValue.timeValueMillis(randomIntBetween(0, 1000)));
693711
Settings.Builder builder = Settings.builder();
694712
if (randomTimeValue != null) {
@@ -721,31 +739,31 @@ public void testAutomaticRefresh() throws InterruptedException {
721739
ensureNoPendingScheduledRefresh(indexService.getThreadPool());
722740
}
723741
}
742+
724743
CountDownLatch started = new CountDownLatch(1);
725744
Thread t = new Thread(() -> {
726-
SearchResponse searchResponse;
727745
started.countDown();
728746
do {
729-
searchResponse = client().prepareSearch().get();
730-
} while (searchResponse.getHits().getTotalHits().value != totalNumDocs.get());
747+
748+
} while (count.applyAsLong(totalNumDocs.get()) != totalNumDocs.get());
731749
});
732750
t.start();
733751
started.await();
734-
assertHitCount(client().prepareSearch().get(), 1);
752+
assertThat(count.applyAsLong(totalNumDocs.get()), equalTo(1L));
735753
for (int i = 1; i < numDocs; i++) {
736754
client().prepareIndex("test", "test", "" + i).setSource("{\"foo\" : \"bar\"}", XContentType.JSON)
737755
.execute(new ActionListener<IndexResponse>() {
738-
@Override
739-
public void onResponse(IndexResponse indexResponse) {
740-
indexingDone.countDown();
741-
}
742-
743-
@Override
744-
public void onFailure(Exception e) {
745-
indexingDone.countDown();
746-
throw new AssertionError(e);
747-
}
748-
});
756+
@Override
757+
public void onResponse(IndexResponse indexResponse) {
758+
indexingDone.countDown();
759+
}
760+
761+
@Override
762+
public void onFailure(Exception e) {
763+
indexingDone.countDown();
764+
throw new AssertionError(e);
765+
}
766+
});
749767
}
750768
indexingDone.await();
751769
t.join();
@@ -757,7 +775,6 @@ public void testPendingRefreshWithIntervalChange() throws Exception {
757775
IndexService indexService = createIndex("test", builder.build());
758776
assertFalse(indexService.getIndexSettings().isExplicitRefresh());
759777
ensureGreen();
760-
assertNoSearchHits(client().prepareSearch().get());
761778
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
762779
IndexShard shard = indexService.getShard(0);
763780
assertFalse(shard.scheduledRefresh());

0 commit comments

Comments
 (0)