diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index 39d14f84145bc..1ccdb1ae58df8 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -42,6 +42,8 @@ import org.elasticsearch.aggregations.bucket.adjacency.ParsedAdjacencyMatrix; import org.elasticsearch.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder; import org.elasticsearch.aggregations.bucket.histogram.ParsedAutoDateHistogram; +import org.elasticsearch.aggregations.bucket.timeseries.ParsedTimeSeries; +import org.elasticsearch.aggregations.bucket.timeseries.TimeSeriesAggregationBuilder; import org.elasticsearch.aggregations.pipeline.DerivativePipelineAggregationBuilder; import org.elasticsearch.client.analytics.ParsedStringStats; import org.elasticsearch.client.analytics.ParsedTopMetrics; @@ -163,8 +165,6 @@ import org.elasticsearch.search.aggregations.pipeline.ParsedStatsBucket; import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; -import org.elasticsearch.search.aggregations.timeseries.ParsedTimeSeries; -import org.elasticsearch.search.aggregations.timeseries.TimeSeriesAggregationBuilder; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder; diff --git a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/SearchCancellationIT.java b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/SearchCancellationIT.java new file mode 100644 index 0000000000000..48f4bcf800d61 --- /dev/null +++ b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/SearchCancellationIT.java @@ -0,0 +1,136 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.aggregations.bucket; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.aggregations.AggregationsPlugin; +import org.elasticsearch.aggregations.bucket.timeseries.TimeSeriesAggregationBuilder; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptType; +import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder; +import org.elasticsearch.test.AbstractSearchCancellationTestCase; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.index.IndexSettings.TIME_SERIES_END_TIME; +import static org.elasticsearch.index.IndexSettings.TIME_SERIES_START_TIME; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class SearchCancellationIT extends AbstractSearchCancellationTestCase { + + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(AggregationsPlugin.class); + return List.copyOf(plugins); + } + + public void testCancellationDuringTimeSeriesAggregation() throws Exception { + List plugins = initBlockFactory(); + int numberOfShards = between(2, 5); + long now = Instant.now().toEpochMilli(); + int numberOfRefreshes = between(1, 5); + // After a few initial checks we check every 2048 - number of shards records so we need to ensure all + // shards have enough records to trigger a check + int numberOfDocsPerRefresh = numberOfShards * between(3000, 3500) / numberOfRefreshes; + assertAcked( + prepareCreate("test").setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES.name()) + .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "dim") + .put(TIME_SERIES_START_TIME.getKey(), now) + .put(TIME_SERIES_END_TIME.getKey(), now + (long) numberOfRefreshes * numberOfDocsPerRefresh + 1) + .build() + ).setMapping(""" + { + "properties": { + "@timestamp": {"type": "date", "format": "epoch_millis"}, + "dim": {"type": "keyword", "time_series_dimension": true} + } + } + """) + ); + + for (int i = 0; i < numberOfRefreshes; i++) { + // Make sure we sometimes have a few segments + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int j = 0; j < numberOfDocsPerRefresh; j++) { + bulkRequestBuilder.add( + client().prepareIndex("test") + .setOpType(DocWriteRequest.OpType.CREATE) + .setSource( + "@timestamp", + now + (long) i * numberOfDocsPerRefresh + j, + "val", + (double) j, + "dim", + String.valueOf(j % 100) + ) + ); + } + assertNoFailures(bulkRequestBuilder.get()); + } + + logger.info("Executing search"); + TimeSeriesAggregationBuilder timeSeriesAggregationBuilder = new TimeSeriesAggregationBuilder("test_agg"); + ActionFuture searchResponse = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .addAggregation( + timeSeriesAggregationBuilder.subAggregation( + new ScriptedMetricAggregationBuilder("sub_agg").initScript( + new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.INIT_SCRIPT_NAME, Collections.emptyMap()) + ) + .mapScript( + new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.MAP_BLOCK_SCRIPT_NAME, Collections.emptyMap()) + ) + .combineScript( + new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.COMBINE_SCRIPT_NAME, Collections.emptyMap()) + ) + .reduceScript( + new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.REDUCE_FAIL_SCRIPT_NAME, Collections.emptyMap()) + ) + ) + ) + .execute(); + awaitForBlock(plugins); + cancelSearch(SearchAction.NAME); + disableBlocks(plugins); + + SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, searchResponse::actionGet); + assertThat(ExceptionsHelper.status(ex), equalTo(RestStatus.BAD_REQUEST)); + logger.info("All shards failed with", ex); + if (lowLevelCancellation) { + // Ensure that we cancelled in TimeSeriesIndexSearcher and not in reduce phase + assertThat(ExceptionsHelper.stackTrace(ex), containsString("TimeSeriesIndexSearcher")); + } + } +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/TimeSeriesAggregationsIT.java b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/TimeSeriesAggregationsIT.java similarity index 94% rename from server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/TimeSeriesAggregationsIT.java rename to modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/TimeSeriesAggregationsIT.java index 97f13449350ec..045433e2182d0 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/TimeSeriesAggregationsIT.java +++ b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/TimeSeriesAggregationsIT.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.search.aggregations; +package org.elasticsearch.aggregations.bucket; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.DocWriteRequest; @@ -14,11 +14,17 @@ import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.aggregations.AggregationIntegTestCase; +import org.elasticsearch.aggregations.bucket.timeseries.InternalTimeSeries; +import org.elasticsearch.aggregations.bucket.timeseries.TimeSeriesAggregationBuilder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.PipelineAggregatorBuilders; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.bucket.global.Global; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; @@ -28,7 +34,6 @@ import org.elasticsearch.search.aggregations.metrics.Stats; import org.elasticsearch.search.aggregations.metrics.Sum; import org.elasticsearch.search.aggregations.pipeline.SimpleValue; -import org.elasticsearch.search.aggregations.timeseries.TimeSeries; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xcontent.XContentBuilder; @@ -49,7 +54,6 @@ import static org.elasticsearch.search.aggregations.AggregationBuilders.stats; import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; -import static org.elasticsearch.search.aggregations.AggregationBuilders.timeSeries; import static org.elasticsearch.search.aggregations.AggregationBuilders.topHits; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; @@ -61,7 +65,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; @ESIntegTestCase.SuiteScopeTestCase -public class TimeSeriesAggregationsIT extends ESIntegTestCase { +public class TimeSeriesAggregationsIT extends AggregationIntegTestCase { private static final Map, Map>> data = new HashMap<>(); private static int numberOfDimensions; @@ -175,12 +179,12 @@ public void testStandAloneTimeSeriesAgg() { assertSearchResponse(response); Aggregations aggregations = response.getAggregations(); assertNotNull(aggregations); - TimeSeries timeSeries = aggregations.get("by_ts"); + InternalTimeSeries timeSeries = aggregations.get("by_ts"); assertThat( timeSeries.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(Collectors.toSet()), equalTo(data.keySet()) ); - for (TimeSeries.Bucket bucket : timeSeries.getBuckets()) { + for (InternalTimeSeries.Bucket bucket : timeSeries.getBuckets()) { @SuppressWarnings("unchecked") Map key = (Map) bucket.getKey(); assertThat((long) data.get(key).size(), equalTo(bucket.getDocCount())); @@ -204,8 +208,8 @@ public void testTimeSeriesGroupedByADimension() { Terms terms = aggregations.get("by_dim"); Set> keys = new HashSet<>(); for (Terms.Bucket term : terms.getBuckets()) { - TimeSeries timeSeries = term.getAggregations().get("by_ts"); - for (TimeSeries.Bucket bucket : timeSeries.getBuckets()) { + InternalTimeSeries timeSeries = term.getAggregations().get("by_ts"); + for (InternalTimeSeries.Bucket bucket : timeSeries.getBuckets()) { @SuppressWarnings("unchecked") Map key = (Map) bucket.getKey(); assertThat((long) data.get(key).size(), equalTo(bucket.getDocCount())); @@ -234,8 +238,8 @@ public void testTimeSeriesGroupedByDateHistogram() { for (Histogram.Bucket interval : histogram.getBuckets()) { long intervalStart = ((ZonedDateTime) interval.getKey()).toEpochSecond() * 1000; long intervalEnd = intervalStart + fixedInterval.estimateMillis(); - TimeSeries timeSeries = interval.getAggregations().get("by_ts"); - for (TimeSeries.Bucket bucket : timeSeries.getBuckets()) { + InternalTimeSeries timeSeries = interval.getAggregations().get("by_ts"); + for (InternalTimeSeries.Bucket bucket : timeSeries.getBuckets()) { @SuppressWarnings("unchecked") Map key = (Map) bucket.getKey(); keys.compute(key, (k, v) -> (v == null ? 0 : v) + bucket.getDocCount()); @@ -269,13 +273,13 @@ public void testStandAloneTimeSeriesAggWithDimFilter() { assertSearchResponse(response); Aggregations aggregations = response.getAggregations(); assertNotNull(aggregations); - TimeSeries timeSeries = aggregations.get("by_ts"); + InternalTimeSeries timeSeries = aggregations.get("by_ts"); Map, Map>> filteredData = dataFilteredByDimension("dim_" + dim, val, include); assertThat( timeSeries.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(Collectors.toSet()), equalTo(filteredData.keySet()) ); - for (TimeSeries.Bucket bucket : timeSeries.getBuckets()) { + for (InternalTimeSeries.Bucket bucket : timeSeries.getBuckets()) { @SuppressWarnings("unchecked") Map key = (Map) bucket.getKey(); assertThat(bucket.getDocCount(), equalTo((long) filteredData.get(key).size())); @@ -301,13 +305,13 @@ public void testStandAloneTimeSeriesAggWithGlobalAggregation() { assertSearchResponse(response); Aggregations aggregations = response.getAggregations(); assertNotNull(aggregations); - TimeSeries timeSeries = aggregations.get("by_ts"); + InternalTimeSeries timeSeries = aggregations.get("by_ts"); Map, Map>> filteredData = dataFilteredByDimension("dim_" + dim, val, include); assertThat( timeSeries.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(Collectors.toSet()), equalTo(filteredData.keySet()) ); - for (TimeSeries.Bucket bucket : timeSeries.getBuckets()) { + for (InternalTimeSeries.Bucket bucket : timeSeries.getBuckets()) { @SuppressWarnings("unchecked") Map key = (Map) bucket.getKey(); assertThat(bucket.getDocCount(), equalTo((long) filteredData.get(key).size())); @@ -348,13 +352,13 @@ public void testStandAloneTimeSeriesAggWithMetricFilter() { assertSearchResponse(response); Aggregations aggregations = response.getAggregations(); assertNotNull(aggregations); - TimeSeries timeSeries = aggregations.get("by_ts"); + InternalTimeSeries timeSeries = aggregations.get("by_ts"); Map, Map>> filteredData = dataFilteredByMetric(data, "metric_" + metric, val, above); assertThat( timeSeries.getBuckets().stream().map(MultiBucketsAggregation.Bucket::getKey).collect(Collectors.toSet()), equalTo(filteredData.keySet()) ); - for (TimeSeries.Bucket bucket : timeSeries.getBuckets()) { + for (InternalTimeSeries.Bucket bucket : timeSeries.getBuckets()) { @SuppressWarnings("unchecked") Map key = (Map) bucket.getKey(); assertThat(bucket.getDocCount(), equalTo((long) filteredData.get(key).size())); @@ -521,4 +525,8 @@ public void testGetHitsFailure() throws Exception { } + public static TimeSeriesAggregationBuilder timeSeries(String name) { + return new TimeSeriesAggregationBuilder(name); + } + } diff --git a/modules/aggregations/src/main/java/module-info.java b/modules/aggregations/src/main/java/module-info.java index 5c3acfcc3beeb..3e378d1082558 100644 --- a/modules/aggregations/src/main/java/module-info.java +++ b/modules/aggregations/src/main/java/module-info.java @@ -15,6 +15,7 @@ exports org.elasticsearch.aggregations.bucket.histogram; exports org.elasticsearch.aggregations.bucket.adjacency; + exports org.elasticsearch.aggregations.bucket.timeseries; exports org.elasticsearch.aggregations.pipeline; opens org.elasticsearch.aggregations to org.elasticsearch.painless.spi; // whitelist resource access diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/AggregationsPlugin.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/AggregationsPlugin.java index b9c2723d4ad78..89a629e05aefd 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/AggregationsPlugin.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/AggregationsPlugin.java @@ -12,27 +12,34 @@ import org.elasticsearch.aggregations.bucket.adjacency.InternalAdjacencyMatrix; import org.elasticsearch.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder; import org.elasticsearch.aggregations.bucket.histogram.InternalAutoDateHistogram; +import org.elasticsearch.aggregations.bucket.timeseries.InternalTimeSeries; +import org.elasticsearch.aggregations.bucket.timeseries.TimeSeriesAggregationBuilder; import org.elasticsearch.aggregations.pipeline.BucketSortPipelineAggregationBuilder; import org.elasticsearch.aggregations.pipeline.Derivative; import org.elasticsearch.aggregations.pipeline.DerivativePipelineAggregationBuilder; import org.elasticsearch.aggregations.pipeline.MovFnPipelineAggregationBuilder; import org.elasticsearch.aggregations.pipeline.MovingFunctionScript; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.ScriptPlugin; import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.script.ScriptContext; +import java.util.ArrayList; import java.util.List; public class AggregationsPlugin extends Plugin implements SearchPlugin, ScriptPlugin { @Override public List getAggregations() { - return List.of( + List specs = new ArrayList<>(); + specs.add( new AggregationSpec( AdjacencyMatrixAggregationBuilder.NAME, AdjacencyMatrixAggregationBuilder::new, AdjacencyMatrixAggregationBuilder::parse - ).addResultReader(InternalAdjacencyMatrix::new), + ).addResultReader(InternalAdjacencyMatrix::new) + ); + specs.add( new AggregationSpec( AutoDateHistogramAggregationBuilder.NAME, AutoDateHistogramAggregationBuilder::new, @@ -40,6 +47,16 @@ public List getAggregations() { ).addResultReader(InternalAutoDateHistogram::new) .setAggregatorRegistrar(AutoDateHistogramAggregationBuilder::registerAggregators) ); + if (IndexSettings.isTimeSeriesModeEnabled()) { + specs.add( + new AggregationSpec( + TimeSeriesAggregationBuilder.NAME, + TimeSeriesAggregationBuilder::new, + TimeSeriesAggregationBuilder.PARSER + ).addResultReader(InternalTimeSeries::new) + ); + } + return List.copyOf(specs); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeries.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java similarity index 97% rename from server/src/main/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeries.java rename to modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java index 3b15f3fe3c2c1..a4ad967b79602 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeries.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.search.aggregations.timeseries; +package org.elasticsearch.aggregations.bucket.timeseries; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -26,9 +26,7 @@ import static org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation.declareMultiBucketAggregationFields; -public class InternalTimeSeries extends InternalMultiBucketAggregation - implements - TimeSeries { +public class InternalTimeSeries extends InternalMultiBucketAggregation { private static final ObjectParser PARSER = new ObjectParser<>( ParsedTimeSeries.class.getSimpleName(), @@ -43,7 +41,10 @@ public class InternalTimeSeries extends InternalMultiBucketAggregation key; @@ -251,7 +252,6 @@ public List getBuckets() { return buckets; } - @Override public InternalBucket getBucketByKey(String key) { if (bucketMap == null) { bucketMap = new HashMap<>(buckets.size()); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/ParsedTimeSeries.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/ParsedTimeSeries.java similarity index 89% rename from server/src/main/java/org/elasticsearch/search/aggregations/timeseries/ParsedTimeSeries.java rename to modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/ParsedTimeSeries.java index eab469b90b94c..f4eff3e878b59 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/ParsedTimeSeries.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/ParsedTimeSeries.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.search.aggregations.timeseries; +package org.elasticsearch.aggregations.bucket.timeseries; import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation; import org.elasticsearch.xcontent.ObjectParser; @@ -18,7 +18,7 @@ import java.util.Map; import java.util.TreeMap; -public class ParsedTimeSeries extends ParsedMultiBucketAggregation implements TimeSeries { +public class ParsedTimeSeries extends ParsedMultiBucketAggregation { private transient Map bucketMap; @@ -28,12 +28,11 @@ public String getType() { } @Override - public List getBuckets() { + public List getBuckets() { return buckets; } - @Override - public TimeSeries.Bucket getBucketByKey(String key) { + public ParsedBucket getBucketByKey(String key) { if (bucketMap == null) { bucketMap = new HashMap<>(buckets.size()); for (ParsedTimeSeries.ParsedBucket bucket : buckets) { @@ -62,7 +61,7 @@ public static ParsedTimeSeries fromXContent(XContentParser parser, String name) return aggregation; } - static class ParsedBucket extends ParsedMultiBucketAggregation.ParsedBucket implements TimeSeries.Bucket { + static class ParsedBucket extends ParsedMultiBucketAggregation.ParsedBucket { private Map key; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregationBuilder.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationBuilder.java similarity index 98% rename from server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregationBuilder.java rename to modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationBuilder.java index 3c6678e418856..5815b0e499672 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregationBuilder.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationBuilder.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.search.aggregations.timeseries; +package org.elasticsearch.aggregations.bucket.timeseries; import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregationFactory.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationFactory.java similarity index 96% rename from server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregationFactory.java rename to modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationFactory.java index 69d7462c1b7d7..881d23cd8c4fb 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregationFactory.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationFactory.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.search.aggregations.timeseries; +package org.elasticsearch.aggregations.bucket.timeseries; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java similarity index 98% rename from server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java rename to modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java index 8a077b100e1ea..d6764dd97af19 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregator.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.search.aggregations.timeseries; +package org.elasticsearch.aggregations.bucket.timeseries; import org.apache.lucene.util.BytesRef; import org.elasticsearch.core.Releasables; diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeriesTests.java b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java similarity index 85% rename from server/src/test/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeriesTests.java rename to modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java index 5f374ca9c7425..38cccb969c8bc 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/InternalTimeSeriesTests.java +++ b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeriesTests.java @@ -6,10 +6,12 @@ * Side Public License, v 1. */ -package org.elasticsearch.search.aggregations.timeseries; +package org.elasticsearch.aggregations.bucket.timeseries; +import org.elasticsearch.aggregations.bucket.AggregationMultiBucketAggregationTestCase; +import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.test.InternalMultiBucketAggregationTestCase; +import org.elasticsearch.xcontent.ContextParser; import java.util.ArrayList; import java.util.HashMap; @@ -20,7 +22,12 @@ import static org.hamcrest.Matchers.arrayContainingInAnyOrder; -public class InternalTimeSeriesTests extends InternalMultiBucketAggregationTestCase { +public class InternalTimeSeriesTests extends AggregationMultiBucketAggregationTestCase { + + @Override + protected Map.Entry> getParser() { + return Map.entry(TimeSeriesAggregationBuilder.NAME, (p, c) -> ParsedTimeSeries.fromXContent(p, (String) c)); + } private List randomBuckets(boolean keyed, InternalAggregations aggregations) { int numberOfBuckets = randomNumberOfBuckets(); diff --git a/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationBuilderTests.java b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationBuilderTests.java new file mode 100644 index 0000000000000..2c87c9f298dca --- /dev/null +++ b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregationBuilderTests.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.aggregations.bucket.timeseries; + +import org.elasticsearch.aggregations.bucket.AggregationBuilderTestCase; + +public class TimeSeriesAggregationBuilderTests extends AggregationBuilderTestCase { + + @Override + protected TimeSeriesAggregationBuilder createTestAggregatorBuilder() { + return new TimeSeriesAggregationBuilder(randomAlphaOfLength(10), randomBoolean()); + } + +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregatorTests.java b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregatorTests.java similarity index 97% rename from server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregatorTests.java rename to modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregatorTests.java index 2435d8cc6c636..b98e464daefb3 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregatorTests.java +++ b/modules/aggregations/src/test/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregatorTests.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.search.aggregations.timeseries; +package org.elasticsearch.aggregations.bucket.timeseries; import org.apache.lucene.document.DoubleDocValuesField; import org.apache.lucene.document.FloatDocValuesField; @@ -17,6 +17,7 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.tests.index.RandomIndexWriter; +import org.elasticsearch.aggregations.bucket.AggregationTestCase; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper; import org.elasticsearch.index.mapper.DateFieldMapper; @@ -25,7 +26,6 @@ import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper.TimeSeriesIdBuilder; -import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.metrics.Sum; import org.elasticsearch.search.aggregations.support.ValuesSourceType; @@ -38,7 +38,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; -public class TimeSeriesAggregatorTests extends AggregatorTestCase { +public class TimeSeriesAggregatorTests extends AggregationTestCase { @Override protected List getSupportedValuesSourceTypes() { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java index 9e48265f1b56b..525b8677a0de7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java @@ -8,13 +8,8 @@ package org.elasticsearch.search; -import org.apache.logging.log4j.LogManager; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.search.MultiSearchAction; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchAction; @@ -24,154 +19,37 @@ import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.IndexMode; -import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder; -import org.elasticsearch.search.aggregations.timeseries.TimeSeriesAggregationBuilder; -import org.elasticsearch.search.lookup.LeafStoredFieldsLookup; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; -import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.test.AbstractSearchCancellationTestCase; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.transport.TransportService; -import org.junit.BeforeClass; -import java.time.Instant; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; -import static org.elasticsearch.index.IndexSettings.TIME_SERIES_END_TIME; -import static org.elasticsearch.index.IndexSettings.TIME_SERIES_START_TIME; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.scriptQuery; -import static org.elasticsearch.search.SearchCancellationIT.ScriptedBlockPlugin.SEARCH_BLOCK_SCRIPT_NAME; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.AbstractSearchCancellationTestCase.ScriptedBlockPlugin.SEARCH_BLOCK_SCRIPT_NAME; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE) -public class SearchCancellationIT extends ESIntegTestCase { - - private static boolean lowLevelCancellation; - - @BeforeClass - public static void init() { - lowLevelCancellation = randomBoolean(); - } - - @Override - protected Collection> nodePlugins() { - return Collections.singleton(ScriptedBlockPlugin.class); - } - - @Override - protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { - logger.info("Using lowLevelCancellation: {}", lowLevelCancellation); - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal, otherSettings)) - .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), lowLevelCancellation) - .build(); - } - - private void indexTestData() { - for (int i = 0; i < 5; i++) { - // Make sure we have a few segments - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int j = 0; j < 20; j++) { - bulkRequestBuilder.add(client().prepareIndex("test").setId(Integer.toString(i * 5 + j)).setSource("field", "value")); - } - assertNoFailures(bulkRequestBuilder.get()); - } - } - - private List initBlockFactory() { - List plugins = new ArrayList<>(); - for (PluginsService pluginsService : internalCluster().getInstances(PluginsService.class)) { - plugins.addAll(pluginsService.filterPlugins(ScriptedBlockPlugin.class)); - } - for (ScriptedBlockPlugin plugin : plugins) { - plugin.reset(); - plugin.enableBlock(); - } - return plugins; - } - - private void awaitForBlock(List plugins) throws Exception { - int numberOfShards = getNumShards("test").numPrimaries; - assertBusy(() -> { - int numberOfBlockedPlugins = 0; - for (ScriptedBlockPlugin plugin : plugins) { - numberOfBlockedPlugins += plugin.hits.get(); - } - logger.info("The plugin blocked on {} out of {} shards", numberOfBlockedPlugins, numberOfShards); - assertThat(numberOfBlockedPlugins, greaterThan(0)); - }); - } - - private void disableBlocks(List plugins) throws Exception { - for (ScriptedBlockPlugin plugin : plugins) { - plugin.disableBlock(); - } - } - - private void cancelSearch(String action) { - ListTasksResponse listTasksResponse = client().admin().cluster().prepareListTasks().setActions(action).get(); - assertThat(listTasksResponse.getTasks(), hasSize(1)); - TaskInfo searchTask = listTasksResponse.getTasks().get(0); - - logger.info("Cancelling search"); - CancelTasksResponse cancelTasksResponse = client().admin() - .cluster() - .prepareCancelTasks() - .setTargetTaskId(searchTask.taskId()) - .get(); - assertThat(cancelTasksResponse.getTasks(), hasSize(1)); - assertThat(cancelTasksResponse.getTasks().get(0).taskId(), equalTo(searchTask.taskId())); - } - - private SearchResponse ensureSearchWasCancelled(ActionFuture searchResponse) { - try { - SearchResponse response = searchResponse.actionGet(); - logger.info("Search response {}", response); - assertNotEquals("At least one shard should have failed", 0, response.getFailedShards()); - for (ShardSearchFailure failure : response.getShardFailures()) { - // We should have fail because the search has been cancel. The status of the exceptions should be 400. - assertThat(ExceptionsHelper.status(failure.getCause()), equalTo(RestStatus.BAD_REQUEST)); - } - return response; - } catch (SearchPhaseExecutionException ex) { - // We should have fail because the search has been cancel. The status of the response should be 400. - assertThat(ExceptionsHelper.status(ex), equalTo(RestStatus.BAD_REQUEST)); - logger.info("All shards failed with", ex); - return null; - } - } +public class SearchCancellationIT extends AbstractSearchCancellationTestCase { public void testCancellationDuringQueryPhase() throws Exception { @@ -258,89 +136,6 @@ public void testCancellationDuringAggregation() throws Exception { ensureSearchWasCancelled(searchResponse); } - public void testCancellationDuringTimeSeriesAggregation() throws Exception { - List plugins = initBlockFactory(); - int numberOfShards = between(2, 5); - long now = Instant.now().toEpochMilli(); - int numberOfRefreshes = between(1, 5); - // After a few initial checks we check every 2048 - number of shards records so we need to ensure all - // shards have enough records to trigger a check - int numberOfDocsPerRefresh = numberOfShards * between(3000, 3500) / numberOfRefreshes; - assertAcked( - prepareCreate("test").setSettings( - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES.name()) - .put(IndexMetadata.INDEX_ROUTING_PATH.getKey(), "dim") - .put(TIME_SERIES_START_TIME.getKey(), now) - .put(TIME_SERIES_END_TIME.getKey(), now + (long) numberOfRefreshes * numberOfDocsPerRefresh + 1) - .build() - ).setMapping(""" - { - "properties": { - "@timestamp": {"type": "date", "format": "epoch_millis"}, - "dim": {"type": "keyword", "time_series_dimension": true} - } - } - """) - ); - - for (int i = 0; i < numberOfRefreshes; i++) { - // Make sure we sometimes have a few segments - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int j = 0; j < numberOfDocsPerRefresh; j++) { - bulkRequestBuilder.add( - client().prepareIndex("test") - .setOpType(DocWriteRequest.OpType.CREATE) - .setSource( - "@timestamp", - now + (long) i * numberOfDocsPerRefresh + j, - "val", - (double) j, - "dim", - String.valueOf(j % 100) - ) - ); - } - assertNoFailures(bulkRequestBuilder.get()); - } - - logger.info("Executing search"); - TimeSeriesAggregationBuilder timeSeriesAggregationBuilder = new TimeSeriesAggregationBuilder("test_agg"); - ActionFuture searchResponse = client().prepareSearch("test") - .setQuery(matchAllQuery()) - .addAggregation( - timeSeriesAggregationBuilder.subAggregation( - new ScriptedMetricAggregationBuilder("sub_agg").initScript( - new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.INIT_SCRIPT_NAME, Collections.emptyMap()) - ) - .mapScript( - new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.MAP_BLOCK_SCRIPT_NAME, Collections.emptyMap()) - ) - .combineScript( - new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.COMBINE_SCRIPT_NAME, Collections.emptyMap()) - ) - .reduceScript( - new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.REDUCE_FAIL_SCRIPT_NAME, Collections.emptyMap()) - ) - ) - ) - .execute(); - awaitForBlock(plugins); - cancelSearch(SearchAction.NAME); - disableBlocks(plugins); - - SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, searchResponse::actionGet); - assertThat(ExceptionsHelper.status(ex), equalTo(RestStatus.BAD_REQUEST)); - logger.info("All shards failed with", ex); - if (lowLevelCancellation) { - // Ensure that we cancelled in TimeSeriesIndexSearcher and not in reduce phase - assertThat(ExceptionsHelper.stackTrace(ex), containsString("TimeSeriesIndexSearcher")); - } - - } - public void testCancellationOfScrollSearches() throws Exception { List plugins = initBlockFactory(); @@ -513,124 +308,4 @@ List getSearchTasks() { return tasks; } - public static class ScriptedBlockPlugin extends MockScriptPlugin { - static final String SEARCH_BLOCK_SCRIPT_NAME = "search_block"; - static final String INIT_SCRIPT_NAME = "init"; - static final String MAP_SCRIPT_NAME = "map"; - static final String MAP_BLOCK_SCRIPT_NAME = "map_block"; - static final String COMBINE_SCRIPT_NAME = "combine"; - static final String REDUCE_SCRIPT_NAME = "reduce"; - static final String REDUCE_FAIL_SCRIPT_NAME = "reduce_fail"; - static final String REDUCE_BLOCK_SCRIPT_NAME = "reduce_block"; - static final String TERM_SCRIPT_NAME = "term"; - - private final AtomicInteger hits = new AtomicInteger(); - - private final AtomicBoolean shouldBlock = new AtomicBoolean(true); - - private final AtomicReference beforeExecution = new AtomicReference<>(); - - public void reset() { - hits.set(0); - } - - public void disableBlock() { - shouldBlock.set(false); - } - - public void enableBlock() { - shouldBlock.set(true); - } - - public void setBeforeExecution(Runnable runnable) { - beforeExecution.set(runnable); - } - - @Override - public Map, Object>> pluginScripts() { - return Map.of( - SEARCH_BLOCK_SCRIPT_NAME, - this::searchBlockScript, - INIT_SCRIPT_NAME, - this::nullScript, - MAP_SCRIPT_NAME, - this::nullScript, - MAP_BLOCK_SCRIPT_NAME, - this::mapBlockScript, - COMBINE_SCRIPT_NAME, - this::nullScript, - REDUCE_BLOCK_SCRIPT_NAME, - this::blockScript, - REDUCE_SCRIPT_NAME, - this::termScript, - REDUCE_FAIL_SCRIPT_NAME, - this::reduceFailScript, - TERM_SCRIPT_NAME, - this::termScript - ); - } - - private Object searchBlockScript(Map params) { - final Runnable runnable = beforeExecution.get(); - if (runnable != null) { - runnable.run(); - } - LeafStoredFieldsLookup fieldsLookup = (LeafStoredFieldsLookup) params.get("_fields"); - LogManager.getLogger(SearchCancellationIT.class).info("Blocking on the document {}", fieldsLookup.get("_id")); - hits.incrementAndGet(); - try { - assertBusy(() -> assertFalse(shouldBlock.get())); - } catch (Exception e) { - throw new RuntimeException(e); - } - return true; - } - - private Object reduceFailScript(Map params) { - fail("Shouldn't reach reduce"); - return true; - } - - private Object nullScript(Map params) { - return null; - } - - private Object blockScript(Map params) { - final Runnable runnable = beforeExecution.get(); - if (runnable != null) { - runnable.run(); - } - if (shouldBlock.get()) { - LogManager.getLogger(SearchCancellationIT.class).info("Blocking in reduce"); - } - hits.incrementAndGet(); - try { - assertBusy(() -> assertFalse(shouldBlock.get())); - } catch (Exception e) { - throw new RuntimeException(e); - } - return 42; - } - - private Object mapBlockScript(Map params) { - final Runnable runnable = beforeExecution.get(); - if (runnable != null) { - runnable.run(); - } - if (shouldBlock.get()) { - LogManager.getLogger(SearchCancellationIT.class).info("Blocking in map"); - } - hits.incrementAndGet(); - try { - assertBusy(() -> assertFalse(shouldBlock.get())); - } catch (Exception e) { - throw new RuntimeException(e); - } - return 1; - } - - private Object termScript(Map params) { - return 1; - } - } } diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index e4393f6983da2..ff48a883724ca 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -318,7 +318,6 @@ exports org.elasticsearch.search.aggregations.pipeline; exports org.elasticsearch.search.aggregations.support; exports org.elasticsearch.search.aggregations.support.values; - exports org.elasticsearch.search.aggregations.timeseries; exports org.elasticsearch.search.builder; exports org.elasticsearch.search.collapse; exports org.elasticsearch.search.dfs; diff --git a/server/src/main/java/org/elasticsearch/search/SearchModule.java b/server/src/main/java/org/elasticsearch/search/SearchModule.java index 42c20def183d8..53a456c55b07b 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/server/src/main/java/org/elasticsearch/search/SearchModule.java @@ -18,7 +18,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RestApiVersion; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.BoostingQueryBuilder; @@ -206,8 +205,6 @@ import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; -import org.elasticsearch.search.aggregations.timeseries.InternalTimeSeries; -import org.elasticsearch.search.aggregations.timeseries.TimeSeriesAggregationBuilder; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.search.fetch.FetchSubPhase; import org.elasticsearch.search.fetch.subphase.ExplainPhase; @@ -633,17 +630,6 @@ private ValuesSourceRegistry registerAggregations(List plugins) { .setAggregatorRegistrar(CompositeAggregationBuilder::registerAggregators), builder ); - if (IndexSettings.isTimeSeriesModeEnabled()) { - registerAggregation( - new AggregationSpec( - TimeSeriesAggregationBuilder.NAME, - TimeSeriesAggregationBuilder::new, - TimeSeriesAggregationBuilder.PARSER - ).addResultReader(InternalTimeSeries::new), - builder - ); - } - if (RestApiVersion.minimumSupported() == RestApiVersion.V_7) { registerQuery( new QuerySpec<>( diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java index 1c242b7a3f592..3f998bffd1860 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationBuilders.java @@ -76,7 +76,6 @@ import org.elasticsearch.search.aggregations.metrics.ValueCount; import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.WeightedAvgAggregationBuilder; -import org.elasticsearch.search.aggregations.timeseries.TimeSeriesAggregationBuilder; import java.util.List; @@ -349,11 +348,4 @@ public static CompositeAggregationBuilder composite(String name, List getBuckets(); - - TimeSeries.Bucket getBucketByKey(String key); -} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java index e47e1c3ab9748..87299e0f5645d 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/AggregationsTests.java @@ -60,7 +60,6 @@ import org.elasticsearch.search.aggregations.pipeline.InternalExtendedStatsBucketTests; import org.elasticsearch.search.aggregations.pipeline.InternalPercentilesBucketTests; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValueTests; -import org.elasticsearch.search.aggregations.timeseries.InternalTimeSeriesTests; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalAggregationTestCase; import org.elasticsearch.test.InternalMultiBucketAggregationTestCase; @@ -139,8 +138,7 @@ public class AggregationsTests extends ESTestCase { new InternalBinaryRangeTests(), new InternalTopHitsTests(), new InternalCompositeTests(), - new InternalMedianAbsoluteDeviationTests(), - new InternalTimeSeriesTests() + new InternalMedianAbsoluteDeviationTests() ); @Override diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesCancellationTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/support/TimeSeriesCancellationTests.java similarity index 98% rename from server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesCancellationTests.java rename to server/src/test/java/org/elasticsearch/search/aggregations/support/TimeSeriesCancellationTests.java index 1419e356f14d0..42b1c33dd26e7 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesCancellationTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/support/TimeSeriesCancellationTests.java @@ -5,7 +5,7 @@ * in compliance with, at your election, the Elastic License 2.0 or the Server * Side Public License, v 1. */ -package org.elasticsearch.search.aggregations.timeseries; +package org.elasticsearch.search.aggregations.support; import org.apache.lucene.document.Document; import org.apache.lucene.document.NumericDocValuesField; diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcherTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcherTests.java similarity index 99% rename from server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcherTests.java rename to server/src/test/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcherTests.java index c103b7dc63488..0f64093597678 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesIndexSearcherTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/support/TimeSeriesIndexSearcherTests.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.search.aggregations.timeseries; +package org.elasticsearch.search.aggregations.support; import org.apache.lucene.document.Document; import org.apache.lucene.document.NumericDocValuesField; diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregationBuilderTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregationBuilderTests.java deleted file mode 100644 index 6ed704326cf34..0000000000000 --- a/server/src/test/java/org/elasticsearch/search/aggregations/timeseries/TimeSeriesAggregationBuilderTests.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.search.aggregations.timeseries; - -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.search.SearchModule; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.test.AbstractXContentSerializingTestCase; -import org.elasticsearch.xcontent.NamedXContentRegistry; -import org.elasticsearch.xcontent.XContentParser; - -import java.io.IOException; -import java.util.Collections; - -import static org.hamcrest.Matchers.hasSize; - -public class TimeSeriesAggregationBuilderTests extends AbstractXContentSerializingTestCase { - - @Override - protected TimeSeriesAggregationBuilder doParseInstance(XContentParser parser) throws IOException { - assertSame(XContentParser.Token.START_OBJECT, parser.nextToken()); - AggregatorFactories.Builder parsed = AggregatorFactories.parseAggregators(parser); - assertThat(parsed.getAggregatorFactories(), hasSize(1)); - assertThat(parsed.getPipelineAggregatorFactories(), hasSize(0)); - TimeSeriesAggregationBuilder agg = (TimeSeriesAggregationBuilder) parsed.getAggregatorFactories().iterator().next(); - assertNull(parser.nextToken()); - assertNotNull(agg); - return agg; - } - - @Override - protected Writeable.Reader instanceReader() { - return TimeSeriesAggregationBuilder::new; - } - - @Override - protected TimeSeriesAggregationBuilder createTestInstance() { - return new TimeSeriesAggregationBuilder(randomAlphaOfLength(10), randomBoolean()); - } - - @Override - protected NamedWriteableRegistry getNamedWriteableRegistry() { - return new NamedWriteableRegistry(new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedWriteables()); - } - - @Override - protected NamedXContentRegistry xContentRegistry() { - return new NamedXContentRegistry(new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents()); - } - -} diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 67e8b955ec9d9..b0befd0911cc7 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -127,10 +127,10 @@ import org.elasticsearch.search.aggregations.support.AggregationContext.ProductionAggregationContext; import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; import org.elasticsearch.search.aggregations.support.SamplingContext; +import org.elasticsearch.search.aggregations.support.TimeSeriesIndexSearcher; import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; import org.elasticsearch.search.aggregations.support.ValuesSourceType; -import org.elasticsearch.search.aggregations.timeseries.TimeSeriesIndexSearcher; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.search.fetch.subphase.FetchDocValuesPhase; import org.elasticsearch.search.fetch.subphase.FetchSourcePhase; diff --git a/test/framework/src/main/java/org/elasticsearch/test/AbstractSearchCancellationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/AbstractSearchCancellationTestCase.java new file mode 100644 index 0000000000000..8e7d895ee23d9 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/AbstractSearchCancellationTestCase.java @@ -0,0 +1,263 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.test; + +import org.apache.logging.log4j.LogManager; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.script.MockScriptPlugin; +import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.lookup.LeafStoredFieldsLookup; +import org.elasticsearch.tasks.TaskInfo; +import org.junit.BeforeClass; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; + +public class AbstractSearchCancellationTestCase extends ESIntegTestCase { + + protected static boolean lowLevelCancellation; + + @BeforeClass + public static void init() { + lowLevelCancellation = randomBoolean(); + } + + @Override + protected Collection> nodePlugins() { + return Collections.singleton(ScriptedBlockPlugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + logger.info("Using lowLevelCancellation: {}", lowLevelCancellation); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), lowLevelCancellation) + .build(); + } + + protected void indexTestData() { + for (int i = 0; i < 5; i++) { + // Make sure we have a few segments + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int j = 0; j < 20; j++) { + bulkRequestBuilder.add(client().prepareIndex("test").setId(Integer.toString(i * 5 + j)).setSource("field", "value")); + } + assertNoFailures(bulkRequestBuilder.get()); + } + } + + protected List initBlockFactory() { + List plugins = new ArrayList<>(); + for (PluginsService pluginsService : internalCluster().getInstances(PluginsService.class)) { + plugins.addAll(pluginsService.filterPlugins(ScriptedBlockPlugin.class)); + } + for (ScriptedBlockPlugin plugin : plugins) { + plugin.reset(); + plugin.enableBlock(); + } + return plugins; + } + + protected void awaitForBlock(List plugins) throws Exception { + int numberOfShards = getNumShards("test").numPrimaries; + assertBusy(() -> { + int numberOfBlockedPlugins = 0; + for (ScriptedBlockPlugin plugin : plugins) { + numberOfBlockedPlugins += plugin.hits.get(); + } + logger.info("The plugin blocked on {} out of {} shards", numberOfBlockedPlugins, numberOfShards); + assertThat(numberOfBlockedPlugins, greaterThan(0)); + }); + } + + protected void disableBlocks(List plugins) throws Exception { + for (ScriptedBlockPlugin plugin : plugins) { + plugin.disableBlock(); + } + } + + protected void cancelSearch(String action) { + ListTasksResponse listTasksResponse = client().admin().cluster().prepareListTasks().setActions(action).get(); + assertThat(listTasksResponse.getTasks(), hasSize(1)); + TaskInfo searchTask = listTasksResponse.getTasks().get(0); + + logger.info("Cancelling search"); + CancelTasksResponse cancelTasksResponse = client().admin() + .cluster() + .prepareCancelTasks() + .setTargetTaskId(searchTask.taskId()) + .get(); + assertThat(cancelTasksResponse.getTasks(), hasSize(1)); + assertThat(cancelTasksResponse.getTasks().get(0).taskId(), equalTo(searchTask.taskId())); + } + + protected SearchResponse ensureSearchWasCancelled(ActionFuture searchResponse) { + try { + SearchResponse response = searchResponse.actionGet(); + logger.info("Search response {}", response); + assertNotEquals("At least one shard should have failed", 0, response.getFailedShards()); + for (ShardSearchFailure failure : response.getShardFailures()) { + // We should have fail because the search has been cancel. The status of the exceptions should be 400. + assertThat(ExceptionsHelper.status(failure.getCause()), equalTo(RestStatus.BAD_REQUEST)); + } + return response; + } catch (SearchPhaseExecutionException ex) { + // We should have fail because the search has been cancel. The status of the response should be 400. + assertThat(ExceptionsHelper.status(ex), equalTo(RestStatus.BAD_REQUEST)); + logger.info("All shards failed with", ex); + return null; + } + } + + public static class ScriptedBlockPlugin extends MockScriptPlugin { + public static final String SEARCH_BLOCK_SCRIPT_NAME = "search_block"; + public static final String INIT_SCRIPT_NAME = "init"; + public static final String MAP_SCRIPT_NAME = "map"; + public static final String MAP_BLOCK_SCRIPT_NAME = "map_block"; + public static final String COMBINE_SCRIPT_NAME = "combine"; + static final String REDUCE_SCRIPT_NAME = "reduce"; + public static final String REDUCE_FAIL_SCRIPT_NAME = "reduce_fail"; + public static final String REDUCE_BLOCK_SCRIPT_NAME = "reduce_block"; + public static final String TERM_SCRIPT_NAME = "term"; + + private final AtomicInteger hits = new AtomicInteger(); + + private final AtomicBoolean shouldBlock = new AtomicBoolean(true); + + private final AtomicReference beforeExecution = new AtomicReference<>(); + + public void reset() { + hits.set(0); + } + + public void disableBlock() { + shouldBlock.set(false); + } + + public void enableBlock() { + shouldBlock.set(true); + } + + public void setBeforeExecution(Runnable runnable) { + beforeExecution.set(runnable); + } + + @Override + public Map, Object>> pluginScripts() { + return Map.of( + SEARCH_BLOCK_SCRIPT_NAME, + this::searchBlockScript, + INIT_SCRIPT_NAME, + this::nullScript, + MAP_SCRIPT_NAME, + this::nullScript, + MAP_BLOCK_SCRIPT_NAME, + this::mapBlockScript, + COMBINE_SCRIPT_NAME, + this::nullScript, + REDUCE_BLOCK_SCRIPT_NAME, + this::blockScript, + REDUCE_SCRIPT_NAME, + this::termScript, + REDUCE_FAIL_SCRIPT_NAME, + this::reduceFailScript, + TERM_SCRIPT_NAME, + this::termScript + ); + } + + private Object searchBlockScript(Map params) { + final Runnable runnable = beforeExecution.get(); + if (runnable != null) { + runnable.run(); + } + LeafStoredFieldsLookup fieldsLookup = (LeafStoredFieldsLookup) params.get("_fields"); + LogManager.getLogger(AbstractSearchCancellationTestCase.class).info("Blocking on the document {}", fieldsLookup.get("_id")); + hits.incrementAndGet(); + try { + assertBusy(() -> assertFalse(shouldBlock.get())); + } catch (Exception e) { + throw new RuntimeException(e); + } + return true; + } + + private Object reduceFailScript(Map params) { + fail("Shouldn't reach reduce"); + return true; + } + + private Object nullScript(Map params) { + return null; + } + + private Object blockScript(Map params) { + final Runnable runnable = beforeExecution.get(); + if (runnable != null) { + runnable.run(); + } + if (shouldBlock.get()) { + LogManager.getLogger(AbstractSearchCancellationTestCase.class).info("Blocking in reduce"); + } + hits.incrementAndGet(); + try { + assertBusy(() -> assertFalse(shouldBlock.get())); + } catch (Exception e) { + throw new RuntimeException(e); + } + return 42; + } + + private Object mapBlockScript(Map params) { + final Runnable runnable = beforeExecution.get(); + if (runnable != null) { + runnable.run(); + } + if (shouldBlock.get()) { + LogManager.getLogger(AbstractSearchCancellationTestCase.class).info("Blocking in map"); + } + hits.incrementAndGet(); + try { + assertBusy(() -> assertFalse(shouldBlock.get())); + } catch (Exception e) { + throw new RuntimeException(e); + } + return 1; + } + + private Object termScript(Map params) { + return 1; + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index ffed9ea6ef8a4..150eff8f84ee1 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -137,8 +137,6 @@ import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.SamplingContext; -import org.elasticsearch.search.aggregations.timeseries.ParsedTimeSeries; -import org.elasticsearch.search.aggregations.timeseries.TimeSeriesAggregationBuilder; import org.elasticsearch.xcontent.ContextParser; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ParseField; @@ -293,7 +291,6 @@ public AggregationReduceContext forFinalReduction() { map.put(IpRangeAggregationBuilder.NAME, (p, c) -> ParsedBinaryRange.fromXContent(p, (String) c)); map.put(TopHitsAggregationBuilder.NAME, (p, c) -> ParsedTopHits.fromXContent(p, (String) c)); map.put(CompositeAggregationBuilder.NAME, (p, c) -> ParsedComposite.fromXContent(p, (String) c)); - map.put(TimeSeriesAggregationBuilder.NAME, (p, c) -> ParsedTimeSeries.fromXContent(p, (String) c)); namedXContents = map.entrySet() .stream() diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java index 856fcb8d53c3c..479e0e9cddc1c 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java @@ -42,7 +42,7 @@ import org.elasticsearch.search.aggregations.BucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.bucket.DocCountProvider; -import org.elasticsearch.search.aggregations.timeseries.TimeSeriesIndexSearcher; +import org.elasticsearch.search.aggregations.support.TimeSeriesIndexSearcher; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentType;