Skip to content
Closed
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/

boolean bwc_tests_enabled = true
final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
boolean bwc_tests_enabled = false
final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/40177" /* place a PR link here when committing bwc changes */
if (bwc_tests_enabled == false) {
if (bwc_tests_disabled_issue.isEmpty()) {
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")
Expand Down
28 changes: 14 additions & 14 deletions docs/reference/modules/cross-cluster-search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
The _{ccs}_ feature allows any node to act as a federated client across
multiple clusters. A {ccs} node won't join the remote cluster, instead
it connects to a remote cluster in a light fashion in order to execute
federated search requests.
federated search requests. For details on communication and compatibility
between different clusters, see <<modules-remote-clusters>>.

[float]
=== Using {ccs}
Expand Down Expand Up @@ -43,7 +44,8 @@ PUT _cluster/settings
// TEST[s/127.0.0.1:9300/\${transport_host}/]

To search the `twitter` index on remote cluster `cluster_one` the index name
must be prefixed with the cluster alias separated by a `:` character:
must be prefixed with the alias of the remote cluster followed by the `:`
character:

[source,js]
--------------------------------------------------
Expand Down Expand Up @@ -104,7 +106,7 @@ GET /cluster_one:twitter/_search
// TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/]


Indices can also be searched with the same name on different clusters:
Indices with the same name on different clusters can also be searched:

[source,js]
--------------------------------------------------
Expand All @@ -120,10 +122,10 @@ GET /cluster_one:twitter,twitter/_search
// CONSOLE
// TEST[continued]

Search results are disambiguated the same way as the indices are disambiguated in the request. Even if index names are
identical these indices will be treated as different indices when results are merged. All results retrieved from a
remote index
will be prefixed with their remote cluster name:
Search results are disambiguated the same way as the indices are disambiguated in the request.
Indices with same names are treated as different indices when results are merged. All results
retrieved from an index located in a remote cluster are prefixed with their corresponding
cluster alias:

[source,js]
--------------------------------------------------
Expand Down Expand Up @@ -185,10 +187,11 @@ will be prefixed with their remote cluster name:
[float]
=== Skipping disconnected clusters

By default all remote clusters that are searched via {ccs} need to be available when
the search request is executed, otherwise the whole request fails and no search results are returned
despite some of the clusters are available. Remote clusters can be made optional through the
boolean `skip_unavailable` setting, set to `false` by default.
By default, all remote clusters that are searched via {ccs} need to be
available when the search request is executed. Otherwise, the whole request
fails; even if some of the clusters are available, no search results are
returned. You can use the boolean `skip_unavailable` setting to make remote
clusters optional. By default, it is set to `false`.

[source,js]
--------------------------------
Expand Down Expand Up @@ -312,6 +315,3 @@ The <<search-request-body, search API>> supports the `ccs_minimize_roundtrips`
parameter, which defaults to `true` and can be set to `false` in case
minimizing network round-trips is not desirable.

Note that all the communication between the nodes, regardless of which cluster
they belong to and the selected reduce mode, happens through the
<<modules-transport,transport layer>>.
54 changes: 43 additions & 11 deletions docs/reference/modules/remote-clusters.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,55 @@ connections to a remote cluster. This functionality is used in
endif::[]

Remote cluster connections work by configuring a remote cluster and connecting
only to a limited number of nodes in the remote cluster. Each remote cluster is
referenced by a name and a list of seed nodes. When a remote cluster is
registered, its cluster state is retrieved from one of the seed nodes so that by
default up to three _gateway nodes_ are selected to be connected to as part of
remote cluster requests. Remote cluster connections consist of uni-directional
connections from the coordinating node to the previously selected remote nodes
only. You can tag which nodes should be selected by using node attributes (see <<remote-cluster-settings>>).
only to a limited number of nodes in that remote cluster. Each remote cluster
is referenced by a name and a list of seed nodes. When a remote cluster is
registered, its cluster state is retrieved from one of the seed nodes and up
to three _gateway nodes_ are selected to be connected to as part of remote
cluster requests. All the communication required between different clusters
goes through the <<modules-transport,transport layer>>. Remote cluster
connections consist of uni-directional connections from the coordinating
node to the selected remote _gateway nodes_ only.

Each node in a cluster that has remote clusters configured connects to one or
more _gateway nodes_ and uses them to federate requests to the remote cluster.
[float]
[[gateway-nodes-selection]]
=== Gateway nodes selection

The _gateway nodes_ selection depends on the following criteria:

- *version*: Remote nodes must be compatible with the cluster they are
registered to. This is subject to the same rules as <<rolling-upgrades>>.
Any node can communicate with any other node on the same major version (e.g.
6.0 can talk to any 6.x node). Only nodes on the last minor version of a
certain major version can communicate with nodes on the following major
version (e.g. 6.7 can communicate with 7.0, as well as any 7.x node, while
6.6 or earlier cannot talk to any 7.x node). Note that version compatibility
is symmetric, meaning that if 6.7 can communicate with 7.0, 7.0 can also
communicate with 6.7. The matrix below summarizes compatibility as described
above.

[cols="^,^,^,^,^,^"]
|====
| Compatibility | 5.0->5.5 | 5.6 | 6.0->6.6 | 6.7 | 7.x
| 5.0->5.5 | Yes | Yes | No | No | No
| 5.6 | Yes | Yes | Yes | Yes | No
| 6.0->6.6 | No | Yes | Yes | Yes | No
| 6.7 | No | Yes | Yes | Yes | Yes
| 7.x | No | No | No | Yes | Yes
|====

- *role*: Dedicated master nodes never get selected.
- *attributes*: You can tag which nodes should be selected
(see <<remote-cluster-settings>>), though such tagged nodes still have
to satisfy the two above requirements.

[float]
[[configuring-remote-clusters]]
=== Configuring remote clusters

You can configure remote clusters globally by using
<<cluster-update-settings,cluster settings>>, which you can update dynamically.
Alternatively, you can configure them locally on individual nodes by using the `elasticsearch.yml` file.
Alternatively, you can configure them locally on individual nodes by using the
`elasticsearch.yml` file.

If you specify the settings in `elasticsearch.yml` files, only the nodes with
those settings can connect to the remote cluster. In other words, functionality
Expand Down Expand Up @@ -59,7 +90,8 @@ between local and remote indices.
For more information about the optional transport settings, see
<<modules-transport>>.

If you use <<cluster-update-settings,cluster settings>>, the remote clusters are available on every node in the cluster. For example:
If you use <<cluster-update-settings,cluster settings>>, the remote clusters
are available on every node in the cluster. For example:

[source,js]
--------------------------------
Expand Down
4 changes: 4 additions & 0 deletions docs/reference/modules/snapshots.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ back up a cluster is by using the snapshot and restore functionality.
[float]
=== Version compatibility

IMPORTANT: Version compatibility refers to the underlying Lucene index
compatibility. Follow the <<setup-upgrade,Upgrade documentation>>
when migrating between versions.

A snapshot contains a copy of the on-disk data structures that make up an
index. This means that snapshots can only be restored to versions of
Elasticsearch that can read the indices:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
Expand All @@ -66,8 +65,6 @@
import java.util.Map;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public final class SearchPhaseController {

Expand Down Expand Up @@ -491,8 +488,8 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
}
final Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
firstResult.pipelineAggregators(), reduceContext);
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
InternalAggregations.reduce(aggregationsList, firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
final TotalHits totalHits = topDocsStats.getTotalHits();
Expand All @@ -501,32 +498,6 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
firstResult.sortValueFormats(), numReducePhases, size, from, false);
}

/**
* Performs an intermediate reduce phase on the aggregations. For instance with this reduce phase never prune information
* that relevant for the final reduce step. For final reduce see {@link #reduceAggs(List, List, ReduceContext)}
*/
private InternalAggregations reduceAggsIncrementally(List<InternalAggregations> aggregationsList) {
ReduceContext reduceContext = reduceContextFunction.apply(false);
return aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
null, reduceContext);
}

private static InternalAggregations reduceAggs(List<InternalAggregations> aggregationsList,
List<SiblingPipelineAggregator> pipelineAggregators, ReduceContext reduceContext) {
InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
if (pipelineAggregators != null) {
List<InternalAggregation> newAggs = StreamSupport.stream(aggregations.spliterator(), false)
.map((p) -> (InternalAggregation) p)
.collect(Collectors.toList());
for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) {
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext);
newAggs.add(newAgg);
}
return new InternalAggregations(newAggs);
}
return aggregations;
}

public static final class ReducedQueryPhase {
// the sum of all hits across all reduces shards
final TotalHits totalHits;
Expand Down Expand Up @@ -646,7 +617,8 @@ public void consumeResult(SearchPhaseResult result) {
private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
if (index == bufferSize) {
if (hasAggs) {
InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(aggsBuffer));
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext);
Arrays.fill(aggsBuffer, null);
aggsBuffer[0] = reducedAggs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,16 @@ SearchResponse getMergedResponse(Clusters clusters) {
assert trackTotalHits == null || trackTotalHits;
trackTotalHits = true;
}

TopDocs topDocs = searchHitsToTopDocs(searchHits, totalHits, shards);
topDocsStats.add(new TopDocsAndMaxScore(topDocs, searchHits.getMaxScore()),
searchResponse.isTimedOut(), searchResponse.isTerminatedEarly());
topDocsList.add(topDocs);
if (searchHits.getHits().length > 0) {
//there is no point in adding empty search hits and merging them with the others. Also, empty search hits always come
//without sort fields and collapse info, despite sort by field and/or field collapsing was requested, which causes
//issues reconstructing the proper TopDocs instance and breaks mergeTopDocs which expects the same type for each result.
topDocsList.add(topDocs);
}
}

//after going through all the hits and collecting all their distinct shards, we can assign shardIndex and set it to the ScoreDocs
Expand Down Expand Up @@ -297,12 +303,17 @@ private static void setShardIndex(Map<ShardIdAndClusterAlias, Integer> shards, L
}

private static SearchHits topDocsToSearchHits(TopDocs topDocs, TopDocsStats topDocsStats) {
SearchHit[] searchHits = new SearchHit[topDocs.scoreDocs.length];
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
FieldDocAndSearchHit scoreDoc = (FieldDocAndSearchHit)topDocs.scoreDocs[i];
searchHits[i] = scoreDoc.searchHit;
SearchHit[] searchHits;
if (topDocs == null) {
//merged TopDocs is null whenever all clusters have returned empty hits
searchHits = new SearchHit[0];
} else {
searchHits = new SearchHit[topDocs.scoreDocs.length];
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
FieldDocAndSearchHit scoreDoc = (FieldDocAndSearchHit)topDocs.scoreDocs[i];
searchHits[i] = scoreDoc.searchHit;
}
}

SortField[] sortFields = null;
String collapseField = null;
Object[] collapseValues = null;
Expand Down Expand Up @@ -370,7 +381,7 @@ public int compareTo(ShardIdAndClusterAlias o) {
}
int clusterAliasCompareTo = clusterAlias.compareTo(o.clusterAlias);
if (clusterAliasCompareTo != 0) {
//TODO we may want to fix this, CCS returns remote results before local ones (TransportSearchAction#mergeShardsIterators)
//CCS returns remote results before local ones (TransportSearchAction#mergeShardsIterators), fixed from 7.1 on
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
return 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.function.Function;
import java.util.function.LongSupplier;

import static org.elasticsearch.action.search.SearchType.DFS_QUERY_THEN_FETCH;
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;

public class TransportSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
Expand Down Expand Up @@ -248,6 +249,9 @@ static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) {
if (searchRequest.scroll() != null) {
return false;
}
if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) {
return false;
}
SearchSourceBuilder source = searchRequest.source();
return source == null || source.collapse() == null || source.collapse().getInnerHits() == null ||
source.collapse().getInnerHits().isEmpty();
Expand Down Expand Up @@ -311,8 +315,6 @@ public void onFailure(Exception e) {
if (localIndices != null) {
ActionListener<SearchResponse> ccsListener = createCCSListener(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
false, countDown, skippedClusters, exceptions, searchResponseMerger, totalClusters, listener);
//here we provide the empty string a cluster alias, which means no prefix in index name,
//but the coord node will perform non final reduce as it's not null.
SearchRequest ccsLocalSearchRequest = SearchRequest.withLocalReduction(searchRequest, localIndices.indices(),
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
localSearchConsumer.accept(ccsLocalSearchRequest, ccsListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ public AggParseContext(String name) {
}
}

public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory<?>[0],
new ArrayList<PipelineAggregationBuilder>());
public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory<?>[0], new ArrayList<>());

private AggregatorFactory<?>[] factories;
private List<PipelineAggregationBuilder> pipelineAggregatorFactories;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ReduceContext(BigArrays bigArrays, ScriptService scriptService, IntConsum
/**
* Returns <code>true</code> iff the current reduce phase is the final reduce phase. This indicates if operations like
* pipeline aggregations should be applied or if specific features like {@code minDocCount} should be taken into account.
* Operations that are potentially loosing information can only be applied during the final reduce phase.
* Operations that are potentially losing information can only be applied during the final reduce phase.
*/
public boolean isFinalReduce() {
return isFinalReduce;
Expand Down
Loading