From 26389ef9fbaf34979583e6eb69eb2c0ba126f82c Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 11 Oct 2021 10:22:01 -0400 Subject: [PATCH 1/2] Create a coordinating node level reader for tsdb This creates an interface that reads data in a time series compatible way on the coordinating node. We believe that it can one day smooth out querying time series data at a high level. Right now there is a single implementation of this interface that targets standard indices very inefficiently. It delegates down to our standard `_search` APIs, specifically `composite`, `top_hits`, and `search_after`. It is our hope that when we have fancier TSDB support we can use it to speed the API. The API itself looks like: ``` // The latest value for all time series in the range void latestInRange(metric, from, to, callback); // The latest value for all time series in ranges starting from // `from`, st void latestInRanges(metric, from, to, step, callback); void valuesInRange(metric, from, to, callback); ``` --- .../search/tsdb/TimeSeriesMetricsIT.java | 367 ++++++++++++++++++ .../search/tsdb/TimeSeriesMetrics.java | 352 +++++++++++++++++ .../search/tsdb/TimeSeriesMetricsService.java | 69 ++++ 3 files changed, 788 insertions(+) create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/search/tsdb/TimeSeriesMetricsIT.java create mode 100644 server/src/main/java/org/elasticsearch/search/tsdb/TimeSeriesMetrics.java create mode 100644 server/src/main/java/org/elasticsearch/search/tsdb/TimeSeriesMetricsService.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/tsdb/TimeSeriesMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/tsdb/TimeSeriesMetricsIT.java new file mode 100644 index 0000000000000..fd7de24420bf4 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/tsdb/TimeSeriesMetricsIT.java @@ -0,0 +1,367 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.tsdb; + +import io.github.nik9000.mapmatcher.MapMatcher; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.ListenableActionFuture; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalAccessor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.IntFunction; + +import static io.github.nik9000.mapmatcher.MapMatcher.assertMap; +import static io.github.nik9000.mapmatcher.MapMatcher.matchesMap; + +@TestLogging(value = "org.elasticsearch.search.tsdb:debug", reason = "test") +public class TimeSeriesMetricsIT extends ESIntegTestCase { + private static final int MAX_RESULT_WINDOW = IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(Settings.EMPTY); + private static final DateFormatter FORMATTER = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER; + + public void testKeywordDimension() throws Exception { + assertSmallSimple("a", "b", mapping -> mapping.field("type", "keyword")); + } + + public void testByteDimension() throws Exception { + assertSmallSimple(0L, 1L, mapping -> mapping.field("type", "byte")); + } + + public void testShortDimension() throws Exception { + assertSmallSimple(0L, 1L, mapping -> mapping.field("type", "short")); + } + + public void testIntDimension() throws Exception { + assertSmallSimple(0L, 1L, mapping -> mapping.field("type", "integer")); + } + + public void testLongDimension() throws Exception { + assertSmallSimple(0L, 1L, mapping -> mapping.field("type", "long")); + } + + public void testIpDimension() throws Exception { + assertSmallSimple("192.168.0.1", "2001:db8::1:0:0:1", mapping -> mapping.field("type", "ip")); + } + + // TODO unsigned long dimension + + public void assertSmallSimple(Object d1, Object d2, CheckedConsumer dimensionMapping) throws Exception { + createTsdbIndex(mapping -> { + mapping.startObject("dim"); + dimensionMapping.accept(mapping); + mapping.field("time_series_dimension", true); + mapping.endObject(); + }); + String beforeAll = "2021-01-01T00:05:00Z"; + String[] dates = new String[] { + "2021-01-01T00:10:00.000Z", + "2021-01-01T00:11:00.000Z", + "2021-01-01T00:15:00.000Z", + "2021-01-01T00:20:00.000Z", }; + indexRandom( + true, + client().prepareIndex("tsdb").setSource(Map.of("@timestamp", dates[0], "dim", d1, "v", 1)), + client().prepareIndex("tsdb").setSource(Map.of("@timestamp", dates[1], "dim", d1, "v", 2)), + client().prepareIndex("tsdb").setSource(Map.of("@timestamp", dates[2], "dim", d1, "v", 3)), + client().prepareIndex("tsdb").setSource(Map.of("@timestamp", dates[3], "dim", d1, "v", 4)), + client().prepareIndex("tsdb").setSource(Map.of("@timestamp", dates[1], "dim", d2, "v", 5)) + ); + assertMap( + latestInRange(between(1, MultiBucketConsumerService.DEFAULT_MAX_BUCKETS), beforeAll, dates[0]), + matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[0], 1.0))) + ); + assertMap( + valuesInRange(between(1, MAX_RESULT_WINDOW), beforeAll, dates[0]), + matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[0], 1.0))) + ); + assertMap( + latestInRange(between(1, MultiBucketConsumerService.DEFAULT_MAX_BUCKETS), dates[0], dates[2]), + matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[2], 3.0))) + .entry(Map.of("dim", d2), List.of(Map.entry(dates[1], 5.0))) + ); + assertMap( + valuesInRange(between(1, MAX_RESULT_WINDOW), dates[0], dates[2]), + matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[1], 2.0), Map.entry(dates[2], 3.0))) + .entry(Map.of("dim", d2), List.of(Map.entry(dates[1], 5.0))) + ); + assertMap( + latestInRange(between(1, MultiBucketConsumerService.DEFAULT_MAX_BUCKETS), beforeAll, dates[3]), + matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[3], 4.0))) + .entry(Map.of("dim", d2), List.of(Map.entry(dates[1], 5.0))) + ); + assertMap( + valuesInRange(between(1, MAX_RESULT_WINDOW), beforeAll, dates[3]), + matchesMap().entry( + Map.of("dim", d1), + List.of(Map.entry(dates[0], 1.0), Map.entry(dates[1], 2.0), Map.entry(dates[2], 3.0), Map.entry(dates[3], 4.0)) + ).entry(Map.of("dim", d2), List.of(Map.entry(dates[1], 5.0))) + ); + assertMap( + latestInRanges( + between(1, MultiBucketConsumerService.DEFAULT_MAX_BUCKETS), + beforeAll, + dates[3], + new DateHistogramInterval("5m") + ), + matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[0], 1.0), Map.entry(dates[2], 3.0), Map.entry(dates[3], 4.0))) + .entry(Map.of("dim", d2), List.of(Map.entry(dates[1], 5.0))) + ); + } + + public void testManyTimeSeries() throws InterruptedException, ExecutionException, IOException { + createTsdbIndex("dim"); + assertManyTimeSeries(i -> Map.of("dim", Integer.toString(i, Character.MAX_RADIX))); + } + + public void testManyTimeSeriesWithManyDimensions() throws InterruptedException, ExecutionException, IOException { + createTsdbIndex("dim0", "dim1", "dim2", "dim3", "dim4", "dim5", "dim6", "dim7"); + assertManyTimeSeries(i -> { + int dimCount = (i & 0x07) + 1; + Map dims = new HashMap<>(dimCount); + int offset = (i >> 3) & 0x03; + String value = Integer.toString(i, Character.MAX_RADIX); + for (int d = 0; d < dimCount; d++) { + dims.put("dim" + ((d + offset) & 0x07), value); + } + return dims; + }); + } + + private void assertManyTimeSeries(IntFunction> gen) throws InterruptedException { + MapMatcher expectedLatest = matchesMap(); + MapMatcher expectedValues = matchesMap(); + String min = "2021-01-01T00:10:00Z"; + String max = "2021-01-01T00:15:00Z"; + long minMillis = FORMATTER.parseMillis(min); + long maxMillis = FORMATTER.parseMillis(max); + int iterationSize = scaledRandomIntBetween(50, 100); + int docCount = scaledRandomIntBetween(iterationSize * 2, iterationSize * 100); + List docs = new ArrayList<>(docCount); + for (int i = 0; i < docCount; i++) { + int count = randomBoolean() ? 1 : 2; + Set times = new TreeSet<>(); // We're using the ordered sort below + while (times.size() < count) { + times.add(randomLongBetween(minMillis + 1, maxMillis)); + } + List> expectedValuesForTimeSeries = new ArrayList<>(count); + Map dimensions = gen.apply(i); + String timestamp = null; + double value = Double.NaN; + for (long time : times) { + timestamp = FORMATTER.formatMillis(time); + value = randomDouble(); + Map source = new HashMap<>(dimensions); + source.put("@timestamp", timestamp); + source.put("v", value); + if (randomBoolean()) { + int garbage = between(1, 10); + for (int g = 0; g < garbage; g++) { + source.put("garbage" + g, randomAlphaOfLength(5)); + } + } + docs.add(client().prepareIndex("tsdb").setSource(source)); + expectedValuesForTimeSeries.add(Map.entry(timestamp, value)); + } + expectedLatest = expectedLatest.entry(dimensions, List.of(Map.entry(timestamp, value))); + expectedValues = expectedValues.entry(dimensions, expectedValuesForTimeSeries); + } + indexRandom(true, docs); + assertMap(latestInRange(iterationSize, min, max), expectedLatest); + assertMap(valuesInRange(iterationSize, min, max), expectedValues); + } + + public void testManySteps() throws InterruptedException, ExecutionException, IOException { + createTsdbIndex("dim"); + List> expectedLatest = new ArrayList<>(); + List> expectedValues = new ArrayList<>(); + String min = "2021-01-01T00:00:00Z"; + long minMillis = FORMATTER.parseMillis(min); + int iterationBuckets = scaledRandomIntBetween(50, 100); + int bucketCount = scaledRandomIntBetween(iterationBuckets * 2, iterationBuckets * 100); + long maxMillis = minMillis + bucketCount * TimeUnit.SECONDS.toMillis(5); + String max = FORMATTER.formatMillis(maxMillis); + List docs = new ArrayList<>(bucketCount); + for (long millis = minMillis; millis < maxMillis; millis += TimeUnit.SECONDS.toMillis(5)) { + String timestamp = FORMATTER.formatMillis(millis); + double v = randomDouble(); + if (randomBoolean()) { + String beforeTimestamp = FORMATTER.formatMillis(millis - 1); + double beforeValue = randomDouble(); + docs.add(client().prepareIndex("tsdb").setSource(Map.of("@timestamp", beforeTimestamp, "dim", "dim", "v", beforeValue))); + expectedValues.add(Map.entry(beforeTimestamp, beforeValue)); + } + expectedLatest.add(Map.entry(timestamp, v)); + expectedValues.add(Map.entry(timestamp, v)); + docs.add(client().prepareIndex("tsdb").setSource(Map.of("@timestamp", timestamp, "dim", "dim", "v", v))); + } + indexRandom(true, docs); + assertMap( + latestInRanges(iterationBuckets, "2020-01-01T00:00:00Z", max, new DateHistogramInterval("5s")), + matchesMap(Map.of(Map.of("dim", "dim"), expectedLatest)) + ); + assertMap(valuesInRange(iterationBuckets, "2020-01-01T00:00:00Z", max), matchesMap(Map.of(Map.of("dim", "dim"), expectedValues))); + } + + private void createTsdbIndex(String... keywordDimensions) throws IOException { + createTsdbIndex(mapping -> { + for (String d : keywordDimensions) { + mapping.startObject(d).field("type", "keyword").field("time_series_dimension", true).endObject(); + } + }); + } + + private void createTsdbIndex(CheckedConsumer dimensionMapping) throws IOException { + XContentBuilder mapping = JsonXContent.contentBuilder(); + mapping.startObject().startObject("properties"); + mapping.startObject("@timestamp").field("type", "date").endObject(); + mapping.startObject("v").field("type", "double").endObject(); + dimensionMapping.accept(mapping); + mapping.endObject().endObject(); + client().admin().indices().prepareCreate("tsdb").setMapping(mapping).get(); + } + + private Map, List>> latestInRange(int bucketBatchSize, String min, String max) { + TemporalAccessor minT = FORMATTER.parse(min); + TemporalAccessor maxT = FORMATTER.parse(max); + if (randomBoolean()) { + long days = Instant.from(maxT).until(Instant.from(minT), ChronoUnit.DAYS) + 1; + DateHistogramInterval step = new DateHistogramInterval(days + "d"); + return latestInRanges(bucketBatchSize, minT, maxT, step); + } + return latestInRange(bucketBatchSize, minT, maxT); + } + + private Map, List>> latestInRange( + int bucketBatchSize, + TemporalAccessor min, + TemporalAccessor max + ) { + return withMetrics( + bucketBatchSize, + between(0, 10000), // Not used by this method + (future, metrics) -> metrics.latestInRange("v", min, max, new CollectingListener(future)) + ); + } + + private Map, List>> latestInRanges( + int bucketBatchSize, + String min, + String max, + DateHistogramInterval step + ) { + return latestInRanges(bucketBatchSize, FORMATTER.parse(min), FORMATTER.parse(max), step); + } + + private Map, List>> latestInRanges( + int bucketBatchSize, + TemporalAccessor min, + TemporalAccessor max, + DateHistogramInterval step + ) { + return withMetrics( + bucketBatchSize, + between(0, 10000), // Not used by this method + (future, metrics) -> metrics.latestInRanges("v", min, max, step, new CollectingListener(future)) + ); + } + + private Map, List>> valuesInRange(int docBatchSize, String min, String max) { + return valuesInRange(docBatchSize, FORMATTER.parse(min), FORMATTER.parse(max)); + } + + private Map, List>> valuesInRange( + int docBatchSize, + TemporalAccessor min, + TemporalAccessor max + ) { + return withMetrics( + between(0, 10000), // Not used by this method + docBatchSize, + (future, metrics) -> metrics.valuesInRange("v", min, max, new CollectingListener(future)) + ); + } + + private R withMetrics(int bucketBatchSize, int docBatchSize, BiConsumer, TimeSeriesMetrics> handle) { + ListenableActionFuture result = new ListenableActionFuture<>(); + new TimeSeriesMetricsService(client(), bucketBatchSize, docBatchSize).newMetrics( + new String[] { "tsdb" }, + new ActionListener() { + @Override + public void onResponse(TimeSeriesMetrics metrics) { + handle.accept(result, metrics); + } + + @Override + public void onFailure(Exception e) { + result.onFailure(e); + } + } + ); + return result.actionGet(); + } + + private class CollectingListener implements TimeSeriesMetrics.MetricsCallback { + private final Map, List>> results = new HashMap<>(); + private final ActionListener, List>>> delegate; + private Map currentDimensions = null; + private List> currentValues = null; + + CollectingListener(ActionListener, List>>> delegate) { + this.delegate = delegate; + } + + @Override + public void onTimeSeriesStart(Map dimensions) { + if (currentDimensions != null) { + results.put(currentDimensions, currentValues); + } + currentDimensions = dimensions; + currentValues = new ArrayList<>(); + } + + @Override + public void onMetric(long time, double value) { + currentValues.add(Map.entry(FORMATTER.formatMillis(time), value)); + } + + @Override + public void onSuccess() { + results.put(currentDimensions, currentValues); + delegate.onResponse(results); + } + + @Override + public void onError(Exception e) { + delegate.onFailure(e); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/tsdb/TimeSeriesMetrics.java b/server/src/main/java/org/elasticsearch/search/tsdb/TimeSeriesMetrics.java new file mode 100644 index 0000000000000..748c49eb6f2ad --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/tsdb/TimeSeriesMetrics.java @@ -0,0 +1,352 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.tsdb; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.DateHistogramValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.InternalComposite; +import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.metrics.InternalTopHits; +import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; +import org.elasticsearch.search.fetch.subphase.FieldAndFormat; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.search.sort.SortOrder; + +import java.time.temporal.TemporalAccessor; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; + +/** + * Reads data in a time series style way. + */ +public class TimeSeriesMetrics { + private static final Logger logger = LogManager.getLogger(); + + private final int bucketBatchSize; + private final int docBatchSize; + private final Client client; + private final String[] indices; + private final List dimensionFieldNames; + + TimeSeriesMetrics(int bucketBatchSize, int docBatchSize, Client client, String[] indices, List dimensionFieldNames) { + this.bucketBatchSize = bucketBatchSize; + this.docBatchSize = docBatchSize; + this.client = client; + this.indices = indices; + this.dimensionFieldNames = dimensionFieldNames; + } + + /** + * Called when metric data is available. + */ + interface MetricsCallback { + /** + * Called when starting a new time series. + */ + void onTimeSeriesStart(Map dimensions); + + /** + * Called for each metric returned. + * @param time the {@code @timestamp} recorded in the metric + * @param value the metric value + */ + void onMetric(long time, double value); + + /** + * Called when all requested metrics have been returned. + */ + void onSuccess(); + + /** + * Called when there is any error fetching metrics. No more results + * will be returned. + */ + void onError(Exception e); + } + + // TODO selector + /** + * Get the latest value for all time series in the range. + */ + public void latestInRange(String metric, TemporalAccessor from, TemporalAccessor to, MetricsCallback callback) { + latestInRanage(metric, from, to, null, null, null, callback); + } + + // TODO selector + /** + * Get the latest value for all time series in many ranges. + */ + public void latestInRanges( + String metric, + TemporalAccessor from, + TemporalAccessor to, + DateHistogramInterval step, + MetricsCallback callback + ) { + latestInRanage(metric, from, to, step, null, null, callback); + } + + /** + * Get the latest value for all time series in one or many ranges. + * @param step null if reading from a single range, the length of the range otherwise. + */ + private void latestInRanage( + String metric, + TemporalAccessor from, + TemporalAccessor to, + @Nullable DateHistogramInterval step, + @Nullable Map afterKey, + @Nullable Map previousTimeSeries, + MetricsCallback callback + ) { + // TODO test asserting forking + SearchRequest search = searchInRange(from, to); + search.source().size(0); + search.source().trackTotalHits(false); + search.source().aggregation(timeSeriesComposite(step, afterKey).subAggregation(latestMetric(metric))); + logger.debug("Requesting batch of latest {}", search); + client.search( + search, + ActionListener.wrap( + new LatestInRangeResponseHandler(metric, callback, from, to, step, search, previousTimeSeries), + callback::onError + ) + ); + } + + private SearchRequest searchInRange(TemporalAccessor from, TemporalAccessor to) { + SearchRequest search = new SearchRequest(indices); + search.source() + .query( + new RangeQueryBuilder("@timestamp").format(DateFieldMapper.DEFAULT_DATE_TIME_NANOS_FORMATTER.pattern()) + .gt(DateFieldMapper.DEFAULT_DATE_TIME_NANOS_FORMATTER.format(from)) + .lte(DateFieldMapper.DEFAULT_DATE_TIME_NANOS_FORMATTER.format(to)) + ); + return search; + } + + private CompositeAggregationBuilder timeSeriesComposite(@Nullable DateHistogramInterval step, @Nullable Map afterKey) { + Stream> sources = dimensionFieldNames.stream() + .map(d -> new TermsValuesSourceBuilder(d).field(d).missingBucket(true)); + if (step != null) { + sources = Stream.concat( + sources, + /* + * offset(1) *includes* that last milli of the range and excludes + * the first milli of the range - effectively shifting us from a + * closed/open range to an open/closed range. + */ + Stream.of(new DateHistogramValuesSourceBuilder("@timestamp").field("@timestamp").fixedInterval(step).offset(1)) + ); + } + return new CompositeAggregationBuilder("time_series", sources.collect(toList())).aggregateAfter(afterKey).size(bucketBatchSize); + } + + private TopHitsAggregationBuilder latestMetric(String metric) { + // TODO top metrics would almost certainly be better here but its in analytics. + return new TopHitsAggregationBuilder("latest").sort(new FieldSortBuilder("@timestamp").order(SortOrder.DESC)) + .fetchField(metric) + .fetchField(new FieldAndFormat("@timestamp", "epoch_millis")) + .size(1); + } + + /** + * Handler for each page of results from {@link TimeSeriesMetrics#latestInRanage}. + */ + private class LatestInRangeResponseHandler implements CheckedConsumer { + private final String metric; + private final MetricsCallback callback; + private final TemporalAccessor from; + private final TemporalAccessor to; + @Nullable + private final DateHistogramInterval step; + private final SearchRequest search; + private Map previousDimensions; + + LatestInRangeResponseHandler( + String metric, + MetricsCallback callback, + TemporalAccessor from, + TemporalAccessor to, + @Nullable DateHistogramInterval step, + SearchRequest search, + @Nullable Map previousDimensions + ) { + this.metric = metric; + this.callback = callback; + this.from = from; + this.to = to; + this.step = step; + this.search = search; + this.previousDimensions = previousDimensions; + } + + @Override + public void accept(SearchResponse response) { + // TODO shard error handling + InternalComposite composite = response.getAggregations().get("time_series"); + logger.debug("Received batch of latest {} with {} buckets", search, composite.getBuckets().size()); + for (InternalComposite.InternalBucket bucket : composite.getBuckets()) { + Map dimensions = bucket.getKey() + .entrySet() + .stream() + .filter(e -> false == e.getKey().equals("@timestamp") && e.getValue() != null) + .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); + if (false == Objects.equals(previousDimensions, dimensions)) { + previousDimensions = dimensions; + callback.onTimeSeriesStart(dimensions); + } + InternalTopHits latest = bucket.getAggregations().get("latest"); + SearchHit[] hits = latest.getHits().getHits(); + if (hits.length == 0) { + continue; + } + DocumentField metricField = hits[0].field(metric); + if (metricField == null) { + // TODO skip in query? + continue; + } + long time = Long.parseLong((String) hits[0].field("@timestamp").getValue()); + double value = ((Number) metricField.getValue()).doubleValue(); + callback.onMetric(time, value); + } + if (composite.afterKey() == null) { + callback.onSuccess(); + } else { + latestInRanage(metric, from, to, step, composite.afterKey(), previousDimensions, callback); + } + } + } + + // TODO selector + /** + * Return all values for all time series in a range. + */ + public void valuesInRange(String metric, TemporalAccessor from, TemporalAccessor to, MetricsCallback listener) { + valuesInRange(metric, from, to, null, null, listener); + } + + /** + * Search for a page of values for all time series in a range. + */ + private void valuesInRange( + String metric, + TemporalAccessor from, + TemporalAccessor to, + Object[] searchAfter, + Map previousTimeSeries, + MetricsCallback callback + ) { + SearchRequest search = searchInRange(from, to); + search.source().size(docBatchSize); + search.source().trackTotalHits(false); + List> sorts = Stream.concat( + dimensionFieldNames.stream().map(d -> new FieldSortBuilder(d).order(SortOrder.ASC)), + Stream.of(new FieldSortBuilder("@timestamp").order(SortOrder.ASC).setFormat("epoch_millis")) + ).collect(toList()); + search.source().sort(sorts); + if (searchAfter != null) { + search.source().searchAfter(searchAfter); + } + search.source().fetchField(metric); + client.search( + search, + ActionListener.wrap(new ValuesInRangeResponseHandler(metric, callback, from, to, search, previousTimeSeries), callback::onError) + ); + } + + /** + * Handler for {@link TimeSeriesMetrics#valuesInRange}. + */ + private class ValuesInRangeResponseHandler implements CheckedConsumer { + private final String metric; + private final MetricsCallback callback; + private final TemporalAccessor from; + private final TemporalAccessor to; + private final SearchRequest search; + private Map previousDimensions; + + ValuesInRangeResponseHandler( + String metric, + MetricsCallback callback, + TemporalAccessor from, + TemporalAccessor to, + SearchRequest search, + @Nullable Map previousDimensions + ) { + this.metric = metric; + this.callback = callback; + this.from = from; + this.to = to; + this.search = search; + this.previousDimensions = previousDimensions; + } + + @Override + public void accept(SearchResponse response) { + // TODO shard error handling + logger.debug("Received batch of values {} with {} docs", search, response.getHits().getHits().length); + SearchHit[] hits = response.getHits().getHits(); + for (SearchHit hit : hits) { + /* + * Read the dimensions out of the sort. This is useful because + * we already need the sort so we can do proper pagination but + * it also converts numeric dimension into a Long which is nice + * and consistent. + */ + Map dimensions = new HashMap<>(); + for (int d = 0; d < dimensionFieldNames.size(); d++) { + Object dimensionValue = hit.getSortValues()[d]; + if (dimensionValue != null) { + dimensions.put(dimensionFieldNames.get(d), dimensionValue); + } + } + if (false == Objects.equals(previousDimensions, dimensions)) { + previousDimensions = dimensions; + callback.onTimeSeriesStart(dimensions); + } + DocumentField metricField = hit.field(metric); + if (metricField == null) { + // TODO skip in query? + continue; + } + long time = Long.parseLong((String) hit.getSortValues()[dimensionFieldNames.size()]); + double value = ((Number) metricField.getValue()).doubleValue(); + callback.onMetric(time, value); + } + + if (hits.length < docBatchSize) { + callback.onSuccess(); + } else { + valuesInRange(metric, from, to, hits[hits.length - 1].getSortValues(), previousDimensions, callback); + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/tsdb/TimeSeriesMetricsService.java b/server/src/main/java/org/elasticsearch/search/tsdb/TimeSeriesMetricsService.java new file mode 100644 index 0000000000000..f23c1e01c7124 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/tsdb/TimeSeriesMetricsService.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.tsdb; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.fieldcaps.FieldCapabilities; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; +import org.elasticsearch.client.Client; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class TimeSeriesMetricsService { + private final Client client; + private final int bucketBatchSize; + private final int docBatchSize; + + public TimeSeriesMetricsService(Client client, int bucketBatchSize, int docBatchSize) { // TODO read maxBuckets at runtime + this.client = client; + this.bucketBatchSize = bucketBatchSize; + this.docBatchSize = docBatchSize; + } + + public void newMetrics(String[] indices, ActionListener listener) { + FieldCapabilitiesRequest request = new FieldCapabilitiesRequest(); + request.indices(indices); + request.fields("*"); // TODO * can be a lot! + client.fieldCaps(request, listener.map(this::newMetrics)); + } + + private TimeSeriesMetrics newMetrics(FieldCapabilitiesResponse response) { + if (response.getFailures().isEmpty() == false) { + ElasticsearchException e = new ElasticsearchException( + "Failed to fetch field caps for " + Arrays.toString(response.getFailedIndices()) + ); + for (FieldCapabilitiesFailure f : response.getFailures()) { + e.addSuppressed( + new ElasticsearchException("Failed to fetch field caps for " + Arrays.toString(f.getIndices()), f.getException()) + ); + } + throw e; + } + List dimensionFieldNames = new ArrayList<>(); + for (Map.Entry> e : response.get().entrySet()) { + for (Map.Entry e2 : e.getValue().entrySet()) { + collectField(dimensionFieldNames, e.getKey(), e2.getKey(), e2.getValue()); + } + } + return new TimeSeriesMetrics(bucketBatchSize, docBatchSize, client, response.getIndices(), List.copyOf(dimensionFieldNames)); + } + + private void collectField(List dimensions, String fieldName, String fieldType, FieldCapabilities capabilities) { + // TODO collect metrics for selector + if (capabilities.isDimension()) { + dimensions.add(fieldName); + } + } +} From c9f9d83294cbc2acd74425e01437c0a92fb278c1 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 15 Oct 2021 16:26:07 -0400 Subject: [PATCH 2/2] Move package --- .../tsdb => timeseries/support}/TimeSeriesMetricsIT.java | 4 +++- .../tsdb => timeseries/support}/TimeSeriesMetrics.java | 2 +- .../tsdb => timeseries/support}/TimeSeriesMetricsService.java | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) rename server/src/internalClusterTest/java/org/elasticsearch/{search/tsdb => timeseries/support}/TimeSeriesMetricsIT.java (98%) rename server/src/main/java/org/elasticsearch/{search/tsdb => timeseries/support}/TimeSeriesMetrics.java (99%) rename server/src/main/java/org/elasticsearch/{search/tsdb => timeseries/support}/TimeSeriesMetricsService.java (98%) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/tsdb/TimeSeriesMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/timeseries/support/TimeSeriesMetricsIT.java similarity index 98% rename from server/src/internalClusterTest/java/org/elasticsearch/search/tsdb/TimeSeriesMetricsIT.java rename to server/src/internalClusterTest/java/org/elasticsearch/timeseries/support/TimeSeriesMetricsIT.java index fd7de24420bf4..450d6a4757847 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/tsdb/TimeSeriesMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/timeseries/support/TimeSeriesMetricsIT.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.search.tsdb; +package org.elasticsearch.timeseries.support; import io.github.nik9000.mapmatcher.MapMatcher; @@ -22,6 +22,8 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.timeseries.support.TimeSeriesMetrics; +import org.elasticsearch.timeseries.support.TimeSeriesMetricsService; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; diff --git a/server/src/main/java/org/elasticsearch/search/tsdb/TimeSeriesMetrics.java b/server/src/main/java/org/elasticsearch/timeseries/support/TimeSeriesMetrics.java similarity index 99% rename from server/src/main/java/org/elasticsearch/search/tsdb/TimeSeriesMetrics.java rename to server/src/main/java/org/elasticsearch/timeseries/support/TimeSeriesMetrics.java index 748c49eb6f2ad..1d5794ffca417 100644 --- a/server/src/main/java/org/elasticsearch/search/tsdb/TimeSeriesMetrics.java +++ b/server/src/main/java/org/elasticsearch/timeseries/support/TimeSeriesMetrics.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.search.tsdb; +package org.elasticsearch.timeseries.support; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/server/src/main/java/org/elasticsearch/search/tsdb/TimeSeriesMetricsService.java b/server/src/main/java/org/elasticsearch/timeseries/support/TimeSeriesMetricsService.java similarity index 98% rename from server/src/main/java/org/elasticsearch/search/tsdb/TimeSeriesMetricsService.java rename to server/src/main/java/org/elasticsearch/timeseries/support/TimeSeriesMetricsService.java index f23c1e01c7124..699eb92ce3e24 100644 --- a/server/src/main/java/org/elasticsearch/search/tsdb/TimeSeriesMetricsService.java +++ b/server/src/main/java/org/elasticsearch/timeseries/support/TimeSeriesMetricsService.java @@ -6,7 +6,7 @@ * Side Public License, v 1. */ -package org.elasticsearch.search.tsdb; +package org.elasticsearch.timeseries.support; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener;