Skip to content

Commit 26389ef

Browse files
committed
Create a coordinating node level reader for tsdb
This creates an interface that reads data in a time series compatible way on the coordinating node. We believe that it can one day smooth out querying time series data at a high level. Right now there is a single implementation of this interface that targets standard indices very inefficiently. It delegates down to our standard `_search` APIs, specifically `composite`, `top_hits`, and `search_after`. It is our hope that when we have fancier TSDB support we can use it to speed the API. The API itself looks like: ``` // The latest value for all time series in the range void latestInRange(metric, from, to, callback); // The latest value for all time series in ranges starting from // `from`, st void latestInRanges(metric, from, to, step, callback); void valuesInRange(metric, from, to, callback); ```
1 parent cff383f commit 26389ef

File tree

3 files changed

+788
-0
lines changed

3 files changed

+788
-0
lines changed
Lines changed: 367 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,367 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.search.tsdb;
10+
11+
import io.github.nik9000.mapmatcher.MapMatcher;
12+
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.index.IndexRequestBuilder;
15+
import org.elasticsearch.action.support.ListenableActionFuture;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.time.DateFormatter;
18+
import org.elasticsearch.core.CheckedConsumer;
19+
import org.elasticsearch.index.IndexSettings;
20+
import org.elasticsearch.index.mapper.DateFieldMapper;
21+
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
22+
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
23+
import org.elasticsearch.test.ESIntegTestCase;
24+
import org.elasticsearch.test.junit.annotations.TestLogging;
25+
import org.elasticsearch.xcontent.XContentBuilder;
26+
import org.elasticsearch.xcontent.json.JsonXContent;
27+
28+
import java.io.IOException;
29+
import java.time.Instant;
30+
import java.time.temporal.ChronoUnit;
31+
import java.time.temporal.TemporalAccessor;
32+
import java.util.ArrayList;
33+
import java.util.HashMap;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.Set;
37+
import java.util.TreeSet;
38+
import java.util.concurrent.ExecutionException;
39+
import java.util.concurrent.TimeUnit;
40+
import java.util.function.BiConsumer;
41+
import java.util.function.IntFunction;
42+
43+
import static io.github.nik9000.mapmatcher.MapMatcher.assertMap;
44+
import static io.github.nik9000.mapmatcher.MapMatcher.matchesMap;
45+
46+
@TestLogging(value = "org.elasticsearch.search.tsdb:debug", reason = "test")
47+
public class TimeSeriesMetricsIT extends ESIntegTestCase {
48+
private static final int MAX_RESULT_WINDOW = IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(Settings.EMPTY);
49+
private static final DateFormatter FORMATTER = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER;
50+
51+
public void testKeywordDimension() throws Exception {
52+
assertSmallSimple("a", "b", mapping -> mapping.field("type", "keyword"));
53+
}
54+
55+
public void testByteDimension() throws Exception {
56+
assertSmallSimple(0L, 1L, mapping -> mapping.field("type", "byte"));
57+
}
58+
59+
public void testShortDimension() throws Exception {
60+
assertSmallSimple(0L, 1L, mapping -> mapping.field("type", "short"));
61+
}
62+
63+
public void testIntDimension() throws Exception {
64+
assertSmallSimple(0L, 1L, mapping -> mapping.field("type", "integer"));
65+
}
66+
67+
public void testLongDimension() throws Exception {
68+
assertSmallSimple(0L, 1L, mapping -> mapping.field("type", "long"));
69+
}
70+
71+
public void testIpDimension() throws Exception {
72+
assertSmallSimple("192.168.0.1", "2001:db8::1:0:0:1", mapping -> mapping.field("type", "ip"));
73+
}
74+
75+
// TODO unsigned long dimension
76+
77+
public void assertSmallSimple(Object d1, Object d2, CheckedConsumer<XContentBuilder, IOException> dimensionMapping) throws Exception {
78+
createTsdbIndex(mapping -> {
79+
mapping.startObject("dim");
80+
dimensionMapping.accept(mapping);
81+
mapping.field("time_series_dimension", true);
82+
mapping.endObject();
83+
});
84+
String beforeAll = "2021-01-01T00:05:00Z";
85+
String[] dates = new String[] {
86+
"2021-01-01T00:10:00.000Z",
87+
"2021-01-01T00:11:00.000Z",
88+
"2021-01-01T00:15:00.000Z",
89+
"2021-01-01T00:20:00.000Z", };
90+
indexRandom(
91+
true,
92+
client().prepareIndex("tsdb").setSource(Map.of("@timestamp", dates[0], "dim", d1, "v", 1)),
93+
client().prepareIndex("tsdb").setSource(Map.of("@timestamp", dates[1], "dim", d1, "v", 2)),
94+
client().prepareIndex("tsdb").setSource(Map.of("@timestamp", dates[2], "dim", d1, "v", 3)),
95+
client().prepareIndex("tsdb").setSource(Map.of("@timestamp", dates[3], "dim", d1, "v", 4)),
96+
client().prepareIndex("tsdb").setSource(Map.of("@timestamp", dates[1], "dim", d2, "v", 5))
97+
);
98+
assertMap(
99+
latestInRange(between(1, MultiBucketConsumerService.DEFAULT_MAX_BUCKETS), beforeAll, dates[0]),
100+
matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[0], 1.0)))
101+
);
102+
assertMap(
103+
valuesInRange(between(1, MAX_RESULT_WINDOW), beforeAll, dates[0]),
104+
matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[0], 1.0)))
105+
);
106+
assertMap(
107+
latestInRange(between(1, MultiBucketConsumerService.DEFAULT_MAX_BUCKETS), dates[0], dates[2]),
108+
matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[2], 3.0)))
109+
.entry(Map.of("dim", d2), List.of(Map.entry(dates[1], 5.0)))
110+
);
111+
assertMap(
112+
valuesInRange(between(1, MAX_RESULT_WINDOW), dates[0], dates[2]),
113+
matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[1], 2.0), Map.entry(dates[2], 3.0)))
114+
.entry(Map.of("dim", d2), List.of(Map.entry(dates[1], 5.0)))
115+
);
116+
assertMap(
117+
latestInRange(between(1, MultiBucketConsumerService.DEFAULT_MAX_BUCKETS), beforeAll, dates[3]),
118+
matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[3], 4.0)))
119+
.entry(Map.of("dim", d2), List.of(Map.entry(dates[1], 5.0)))
120+
);
121+
assertMap(
122+
valuesInRange(between(1, MAX_RESULT_WINDOW), beforeAll, dates[3]),
123+
matchesMap().entry(
124+
Map.of("dim", d1),
125+
List.of(Map.entry(dates[0], 1.0), Map.entry(dates[1], 2.0), Map.entry(dates[2], 3.0), Map.entry(dates[3], 4.0))
126+
).entry(Map.of("dim", d2), List.of(Map.entry(dates[1], 5.0)))
127+
);
128+
assertMap(
129+
latestInRanges(
130+
between(1, MultiBucketConsumerService.DEFAULT_MAX_BUCKETS),
131+
beforeAll,
132+
dates[3],
133+
new DateHistogramInterval("5m")
134+
),
135+
matchesMap().entry(Map.of("dim", d1), List.of(Map.entry(dates[0], 1.0), Map.entry(dates[2], 3.0), Map.entry(dates[3], 4.0)))
136+
.entry(Map.of("dim", d2), List.of(Map.entry(dates[1], 5.0)))
137+
);
138+
}
139+
140+
public void testManyTimeSeries() throws InterruptedException, ExecutionException, IOException {
141+
createTsdbIndex("dim");
142+
assertManyTimeSeries(i -> Map.of("dim", Integer.toString(i, Character.MAX_RADIX)));
143+
}
144+
145+
public void testManyTimeSeriesWithManyDimensions() throws InterruptedException, ExecutionException, IOException {
146+
createTsdbIndex("dim0", "dim1", "dim2", "dim3", "dim4", "dim5", "dim6", "dim7");
147+
assertManyTimeSeries(i -> {
148+
int dimCount = (i & 0x07) + 1;
149+
Map<String, Object> dims = new HashMap<>(dimCount);
150+
int offset = (i >> 3) & 0x03;
151+
String value = Integer.toString(i, Character.MAX_RADIX);
152+
for (int d = 0; d < dimCount; d++) {
153+
dims.put("dim" + ((d + offset) & 0x07), value);
154+
}
155+
return dims;
156+
});
157+
}
158+
159+
private void assertManyTimeSeries(IntFunction<Map<String, Object>> gen) throws InterruptedException {
160+
MapMatcher expectedLatest = matchesMap();
161+
MapMatcher expectedValues = matchesMap();
162+
String min = "2021-01-01T00:10:00Z";
163+
String max = "2021-01-01T00:15:00Z";
164+
long minMillis = FORMATTER.parseMillis(min);
165+
long maxMillis = FORMATTER.parseMillis(max);
166+
int iterationSize = scaledRandomIntBetween(50, 100);
167+
int docCount = scaledRandomIntBetween(iterationSize * 2, iterationSize * 100);
168+
List<IndexRequestBuilder> docs = new ArrayList<>(docCount);
169+
for (int i = 0; i < docCount; i++) {
170+
int count = randomBoolean() ? 1 : 2;
171+
Set<Long> times = new TreeSet<>(); // We're using the ordered sort below
172+
while (times.size() < count) {
173+
times.add(randomLongBetween(minMillis + 1, maxMillis));
174+
}
175+
List<Map.Entry<String, Double>> expectedValuesForTimeSeries = new ArrayList<>(count);
176+
Map<String, Object> dimensions = gen.apply(i);
177+
String timestamp = null;
178+
double value = Double.NaN;
179+
for (long time : times) {
180+
timestamp = FORMATTER.formatMillis(time);
181+
value = randomDouble();
182+
Map<String, Object> source = new HashMap<>(dimensions);
183+
source.put("@timestamp", timestamp);
184+
source.put("v", value);
185+
if (randomBoolean()) {
186+
int garbage = between(1, 10);
187+
for (int g = 0; g < garbage; g++) {
188+
source.put("garbage" + g, randomAlphaOfLength(5));
189+
}
190+
}
191+
docs.add(client().prepareIndex("tsdb").setSource(source));
192+
expectedValuesForTimeSeries.add(Map.entry(timestamp, value));
193+
}
194+
expectedLatest = expectedLatest.entry(dimensions, List.of(Map.entry(timestamp, value)));
195+
expectedValues = expectedValues.entry(dimensions, expectedValuesForTimeSeries);
196+
}
197+
indexRandom(true, docs);
198+
assertMap(latestInRange(iterationSize, min, max), expectedLatest);
199+
assertMap(valuesInRange(iterationSize, min, max), expectedValues);
200+
}
201+
202+
public void testManySteps() throws InterruptedException, ExecutionException, IOException {
203+
createTsdbIndex("dim");
204+
List<Map.Entry<String, Double>> expectedLatest = new ArrayList<>();
205+
List<Map.Entry<String, Double>> expectedValues = new ArrayList<>();
206+
String min = "2021-01-01T00:00:00Z";
207+
long minMillis = FORMATTER.parseMillis(min);
208+
int iterationBuckets = scaledRandomIntBetween(50, 100);
209+
int bucketCount = scaledRandomIntBetween(iterationBuckets * 2, iterationBuckets * 100);
210+
long maxMillis = minMillis + bucketCount * TimeUnit.SECONDS.toMillis(5);
211+
String max = FORMATTER.formatMillis(maxMillis);
212+
List<IndexRequestBuilder> docs = new ArrayList<>(bucketCount);
213+
for (long millis = minMillis; millis < maxMillis; millis += TimeUnit.SECONDS.toMillis(5)) {
214+
String timestamp = FORMATTER.formatMillis(millis);
215+
double v = randomDouble();
216+
if (randomBoolean()) {
217+
String beforeTimestamp = FORMATTER.formatMillis(millis - 1);
218+
double beforeValue = randomDouble();
219+
docs.add(client().prepareIndex("tsdb").setSource(Map.of("@timestamp", beforeTimestamp, "dim", "dim", "v", beforeValue)));
220+
expectedValues.add(Map.entry(beforeTimestamp, beforeValue));
221+
}
222+
expectedLatest.add(Map.entry(timestamp, v));
223+
expectedValues.add(Map.entry(timestamp, v));
224+
docs.add(client().prepareIndex("tsdb").setSource(Map.of("@timestamp", timestamp, "dim", "dim", "v", v)));
225+
}
226+
indexRandom(true, docs);
227+
assertMap(
228+
latestInRanges(iterationBuckets, "2020-01-01T00:00:00Z", max, new DateHistogramInterval("5s")),
229+
matchesMap(Map.of(Map.of("dim", "dim"), expectedLatest))
230+
);
231+
assertMap(valuesInRange(iterationBuckets, "2020-01-01T00:00:00Z", max), matchesMap(Map.of(Map.of("dim", "dim"), expectedValues)));
232+
}
233+
234+
private void createTsdbIndex(String... keywordDimensions) throws IOException {
235+
createTsdbIndex(mapping -> {
236+
for (String d : keywordDimensions) {
237+
mapping.startObject(d).field("type", "keyword").field("time_series_dimension", true).endObject();
238+
}
239+
});
240+
}
241+
242+
private void createTsdbIndex(CheckedConsumer<XContentBuilder, IOException> dimensionMapping) throws IOException {
243+
XContentBuilder mapping = JsonXContent.contentBuilder();
244+
mapping.startObject().startObject("properties");
245+
mapping.startObject("@timestamp").field("type", "date").endObject();
246+
mapping.startObject("v").field("type", "double").endObject();
247+
dimensionMapping.accept(mapping);
248+
mapping.endObject().endObject();
249+
client().admin().indices().prepareCreate("tsdb").setMapping(mapping).get();
250+
}
251+
252+
private Map<Map<String, Object>, List<Map.Entry<String, Double>>> latestInRange(int bucketBatchSize, String min, String max) {
253+
TemporalAccessor minT = FORMATTER.parse(min);
254+
TemporalAccessor maxT = FORMATTER.parse(max);
255+
if (randomBoolean()) {
256+
long days = Instant.from(maxT).until(Instant.from(minT), ChronoUnit.DAYS) + 1;
257+
DateHistogramInterval step = new DateHistogramInterval(days + "d");
258+
return latestInRanges(bucketBatchSize, minT, maxT, step);
259+
}
260+
return latestInRange(bucketBatchSize, minT, maxT);
261+
}
262+
263+
private Map<Map<String, Object>, List<Map.Entry<String, Double>>> latestInRange(
264+
int bucketBatchSize,
265+
TemporalAccessor min,
266+
TemporalAccessor max
267+
) {
268+
return withMetrics(
269+
bucketBatchSize,
270+
between(0, 10000), // Not used by this method
271+
(future, metrics) -> metrics.latestInRange("v", min, max, new CollectingListener(future))
272+
);
273+
}
274+
275+
private Map<Map<String, Object>, List<Map.Entry<String, Double>>> latestInRanges(
276+
int bucketBatchSize,
277+
String min,
278+
String max,
279+
DateHistogramInterval step
280+
) {
281+
return latestInRanges(bucketBatchSize, FORMATTER.parse(min), FORMATTER.parse(max), step);
282+
}
283+
284+
private Map<Map<String, Object>, List<Map.Entry<String, Double>>> latestInRanges(
285+
int bucketBatchSize,
286+
TemporalAccessor min,
287+
TemporalAccessor max,
288+
DateHistogramInterval step
289+
) {
290+
return withMetrics(
291+
bucketBatchSize,
292+
between(0, 10000), // Not used by this method
293+
(future, metrics) -> metrics.latestInRanges("v", min, max, step, new CollectingListener(future))
294+
);
295+
}
296+
297+
private Map<Map<String, Object>, List<Map.Entry<String, Double>>> valuesInRange(int docBatchSize, String min, String max) {
298+
return valuesInRange(docBatchSize, FORMATTER.parse(min), FORMATTER.parse(max));
299+
}
300+
301+
private Map<Map<String, Object>, List<Map.Entry<String, Double>>> valuesInRange(
302+
int docBatchSize,
303+
TemporalAccessor min,
304+
TemporalAccessor max
305+
) {
306+
return withMetrics(
307+
between(0, 10000), // Not used by this method
308+
docBatchSize,
309+
(future, metrics) -> metrics.valuesInRange("v", min, max, new CollectingListener(future))
310+
);
311+
}
312+
313+
private <R> R withMetrics(int bucketBatchSize, int docBatchSize, BiConsumer<ListenableActionFuture<R>, TimeSeriesMetrics> handle) {
314+
ListenableActionFuture<R> result = new ListenableActionFuture<>();
315+
new TimeSeriesMetricsService(client(), bucketBatchSize, docBatchSize).newMetrics(
316+
new String[] { "tsdb" },
317+
new ActionListener<TimeSeriesMetrics>() {
318+
@Override
319+
public void onResponse(TimeSeriesMetrics metrics) {
320+
handle.accept(result, metrics);
321+
}
322+
323+
@Override
324+
public void onFailure(Exception e) {
325+
result.onFailure(e);
326+
}
327+
}
328+
);
329+
return result.actionGet();
330+
}
331+
332+
private class CollectingListener implements TimeSeriesMetrics.MetricsCallback {
333+
private final Map<Map<String, Object>, List<Map.Entry<String, Double>>> results = new HashMap<>();
334+
private final ActionListener<Map<Map<String, Object>, List<Map.Entry<String, Double>>>> delegate;
335+
private Map<String, Object> currentDimensions = null;
336+
private List<Map.Entry<String, Double>> currentValues = null;
337+
338+
CollectingListener(ActionListener<Map<Map<String, Object>, List<Map.Entry<String, Double>>>> delegate) {
339+
this.delegate = delegate;
340+
}
341+
342+
@Override
343+
public void onTimeSeriesStart(Map<String, Object> dimensions) {
344+
if (currentDimensions != null) {
345+
results.put(currentDimensions, currentValues);
346+
}
347+
currentDimensions = dimensions;
348+
currentValues = new ArrayList<>();
349+
}
350+
351+
@Override
352+
public void onMetric(long time, double value) {
353+
currentValues.add(Map.entry(FORMATTER.formatMillis(time), value));
354+
}
355+
356+
@Override
357+
public void onSuccess() {
358+
results.put(currentDimensions, currentValues);
359+
delegate.onResponse(results);
360+
}
361+
362+
@Override
363+
public void onError(Exception e) {
364+
delegate.onFailure(e);
365+
}
366+
}
367+
}

0 commit comments

Comments
 (0)