Skip to content

Commit 51fe20e

Browse files
authored
Add support for local cluster alias to SearchRequest (#36997)
With the upcoming cross-cluster search alternate execution mode, the CCS node will be able to split a CCS request into multiple search requests, one per remote cluster involved. In order to do that, the CCS node has to be able to signal to each remote cluster that such sub-requests are part of a CCS request. Each cluster does not know about the other clusters involved, and does not know either what alias it is given in the CCS node, hence the CCS coordinating node needs to be able to provide the alias as part of the search request so that it is used as index prefix in the returned search hits. The cluster alias is a notion that's already supported in the search shards iterator and search shard target, but it is currently used in CCS as both index prefix and connection lookup key when fanning out to all the shards. With CCS alternate execution mode the provided cluster alias needs to be used only as index prefix, as shards are local to each cluster hence no cluster alias should be used for connection lookups. The local cluster alias can be set to the SearchRequest at the transport layer only, and its constructor/getter methods are package private. Relates to #32125
1 parent 0cae979 commit 51fe20e

26 files changed

+408
-135
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,16 +318,16 @@ public final void onFailure(Exception e) {
318318
listener.onFailure(e);
319319
}
320320

321+
@Override
321322
public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt) {
322-
String clusterAlias = shardIt.getClusterAlias();
323323
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
324324
assert filter != null;
325325
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
326326
String indexName = shardIt.shardId().getIndex().getName();
327327
final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet())
328328
.toArray(new String[0]);
329329
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
330-
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias, routings);
330+
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings);
331331
}
332332

333333
/**

server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
9090
final SearchShardIterator shardIt, Exception e) {
9191
// we always add the shard failure for a specific shard instance
9292
// we do make sure to clean it on a successful response from a shard
93-
SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId(), shardIt.getClusterAlias(),
94-
shardIt.getOriginalIndices());
93+
SearchShardTarget shardTarget = shardIt.newSearchShardTarget(nodeId);
9594
onShardFailure(shardIndex, shardTarget, e);
9695

9796
if (totalOps.incrementAndGet() == expectedTotalOps) {
@@ -257,8 +256,8 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator
257256
Runnable r = () -> {
258257
final Thread thread = Thread.currentThread();
259258
try {
260-
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
261-
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
259+
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(
260+
shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) {
262261
@Override
263262
public void innerOnResponse(FirstResult result) {
264263
try {

server/src/main/java/org/elasticsearch/action/search/ScrollIdForNode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public String getNode() {
3636
return node;
3737
}
3838

39+
@Nullable
3940
public String getClusterAlias() {
4041
return clusterAlias;
4142
}

server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
6262
public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
6363
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512;
6464

65+
private final String localClusterAlias;
66+
6567
private SearchType searchType = SearchType.DEFAULT;
6668

6769
private String[] indices = Strings.EMPTY_ARRAY;
@@ -92,6 +94,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
9294
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
9395

9496
public SearchRequest() {
97+
this.localClusterAlias = null;
9598
}
9699

97100
/**
@@ -111,6 +114,7 @@ public SearchRequest(SearchRequest searchRequest) {
111114
this.searchType = searchRequest.searchType;
112115
this.source = searchRequest.source;
113116
this.types = searchRequest.types;
117+
this.localClusterAlias = searchRequest.localClusterAlias;
114118
}
115119

116120
/**
@@ -125,13 +129,23 @@ public SearchRequest(String... indices) {
125129
* Constructs a new search request against the provided indices with the given search source.
126130
*/
127131
public SearchRequest(String[] indices, SearchSourceBuilder source) {
132+
this();
128133
if (source == null) {
129134
throw new IllegalArgumentException("source must not be null");
130135
}
131136
indices(indices);
132137
this.source = source;
133138
}
134139

140+
/**
141+
* Creates a new search request by providing the alias of the cluster where it will be executed. Used when a {@link SearchRequest}
142+
* is created and executed as part of a cross-cluster search request performing local reduction on each cluster.
143+
* The coordinating CCS node provides the alias to prefix index names with in the returned search results.
144+
*/
145+
SearchRequest(String localClusterAlias) {
146+
this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
147+
}
148+
135149
/**
136150
* Constructs a new search request from reading the specified stream.
137151
*
@@ -158,6 +172,12 @@ public SearchRequest(StreamInput in) throws IOException {
158172
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
159173
allowPartialSearchResults = in.readOptionalBoolean();
160174
}
175+
//TODO update version after backport
176+
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
177+
localClusterAlias = in.readOptionalString();
178+
} else {
179+
localClusterAlias = null;
180+
}
161181
}
162182

163183
@Override
@@ -181,6 +201,10 @@ public void writeTo(StreamOutput out) throws IOException {
181201
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
182202
out.writeOptionalBoolean(allowPartialSearchResults);
183203
}
204+
//TODO update version after backport
205+
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
206+
out.writeOptionalString(localClusterAlias);
207+
}
184208
}
185209

186210
@Override
@@ -209,6 +233,16 @@ public ActionRequestValidationException validate() {
209233
return validationException;
210234
}
211235

236+
/**
237+
* Returns the alias of the cluster that this search request is being executed on. A non-null value indicates that this search request
238+
* is being executed as part of a locally reduced cross-cluster search request. The cluster alias is used to prefix index names
239+
* returned as part of search hits with the alias of the cluster they came from.
240+
*/
241+
@Nullable
242+
String getLocalClusterAlias() {
243+
return localClusterAlias;
244+
}
245+
212246
/**
213247
* Sets the indices the search will be executed on.
214248
*/
@@ -529,14 +563,15 @@ public boolean equals(Object o) {
529563
Objects.equals(maxConcurrentShardRequests, that.maxConcurrentShardRequests) &&
530564
Objects.equals(preFilterShardSize, that.preFilterShardSize) &&
531565
Objects.equals(indicesOptions, that.indicesOptions) &&
532-
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults);
566+
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) &&
567+
Objects.equals(localClusterAlias, that.localClusterAlias);
533568
}
534569

535570
@Override
536571
public int hashCode() {
537572
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
538573
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
539-
allowPartialSearchResults);
574+
allowPartialSearchResults, localClusterAlias);
540575
}
541576

542577
@Override
@@ -554,6 +589,7 @@ public String toString() {
554589
", batchedReduceSize=" + batchedReduceSize +
555590
", preFilterShardSize=" + preFilterShardSize +
556591
", allowPartialSearchResults=" + allowPartialSearchResults +
592+
", localClusterAlias=" + localClusterAlias +
557593
", source=" + source + '}';
558594
}
559595
}

server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,28 +22,34 @@
2222
import org.elasticsearch.action.OriginalIndices;
2323
import org.elasticsearch.cluster.routing.PlainShardIterator;
2424
import org.elasticsearch.cluster.routing.ShardRouting;
25+
import org.elasticsearch.common.Nullable;
2526
import org.elasticsearch.index.shard.ShardId;
27+
import org.elasticsearch.search.SearchShardTarget;
2628

2729
import java.util.List;
2830

2931
/**
3032
* Extension of {@link PlainShardIterator} used in the search api, which also holds the {@link OriginalIndices}
31-
* of the search request. Useful especially with cross cluster search, as each cluster has its own set of original indices.
33+
* of the search request (useful especially with cross-cluster search, as each cluster has its own set of original indices) as well as
34+
* the cluster alias.
35+
* @see OriginalIndices
3236
*/
3337
public final class SearchShardIterator extends PlainShardIterator {
3438

3539
private final OriginalIndices originalIndices;
36-
private String clusterAlias;
40+
private final String clusterAlias;
3741
private boolean skip = false;
3842

3943
/**
4044
* Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards
4145
* this the a given <code>shardId</code>.
4246
*
47+
* @param clusterAlias the alias of the cluster where the shard is located
4348
* @param shardId shard id of the group
4449
* @param shards shards to iterate
50+
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
4551
*/
46-
public SearchShardIterator(String clusterAlias, ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) {
52+
public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) {
4753
super(shardId, shards);
4854
this.originalIndices = originalIndices;
4955
this.clusterAlias = clusterAlias;
@@ -56,10 +62,22 @@ public OriginalIndices getOriginalIndices() {
5662
return originalIndices;
5763
}
5864

65+
/**
66+
* Returns the alias of the cluster where the shard is located.
67+
*/
68+
@Nullable
5969
public String getClusterAlias() {
6070
return clusterAlias;
6171
}
6272

73+
/**
74+
* Creates a new shard target from this iterator, pointing at the node identified by the provided identifier.
75+
* @see SearchShardTarget
76+
*/
77+
SearchShardTarget newSearchShardTarget(String nodeId) {
78+
return new SearchShardTarget(nodeId, shardId(), clusterAlias, originalIndices);
79+
}
80+
6381
/**
6482
* Reset the iterator and mark it as skippable
6583
* @see #skip()

server/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ public static ShardSearchFailure readShardSearchFailure(StreamInput in) throws I
9898

9999
@Override
100100
public void readFrom(StreamInput in) throws IOException {
101-
if (in.readBoolean()) {
102-
shardTarget = new SearchShardTarget(in);
101+
shardTarget = in.readOptionalWriteable(SearchShardTarget::new);
102+
if (shardTarget != null) {
103103
index = shardTarget.getFullyQualifiedIndexName();
104104
shardId = shardTarget.getShardId().getId();
105105
}
@@ -110,12 +110,7 @@ public void readFrom(StreamInput in) throws IOException {
110110

111111
@Override
112112
public void writeTo(StreamOutput out) throws IOException {
113-
if (shardTarget == null) {
114-
out.writeBoolean(false);
115-
} else {
116-
out.writeBoolean(true);
117-
shardTarget.writeTo(out);
118-
}
113+
out.writeOptionalWriteable(shardTarget);
119114
out.writeString(reason);
120115
RestStatus.writeTo(out, status);
121116
out.writeException(cause);
@@ -175,7 +170,7 @@ public static ShardSearchFailure fromXContent(XContentParser parser) throws IOEx
175170
SearchShardTarget searchShardTarget = null;
176171
if (nodeId != null) {
177172
searchShardTarget = new SearchShardTarget(nodeId,
178-
new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), clusterAlias, OriginalIndices.NONE);
173+
new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), clusterAlias, OriginalIndices.NONE);
179174
}
180175
return new ShardSearchFailure(exception, searchShardTarget);
181176
}

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.cluster.routing.GroupShardsIterator;
3535
import org.elasticsearch.cluster.routing.ShardIterator;
3636
import org.elasticsearch.cluster.service.ClusterService;
37+
import org.elasticsearch.common.Nullable;
3738
import org.elasticsearch.common.inject.Inject;
3839
import org.elasticsearch.common.io.stream.Writeable;
3940
import org.elasticsearch.common.settings.Setting;
@@ -60,6 +61,7 @@
6061
import java.util.Set;
6162
import java.util.concurrent.Executor;
6263
import java.util.function.BiFunction;
64+
import java.util.function.Function;
6365
import java.util.function.LongSupplier;
6466

6567
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
@@ -311,7 +313,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
311313
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
312314
concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
313315
GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
314-
remoteShardIterators);
316+
searchRequest.getLocalClusterAlias(), remoteShardIterators);
315317

316318
failIfOverShardCountLimit(clusterService, shardIterators.size());
317319

@@ -338,19 +340,34 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
338340
}
339341

340342
final DiscoveryNodes nodes = clusterState.nodes();
341-
BiFunction<String, String, Transport.Connection> connectionLookup = (clusterName, nodeId) -> {
342-
final DiscoveryNode discoveryNode = clusterName == null ? nodes.get(nodeId) : remoteConnections.apply(clusterName, nodeId);
343+
BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),
344+
nodes::get, remoteConnections, searchTransportService::getConnection);
345+
boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
346+
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
347+
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
348+
}
349+
350+
static BiFunction<String, String, Transport.Connection> buildConnectionLookup(String requestClusterAlias,
351+
Function<String, DiscoveryNode> localNodes,
352+
BiFunction<String, String, DiscoveryNode> remoteNodes,
353+
BiFunction<String, DiscoveryNode, Transport.Connection> nodeToConnection) {
354+
return (clusterAlias, nodeId) -> {
355+
final DiscoveryNode discoveryNode;
356+
if (clusterAlias == null || requestClusterAlias != null) {
357+
assert requestClusterAlias == null || requestClusterAlias.equals(clusterAlias);
358+
discoveryNode = localNodes.apply(nodeId);
359+
} else {
360+
discoveryNode = remoteNodes.apply(clusterAlias, nodeId);
361+
}
343362
if (discoveryNode == null) {
344363
throw new IllegalStateException("no node found for id: " + nodeId);
345364
}
346-
return searchTransportService.getConnection(clusterName, discoveryNode);
365+
return nodeToConnection.apply(clusterAlias, discoveryNode);
347366
};
348-
boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
349-
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
350-
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
351367
}
352368

353-
private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupShardsIterator<SearchShardIterator> shardIterators) {
369+
private static boolean shouldPreFilterSearchShards(SearchRequest searchRequest,
370+
GroupShardsIterator<SearchShardIterator> shardIterators) {
354371
SearchSourceBuilder source = searchRequest.source();
355372
return searchRequest.searchType() == QUERY_THEN_FETCH && // we can't do this for DFS it needs to fan out to all shards all the time
356373
SearchService.canRewriteToMatchNone(source) &&
@@ -359,10 +376,11 @@ private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupSh
359376

360377
static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,
361378
OriginalIndices localIndices,
379+
@Nullable String localClusterAlias,
362380
List<SearchShardIterator> remoteShardIterators) {
363381
List<SearchShardIterator> shards = new ArrayList<>(remoteShardIterators);
364382
for (ShardIterator shardIterator : localShardsIterator) {
365-
shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
383+
shards.add(new SearchShardIterator(localClusterAlias, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
366384
}
367385
return new GroupShardsIterator<>(shards);
368386
}

server/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ static String buildScrollId(AtomicArray<? extends SearchPhaseResult> searchPhase
4444
out.writeLong(searchPhaseResult.getRequestId());
4545
SearchShardTarget searchShardTarget = searchPhaseResult.getSearchShardTarget();
4646
if (searchShardTarget.getClusterAlias() != null) {
47-
out.writeString(RemoteClusterAware.buildRemoteIndexName(searchShardTarget.getClusterAlias(),
48-
searchShardTarget.getNodeId()));
47+
out.writeString(
48+
RemoteClusterAware.buildRemoteIndexName(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()));
4949
} else {
5050
out.writeString(searchShardTarget.getNodeId());
5151
}

server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ final class DefaultSearchContext extends SearchContext {
159159
DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
160160
Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
161161
IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter, TimeValue timeout,
162-
FetchPhase fetchPhase, String clusterAlias, Version minNodeVersion) {
162+
FetchPhase fetchPhase, Version minNodeVersion) {
163163
this.id = id;
164164
this.request = request;
165165
this.fetchPhase = fetchPhase;
@@ -179,7 +179,7 @@ final class DefaultSearchContext extends SearchContext {
179179
this.timeout = timeout;
180180
this.minNodeVersion = minNodeVersion;
181181
queryShardContext = indexService.newQueryShardContext(request.shardId().id(), searcher.getIndexReader(), request::nowInMillis,
182-
clusterAlias);
182+
shardTarget.getClusterAlias());
183183
queryShardContext.setTypes(request.types());
184184
queryBoost = request.indexBoost();
185185
}

0 commit comments

Comments
 (0)