Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/reference/modules/threadpool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ There are several thread pools, but the important ones include:
Thread pool type is `fixed_auto_queue_size` with a size of `1`, and initial
queue_size of `100`.

`search_coordination`::
For lightweight search-related coordination operations. Thread pool type is
`fixed` with a size of a max of `min(5, (`<<node.processors,
`# of allocated processors`>>`) / 2)`, and queue_size of `1000`.

`get`::
For get operations. Thread pool type is `fixed`
with a size of <<node.processors, `# of allocated processors`>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ static int indexDocs(RestHighLevelClient client, String index, int numDocs) thro
return numDocs;
}

void verifySearch(String localIndex, int localNumDocs, String remoteIndex, int remoteNumDocs) {
void verifySearch(String localIndex, int localNumDocs, String remoteIndex, int remoteNumDocs, Integer preFilterShardSize) {
try (RestHighLevelClient localClient = newLocalClient(LOGGER)) {
Request request = new Request("POST", "/_search");
final int expectedDocs;
Expand All @@ -103,6 +103,12 @@ void verifySearch(String localIndex, int localNumDocs, String remoteIndex, int r
if (UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_0_0)) {
request.addParameter("ccs_minimize_roundtrips", Boolean.toString(randomBoolean()));
}
if (preFilterShardSize == null && randomBoolean()) {
preFilterShardSize = randomIntBetween(1, 100);
}
if (preFilterShardSize != null) {
request.addParameter("pre_filter_shard_size", Integer.toString(preFilterShardSize));
}
int size = between(1, 100);
request.setJsonEntity("{\"sort\": \"f\", \"size\": " + size + "}");
Response response = localClient.getLowLevelClient().performRequest(request);
Expand Down Expand Up @@ -142,7 +148,32 @@ public void testBWCSearchStates() throws Exception {
configureRemoteClusters(remoteNodes, CLUSTER_ALIAS, UPGRADE_FROM_VERSION, LOGGER);
int iterations = between(1, 20);
for (int i = 0; i < iterations; i++) {
verifySearch(localIndex, localNumDocs, CLUSTER_ALIAS + ":" + remoteIndex, remoteNumDocs);
verifySearch(localIndex, localNumDocs, CLUSTER_ALIAS + ":" + remoteIndex, remoteNumDocs, null);
}
localClient.indices().delete(new DeleteIndexRequest(localIndex), RequestOptions.DEFAULT);
remoteClient.indices().delete(new DeleteIndexRequest(remoteIndex), RequestOptions.DEFAULT);
}
}

public void testCanMatch() throws Exception {
String localIndex = "test_can_match_local_index";
String remoteIndex = "test_can_match_remote_index";
try (RestHighLevelClient localClient = newLocalClient(LOGGER);
RestHighLevelClient remoteClient = newRemoteClient()) {
localClient.indices().create(new CreateIndexRequest(localIndex)
.settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(5, 20))),
RequestOptions.DEFAULT);
int localNumDocs = indexDocs(localClient, localIndex, between(10, 100));

remoteClient.indices().create(new CreateIndexRequest(remoteIndex)
.settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(5, 20))),
RequestOptions.DEFAULT);
int remoteNumDocs = indexDocs(remoteClient, remoteIndex, between(10, 100));

