Skip to content

Commit 2b6891b

Browse files
author
Hendrik Muhs
authored
[7.x][Transform] implement test suite to test continuous transforms (#60725)
implements a test suite for testing continuous transform with randomization in terms of mappings, index settings, transform configuration. Add a test case for terms and date histogram. The test covers: - continuous mode with several checkpoints created - correctness of results - optimizations (minimal necessary writes) - permutations of features (index settings, aggs, data types, index or data stream)
1 parent 35b9f2b commit 2b6891b

File tree

5 files changed

+953
-6
lines changed

5 files changed

+953
-6
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/DestConfig.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@ public class DestConfig implements ToXContentObject {
3838
public static final ParseField INDEX = new ParseField("index");
3939
public static final ParseField PIPELINE = new ParseField("pipeline");
4040

41-
public static final ConstructingObjectParser<DestConfig, Void> PARSER = new ConstructingObjectParser<>("transform_config_dest",
41+
public static final ConstructingObjectParser<DestConfig, Void> PARSER = new ConstructingObjectParser<>(
42+
"transform_config_dest",
4243
true,
43-
args -> new DestConfig((String)args[0], (String)args[1]));
44+
args -> new DestConfig((String) args[0], (String) args[1])
45+
);
4446

4547
static {
4648
PARSER.declareString(constructorArg(), INDEX);
@@ -50,7 +52,7 @@ public class DestConfig implements ToXContentObject {
5052
private final String index;
5153
private final String pipeline;
5254

53-
DestConfig(String index, String pipeline) {
55+
public DestConfig(String index, String pipeline) {
5456
this.index = Objects.requireNonNull(index, INDEX.getPreferredName());
5557
this.pipeline = pipeline;
5658
}
@@ -84,12 +86,11 @@ public boolean equals(Object other) {
8486
}
8587

8688
DestConfig that = (DestConfig) other;
87-
return Objects.equals(index, that.index) &&
88-
Objects.equals(pipeline, that.pipeline);
89+
return Objects.equals(index, that.index) && Objects.equals(pipeline, that.pipeline);
8990
}
9091

9192
@Override
92-
public int hashCode(){
93+
public int hashCode() {
9394
return Objects.hash(index, pipeline);
9495
}
9596

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.transform.integration.continuous;
8+
9+
import org.elasticsearch.action.search.SearchRequest;
10+
import org.elasticsearch.action.search.SearchResponse;
11+
import org.elasticsearch.client.RequestOptions;
12+
import org.elasticsearch.client.RestHighLevelClient;
13+
import org.elasticsearch.client.transform.transforms.SettingsConfig;
14+
import org.elasticsearch.client.transform.transforms.SyncConfig;
15+
import org.elasticsearch.client.transform.transforms.TimeSyncConfig;
16+
import org.elasticsearch.client.transform.transforms.TransformConfig;
17+
import org.elasticsearch.common.settings.Settings;
18+
import org.elasticsearch.common.unit.TimeValue;
19+
import org.elasticsearch.common.util.concurrent.ThreadContext;
20+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
21+
import org.elasticsearch.search.SearchModule;
22+
import org.elasticsearch.search.aggregations.AggregationBuilders;
23+
import org.elasticsearch.search.aggregations.AggregatorFactories;
24+
import org.elasticsearch.test.rest.ESRestTestCase;
25+
26+
import java.io.IOException;
27+
import java.nio.charset.StandardCharsets;
28+
import java.time.format.DateTimeFormatter;
29+
import java.time.format.DateTimeFormatterBuilder;
30+
import java.util.Base64;
31+
import java.util.Collections;
32+
import java.util.List;
33+
import java.util.Locale;
34+
import java.util.concurrent.TimeUnit;
35+
36+
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
37+
import static java.time.temporal.ChronoField.HOUR_OF_DAY;
38+
import static java.time.temporal.ChronoField.MINUTE_OF_HOUR;
39+
import static java.time.temporal.ChronoField.NANO_OF_SECOND;
40+
import static java.time.temporal.ChronoField.SECOND_OF_MINUTE;
41+
42+
public abstract class ContinuousTestCase extends ESRestTestCase {
43+
44+
public static final String CONTINUOUS_EVENTS_SOURCE_INDEX = "test-transform-continuous-events";
45+
public static final String INGEST_PIPELINE = "transform-ingest";
46+
public static final String MAX_RUN_FIELD = "run.max";
47+
public static final String INGEST_RUN_FIELD = "run.ingest";
48+
public static final DateTimeFormatter STRICT_DATE_OPTIONAL_TIME_PRINTER_NANOS = new DateTimeFormatterBuilder().parseCaseInsensitive()
49+
.append(ISO_LOCAL_DATE)
50+
.appendLiteral('T')
51+
.appendValue(HOUR_OF_DAY, 2)
52+
.appendLiteral(':')
53+
.appendValue(MINUTE_OF_HOUR, 2)
54+
.appendLiteral(':')
55+
.appendValue(SECOND_OF_MINUTE, 2)
56+
.appendFraction(NANO_OF_SECOND, 3, 9, true)
57+
.appendOffsetId()
58+
.toFormatter(Locale.ROOT);
59+
60+
/**
61+
* Get the name of the transform/test
62+
*
63+
* @return name of the transform(used for start/stop)
64+
*/
65+
public abstract String getName();
66+
67+
/**
68+
* Create the transform configuration for the test.
69+
*
70+
* @return the transform configuration
71+
*/
72+
public abstract TransformConfig createConfig();
73+
74+
/**
75+
* Test results after 1 iteration in the test runner.
76+
*
77+
* @param iteration the current iteration
78+
*/
79+
public abstract void testIteration(int iteration) throws IOException;
80+
81+
protected TransformConfig.Builder addCommonBuilderParameters(TransformConfig.Builder builder) {
82+
return builder.setSyncConfig(getSyncConfig())
83+
.setSettings(addCommonSetings(new SettingsConfig.Builder()).build())
84+
.setFrequency(new TimeValue(1, TimeUnit.SECONDS));
85+
}
86+
87+
protected AggregatorFactories.Builder addCommonAggregations(AggregatorFactories.Builder builder) {
88+
builder.addAggregator(AggregationBuilders.max(MAX_RUN_FIELD).field("run"))
89+
.addAggregator(AggregationBuilders.count("count").field("run"));
90+
return builder;
91+
}
92+
93+
protected SettingsConfig.Builder addCommonSetings(SettingsConfig.Builder builder) {
94+
// enforce paging, to see we run through all of the options
95+
builder.setMaxPageSearchSize(10);
96+
return builder;
97+
}
98+
99+
protected SearchResponse search(SearchRequest searchRequest) throws IOException {
100+
try (RestHighLevelClient restClient = new TestRestHighLevelClient()) {
101+
return restClient.search(searchRequest, RequestOptions.DEFAULT);
102+
} catch (Exception e) {
103+
logger.error("Search failed with an exception.", e);
104+
throw e;
105+
}
106+
}
107+
108+
@Override
109+
protected Settings restClientSettings() {
110+
final String token = "Basic "
111+
+ Base64.getEncoder().encodeToString(("x_pack_rest_user:x-pack-test-password").getBytes(StandardCharsets.UTF_8));
112+
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
113+
}
114+
115+
private static class TestRestHighLevelClient extends RestHighLevelClient {
116+
private static final List<NamedXContentRegistry.Entry> X_CONTENT_ENTRIES = new SearchModule(
117+
Settings.EMPTY,
118+
false,
119+
Collections.emptyList()
120+
).getNamedXContents();
121+
122+
TestRestHighLevelClient() {
123+
super(client(), restClient -> {}, X_CONTENT_ENTRIES);
124+
}
125+
}
126+
127+
private SyncConfig getSyncConfig() {
128+
return new TimeSyncConfig("timestamp", new TimeValue(1, TimeUnit.SECONDS));
129+
}
130+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.transform.integration.continuous;
8+
9+
import org.elasticsearch.action.search.SearchRequest;
10+
import org.elasticsearch.action.search.SearchResponse;
11+
import org.elasticsearch.action.support.IndicesOptions;
12+
import org.elasticsearch.client.transform.transforms.DestConfig;
13+
import org.elasticsearch.client.transform.transforms.SourceConfig;
14+
import org.elasticsearch.client.transform.transforms.TransformConfig;
15+
import org.elasticsearch.client.transform.transforms.pivot.DateHistogramGroupSource;
16+
import org.elasticsearch.client.transform.transforms.pivot.GroupConfig;
17+
import org.elasticsearch.client.transform.transforms.pivot.PivotConfig;
18+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
19+
import org.elasticsearch.search.SearchHit;
20+
import org.elasticsearch.search.aggregations.AggregatorFactories;
21+
import org.elasticsearch.search.aggregations.BucketOrder;
22+
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
23+
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
24+
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
25+
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
26+
import org.elasticsearch.search.builder.SearchSourceBuilder;
27+
28+
import java.io.IOException;
29+
import java.time.Instant;
30+
import java.time.ZoneId;
31+
import java.time.ZonedDateTime;
32+
import java.util.Iterator;
33+
import java.util.List;
34+
import java.util.Map;
35+
36+
import static org.hamcrest.Matchers.equalTo;
37+
import static org.hamcrest.Matchers.is;
38+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
39+
40+
public class DataHistogramGroupByIT extends ContinuousTestCase {
41+
private static final String NAME = "continuous-date-histogram-pivot-test";
42+
private static final String MISSING_BUCKET_KEY = ContinuousTestCase.STRICT_DATE_OPTIONAL_TIME_PRINTER_NANOS.withZone(ZoneId.of("UTC"))
43+
.format(Instant.ofEpochMilli(42));
44+
45+
private final boolean missing;
46+
47+
public DataHistogramGroupByIT() {
48+
missing = randomBoolean();
49+
}
50+
51+
@Override
52+
public TransformConfig createConfig() {
53+
TransformConfig.Builder transformConfigBuilder = new TransformConfig.Builder();
54+
addCommonBuilderParameters(transformConfigBuilder);
55+
transformConfigBuilder.setSource(new SourceConfig(CONTINUOUS_EVENTS_SOURCE_INDEX));
56+
transformConfigBuilder.setDest(new DestConfig(NAME, INGEST_PIPELINE));
57+
transformConfigBuilder.setId(NAME);
58+
PivotConfig.Builder pivotConfigBuilder = new PivotConfig.Builder();
59+
pivotConfigBuilder.setGroups(
60+
new GroupConfig.Builder().groupBy(
61+
"second",
62+
new DateHistogramGroupSource.Builder().setField("timestamp")
63+
.setInterval(new DateHistogramGroupSource.FixedInterval(DateHistogramInterval.SECOND))
64+
.setMissingBucket(missing)
65+
.build()
66+
).build()
67+
);
68+
AggregatorFactories.Builder aggregations = new AggregatorFactories.Builder();
69+
addCommonAggregations(aggregations);
70+
71+
pivotConfigBuilder.setAggregations(aggregations);
72+
transformConfigBuilder.setPivotConfig(pivotConfigBuilder.build());
73+
return transformConfigBuilder.build();
74+
}
75+
76+
@Override
77+
public String getName() {
78+
return NAME;
79+
}
80+
81+
@Override
82+
public void testIteration(int iteration) throws IOException {
83+
SearchRequest searchRequestSource = new SearchRequest(CONTINUOUS_EVENTS_SOURCE_INDEX).allowPartialSearchResults(false)
84+
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
85+
SearchSourceBuilder sourceBuilderSource = new SearchSourceBuilder().size(0);
86+
DateHistogramAggregationBuilder bySecond = new DateHistogramAggregationBuilder("second").field("timestamp")
87+
.fixedInterval(DateHistogramInterval.SECOND)
88+
.order(BucketOrder.key(true));
89+
if (missing) {
90+
// missing_bucket produces `null`, we can't use `null` in aggs, so we have to use a magic value, see gh#60043
91+
bySecond.missing(MISSING_BUCKET_KEY);
92+
}
93+
sourceBuilderSource.aggregation(bySecond);
94+
searchRequestSource.source(sourceBuilderSource);
95+
SearchResponse responseSource = search(searchRequestSource);
96+
97+
SearchRequest searchRequestDest = new SearchRequest(NAME).allowPartialSearchResults(false)
98+
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
99+
SearchSourceBuilder sourceBuilderDest = new SearchSourceBuilder().size(100).sort("second");
100+
searchRequestDest.source(sourceBuilderDest);
101+
SearchResponse responseDest = search(searchRequestDest);
102+
103+
List<? extends Bucket> buckets = ((Histogram) responseSource.getAggregations().get("second")).getBuckets();
104+
105+
Iterator<? extends Bucket> sourceIterator = buckets.iterator();
106+
Iterator<SearchHit> destIterator = responseDest.getHits().iterator();
107+
108+
while (sourceIterator.hasNext() && destIterator.hasNext()) {
109+
Bucket bucket = sourceIterator.next();
110+
SearchHit searchHit = destIterator.next();
111+
Map<String, Object> source = searchHit.getSourceAsMap();
112+
113+
Long transformBucketKey = (Long) XContentMapValues.extractValue("second", source);
114+
if (transformBucketKey == null) {
115+
transformBucketKey = 42L;
116+
}
117+
118+
// aggs return buckets with 0 doc_count while composite aggs skip over them
119+
while (bucket.getDocCount() == 0L) {
120+
assertTrue(sourceIterator.hasNext());
121+
bucket = sourceIterator.next();
122+
}
123+
long bucketKey = ((ZonedDateTime) bucket.getKey()).toEpochSecond() * 1000;
124+
125+
// test correctness, the results from the aggregation and the results from the transform should be the same
126+
assertThat(
127+
"Buckets did not match, source: " + source + ", expected: " + bucketKey + ", iteration: " + iteration,
128+
transformBucketKey,
129+
equalTo(bucketKey)
130+
);
131+
assertThat(
132+
"Doc count did not match, source: " + source + ", expected: " + bucket.getDocCount() + ", iteration: " + iteration,
133+
XContentMapValues.extractValue("count", source),
134+
equalTo(Double.valueOf(bucket.getDocCount()))
135+
);
136+
137+
// transform should only rewrite documents that require it
138+
if (missing == false) {
139+
assertThat(
140+
"Ingest run: "
141+
+ XContentMapValues.extractValue(INGEST_RUN_FIELD, source)
142+
+ " did not match max run: "
143+
+ XContentMapValues.extractValue(MAX_RUN_FIELD, source)
144+
+ ", iteration: "
145+
+ iteration,
146+
// we use a fixed_interval of `1s`, the transform runs every `1s` so it the bucket might be recalculated at the next run
147+
// but
148+
// should NOT be recalculated for the 2nd/3rd/... run
149+
Double.valueOf((Integer) XContentMapValues.extractValue(INGEST_RUN_FIELD, source)) - (Double) XContentMapValues
150+
.extractValue(MAX_RUN_FIELD, source),
151+
is(lessThanOrEqualTo(1.0))
152+
);
153+
}
154+
}
155+
assertFalse(sourceIterator.hasNext());
156+
assertFalse(destIterator.hasNext());
157+
}
158+
}

0 commit comments

Comments
 (0)