Skip to content

Commit b768fbf

Browse files
committed
Rollup V2 Search Resolution Setup
this commit introduces initial search-time indices resolution POC for the rollup indices and how it interacts with DataStream. needs to be put behind feature-flag for rollups (TODO) relates #42720.
1 parent 7a890e8 commit b768fbf

File tree

7 files changed

+303
-43
lines changed

7 files changed

+303
-43
lines changed

server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
5959

6060
private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory;
6161
private final GroupShardsIterator<SearchShardIterator> shardsIts;
62+
private final boolean preFilterRollup;
6263

6364
CanMatchPreFilterSearchPhase(Logger logger, SearchTransportService searchTransportService,
6465
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
@@ -68,13 +69,14 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
6869
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
6970
TransportSearchAction.SearchTimeProvider timeProvider, ClusterState clusterState,
7071
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
71-
SearchResponse.Clusters clusters) {
72+
SearchResponse.Clusters clusters, boolean preFilterRollup) {
7273
//We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
7374
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
7475
executor, request, listener, shardsIts, timeProvider, clusterState, task,
7576
new CanMatchSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters);
7677
this.phaseFactory = phaseFactory;
7778
this.shardsIts = shardsIts;
79+
this.preFilterRollup = preFilterRollup;
7880
}
7981

8082
@Override

server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
365365
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);
366366

367367
// this is cheap, it does not fetch during the rewrite phase, so we can let it quickly execute on a networking thread
368+
// TODO(talevy): update request here potentially to pass parameters to the can match phase to say whether to filter rollup or not
368369
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchRequest::new,
369370
(request, channel, task) -> {
370371
searchService.canMatch(request, new ChannelActionListener<>(channel, QUERY_CAN_MATCH_NAME, request));

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,11 @@ long buildTookInMillis() {
218218

219219
@Override
220220
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
221-
executeRequest(task, searchRequest, this::searchAsyncAction, listener);
221+
executeRequest(task, searchRequest, (task1, searchRequest1, executor, shardIterators, timeProvider, connectionLookup,
222+
clusterState, aliasFilter, concreteIndexBoosts, indexRoutings, listener1, preFilter,
223+
preFilterRollup, clusters, threadPool1) -> searchAsyncAction(task1, searchRequest1, executor,
224+
shardIterators, timeProvider, connectionLookup, clusterState, aliasFilter, concreteIndexBoosts, indexRoutings, listener1,
225+
preFilter, preFilterRollup, threadPool1, clusters), listener);
222226
}
223227

224228
public interface SinglePhaseSearchAction {
@@ -236,7 +240,8 @@ public AbstractSearchAsyncAction<? extends SearchPhaseResult> asyncSearchAction(
236240
SearchTimeProvider timeProvider, BiFunction<String, String, Transport.Connection> connectionLookup,
237241
ClusterState clusterState, Map<String, AliasFilter> aliasFilter,
238242
Map<String, Float> concreteIndexBoosts, Map<String, Set<String>> indexRoutings,
239-
ActionListener<SearchResponse> listener, boolean preFilter, ThreadPool threadPool, SearchResponse.Clusters clusters) {
243+
ActionListener<SearchResponse> listener, boolean preFilter, boolean preFilterRollup, SearchResponse.Clusters clusters,
244+
ThreadPool threadPool) {
240245
return new AbstractSearchAsyncAction<>(
241246
actionName, logger, searchTransportService, connectionLookup, aliasFilter, concreteIndexBoosts,
242247
indexRoutings, executor, searchRequest, listener, shardsIts, timeProvider, clusterState, task,
@@ -260,7 +265,7 @@ public void run() {
260265
}
261266

262267
@Override
263-
boolean buildPointInTimeFromSearchResults() {
268+
boolean includeSearchContextInResponse() {
264269
return includeSearchContext;
265270
}
266271
};
@@ -673,7 +678,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
673678
searchAsyncActionProvider.asyncSearchAction(
674679
task, searchRequest, asyncSearchExecutor, shardIterators, timeProvider, connectionLookup, clusterState,
675680
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, indexRoutings, listener,
676-
preFilterSearchShards, threadPool, clusters).start();
681+
preFilterSearchShards, false, clusters, threadPool).start();
677682
}
678683

679684
Executor asyncSearchExecutor(final String[] indices, final ClusterState clusterState) {
@@ -719,9 +724,13 @@ static boolean shouldPreFilterSearchShards(ClusterState clusterState,
719724
} else if (preFilterShardSize == null) {
720725
preFilterShardSize = SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE;
721726
}
722-
return searchRequest.searchType() == QUERY_THEN_FETCH // we can't do this for DFS it needs to fan out to all shards all the time
723-
&& (SearchService.canRewriteToMatchNone(source) || hasPrimaryFieldSort(source))
724-
&& preFilterShardSize < numShards;
727+
728+
boolean shouldFilterSearchShardsByQuery =
729+
searchRequest.searchType() == QUERY_THEN_FETCH // we can't do this for DFS it needs to fan out to all shards all the time
730+
&& (SearchService.canRewriteToMatchNone(source) || hasPrimaryFieldSort(source))
731+
&& preFilterShardSize < numShards;
732+
boolean shouldFilterSearchShardsByRollupGroup = true;
733+
return shouldFilterSearchShardsByQuery || shouldFilterSearchShardsByRollupGroup;
725734
}
726735

727736
private static boolean hasReadOnlyIndices(String[] indices, ClusterState clusterState) {
@@ -747,7 +756,7 @@ AbstractSearchAsyncAction<? extends SearchPhaseResult> asyncSearchAction(
747756
SearchTimeProvider timeProvider, BiFunction<String, String, Transport.Connection> connectionLookup,
748757
ClusterState clusterState, Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
749758
Map<String, Set<String>> indexRoutings, ActionListener<SearchResponse> listener, boolean preFilter,
750-
ThreadPool threadPool, SearchResponse.Clusters clusters);
759+
boolean preFilterRollup, SearchResponse.Clusters clusters, ThreadPool threadPool);
751760
}
752761

753762
private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction(
@@ -763,6 +772,7 @@ private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction
763772
Map<String, Set<String>> indexRoutings,
764773
ActionListener<SearchResponse> listener,
765774
boolean preFilter,
775+
boolean preFilterRollup,
766776
ThreadPool threadPool,
767777
SearchResponse.Clusters clusters) {
768778
if (preFilter) {
@@ -782,6 +792,7 @@ private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction
782792
indexRoutings,
783793
listener,
784794
false,
795+
false,
785796
threadPool,
786797
clusters);
787798
return new SearchPhase(action.getName()) {
@@ -790,7 +801,7 @@ public void run() {
790801
action.start();
791802
}
792803
};
793-
}, clusters);
804+
}, clusters, preFilterRollup);
794805
} else {
795806
final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults(executor,
796807
circuitBreaker, task.getProgressListener(), searchRequest, shardIterators.size(),
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.rollup;
21+
22+
import org.apache.logging.log4j.LogManager;
23+
import org.apache.logging.log4j.Logger;
24+
import org.apache.lucene.index.Term;
25+
import org.apache.lucene.search.Query;
26+
import org.apache.lucene.search.QueryVisitor;
27+
import org.elasticsearch.cluster.metadata.IndexAbstraction;
28+
import org.elasticsearch.cluster.metadata.IndexMetadata;
29+
import org.elasticsearch.cluster.metadata.RollupGroup;
30+
import org.elasticsearch.cluster.metadata.RollupMetadata;
31+
import org.elasticsearch.common.Rounding;
32+
import org.elasticsearch.index.query.QueryBuilder;
33+
import org.elasticsearch.index.query.QueryShardContext;
34+
import org.elasticsearch.search.aggregations.AggregationBuilder;
35+
import org.elasticsearch.search.aggregations.AggregationInitializationException;
36+
import org.elasticsearch.search.aggregations.AggregatorFactories;
37+
import org.elasticsearch.search.aggregations.AggregatorFactory;
38+
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
39+
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
40+
import org.elasticsearch.search.aggregations.support.AggregationContext;
41+
42+
import java.io.IOException;
43+
import java.time.ZoneId;
44+
import java.time.ZoneOffset;
45+
import java.util.Map;
46+
import java.util.SortedMap;
47+
48+
public class RollupShardDecider {
49+
private static final Logger logger = LogManager.getLogger(RollupShardDecider.class);
50+
51+
public static boolean shouldMatchRollup(QueryShardContext context,
52+
QueryBuilder queryBuilder,
53+
AggregatorFactories.Builder aggFactoryBuilders,
54+
RollupMetadata rollupMetadata,
55+
Map<String, String> indexRollupMetadata,
56+
IndexMetadata requestIndexMetadata,
57+
String[] indices,
58+
SortedMap<String, IndexAbstraction> indexLookup) {
59+
// TODO(talevy): currently assumes that all indices part of the same rollup group are searched as well, since this is the case
60+
// for data-streams. should not be here in the non-datastream case...
61+
62+
if (aggFactoryBuilders == null && indexRollupMetadata != null) {
63+
return false;
64+
} else if (aggFactoryBuilders != null && rollupMetadata != null && indexRollupMetadata != null && queryBuilder != null) {
65+
Query query = context.toQuery(queryBuilder).query();
66+
try {
67+
AggregatorFactory[] factories = aggFactoryBuilders
68+
.build(new AggregationContext.ProductionAggregationContext(context, query), null)
69+
.factories();
70+
} catch (IOException e) {
71+
throw new AggregationInitializationException("Failed to create aggregators, shard not supported", e);
72+
}
73+
74+
if (queryShard(rollupMetadata, indices, requestIndexMetadata, aggFactoryBuilders) == false) {
75+
return false;
76+
}
77+
// do something with query?
78+
query.visit(new QueryVisitor() {
79+
@Override
80+
public void consumeTerms(Query query, Term... terms) {
81+
super.consumeTerms(query, terms);
82+
}
83+
});
84+
} else if (aggFactoryBuilders != null && rollupMetadata != null) {
85+
if (queryShard(rollupMetadata, indices, requestIndexMetadata, aggFactoryBuilders) == false) {
86+
return false;
87+
}
88+
}
89+
90+
return true;
91+
}
92+
93+
static boolean queryShard(RollupMetadata rollupMetadata, String[] indices, IndexMetadata requestIndexMetadata,
94+
AggregatorFactories.Builder aggFactoryBuilders) {
95+
String requestIndexName = requestIndexMetadata.getIndex().getName();
96+
Map<String, String> indexRollupMetadata = requestIndexMetadata.getCustomData(RollupMetadata.TYPE);
97+
boolean isRollupIndex = requestIndexMetadata.getCustomData(RollupMetadata.TYPE) != null;
98+
final String originalIndexName;
99+
final RollupGroup rollupGroup;
100+
if (isRollupIndex) {
101+
// rollup is being searched
102+
originalIndexName = indexRollupMetadata.get(RollupMetadata.SOURCE_INDEX_NAME_META_FIELD);
103+
rollupGroup = rollupMetadata.rollupGroups().get(originalIndexName);
104+
} else if (rollupMetadata.contains(requestIndexName)) {
105+
originalIndexName = requestIndexName;
106+
rollupGroup = rollupMetadata.rollupGroups().get(requestIndexName);
107+
} else {
108+
// not part of a rollup group, search away!
109+
return true;
110+
}
111+
String bestIndexForAgg = bestIndexForAgg(originalIndexName, rollupGroup, aggFactoryBuilders);
112+
logger.warn("original index: " + originalIndexName);
113+
logger.warn("best index: " + bestIndexForAgg);
114+
return requestIndexName.equals(bestIndexForAgg);
115+
}
116+
117+
static String bestIndexForAgg(String originalIndexName, RollupGroup rollupGroup, AggregatorFactories.Builder aggFactoryBuilders) {
118+
for (AggregationBuilder builder : aggFactoryBuilders.getAggregatorFactories()) {
119+
if (builder.getWriteableName().equals(DateHistogramAggregationBuilder.NAME)) {
120+
return bestIndexForDateHisto(originalIndexName, rollupGroup, (DateHistogramAggregationBuilder) builder);
121+
}
122+
}
123+
return originalIndexName;
124+
}
125+
126+
static String bestIndexForDateHisto(String originalIndexName, RollupGroup rollupGroup, DateHistogramAggregationBuilder source) {
127+
DateHistogramInterval maxInterval = null;
128+
String bestIndex = originalIndexName;
129+
for (String rollupIndex : rollupGroup.getIndices()) {
130+
DateHistogramInterval thisInterval = rollupGroup.getDateInterval(rollupIndex);
131+
ZoneId thisTimezone = rollupGroup.getDateTimezone(rollupIndex).zoneId();
132+
133+
ZoneId sourceTimeZone = source.timeZone() == null ? ZoneOffset.UTC : ZoneId.of(source.timeZone().toString(), ZoneId.SHORT_IDS);
134+
DateHistogramInterval sourceInterval = source.getCalendarInterval();
135+
136+
if (thisTimezone.getRules().equals(sourceTimeZone.getRules()) == false) {
137+
continue;
138+
}
139+
if (validateCalendarInterval(sourceInterval, thisInterval)) {
140+
if (maxInterval == null) {
141+
bestIndex = rollupIndex;
142+
maxInterval = thisInterval;
143+
} else if (validateCalendarInterval(thisInterval, maxInterval)) {
144+
bestIndex = rollupIndex;
145+
maxInterval = thisInterval;
146+
}
147+
}
148+
}
149+
return bestIndex;
150+
}
151+
152+
static boolean validateCalendarInterval(DateHistogramInterval requestInterval,
153+
DateHistogramInterval configInterval) {
154+
if (requestInterval == null || configInterval == null) {
155+
return false;
156+
}
157+
158+
// The request must be gte the config. The CALENDAR_ORDERING map values are integers representing
159+
// relative orders between the calendar units
160+
Rounding.DateTimeUnit requestUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(requestInterval.toString());
161+
if (requestUnit == null) {
162+
return false;
163+
}
164+
Rounding.DateTimeUnit configUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(configInterval.toString());
165+
if (configUnit == null) {
166+
return false;
167+
}
168+
169+
long requestOrder = requestUnit.getField().getBaseUnit().getDuration().toMillis();
170+
long configOrder = configUnit.getField().getBaseUnit().getDuration().toMillis();
171+
172+
// All calendar units are multiples naturally, so we just care about gte
173+
return requestOrder >= configOrder;
174+
}
175+
}

0 commit comments

Comments
 (0)