configureRemoteClusters(getNodes(remoteClient.getLowLevelClient()), CLUSTER_ALIAS, UPGRADE_FROM_VERSION, LOGGER);
int iterations = between(1, 10);
for (int i = 0; i < iterations; i++) {
verifySearch(localIndex, localNumDocs, CLUSTER_ALIAS + ":" + remoteIndex, remoteNumDocs, between(1, 10));
}
localClient.indices().delete(new DeleteIndexRequest(localIndex), RequestOptions.DEFAULT);
remoteClient.indices().delete(new DeleteIndexRequest(remoteIndex), RequestOptions.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,11 @@ private static void assumeMultiClusterSetup() {
private static SearchRequest initSearchRequest() {
List<String> indices = Arrays.asList(INDEX_NAME, "my_remote_cluster:" + INDEX_NAME);
Collections.shuffle(indices, random());
return new SearchRequest(indices.toArray(new String[0]));
final SearchRequest request = new SearchRequest(indices.toArray(new String[0]));
if (randomBoolean()) {
request.setPreFilterShardSize(between(1, 20));
}
return request;
}

private static void duelSearch(SearchRequest searchRequest, Consumer<SearchResponse> responseChecker) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchPhaseResult;
Expand Down Expand Up @@ -65,7 +63,6 @@
*/
abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends SearchPhase implements SearchPhaseContext {
private static final float DEFAULT_INDEX_BOOST = 1.0f;
private static final long[] EMPTY_LONG_ARRAY = new long[0];
private final Logger logger;
private final SearchTransportService searchTransportService;
private final Executor executor;
Expand Down Expand Up @@ -736,21 +733,9 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
final Map<String, long[]> indexToWaitForCheckpoints = request.getWaitForCheckpoints();
final TimeValue waitForCheckpointsTimeout = request.getWaitForCheckpointsTimeout();
final long[] waitForCheckpoints = indexToWaitForCheckpoints.getOrDefault(shardIt.shardId().getIndex().getName(), EMPTY_LONG_ARRAY);

long waitForCheckpoint;
if (waitForCheckpoints.length == 0) {
waitForCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
} else {
assert waitForCheckpoints.length > shardIndex;
waitForCheckpoint = waitForCheckpoints[shardIndex];
}
ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request,
shardIt.shardId(), shardIndex, getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(),
shardIt.getClusterAlias(), shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive(), waitForCheckpoint,
waitForCheckpointsTimeout);
shardIt.getClusterAlias(), shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive());
// if we already received a search result we can inform the shard that it
// can return a null response if the request rewrites to match none rather
// than creating an empty response in the search thread pool.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.search;

import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Node-level request used during can-match phase
*/
public class CanMatchNodeRequest extends TransportRequest implements IndicesRequest {

private final SearchSourceBuilder source;
private final List<Shard> shards;
private final SearchType searchType;
private final String[] types;
private final Boolean requestCache;
private final boolean allowPartialSearchResults;
private final Scroll scroll;
private final int numberOfShards;
private final long nowInMillis;
@Nullable
private final String clusterAlias;
private final String[] indices;
private final IndicesOptions indicesOptions;
private final TimeValue waitForCheckpointsTimeout;

public static class Shard implements Writeable {
private final String[] indices;
private final ShardId shardId;
private final int shardRequestIndex;
private final AliasFilter aliasFilter;
private final float indexBoost;
private final ShardSearchContextId readerId;
private final TimeValue keepAlive;
private final long waitForCheckpoint;

public Shard(String[] indices,
ShardId shardId,
int shardRequestIndex,
AliasFilter aliasFilter,
float indexBoost,
ShardSearchContextId readerId,
TimeValue keepAlive,
long waitForCheckpoint) {
this.indices = indices;
this.shardId = shardId;
this.shardRequestIndex = shardRequestIndex;
this.aliasFilter = aliasFilter;
this.indexBoost = indexBoost;
this.readerId = readerId;
this.keepAlive = keepAlive;
this.waitForCheckpoint = waitForCheckpoint;
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
}

public Shard(StreamInput in) throws IOException {
indices = in.readStringArray();
shardId = new ShardId(in);
shardRequestIndex = in.readVInt();
aliasFilter = new AliasFilter(in);
indexBoost = in.readFloat();
readerId = in.readOptionalWriteable(ShardSearchContextId::new);
keepAlive = in.readOptionalTimeValue();
waitForCheckpoint = in.readLong();
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(indices);
shardId.writeTo(out);
out.writeVInt(shardRequestIndex);
aliasFilter.writeTo(out);
out.writeFloat(indexBoost);
out.writeOptionalWriteable(readerId);
out.writeOptionalTimeValue(keepAlive);
out.writeLong(waitForCheckpoint);
}

public int getShardRequestIndex() {
return shardRequestIndex;
}

public String[] getOriginalIndices() {
return indices;
}

public ShardId shardId() {
return shardId;
}
}

public CanMatchNodeRequest(
SearchRequest searchRequest,
IndicesOptions indicesOptions,
List<Shard> shards,
int numberOfShards,
long nowInMillis,
@Nullable String clusterAlias
) {
this.source = searchRequest.source();
this.indicesOptions = indicesOptions;
this.shards = new ArrayList<>(shards);
this.searchType = searchRequest.searchType();
this.types = searchRequest.types();
this.requestCache = searchRequest.requestCache();
// If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted
// at this stage. Any NPEs in the above are therefore an error in request preparation logic.
assert searchRequest.allowPartialSearchResults() != null;
this.allowPartialSearchResults = searchRequest.allowPartialSearchResults();
this.scroll = searchRequest.scroll();
this.numberOfShards = numberOfShards;
this.nowInMillis = nowInMillis;
this.clusterAlias = clusterAlias;
this.waitForCheckpointsTimeout = searchRequest.getWaitForCheckpointsTimeout();
indices = shards.stream().map(Shard::getOriginalIndices).flatMap(Arrays::stream).distinct()
.toArray(String[]::new);
}

public CanMatchNodeRequest(StreamInput in) throws IOException {
super(in);
source = in.readOptionalWriteable(SearchSourceBuilder::new);
indicesOptions = IndicesOptions.readIndicesOptions(in);
searchType = SearchType.fromId(in.readByte());
types = in.readStringArray();
scroll = in.readOptionalWriteable(Scroll::new);
requestCache = in.readOptionalBoolean();
allowPartialSearchResults = in.readBoolean();
numberOfShards = in.readVInt();
nowInMillis = in.readVLong();
clusterAlias = in.readOptionalString();
waitForCheckpointsTimeout = in.readTimeValue();
shards = in.readList(Shard::new);
indices = shards.stream().map(Shard::getOriginalIndices).flatMap(Arrays::stream).distinct()
.toArray(String[]::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalWriteable(source);
indicesOptions.writeIndicesOptions(out);
out.writeByte(searchType.id());
out.writeStringArray(types);
out.writeOptionalWriteable(scroll);
out.writeOptionalBoolean(requestCache);
out.writeBoolean(allowPartialSearchResults);
out.writeVInt(numberOfShards);
out.writeVLong(nowInMillis);
out.writeOptionalString(clusterAlias);
out.writeTimeValue(waitForCheckpointsTimeout);
out.writeList(shards);
}

public List<Shard> getShardLevelRequests() {
return shards;
}

public List<ShardSearchRequest> createShardSearchRequests() {
return shards.stream().map(this::createShardSearchRequest).collect(Collectors.toList());
}

public ShardSearchRequest createShardSearchRequest(Shard r) {
ShardSearchRequest shardSearchRequest = new ShardSearchRequest(
new OriginalIndices(r.indices, indicesOptions), r.shardId, r.shardRequestIndex, numberOfShards, searchType,
source, types, requestCache, r.aliasFilter, r.indexBoost, allowPartialSearchResults, scroll,
nowInMillis, clusterAlias, r.readerId, r.keepAlive, r.waitForCheckpoint, waitForCheckpointsTimeout
);
shardSearchRequest.setParentTask(getParentTask());
return shardSearchRequest;
}

@Override
public String[] indices() {
return indices;
}

@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers);
}

@Override
public String getDescription() {
// Shard id is enough here, the request itself can be found by looking at the parent task description
return "shardIds[" + shards.stream().map(slr -> slr.shardId).collect(Collectors.toList()) + "]";
}

}
Loading