Skip to content

Commit 13487b1

Browse files
authored
Node level can match action (#78765)
Changes can-match from a shard-level to a node-level action, which helps avoid an explosion of shard-level can-match subrequests in clusters with many shards, that can cause stability issues. Also introduces a new search_coordination thread pool to handle the sending and handling of node-level can-match requests.
1 parent b1c3f55 commit 13487b1

File tree

20 files changed

+1168
-433
lines changed

20 files changed

+1168
-433
lines changed

docs/reference/modules/threadpool.asciidoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ There are several thread pools, but the important ones include:
2121
For count/search/suggest/get operations on `search_throttled indices`.
2222
Thread pool type is `fixed` with a size of `1`, and queue_size of `100`.
2323

24+
`search_coordination`::
25+
For lightweight search-related coordination operations. Thread pool type is
26+
`fixed` with a size of a max of `min(5, (`<<node.processors,
27+
`# of allocated processors`>>`) / 2)`, and queue_size of `1000`.
28+
2429
`get`::
2530
For get operations. Thread pool type is `fixed`
2631
with a size of <<node.processors, `# of allocated processors`>>,

qa/ccs-rolling-upgrade-remote-cluster/src/test/java/org/elasticsearch/upgrades/SearchStatesIT.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ static int indexDocs(RestHighLevelClient client, String index, int numDocs) thro
198198
return numDocs;
199199
}
200200

201-
void verifySearch(String localIndex, int localNumDocs, String remoteIndex, int remoteNumDocs) {
201+
void verifySearch(String localIndex, int localNumDocs, String remoteIndex, int remoteNumDocs, Integer preFilterShardSize) {
202202
try (RestHighLevelClient localClient = newLocalClient()) {
203203
Request request = new Request("POST", "/_search");
204204
final int expectedDocs;
@@ -212,6 +212,12 @@ void verifySearch(String localIndex, int localNumDocs, String remoteIndex, int r
212212
if (UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_0_0)) {
213213
request.addParameter("ccs_minimize_roundtrips", Boolean.toString(randomBoolean()));
214214
}
215+
if (preFilterShardSize == null && randomBoolean()) {
216+
preFilterShardSize = randomIntBetween(1, 100);
217+
}
218+
if (preFilterShardSize != null) {
219+
request.addParameter("pre_filter_shard_size", Integer.toString(preFilterShardSize));
220+
}
215221
int size = between(1, 100);
216222
request.setJsonEntity("{\"sort\": \"f\", \"size\": " + size + "}");
217223
Response response = localClient.getLowLevelClient().performRequest(request);
@@ -245,7 +251,32 @@ public void testBWCSearchStates() throws Exception {
245251
configureRemoteClusters(getNodes(remoteClient.getLowLevelClient()));
246252
int iterations = between(1, 20);
247253
for (int i = 0; i < iterations; i++) {
248-
verifySearch(localIndex, localNumDocs, CLUSTER_ALIAS + ":" + remoteIndex, remoteNumDocs);
254+
verifySearch(localIndex, localNumDocs, CLUSTER_ALIAS + ":" + remoteIndex, remoteNumDocs, null);
255+
}
256+
localClient.indices().delete(new DeleteIndexRequest(localIndex), RequestOptions.DEFAULT);
257+
remoteClient.indices().delete(new DeleteIndexRequest(remoteIndex), RequestOptions.DEFAULT);
258+
}
259+
}
260+
261+
public void testCanMatch() throws Exception {
262+
String localIndex = "test_can_match_local_index";
263+
String remoteIndex = "test_can_match_remote_index";
264+
try (RestHighLevelClient localClient = newLocalClient();
265+
RestHighLevelClient remoteClient = newRemoteClient()) {
266+
localClient.indices().create(new CreateIndexRequest(localIndex)
267+
.settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(5, 20))),
268+
RequestOptions.DEFAULT);
269+
int localNumDocs = indexDocs(localClient, localIndex, between(10, 100));
270+
271+
remoteClient.indices().create(new CreateIndexRequest(remoteIndex)
272+
.settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(5, 20))),
273+
RequestOptions.DEFAULT);
274+
int remoteNumDocs = indexDocs(remoteClient, remoteIndex, between(10, 100));
275+
276+
configureRemoteClusters(getNodes(remoteClient.getLowLevelClient()));
277+
int iterations = between(1, 10);
278+
for (int i = 0; i < iterations; i++) {
279+
verifySearch(localIndex, localNumDocs, CLUSTER_ALIAS + ":" + remoteIndex, remoteNumDocs, between(1, 10));
249280
}
250281
localClient.indices().delete(new DeleteIndexRequest(localIndex), RequestOptions.DEFAULT);
251282
remoteClient.indices().delete(new DeleteIndexRequest(remoteIndex), RequestOptions.DEFAULT);

qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,11 @@ private static void assumeMultiClusterSetup() {
724724
private static SearchRequest initSearchRequest() {
725725
List<String> indices = Arrays.asList(INDEX_NAME, "my_remote_cluster:" + INDEX_NAME);
726726
Collections.shuffle(indices, random());
727-
return new SearchRequest(indices.toArray(new String[0]));
727+
final SearchRequest request = new SearchRequest(indices.toArray(new String[0]));
728+
if (randomBoolean()) {
729+
request.setPreFilterShardSize(between(1, 20));
730+
}
731+
return request;
728732
}
729733

730734
private static void duelSearch(SearchRequest searchRequest, Consumer<SearchResponse> responseChecker) throws Exception {

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,10 @@
2323
import org.elasticsearch.action.support.TransportActions;
2424
import org.elasticsearch.cluster.ClusterState;
2525
import org.elasticsearch.cluster.routing.GroupShardsIterator;
26-
import org.elasticsearch.core.Releasable;
27-
import org.elasticsearch.core.Releasables;
2826
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2927
import org.elasticsearch.common.util.concurrent.AtomicArray;
30-
import org.elasticsearch.core.TimeValue;
31-
import org.elasticsearch.index.seqno.SequenceNumbers;
28+
import org.elasticsearch.core.Releasable;
29+
import org.elasticsearch.core.Releasables;
3230
import org.elasticsearch.index.shard.ShardId;
3331
import org.elasticsearch.search.SearchContextMissingException;
3432
import org.elasticsearch.search.SearchPhaseResult;
@@ -65,7 +63,6 @@
6563
*/
6664
abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends SearchPhase implements SearchPhaseContext {
6765
private static final float DEFAULT_INDEX_BOOST = 1.0f;
68-
private static final long[] EMPTY_LONG_ARRAY = new long[0];
6966
private final Logger logger;
7067
private final SearchTransportService searchTransportService;
7168
private final Executor executor;
@@ -736,21 +733,9 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar
736733
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
737734
assert filter != null;
738735
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
739-
final Map<String, long[]> indexToWaitForCheckpoints = request.getWaitForCheckpoints();
740-
final TimeValue waitForCheckpointsTimeout = request.getWaitForCheckpointsTimeout();
741-
final long[] waitForCheckpoints = indexToWaitForCheckpoints.getOrDefault(shardIt.shardId().getIndex().getName(), EMPTY_LONG_ARRAY);
742-
743-
long waitForCheckpoint;
744-
if (waitForCheckpoints.length == 0) {
745-
waitForCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
746-
} else {
747-
assert waitForCheckpoints.length > shardIndex;
748-
waitForCheckpoint = waitForCheckpoints[shardIndex];
749-
}
750736
ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request,
751737
shardIt.shardId(), shardIndex, getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(),
752-
shardIt.getClusterAlias(), shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive(), waitForCheckpoint,
753-
waitForCheckpointsTimeout);
738+
shardIt.getClusterAlias(), shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive());
754739
// if we already received a search result we can inform the shard that it
755740
// can return a null response if the request rewrites to match none rather
756741
// than creating an empty response in the search thread pool.
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.search;
10+
11+
import org.elasticsearch.action.IndicesRequest;
12+
import org.elasticsearch.action.OriginalIndices;
13+
import org.elasticsearch.action.support.IndicesOptions;
14+
import org.elasticsearch.common.io.stream.StreamInput;
15+
import org.elasticsearch.common.io.stream.StreamOutput;
16+
import org.elasticsearch.common.io.stream.Writeable;
17+
import org.elasticsearch.core.Nullable;
18+
import org.elasticsearch.core.TimeValue;
19+
import org.elasticsearch.index.shard.ShardId;
20+
import org.elasticsearch.search.Scroll;
21+
import org.elasticsearch.search.builder.SearchSourceBuilder;
22+
import org.elasticsearch.search.internal.AliasFilter;
23+
import org.elasticsearch.search.internal.ShardSearchContextId;
24+
import org.elasticsearch.search.internal.ShardSearchRequest;
25+
import org.elasticsearch.tasks.Task;
26+
import org.elasticsearch.tasks.TaskId;
27+
import org.elasticsearch.transport.TransportRequest;
28+
29+
import java.io.IOException;
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.stream.Collectors;
35+
36+
/**
37+
* Node-level request used during can-match phase
38+
*/
39+
public class CanMatchNodeRequest extends TransportRequest implements IndicesRequest {
40+
41+
private final SearchSourceBuilder source;
42+
private final List<Shard> shards;
43+
private final SearchType searchType;
44+
private final Boolean requestCache;
45+
private final boolean allowPartialSearchResults;
46+
private final Scroll scroll;
47+
private final int numberOfShards;
48+
private final long nowInMillis;
49+
@Nullable
50+
private final String clusterAlias;
51+
private final String[] indices;
52+
private final IndicesOptions indicesOptions;
53+
private final TimeValue waitForCheckpointsTimeout;
54+
55+
public static class Shard implements Writeable {
56+
private final String[] indices;
57+
private final ShardId shardId;
58+
private final int shardRequestIndex;
59+
private final AliasFilter aliasFilter;
60+
private final float indexBoost;
61+
private final ShardSearchContextId readerId;
62+
private final TimeValue keepAlive;
63+
private final long waitForCheckpoint;
64+
65+
public Shard(String[] indices,
66+
ShardId shardId,
67+
int shardRequestIndex,
68+
AliasFilter aliasFilter,
69+
float indexBoost,
70+
ShardSearchContextId readerId,
71+
TimeValue keepAlive,
72+
long waitForCheckpoint) {
73+
this.indices = indices;
74+
this.shardId = shardId;
75+
this.shardRequestIndex = shardRequestIndex;
76+
this.aliasFilter = aliasFilter;
77+
this.indexBoost = indexBoost;
78+
this.readerId = readerId;
79+
this.keepAlive = keepAlive;
80+
this.waitForCheckpoint = waitForCheckpoint;
81+
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
82+
}
83+
84+
public Shard(StreamInput in) throws IOException {
85+
indices = in.readStringArray();
86+
shardId = new ShardId(in);
87+
shardRequestIndex = in.readVInt();
88+
aliasFilter = new AliasFilter(in);
89+
indexBoost = in.readFloat();
90+
readerId = in.readOptionalWriteable(ShardSearchContextId::new);
91+
keepAlive = in.readOptionalTimeValue();
92+
waitForCheckpoint = in.readLong();
93+
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
94+
}
95+
96+
@Override
97+
public void writeTo(StreamOutput out) throws IOException {
98+
out.writeStringArray(indices);
99+
shardId.writeTo(out);
100+
out.writeVInt(shardRequestIndex);
101+
aliasFilter.writeTo(out);
102+
out.writeFloat(indexBoost);
103+
out.writeOptionalWriteable(readerId);
104+
out.writeOptionalTimeValue(keepAlive);
105+
out.writeLong(waitForCheckpoint);
106+
}
107+
108+
public int getShardRequestIndex() {
109+
return shardRequestIndex;
110+
}
111+
112+
public String[] getOriginalIndices() {
113+
return indices;
114+
}
115+
116+
public ShardId shardId() {
117+
return shardId;
118+
}
119+
}
120+
121+
public CanMatchNodeRequest(
122+
SearchRequest searchRequest,
123+
IndicesOptions indicesOptions,
124+
List<Shard> shards,
125+
int numberOfShards,
126+
long nowInMillis,
127+
@Nullable String clusterAlias
128+
) {
129+
this.source = searchRequest.source();
130+
this.indicesOptions = indicesOptions;
131+
this.shards = new ArrayList<>(shards);
132+
this.searchType = searchRequest.searchType();
133+
this.requestCache = searchRequest.requestCache();
134+
// If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted
135+
// at this stage. Any NPEs in the above are therefore an error in request preparation logic.
136+
assert searchRequest.allowPartialSearchResults() != null;
137+
this.allowPartialSearchResults = searchRequest.allowPartialSearchResults();
138+
this.scroll = searchRequest.scroll();
139+
this.numberOfShards = numberOfShards;
140+
this.nowInMillis = nowInMillis;
141+
this.clusterAlias = clusterAlias;
142+
this.waitForCheckpointsTimeout = searchRequest.getWaitForCheckpointsTimeout();
143+
indices = shards.stream().map(Shard::getOriginalIndices).flatMap(Arrays::stream).distinct()
144+
.toArray(String[]::new);
145+
}
146+
147+
public CanMatchNodeRequest(StreamInput in) throws IOException {
148+
super(in);
149+
source = in.readOptionalWriteable(SearchSourceBuilder::new);
150+
indicesOptions = IndicesOptions.readIndicesOptions(in);
151+
searchType = SearchType.fromId(in.readByte());
152+
scroll = in.readOptionalWriteable(Scroll::new);
153+
requestCache = in.readOptionalBoolean();
154+
allowPartialSearchResults = in.readBoolean();
155+
numberOfShards = in.readVInt();
156+
nowInMillis = in.readVLong();
157+
clusterAlias = in.readOptionalString();
158+
waitForCheckpointsTimeout = in.readTimeValue();
159+
shards = in.readList(Shard::new);
160+
indices = shards.stream().map(Shard::getOriginalIndices).flatMap(Arrays::stream).distinct()
161+
.toArray(String[]::new);
162+
}
163+
164+
@Override
165+
public void writeTo(StreamOutput out) throws IOException {
166+
super.writeTo(out);
167+
out.writeOptionalWriteable(source);
168+
indicesOptions.writeIndicesOptions(out);
169+
out.writeByte(searchType.id());
170+
out.writeOptionalWriteable(scroll);
171+
out.writeOptionalBoolean(requestCache);
172+
out.writeBoolean(allowPartialSearchResults);
173+
out.writeVInt(numberOfShards);
174+
out.writeVLong(nowInMillis);
175+
out.writeOptionalString(clusterAlias);
176+
out.writeTimeValue(waitForCheckpointsTimeout);
177+
out.writeList(shards);
178+
}
179+
180+
public List<Shard> getShardLevelRequests() {
181+
return shards;
182+
}
183+
184+
public List<ShardSearchRequest> createShardSearchRequests() {
185+
return shards.stream().map(this::createShardSearchRequest).collect(Collectors.toList());
186+
}
187+
188+
public ShardSearchRequest createShardSearchRequest(Shard r) {
189+
ShardSearchRequest shardSearchRequest = new ShardSearchRequest(
190+
new OriginalIndices(r.indices, indicesOptions), r.shardId, r.shardRequestIndex, numberOfShards, searchType,
191+
source, requestCache, r.aliasFilter, r.indexBoost, allowPartialSearchResults, scroll,
192+
nowInMillis, clusterAlias, r.readerId, r.keepAlive, r.waitForCheckpoint, waitForCheckpointsTimeout
193+
);
194+
shardSearchRequest.setParentTask(getParentTask());
195+
return shardSearchRequest;
196+
}
197+
198+
@Override
199+
public String[] indices() {
200+
return indices;
201+
}
202+
203+
@Override
204+
public IndicesOptions indicesOptions() {
205+
return indicesOptions;
206+
}
207+
208+
@Override
209+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
210+
return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers);
211+
}
212+
213+
@Override
214+
public String getDescription() {
215+
// Shard id is enough here, the request itself can be found by looking at the parent task description
216+
return "shardIds[" + shards.stream().map(slr -> slr.shardId).collect(Collectors.toList()) + "]";
217+
}
218+
219+
}

0 commit comments

Comments
 (0)