diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index e3a539a58b837..bf7afbe67d771 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -72,7 +72,7 @@ abstract class AbstractSearchAsyncAction exten private final BiFunction nodeIdToConnection; private final SearchTask task; protected final SearchPhaseResults results; - private final ClusterState clusterState; + protected final ClusterState clusterState; private final Map aliasFilter; private final Map concreteIndexBoosts; private final SetOnce> shardFailures = new SetOnce<>(); diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index bb7885de56d05..9f2b989ac0b97 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -11,10 +11,13 @@ import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.index.query.CoordinatorRewriteContext; import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; +import org.elasticsearch.rollup.RollupV2; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchService.CanMatchResponse; import org.elasticsearch.search.SearchShardTarget; @@ -27,6 +30,7 @@ import org.elasticsearch.transport.Transport; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -107,6 +111,11 @@ private GroupShardsIterator getIterator(CanMatchSearchPhase } possibleMatches.set(shardIndexToQuery); } + + if (RollupV2.isEnabled()) { + possibleMatches = chooseOptimalRollupShards(results, shardsIts, possibleMatches); + } + SearchSourceBuilder source = getRequest().source(); int i = 0; for (SearchShardIterator iter : shardsIts) { @@ -123,6 +132,69 @@ private GroupShardsIterator getIterator(CanMatchSearchPhase return new GroupShardsIterator<>(sortShards(shardsIts, results.minAndMaxes, fieldSort.order())); } + /** + * Iterate over the shard iterator and search for overlapping rollup indices. If rollup indices exist, + * the method includes the shards from the optimal rollup index that can match the query and ignores + * the shards from other overlapping indices. + * + * @return A {@link FixedBitSet} with the shards of the optimal indices. + */ + private FixedBitSet chooseOptimalRollupShards(CanMatchSearchPhaseResults results, + GroupShardsIterator shardsIts, + FixedBitSet possibleMatches) { + // Map with key the index UUID that will be replaced by an optimal rollup index. The optimal index + // will be its substitute. The value is a tuple containing the name of the substitute index + // and its priority. In the end all indices contained in the keys will be ignored. + Map> indexSubstitute = new HashMap<>(shardsIts.size()); + + int i = 0; + for (SearchShardIterator iter : shardsIts) { + CanMatchResponse result = results.getResult(i++); + if (result.canMatch()) { + String indexName = iter.shardId().getIndexName(); + String indexUuid = iter.shardId().getIndex().getUUID(); + Long priority = result.priority(); + String sourceIndexUuid = result.sourceIndexUuid(); + IndexAbstraction originalIndex = clusterState.getMetadata().getIndicesLookup().get(indexName); + + // If index is not a member of a data stream or is not a rollup index, it will not be replaced + // Also, if an index has already been marked to be replaced, it should be skipped. + if (originalIndex.getParentDataStream() == null || sourceIndexUuid == null || priority == null + || indexSubstitute.containsKey(indexUuid)) { + continue; + } + + if (indexSubstitute.containsKey(sourceIndexUuid)) { + // Retrieve the previously optimal index and compare it with the current index + // Find a new optimal index and replace source index and suboptimal rollup index + // with the new optimal index. + Tuple previousOptimalIndex = indexSubstitute.get(sourceIndexUuid); + String newOptimalIndex = previousOptimalIndex.v2() >= priority ? previousOptimalIndex.v1() : indexUuid; + Tuple optimalTuple = Tuple.tuple(newOptimalIndex, Math.max(priority, previousOptimalIndex.v2())); + + // Replace original index with the new optimal index + indexSubstitute.put(sourceIndexUuid, optimalTuple); + + // Replace suboptimal index with the new optimal index + String suboptimalIndex = indexUuid.equals(newOptimalIndex) == false ? indexUuid : previousOptimalIndex.v1(); + indexSubstitute.put(suboptimalIndex, optimalTuple); + } else { + indexSubstitute.put(sourceIndexUuid, Tuple.tuple(indexUuid, priority)); + } + } + } + + FixedBitSet newMatches = possibleMatches.clone(); + i = 0; + for (SearchShardIterator iter : shardsIts) { + if (newMatches.get(i) && indexSubstitute.containsKey(iter.shardId().getIndex().getUUID())) { + newMatches.clear(i); + } + i++; + } + return newMatches; + } + @Override protected void performPhaseOnShard(int shardIndex, SearchShardIterator shardIt, SearchShardTarget shard) { CoordinatorRewriteContext coordinatorRewriteContext = @@ -193,17 +265,19 @@ private static final class CanMatchSearchPhaseResults extends SearchPhaseResults private final FixedBitSet possibleMatches; private final MinAndMax[] minAndMaxes; private int numPossibleMatches; + private final CanMatchResponse[] results; CanMatchSearchPhaseResults(int size) { super(size); possibleMatches = new FixedBitSet(size); minAndMaxes = new MinAndMax[size]; + results = new CanMatchResponse[size]; } @Override void consumeResult(CanMatchResponse result, Runnable next) { try { - consumeResult(result.getShardIndex(), result.canMatch(), result.estimatedMinAndMax()); + consumeResult(result.getShardIndex(), result); } finally { next.run(); } @@ -217,15 +291,16 @@ boolean hasResult(int shardIndex) { @Override void consumeShardFailure(int shardIndex) { // we have to carry over shard failures in order to account for them in the response. - consumeResult(shardIndex, true, null); + consumeResult(shardIndex, new CanMatchResponse(true, null, null, null)); } - synchronized void consumeResult(int shardIndex, boolean canMatch, MinAndMax minAndMax) { - if (canMatch) { + synchronized void consumeResult(int shardIndex, CanMatchResponse result) { + if (result.canMatch()) { possibleMatches.set(shardIndex); numPossibleMatches++; } - minAndMaxes[shardIndex] = minAndMax; + minAndMaxes[shardIndex] = result.estimatedMinAndMax(); + results[shardIndex] = result; } synchronized int getNumPossibleMatches() { @@ -240,5 +315,9 @@ synchronized FixedBitSet getPossibleMatches() { Stream getSuccessfulResults() { return Stream.empty(); } + + synchronized CanMatchResponse getResult(int shardIndex) { + return results[shardIndex]; + } } } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 5343acf636342..a214f77ea330e 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -20,6 +20,8 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -44,6 +46,7 @@ import org.elasticsearch.index.query.Rewriteable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.rollup.RollupV2; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; @@ -710,9 +713,26 @@ static boolean shouldPreFilterSearchShards(ClusterState clusterState, } else if (preFilterShardSize == null) { preFilterShardSize = SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE; } + if (RollupV2.isEnabled() && hasRollupDatastream(indices, searchRequest.indices(), clusterState)) { + return true; + } return searchRequest.searchType() == QUERY_THEN_FETCH // we can't do this for DFS it needs to fan out to all shards all the time - && (SearchService.canRewriteToMatchNone(source) || hasPrimaryFieldSort(source)) - && preFilterShardSize < numShards; + && (SearchService.canRewriteToMatchNone(source) || hasPrimaryFieldSort(source)) + && preFilterShardSize < numShards; + } + + private static boolean hasRollupDatastream(String[] indices, String[] requestIndices, ClusterState clusterState) { + Set requestIndicesSet = Set.of(requestIndices); + for (String index : indices) { + IndexAbstraction originalIndex = clusterState.getMetadata().getIndicesLookup().get(index); + DataStream datastream = originalIndex.getParentDataStream() != null + ? originalIndex.getParentDataStream().getDataStream() : null; + IndexMetadata indexMetadata = clusterState.getMetadata().index(index); + if (datastream != null && indexMetadata.isRollupIndex() && requestIndicesSet.contains(index) == false) { + return true; + } + } + return false; } private static boolean hasReadOnlyIndices(String[] indices, ClusterState clusterState) { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 0d364047ddd8f..df12b0bbabfdf 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -577,6 +577,10 @@ public Index getResizeSourceIndex() { public static final Setting INDEX_ROLLUP_SOURCE_NAME = Setting.simpleString(INDEX_ROLLUP_SOURCE_NAME_KEY, Property.IndexScope, Property.PrivateIndex); + public boolean isRollupIndex() { + return INDEX_ROLLUP_SOURCE_UUID.exists(settings); + } + ImmutableOpenMap getCustomData() { return this.customData; } diff --git a/server/src/main/java/org/elasticsearch/rollup/RollupShardDecider.java b/server/src/main/java/org/elasticsearch/rollup/RollupShardDecider.java new file mode 100644 index 0000000000000..65a4c3f40cf8d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/rollup/RollupShardDecider.java @@ -0,0 +1,239 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.rollup; + +import org.elasticsearch.cluster.metadata.IndexAbstraction; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Rounding; +import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; +import org.elasticsearch.search.internal.ShardSearchRequest; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROLLUP_SOURCE_UUID; +public class RollupShardDecider { + + private static final List ALLOWED_INTERVAL_TYPES = List.of("calendar_interval", "fixed_interval"); + private static final Set SUPPORTED_AGGS = Set.of( + DateHistogramAggregationBuilder.NAME, + TermsAggregationBuilder.NAME, + MinAggregationBuilder.NAME, + MaxAggregationBuilder.NAME, + SumAggregationBuilder.NAME, + ValueCountAggregationBuilder.NAME, + AvgAggregationBuilder.NAME + ); + + /** + * Decide if an index can match a query. When indices are rollup indices, all + * requirements for matching the query/aggregation will be considered. + * + * @return The response of the can_match phase. The response is enriched with information + * about the source index that a rollup index came from, as well as the priority assigned + * to rollup indices. + */ + public static SearchService.CanMatchResponse canMatch(ShardSearchRequest request, + SearchExecutionContext context, + IndexMetadata requestIndexMetadata, + SortedMap indexLookup) throws IOException { + IndexAbstraction originalIndex = indexLookup.get(requestIndexMetadata.getIndex().getName()); + String sourceIndexUuid = requestIndexMetadata.getSettings().get(INDEX_ROLLUP_SOURCE_UUID.getKey(), + requestIndexMetadata.getIndex().getUUID()); + + // Index must be member of a data stream and rollup metadata must exist in the index metadata + if (originalIndex.getParentDataStream() == null || requestIndexMetadata.isRollupIndex() == false) { + return new SearchService.CanMatchResponse(true, null, sourceIndexUuid, null); + } + + // A rollup index is being searched in a data stream + if (validateRollupConditions(request, context, requestIndexMetadata) == false) { + return new SearchService.CanMatchResponse(false, null, sourceIndexUuid, null); + } + + // TODO (csoulios): Timezone and interval validations will eventually move to the AggregationBuilder + final AggregatorFactories.Builder aggregations = request.source() != null ? request.source().aggregations() : null; + DateHistogramAggregationBuilder source = getDateHistogramAggregationBuilder(aggregations); + String tsField = source.field(); + + Map> mapingAsMap = + (Map>) requestIndexMetadata.mapping().sourceAsMap().get("properties"); + + if (mapingAsMap == null || mapingAsMap.containsKey(tsField) == false) { + throw new IllegalStateException("Rollup index mapping does not contain time time_zone information"); + } + + Map timestampMeta = (Map) mapingAsMap.get(tsField).get("meta"); + ZoneId rollupTimeZone = timestampMeta.containsKey("time_zone") ? ZoneId.of(timestampMeta.get("time_zone")) : ZoneOffset.UTC; + ZoneId sourceTimeZone = source != null && source.timeZone() != null ? + ZoneId.of(source.timeZone().toString(), ZoneId.SHORT_IDS) : ZoneOffset.UTC; // Default timezone is UTC + + DateHistogramInterval sourceInterval = source != null ? source.getCalendarInterval() : null; + DateHistogramInterval rollupInterval = null; + for (String intervalType : ALLOWED_INTERVAL_TYPES) { + if (timestampMeta.containsKey(intervalType)) { + rollupInterval = new DateHistogramInterval(timestampMeta.get(intervalType)); + break; + } + } + if (rollupInterval == null) { + throw new IllegalStateException("Rollup index mapping does not contain time interval information"); + } + + // Incompatible timezone => skip this rollup index + if (canMatchTimezone(sourceTimeZone, rollupTimeZone) == false + || canMatchCalendarInterval(sourceInterval, rollupInterval) == false) { + return new SearchService.CanMatchResponse(false, null); + } + // Assign index priority to match the rollup interval. Higher intervals have higher priority + // Index priority be evaluated at the coordinator node to select the optimal shard + long priority = rollupInterval.estimateMillis(); + + return new SearchService.CanMatchResponse(true, null, sourceIndexUuid, priority); + } + + /** + * Parses the search request and checks if the required conditions are met so that results + * can be answered by a rollup index. + * + * Conditions that must be met are: + * - Request size must be 0 + * - Metric aggregations must be supported + * - No use of runtime fields in the query + * - Check for any unmapped fields + * + * @param request the search request to parse + * @return true if a rollup index can + */ + private static boolean validateRollupConditions(ShardSearchRequest request, SearchExecutionContext context, + IndexMetadata requestIndexMetadata) { + if (request.source() == null) { + return false; + } + + // If request size is not 0, rollup indices should not match + if (request.source().size() > 0) { + return false; + } + + // Check for supported aggs + AggregatorFactories.Builder aggregations = request.source().aggregations(); + if (checkSupportedAggregations(aggregations) == false){ + return false; + } + + // If runtime fields are used in the query, rollup indices should not match + if (request.getRuntimeMappings().isEmpty() == false) { + return false; + } + + //TODO: Check for unmapped fields + return true; + } + + /** + * Check if requested aggregations are supported by rollup indices + * + * @param aggregations the aggregation builders + * @return true if aggregations are supported by rollups, otherwise false + */ + private static boolean checkSupportedAggregations(AggregatorFactories.Builder aggregations) { + if (aggregations == null) { + return false; + } + + for (AggregationBuilder builder : aggregations.getAggregatorFactories()) { + if (SUPPORTED_AGGS.contains(builder.getWriteableName()) == false) { + return false; + } + } + + return true; + } + + /** + * Parse the aggregator factories and return a date_histogram {@link AggregationBuilder} for the aggregation on rollups + */ + private static DateHistogramAggregationBuilder getDateHistogramAggregationBuilder(AggregatorFactories.Builder aggFactoryBuilders) { + DateHistogramAggregationBuilder dateHistogramBuilder = null; + for (AggregationBuilder builder : aggFactoryBuilders.getAggregatorFactories()) { + if (builder.getWriteableName().equals(DateHistogramAggregationBuilder.NAME)) { + dateHistogramBuilder = (DateHistogramAggregationBuilder) builder; + } + } + return dateHistogramBuilder; + } + + /** + * Validate if a candidate interval can match the required accuracy for a given interval. A candidate interval + * matches the required interval only if it has greater or equal accuracy to the required interval. This means + * that the base unit (1h, 1d, 1M etc) of the candidate interval must be smaller or equal to the base unit + * of the required interval. + * + * @param requiredInterval the required interval to match. If null, all candidateIntervals will match. + * @param candidateInterval the candidate inteval to validate + * @return true if the candidate interval can match the required interval, otherwise false + */ + static boolean canMatchCalendarInterval(DateHistogramInterval requiredInterval, DateHistogramInterval candidateInterval) { + // If no interval is required, any interval should do + if (requiredInterval == null) { + return true; + } + + // If candidate interval is empty, + if (candidateInterval == null) { + return false; + } + + // The request must be gte the config. The CALENDAR_ORDERING map values are integers representing + // relative orders between the calendar units + Rounding.DateTimeUnit requiredIntervalUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(requiredInterval.toString()); + if (requiredIntervalUnit == null) { + return false; + } + Rounding.DateTimeUnit candidateIntervalUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(candidateInterval.toString()); + if (candidateIntervalUnit == null) { + return false; + } + + long requiredIntervalOrder = requiredIntervalUnit.getField().getBaseUnit().getDuration().toMillis(); + long candidateIntervalOrder = candidateIntervalUnit.getField().getBaseUnit().getDuration().toMillis(); + + // All calendar units are multiples naturally, so we just care about gte + return requiredIntervalOrder >= candidateIntervalOrder; + } + + /** + * Check if two timezones are compatible. + * + * @return true if the timezones are compatible, otherwise false + */ + static boolean canMatchTimezone(ZoneId tz1, ZoneId tz2) { + if (tz1 == null || tz2 == null) { + throw new IllegalArgumentException("Timezone cannot be null"); + } + return tz1.getRules().equals(tz2.getRules()); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index a95bc2d82e032..c12b0795ac08e 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedSupplier; @@ -55,8 +56,8 @@ import org.elasticsearch.index.query.MatchNoneQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryRewriteContext; -import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.index.query.Rewriteable; +import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.SearchOperationListener; @@ -65,6 +66,8 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason; import org.elasticsearch.node.ResponseCollectorService; +import org.elasticsearch.rollup.RollupShardDecider; +import org.elasticsearch.rollup.RollupV2; import org.elasticsearch.script.FieldScript; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.aggregations.AggregationInitializationException; @@ -1242,6 +1245,16 @@ private CanMatchResponse canMatch(ShardSearchRequest request, boolean checkRefre if (canMatch || hasRefreshPending) { FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source()); minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null; + + if (RollupV2.isEnabled()) { + IndexMetadata requestIndexMetadata = context.getIndexSettings().getIndexMetadata(); + CanMatchResponse rollupCanMatchResponse = RollupShardDecider.canMatch(request, context, requestIndexMetadata, + clusterService.state().getMetadata().getIndicesLookup()); + + return new CanMatchResponse(rollupCanMatchResponse.canMatch(), minMax, + rollupCanMatchResponse.sourceIndexUuid(), + rollupCanMatchResponse.priority()); + } } else { minMax = null; } @@ -1341,6 +1354,8 @@ private static PipelineTree requestToPipelineTree(SearchRequest request) { public static final class CanMatchResponse extends SearchPhaseResult { private final boolean canMatch; private final MinAndMax estimatedMinAndMax; + private final String sourceIndexUuid; + private final Long priority; public CanMatchResponse(StreamInput in) throws IOException { super(in); @@ -1350,11 +1365,24 @@ public CanMatchResponse(StreamInput in) throws IOException { } else { estimatedMinAndMax = null; } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + sourceIndexUuid = in.readOptionalString(); + priority = in.readOptionalLong(); + } else { + sourceIndexUuid = null; + priority = null; + } } - public CanMatchResponse(boolean canMatch, MinAndMax estimatedMinAndMax) { + public CanMatchResponse(boolean canMatch, MinAndMax estimatedMinAndMax, String sourceIndexUuid, Long priority) { this.canMatch = canMatch; this.estimatedMinAndMax = estimatedMinAndMax; + this.sourceIndexUuid = sourceIndexUuid; + this.priority = priority; + } + + public CanMatchResponse(boolean canMatch, MinAndMax estimatedMinAndMax) { + this(canMatch, estimatedMinAndMax, null, null); } @Override @@ -1363,6 +1391,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_6_0)) { out.writeOptionalWriteable(estimatedMinAndMax); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalString(sourceIndexUuid); + out.writeOptionalLong(priority); + } } public boolean canMatch() { @@ -1372,6 +1404,14 @@ public boolean canMatch() { public MinAndMax estimatedMinAndMax() { return estimatedMinAndMax; } + + public String sourceIndexUuid() { + return sourceIndexUuid; + } + + public Long priority() { + return priority; + } } /** diff --git a/server/src/test/java/org/elasticsearch/rollup/RollupShardDeciderTests.java b/server/src/test/java/org/elasticsearch/rollup/RollupShardDeciderTests.java new file mode 100644 index 0000000000000..32c42bcae59e7 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/rollup/RollupShardDeciderTests.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.rollup; + +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.test.ESTestCase; + +import java.time.ZoneId; + +public class RollupShardDeciderTests extends ESTestCase { + + public void testCanMatchCalendarInterval() { + assertTrue(RollupShardDecider.canMatchCalendarInterval(DateHistogramInterval.HOUR, DateHistogramInterval.HOUR)); + assertTrue(RollupShardDecider.canMatchCalendarInterval(DateHistogramInterval.DAY, DateHistogramInterval.DAY)); + assertTrue(RollupShardDecider.canMatchCalendarInterval(DateHistogramInterval.WEEK, DateHistogramInterval.WEEK)); + assertTrue(RollupShardDecider.canMatchCalendarInterval(DateHistogramInterval.MONTH, DateHistogramInterval.MONTH)); + assertTrue(RollupShardDecider.canMatchCalendarInterval(DateHistogramInterval.YEAR, DateHistogramInterval.YEAR)); + + assertTrue(RollupShardDecider.canMatchCalendarInterval(DateHistogramInterval.MONTH, DateHistogramInterval.DAY)); + assertFalse(RollupShardDecider.canMatchCalendarInterval(DateHistogramInterval.DAY, DateHistogramInterval.MONTH)); + assertTrue(RollupShardDecider.canMatchCalendarInterval(DateHistogramInterval.WEEK, DateHistogramInterval.DAY)); + assertFalse(RollupShardDecider.canMatchCalendarInterval(DateHistogramInterval.DAY, DateHistogramInterval.WEEK)); + assertTrue(RollupShardDecider.canMatchCalendarInterval(DateHistogramInterval.DAY, DateHistogramInterval.HOUR)); + assertFalse(RollupShardDecider.canMatchCalendarInterval(DateHistogramInterval.HOUR, DateHistogramInterval.DAY)); + + assertTrue(RollupShardDecider.canMatchCalendarInterval(null, DateHistogramInterval.HOUR)); + assertFalse(RollupShardDecider.canMatchCalendarInterval(DateHistogramInterval.HOUR, null)); + } + + public void testCanMatchTimezone() { + assertTrue(RollupShardDecider.canMatchTimezone(ZoneId.of("Z"), ZoneId.of("Z"))); + assertTrue(RollupShardDecider.canMatchTimezone(ZoneId.of("Z"), ZoneId.of("UTC"))); + assertTrue(RollupShardDecider.canMatchTimezone(ZoneId.of("UTC"), ZoneId.of("+00:00"))); + assertFalse(RollupShardDecider.canMatchTimezone(ZoneId.of("Europe/Paris"), ZoneId.of("+01:00"))); + assertTrue(RollupShardDecider.canMatchTimezone(ZoneId.of("Europe/Paris"), ZoneId.of("Europe/Paris"))); + assertFalse(RollupShardDecider.canMatchTimezone(ZoneId.of("Europe/Paris"), ZoneId.of("Europe/Athens"))); + assertFalse(RollupShardDecider.canMatchTimezone(ZoneId.of("UTC"), ZoneId.of("+01:00"))); + } + +} diff --git a/x-pack/plugin/rollup/qa/rest/build.gradle b/x-pack/plugin/rollup/qa/rest/build.gradle index 19e595acb0f53..99d87d5cc7891 100644 --- a/x-pack/plugin/rollup/qa/rest/build.gradle +++ b/x-pack/plugin/rollup/qa/rest/build.gradle @@ -7,10 +7,11 @@ import org.elasticsearch.gradle.info.BuildParams apply plugin: 'elasticsearch.yaml-rest-test' - +apply plugin: 'elasticsearch.java-rest-test' dependencies { yamlRestTestImplementation project(path: xpackModule('rollup')) + javaRestTestImplementation project(path: xpackModule('rollup')) } restResources { @@ -20,15 +21,16 @@ restResources { } } -testClusters.all { - testDistribution = 'DEFAULT' - setting 'xpack.license.self_generated.type', 'basic' - systemProperty 'es.rollup_v2_feature_flag_enabled', 'true' -} - tasks.named("test").configure{enabled = false } if (BuildParams.inFipsJvm){ // This test cluster is using a BASIC license and FIPS 140 mode is not supported in BASIC tasks.named("yamlRestTest").configure{enabled = false } + tasks.named("javaRestTest").configure{enabled = false } +} + +testClusters.all { + testDistribution = 'DEFAULT' + setting 'xpack.license.self_generated.type', 'basic' + systemProperty 'es.rollup_v2_feature_flag_enabled', 'true' } diff --git a/x-pack/plugin/rollup/qa/rest/src/javaRestTest/java/org/elasticsearch/rollup/RollupSearchIT.java b/x-pack/plugin/rollup/qa/rest/src/javaRestTest/java/org/elasticsearch/rollup/RollupSearchIT.java new file mode 100644 index 0000000000000..73231ab066933 --- /dev/null +++ b/x-pack/plugin/rollup/qa/rest/src/javaRestTest/java/org/elasticsearch/rollup/RollupSearchIT.java @@ -0,0 +1,644 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.rollup; + +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.test.rest.ESRestTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * This class contains integration tests for searching rollup indices and data streams containing rollups. + */ +public class RollupSearchIT extends ESRestTestCase { + + @SuppressWarnings("unchecked") + public void testSearchRollupDatastream() throws Exception { + Template template = new Template(Settings.builder().put("index.number_of_shards", 1).build(), + new CompressedXContent("{" + + "\"properties\": {\n" + + " \"@timestamp\": { \"type\": \"date\" },\n" + + " \"units\": { \"type\": \"keyword\"} \n" + + "}}"), null); + createComposableTemplate(client(), "logs-template", "logs-foo*", template); + String dataStream = "logs-foo"; + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-04T12:10:30Z\", \"temperature\": 27.5, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-08T07:12:25Z\", \"temperature\": 28.1, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-02-02T11:05:37Z\", \"temperature\": 29.2, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-02-10T08:05:20Z\", \"temperature\": 19.5, \"units\": \"celsius\" }"); + rolloverMaxOneDocCondition(client(), dataStream); + String firstGenerationIndex = DataStream.getDefaultBackingIndexName(dataStream, 1); + String firstGenerationRollupIndex = ".rollup-" + firstGenerationIndex; + assertBusy(() -> assertThat(indexExists(DataStream.getDefaultBackingIndexName(dataStream, 2)), is(true)), 30, TimeUnit.SECONDS); + rollupIndex(client(), firstGenerationIndex, firstGenerationRollupIndex, "1M", + Set.of("units"), Set.of("temperature"), Set.of("max", "sum", "avg")); + String query = "{\n" + + " \"size\": 0,\n" + + " \"aggs\": {\n" + + " \"monthly_temperatures\": {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"@timestamp\",\n" + + " \"calendar_interval\": \"1M\"\n" + + " },\n" + + " \"aggs\": {\n" + + " \"avg_temperature\": {\n" + + " \"avg\": {\n" + + " \"field\": \"temperature\"\n" + + " }\n" + + " },\n" + + " \"max_temperature\": {\n" + + " \"max\": {\n" + + " \"field\": \"temperature\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + Map response = search(dataStream, query); + Map shards = (Map) response.get("_shards"); + assertThat(shards.get("skipped"), equalTo(1)); + + // Total hits should only contain the rollup docs (and not the live docs + Map hits = (Map) response.get("hits"); + assertThat(hits.get("total"), equalTo(2)); + Map aggs = (Map) response.get("aggregations"); + Map monthlyTemps = (Map) aggs.get("monthly_temperatures"); + List> buckets = (List>) monthlyTemps.get("buckets"); + assertThat(buckets.size(), equalTo(2)); + assertThat(buckets.get(0).get("doc_count"), equalTo(2)); + assertEquals(28.1, (Double) ((Map) buckets.get(0).get("max_temperature")).get("value"), 0.0001); + assertEquals(27.8, (Double) ((Map) buckets.get(0).get("avg_temperature")).get("value"), 0.0001); + assertThat(buckets.get(1).get("doc_count"), equalTo(2)); + + // Index new doc so that we confirm merging live and rollup indices + // The following operation adds a new doc in the first bucket + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-30T12:10:30Z\", \"temperature\": 19.4, \"units\": \"celsius\" }"); + // The following operation adds a new doc in a new bucket + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-03-10T02:10:30Z\", \"temperature\": 23.5, \"units\": \"celsius\" }"); + response = search(dataStream, query); + aggs = (Map) response.get("aggregations"); + monthlyTemps = (Map) aggs.get("monthly_temperatures"); + buckets = (List>) monthlyTemps.get("buckets"); + assertThat(buckets.size(), equalTo(3)); + assertThat(buckets.get(0).get("doc_count"), equalTo(3)); + assertEquals(28.1, (Double) ((Map) buckets.get(0).get("max_temperature")).get("value"), 0.0001); + assertEquals(25.0, (Double) ((Map) buckets.get(0).get("avg_temperature")).get("value"), 0.0001); + assertThat(buckets.get(2).get("doc_count"), equalTo(1)); + assertEquals(23.5, (Double) ((Map) buckets.get(2).get("max_temperature")).get("value"), 0.0001); + assertEquals(23.5, (Double) ((Map) buckets.get(2).get("avg_temperature")).get("value"), 0.0001); + } + + /** + * Test that queries a concrete live index and its concrete rollup index using + * index wildcard in the search. Results from both live and rollup data should be + * returned. + */ + @SuppressWarnings("unchecked") + public void testSearchConcreteRollupIndices() throws Exception { + String liveIndex = "logs-foo"; + Settings settings = Settings.builder().put("index.number_of_shards", 1).build(); + String mapping = + "\"properties\": {\n" + + " \"@timestamp\": { \"type\": \"date\" },\n" + + " \"units\": { \"type\": \"keyword\"}, \n" + + " \"temperature\": { \"type\": \"double\"} \n" + + "}"; + createIndex(liveIndex, settings, mapping); + + indexDocument(client(), liveIndex, + "{ \"@timestamp\": \"2020-01-04T12:10:30Z\", \"temperature\": 27.5, \"units\": \"celsius\" }"); + indexDocument(client(), liveIndex, + "{ \"@timestamp\": \"2020-01-08T07:12:25Z\", \"temperature\": 28.1, \"units\": \"celsius\" }"); + indexDocument(client(), liveIndex, + "{ \"@timestamp\": \"2020-02-02T11:05:37Z\", \"temperature\": 29.2, \"units\": \"celsius\" }"); + indexDocument(client(), liveIndex, + "{ \"@timestamp\": \"2020-02-10T08:05:20Z\", \"temperature\": 19.5, \"units\": \"celsius\" }"); + String rollupIndex = "rollup-" + liveIndex; + + rollupIndex(client(), liveIndex, rollupIndex, "1M", + Set.of("units"), Set.of("temperature"), Set.of("max", "sum", "avg")); + + String query = "{\n" + + " \"size\": 0,\n" + + " \"aggs\": {\n" + + " \"monthly_temperatures\": {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"@timestamp\",\n" + + " \"calendar_interval\": \"1M\"\n" + + " },\n" + + " \"aggs\": {\n" + + " \"avg_temperature\": {\n" + + " \"avg\": {\n" + + " \"field\": \"temperature\"\n" + + " }\n" + + " },\n" + + " \"max_temperature\": {\n" + + " \"max\": {\n" + + " \"field\": \"temperature\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + String indices = "*" + liveIndex + "*"; + Map response = search(indices, query); + Map shards = (Map) response.get("_shards"); + assertThat(shards.get("skipped"), equalTo(0)); + assertThat(shards.get("successful"), equalTo(2)); + Map hits = (Map) response.get("hits"); + assertThat(hits.get("total"), equalTo(6)); // 4 docs in the live index + 2 docs in the rollup index + + Map aggs = (Map) response.get("aggregations"); + Map monthlyTemps = (Map) aggs.get("monthly_temperatures"); + List> buckets = (List>) monthlyTemps.get("buckets"); + assertThat(buckets.size(), equalTo(2)); + assertThat(buckets.get(0).get("doc_count"), equalTo(4)); + assertEquals(28.1, (Double) ((Map) buckets.get(0).get("max_temperature")).get("value"), 0.0001); + assertEquals(27.8, (Double) ((Map) buckets.get(0).get("avg_temperature")).get("value"), 0.0001); + assertThat(buckets.get(1).get("doc_count"), equalTo(4)); + + // Delete the index and run the query again. Now only the rollup index should match the query + deleteIndex(liveIndex); + response = search(indices, query); + hits = (Map) response.get("hits"); + assertThat(hits.get("total"), equalTo(2)); + aggs = (Map) response.get("aggregations"); + monthlyTemps = (Map) aggs.get("monthly_temperatures"); + buckets = (List>) monthlyTemps.get("buckets"); + assertThat(buckets.size(), equalTo(2)); // 2 docs in the rollup index + assertThat(buckets.get(0).get("doc_count"), equalTo(2)); + assertEquals(28.1, (Double) ((Map) buckets.get(0).get("max_temperature")).get("value"), 0.0001); + assertEquals(27.8, (Double) ((Map) buckets.get(0).get("avg_temperature")).get("value"), 0.0001); + assertThat(buckets.get(1).get("doc_count"), equalTo(2)); + } + + @SuppressWarnings("unchecked") + public void testSearchMultipleRollupIntervals() throws Exception { + Template template = new Template(Settings.builder().put("index.number_of_shards", 1).build(), + new CompressedXContent("{" + + "\"properties\": {\n" + + " \"@timestamp\": { \"type\": \"date\" },\n" + + " \"units\": { \"type\": \"keyword\"}, \n" + + " \"temperature\": { \"type\": \"double\"} \n" + + "}}"), null); + createComposableTemplate(client(), "logs-template", "logs-foo*", template); + String dataStream = "logs-foo"; + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-04T12:10:30Z\", \"temperature\": 27.5, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-04T17:12:25Z\", \"temperature\": 28.1, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-17T11:12:25Z\", \"temperature\": 25.4, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-02-02T11:00:37Z\", \"temperature\": 29, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-02-02T11:05:37Z\", \"temperature\": 27, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-02-17T08:05:20Z\", \"temperature\": 19.5, \"units\": \"celsius\" }"); + + rolloverMaxOneDocCondition(client(), dataStream); + String firstGenerationIndex = DataStream.getDefaultBackingIndexName(dataStream, 1); + assertBusy(() -> assertThat(indexExists(DataStream.getDefaultBackingIndexName(dataStream, 2)), is(true)), 30, TimeUnit.SECONDS); + // Rollup daily and monthly intervals + String dailyRollupIndex = ".rollup-daily-" + firstGenerationIndex; + rollupIndex(client(), firstGenerationIndex, dailyRollupIndex, "1d", + Set.of("units"), Set.of("temperature"), Set.of("max", "sum", "avg")); + String monthlyRollupIndex = ".rollup-monthly-" + firstGenerationIndex; + rollupIndex(client(), firstGenerationIndex, monthlyRollupIndex, "1M", + Set.of("units"), Set.of("temperature"), Set.of("max", "sum", "avg")); + + String query = "{\n" + + " \"size\": 0,\n" + + " \"aggs\": {\n" + + " \"temperatures\": {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"@timestamp\",\n" + + " \"calendar_interval\": \"%s\",\n" + + " \"min_doc_count\": 1\n" + + " },\n" + + " \"aggs\": {\n" + + " \"avg_temperature\": {\n" + + " \"avg\": {\n" + + " \"field\": \"temperature\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + Map response = search(dataStream, String.format(query, "1M")); + Map shards = (Map) response.get("_shards"); + assertThat(shards.get("skipped"), equalTo(2)); + + // Total hits should only contain the docs in the monthly rollup index + Map hits = (Map) response.get("hits"); + assertThat(hits.get("total"), equalTo(2)); // 2 docs in the monthly rollup index + Map aggs = (Map) response.get("aggregations"); + Map temps = (Map) aggs.get("temperatures"); + List> buckets = (List>) temps.get("buckets"); + assertThat(buckets.size(), equalTo(2)); + assertThat(buckets.get(0).get("doc_count"), equalTo(3)); + assertEquals(27, (Double) ((Map) buckets.get(0).get("avg_temperature")).get("value"), 0.0001); + assertThat(buckets.get(1).get("doc_count"), equalTo(3)); + + // Search for daily results. It should return the results from daily rollup index + response = search(dataStream, String.format(query, "1d")); + shards = (Map) response.get("_shards"); + assertThat(shards.get("skipped"), equalTo(2)); + hits = (Map) response.get("hits"); + assertThat(hits.get("total"), equalTo(4)); // 4 docs in the daily rollup index + + aggs = (Map) response.get("aggregations"); + temps = (Map) aggs.get("temperatures"); + buckets = (List>) temps.get("buckets"); + assertThat(buckets.size(), equalTo(4)); + assertThat(buckets.get(0).get("doc_count"), equalTo(2)); + assertEquals(27.8, (Double) ((Map) buckets.get(0).get("avg_temperature")).get("value"), 0.0001); + assertThat(buckets.get(1).get("doc_count"), equalTo(1)); + assertThat(buckets.get(2).get("doc_count"), equalTo(2)); + assertThat(buckets.get(3).get("doc_count"), equalTo(1)); + + // Search for hourly results. It should return the results from live index, because there are no + // rollups that satisfy the hourly interval + response = search(dataStream, String.format(query, "1h")); + shards = (Map) response.get("_shards"); + assertThat(shards.get("skipped"), equalTo(2)); + hits = (Map) response.get("hits"); + assertThat(hits.get("total"), equalTo(6)); // 6 docs in the live index + } + + /** + * Test that when querying explitly the indices inside a datastream, no result merging occurs + * and result from all indices are returned. + */ + @SuppressWarnings("unchecked") + public void testSearchConcreteRollupIndicesInDatastream() throws Exception { + Template template = new Template(Settings.builder().put("index.number_of_shards", 1).build(), + new CompressedXContent("{" + + "\"properties\": {\n" + + " \"@timestamp\": { \"type\": \"date\" },\n" + + " \"units\": { \"type\": \"keyword\"}, \n" + + " \"temperature\": { \"type\": \"double\"} \n" + + "}}"), null); + createComposableTemplate(client(), "logs-template", "logs-foo*", template); + String dataStream = "logs-foo"; + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-04T12:10:30Z\", \"temperature\": 27.5, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-04T17:12:25Z\", \"temperature\": 28.1, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-17T11:12:25Z\", \"temperature\": 25.4, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-02-02T11:00:37Z\", \"temperature\": 29, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-02-02T11:05:37Z\", \"temperature\": 27, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-02-17T08:05:20Z\", \"temperature\": 19.5, \"units\": \"celsius\" }"); + + rolloverMaxOneDocCondition(client(), dataStream); + String firstGenerationIndex = DataStream.getDefaultBackingIndexName(dataStream, 1); + assertBusy(() -> assertThat(indexExists(DataStream.getDefaultBackingIndexName(dataStream, 2)), is(true)), 30, TimeUnit.SECONDS); + // Rollup daily and monthly intervals + String dailyRollupIndex = ".rollup-daily-" + firstGenerationIndex; + rollupIndex(client(), firstGenerationIndex, dailyRollupIndex, "1d", + Set.of("units"), Set.of("temperature"), Set.of("max", "sum", "avg")); + String monthlyRollupIndex = ".rollup-monthly-" + firstGenerationIndex; + rollupIndex(client(), firstGenerationIndex, monthlyRollupIndex, "1M", + Set.of("units"), Set.of("temperature"), Set.of("max", "sum", "avg")); + + String query = "{\n" + + " \"size\": 0,\n" + + " \"aggs\": {\n" + + " \"temperatures\": {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"@timestamp\",\n" + + " \"calendar_interval\": \"%s\",\n" + + " \"min_doc_count\": 1\n" + + " },\n" + + " \"aggs\": {\n" + + " \"avg_temperature\": {\n" + + " \"avg\": {\n" + + " \"field\": \"temperature\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + // Query daily rollup index explicitly instead of the data stream. Even if monthly rollups + // index exists, results should be computed on the daily rollups + Map response = search(dailyRollupIndex, String.format(query, "1M")); + Map shards = (Map) response.get("_shards"); + assertThat(shards.get("total"), equalTo(1)); + assertThat(shards.get("skipped"), equalTo(0)); + + // Total hits should contain the docs in the daily rollup index + Map hits = (Map) response.get("hits"); + assertThat(hits.get("total"), equalTo(4)); // 4 docs in the daily rollup index + Map aggs = (Map) response.get("aggregations"); + Map temps = (Map) aggs.get("temperatures"); + List> buckets = (List>) temps.get("buckets"); + assertThat(buckets.size(), equalTo(2)); + assertThat(buckets.get(0).get("doc_count"), equalTo(3)); + assertEquals(27, (Double) ((Map) buckets.get(0).get("avg_temperature")).get("value"), 0.0001); + assertThat(buckets.get(1).get("doc_count"), equalTo(3)); + + // Search for monthly results in the live index. No rollups should be queried. + response = search(firstGenerationIndex, String.format(query, "1M")); + shards = (Map) response.get("_shards"); + assertThat(shards.get("total"), equalTo(1)); + assertThat(shards.get("skipped"), equalTo(0)); + hits = (Map) response.get("hits"); + assertThat(hits.get("total"), equalTo(6)); // 6 docs in the live index + + aggs = (Map) response.get("aggregations"); + temps = (Map) aggs.get("temperatures"); + buckets = (List>) temps.get("buckets"); + assertThat(buckets.size(), equalTo(2)); + assertThat(buckets.get(0).get("doc_count"), equalTo(3)); + assertEquals(27, (Double) ((Map) buckets.get(0).get("avg_temperature")).get("value"), 0.0001); + assertThat(buckets.get(1).get("doc_count"), equalTo(3)); + + // Search for monthly results in all indices. It should return documents for all indices + String allIndices = firstGenerationIndex + "," + dailyRollupIndex + "," + monthlyRollupIndex; + response = search(allIndices, String.format(query, "1M")); + shards = (Map) response.get("_shards"); + assertThat(shards.get("total"), equalTo(3)); + assertThat(shards.get("skipped"), equalTo(0)); + hits = (Map) response.get("hits"); + assertThat(hits.get("total"), equalTo(12)); // 6 + 4 + 2 docs in the live, daily and monthly index + } + + /** + * Rollup a datastream with the default timezone and aggregate using different timezones in the + * date_histogram + */ + @SuppressWarnings("unchecked") + public void testSearchRollupDifferentTimezone() throws Exception { + Template template = new Template(Settings.builder().put("index.number_of_shards", 1).build(), + new CompressedXContent("{" + + "\"properties\": {\n" + + " \"@timestamp\": { \"type\": \"date\" },\n" + + " \"units\": { \"type\": \"keyword\"}, \n" + + " \"temperature\": { \"type\": \"double\"} \n" + + "}}"), null); + createComposableTemplate(client(), "logs-template", "logs-foo*", template); + String dataStream = "logs-foo"; + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-04T00:10:30Z\", \"temperature\": 27.5, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-04T23:12:25Z\", \"temperature\": 28.1, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-05T01:12:25Z\", \"temperature\": 25.4, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-06T11:00:37Z\", \"temperature\": 29, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-07T11:05:37Z\", \"temperature\": 27, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-07T23:05:20Z\", \"temperature\": 19.5, \"units\": \"celsius\" }"); + + rolloverMaxOneDocCondition(client(), dataStream); + String firstGenerationIndex = DataStream.getDefaultBackingIndexName(dataStream, 1); + assertBusy(() -> assertThat(indexExists(DataStream.getDefaultBackingIndexName(dataStream, 2)), is(true)), 30, TimeUnit.SECONDS); + + // Rollup daily. Default rollup is in UTC timezone. + String dailyRollupIndex = ".rollup-utc-daily-" + firstGenerationIndex; + rollupIndex(client(), firstGenerationIndex, dailyRollupIndex, "1d", + Set.of("units"), Set.of("temperature"), Set.of("max", "sum", "avg")); + + String query = "{\n" + + " \"size\": 0,\n" + + " \"aggs\": {\n" + + " \"temperatures\": {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"@timestamp\",\n" + + " \"calendar_interval\": \"1d\",\n" + + " \"time_zone\": \"%s\",\n" + + " \"min_doc_count\": 1\n" + + " },\n" + + " \"aggs\": {\n" + + " \"avg_temperature\": {\n" + + " \"avg\": {\n" + + " \"field\": \"temperature\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + // Search for daily results with the default tz. It should return the results from daily rollup index + Map response = search(dataStream, String.format(query, "+00:00")); + Map shards = (Map) response.get("_shards"); + assertThat(shards.get("skipped"), equalTo(1)); + + // Total hits should only contain the docs in the daily rollup index + Map hits = (Map) response.get("hits"); + assertThat(hits.get("total"), equalTo(4)); // 4 docs in the daily rollup index + Map aggs = (Map) response.get("aggregations"); + Map temps = (Map) aggs.get("temperatures"); + List> buckets = (List>) temps.get("buckets"); + assertThat(buckets.size(), equalTo(4)); + assertThat(buckets.get(0).get("doc_count"), equalTo(2)); + assertThat(buckets.get(1).get("doc_count"), equalTo(1)); + assertThat(buckets.get(2).get("doc_count"), equalTo(1)); + assertThat(buckets.get(3).get("doc_count"), equalTo(2)); + + // Search for daily results with different tz than rollups. It should return the results from the original index, + // because there are no rollups that satisfy the the timezone + response = search(dataStream, String.format(query, "-02:00")); + shards = (Map) response.get("_shards"); + assertThat(shards.get("skipped"), equalTo(1)); + hits = (Map) response.get("hits"); + assertThat(hits.get("total"), equalTo(6)); // 6 docs in the live index + aggs = (Map) response.get("aggregations"); + temps = (Map) aggs.get("temperatures"); + buckets = (List>) temps.get("buckets"); + assertThat(buckets.size(), equalTo(4)); + assertThat(buckets.get(0).get("doc_count"), equalTo(1)); + assertEquals(27.5, (Double) ((Map) buckets.get(0).get("avg_temperature")).get("value"), 0.0001); + assertThat(buckets.get(1).get("doc_count"), equalTo(2)); + } + + @AwaitsFix(bugUrl = "TODO") + @SuppressWarnings("unchecked") + public void testSearchRollupDatastreamMissingMetric() throws Exception { + Template template = new Template(Settings.builder().put("index.number_of_shards", 1).build(), + new CompressedXContent("{" + + "\"properties\": {\n" + + " \"@timestamp\": { \"type\": \"date\" },\n" + + " \"units\": { \"type\": \"keyword\"} \n" + + "}}"), null); + createComposableTemplate(client(), "logs-template", "logs-foo*", template); + String dataStream = "logs-foo"; + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-04T12:10:30Z\", \"temperature\": 27.5, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-08T07:12:25Z\", \"temperature\": 28.1, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-02-02T11:05:37Z\", \"temperature\": 29.2, \"units\": \"celsius\" }"); + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-02-10T08:05:20Z\", \"temperature\": 19.5, \"units\": \"celsius\" }"); + rolloverMaxOneDocCondition(client(), dataStream); + String firstGenerationIndex = DataStream.getDefaultBackingIndexName(dataStream, 1); + String firstGenerationRollupIndex = ".rollup-" + firstGenerationIndex; + assertBusy(() -> assertThat(indexExists(DataStream.getDefaultBackingIndexName(dataStream, 2)), is(true)), 30, TimeUnit.SECONDS); + rollupIndex(client(), firstGenerationIndex, firstGenerationRollupIndex, "1M", + Set.of("units"), Set.of("temperature"), Set.of("max", "sum")); + String query = "{\n" + + " \"size\": 0,\n" + + " \"aggs\": {\n" + + " \"monthly_temperatures\": {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"@timestamp\",\n" + + " \"calendar_interval\": \"1M\"\n" + + " },\n" + + " \"aggs\": {\n" + + " \"min_temperature\": {\n" + + " \"min\": {\n" + + " \"field\": \"temperature\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + Map response = search(dataStream, query); + Map shards = (Map) response.get("_shards"); + assertThat(shards.get("skipped"), equalTo(1)); + + // Total hits should only contain the live docs (and not the rollup docs) + Map hits = (Map) response.get("hits"); + assertThat(hits.get("total"), equalTo(4)); + Map aggs = (Map) response.get("aggregations"); + Map monthlyTemps = (Map) aggs.get("monthly_temperatures"); + List> buckets = (List>) monthlyTemps.get("buckets"); + assertThat(buckets.size(), equalTo(2)); + assertThat(buckets.get(0).get("doc_count"), equalTo(2)); + assertEquals(27.5, (Double) ((Map) buckets.get(0).get("min_temperature")).get("value"), 0.0001); + assertThat(buckets.get(1).get("doc_count"), equalTo(2)); + assertEquals(19.5, (Double) ((Map) buckets.get(1).get("min_temperature")).get("value"), 0.0001); + + // Index new doc so that we confirm merging live and rollup indices + // The following operation adds a new doc in the first bucket + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-01-30T12:10:30Z\", \"temperature\": 19.4, \"units\": \"celsius\" }"); + // The following operation adds a new doc in a new bucket + indexDocument(client(), dataStream, + "{ \"@timestamp\": \"2020-03-10T02:10:30Z\", \"temperature\": 23.5, \"units\": \"celsius\" }"); + response = search(dataStream, query); + aggs = (Map) response.get("aggregations"); + monthlyTemps = (Map) aggs.get("monthly_temperatures"); + buckets = (List>) monthlyTemps.get("buckets"); + assertThat(buckets.size(), equalTo(3)); + assertThat(buckets.get(0).get("doc_count"), equalTo(3)); + assertEquals(28.1, (Double) ((Map) buckets.get(0).get("max_temperature")).get("value"), 0.0001); + assertEquals(25.0, (Double) ((Map) buckets.get(0).get("avg_temperature")).get("value"), 0.0001); + assertThat(buckets.get(2).get("doc_count"), equalTo(1)); + assertEquals(23.5, (Double) ((Map) buckets.get(2).get("max_temperature")).get("value"), 0.0001); + assertEquals(23.5, (Double) ((Map) buckets.get(2).get("avg_temperature")).get("value"), 0.0001); + } + + private static void createComposableTemplate(RestClient client, String templateName, String indexPattern, Template template) + throws IOException { + XContentBuilder builder = jsonBuilder(); + template.toXContent(builder, ToXContent.EMPTY_PARAMS); + StringEntity templateJSON = new StringEntity( + String.format(Locale.ROOT, "{\n" + + " \"index_patterns\": \"%s\",\n" + + " \"data_stream\": {},\n" + + " \"template\": %s\n" + + "}", indexPattern, Strings.toString(builder)), + ContentType.APPLICATION_JSON); + Request createIndexTemplateRequest = new Request("PUT", "_index_template/" + templateName); + createIndexTemplateRequest.setEntity(templateJSON); + client.performRequest(createIndexTemplateRequest); + } + + private static void rolloverMaxOneDocCondition(RestClient client, String indexAbstractionName) throws IOException { + Request rolloverRequest = new Request("POST", "/" + indexAbstractionName + "/_rollover"); + rolloverRequest.setJsonEntity("{\n" + + " \"conditions\": {\n" + + " \"max_docs\": \"1\"\n" + + " }\n" + + "}" + ); + client.performRequest(rolloverRequest); + } + + private static void rollupIndex(RestClient client, String indexAbstractionName, String rollupIndex, + String rollupConfig) throws IOException { + Request rollupRequest = new Request("POST", "/" + indexAbstractionName + "/_rollup/" + rollupIndex); + rollupRequest.setJsonEntity(rollupConfig); + client.performRequest(rollupRequest); + } + + private static void rollupIndex(RestClient client, String indexAbstractionName, String rollupIndex, + String interval, Set terms, Set metricFields, Set metrics) throws IOException { + String rollupConfig = "{\n" + + " \"groups\" : {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"@timestamp\",\n" + + " \"calendar_interval\": \"" + interval + "\"\n" + + " },\n" + + " \"terms\": {\n" + + " \"fields\": [\"" + String.join("\", \"", terms) + "\"]\n" + + " }\n" + + " },\n" + + " \"metrics\": [\n" + + " {\n" + + " \"field\": \"temperature\",\n" + + " \"metrics\": [\"max\", \"sum\", \"avg\"]\n" + + " }\n" + + " ]\n" + + "}"; + rollupIndex(client, indexAbstractionName, rollupIndex, rollupConfig); + } + + private static void indexDocument(RestClient client, String indexAbstractionName, String documentString) throws IOException { + Request indexRequest = new Request("POST", indexAbstractionName + "/_doc?refresh"); + indexRequest.setEntity(new StringEntity(documentString, ContentType.APPLICATION_JSON)); + client.performRequest(indexRequest); + } + + @SuppressWarnings("unchecked") + private static Map search(String indexAbstractionName, String query) throws IOException { + Request searchForStats = new Request("GET", indexAbstractionName + "/_search?rest_total_hits_as_int"); + searchForStats.setJsonEntity(query); + Response searchResponse = client().performRequest(searchForStats); + Map responseAsMap = entityAsMap(searchResponse); + return responseAsMap; + } +}