Skip to content

Commit 12cdf1c

Browse files
authored
ML: Add support for single bucket aggs in Datafeeds (#37544)
Single bucket aggs are now supported in datafeed aggregation configurations.
1 parent 5384162 commit 12cdf1c

File tree

5 files changed

+163
-1
lines changed

5 files changed

+163
-1
lines changed

docs/reference/ml/aggregations.asciidoc

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,53 @@ pipeline aggregation to find the first order derivative of the counter
145145
----------------------------------
146146
// NOTCONSOLE
147147

148+
{dfeeds-cap} not only supports multi-bucket aggregations, but also single bucket aggregations.
149+
The following shows two `filter` aggregations, each gathering the number of unique entries for
150+
the `error` field.
151+
152+
[source,js]
153+
----------------------------------
154+
{
155+
"job_id":"servers-unique-errors",
156+
"indices": ["logs-*"],
157+
"aggregations": {
158+
"buckets": {
159+
"date_histogram": {
160+
"field": "time",
161+
"interval": "360s",
162+
"time_zone": "UTC"
163+
},
164+
"aggregations": {
165+
"time": {
166+
"max": {"field": "time"}
167+
}
168+
"server1": {
169+
"filter": {"term": {"source": "server-name-1"}},
170+
"aggregations": {
171+
"server1_error_count": {
172+
"value_count": {
173+
"field": "error"
174+
}
175+
}
176+
}
177+
},
178+
"server2": {
179+
"filter": {"term": {"source": "server-name-2"}},
180+
"aggregations": {
181+
"server2_error_count": {
182+
"value_count": {
183+
"field": "error"
184+
}
185+
}
186+
}
187+
}
188+
}
189+
}
190+
}
191+
}
192+
----------------------------------
193+
// NOTCONSOLE
194+
148195
When you define an aggregation in a {dfeed}, it must have the following form:
149196

150197
[source,js]

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -894,6 +894,44 @@ public void testLookbackWithoutPermissionsAndRollup() throws Exception {
894894
"action [indices:admin/xpack/rollup/search] is unauthorized for user [ml_admin_plus_data]\""));
895895
}
896896

897+
public void testLookbackWithSingleBucketAgg() throws Exception {
898+
String jobId = "aggs-date-histogram-with-single-bucket-agg-job";
899+
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
900+
createJobRequest.setJsonEntity("{\n"
901+
+ " \"description\": \"Aggs job\",\n"
902+
+ " \"analysis_config\": {\n"
903+
+ " \"bucket_span\": \"3600s\",\n"
904+
+ " \"summary_count_field_name\": \"doc_count\",\n"
905+
+ " \"detectors\": [\n"
906+
+ " {\n"
907+
+ " \"function\": \"mean\",\n"
908+
+ " \"field_name\": \"responsetime\""
909+
+ " }\n"
910+
+ " ]\n"
911+
+ " },\n"
912+
+ " \"data_description\": {\"time_field\": \"time stamp\"}\n"
913+
+ "}");
914+
client().performRequest(createJobRequest);
915+
916+
String datafeedId = "datafeed-" + jobId;
917+
String aggregations = "{\"time stamp\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":\"1h\"},"
918+
+ "\"aggregations\":{"
919+
+ "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}},"
920+
+ "\"airlineFilter\":{\"filter\":{\"term\": {\"airline\":\"AAA\"}},"
921+
+ " \"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}";
922+
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build();
923+
openJob(client(), jobId);
924+
925+
startDatafeedAndWaitUntilStopped(datafeedId);
926+
waitUntilJobIsClosed(jobId);
927+
Response jobStatsResponse = client().performRequest(new Request("GET",
928+
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
929+
String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity());
930+
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2"));
931+
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2"));
932+
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
933+
}
934+
897935
public void testRealtime() throws Exception {
898936
String jobId = "job-realtime-1";
899937
createJob(jobId, "airline");

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.search.aggregations.Aggregation;
1414
import org.elasticsearch.search.aggregations.Aggregations;
1515
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
16+
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
1617
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
1718
import org.elasticsearch.search.aggregations.metrics.Max;
1819
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
@@ -34,6 +35,7 @@
3435
import java.util.Set;
3536
import java.util.SortedMap;
3637
import java.util.TreeMap;
38+
import java.util.stream.Collectors;
3739

3840
/**
3941
* Processes {@link Aggregation} objects and writes flat JSON documents for each leaf aggregation.
@@ -93,18 +95,39 @@ private void processAggs(long docCount, List<Aggregation> aggregations) throws I
9395

9496
List<Aggregation> leafAggregations = new ArrayList<>();
9597
List<MultiBucketsAggregation> bucketAggregations = new ArrayList<>();
98+
List<SingleBucketAggregation> singleBucketAggregations = new ArrayList<>();
9699

97100
// Sort into leaf and bucket aggregations.
98101
// The leaf aggregations will be processed first.
99102
for (Aggregation agg : aggregations) {
100103
if (agg instanceof MultiBucketsAggregation) {
101104
bucketAggregations.add((MultiBucketsAggregation)agg);
105+
} else if (agg instanceof SingleBucketAggregation){
106+
// Skip a level down for single bucket aggs, if they have a sub-agg that is not
107+
// a bucketed agg we should treat it like a leaf in this bucket
108+
SingleBucketAggregation singleBucketAggregation = (SingleBucketAggregation)agg;
109+
for (Aggregation subAgg : singleBucketAggregation.getAggregations()) {
110+
if (subAgg instanceof MultiBucketsAggregation || subAgg instanceof SingleBucketAggregation) {
111+
singleBucketAggregations.add(singleBucketAggregation);
112+
} else {
113+
leafAggregations.add(subAgg);
114+
}
115+
}
102116
} else {
103117
leafAggregations.add(agg);
104118
}
105119
}
106120

107-
if (bucketAggregations.size() > 1) {
121+
// If on the current level (indicated via bucketAggregations) or one of the next levels (singleBucketAggregations)
122+
// we have more than 1 `MultiBucketsAggregation`, we should error out.
123+
// We need to make the check in this way as each of the items in `singleBucketAggregations` is treated as a separate branch
124+
// in the recursive handling of this method.
125+
int bucketAggLevelCount = Math.max(bucketAggregations.size(), (int)singleBucketAggregations.stream()
126+
.flatMap(s -> asList(s.getAggregations()).stream())
127+
.filter(MultiBucketsAggregation.class::isInstance)
128+
.count());
129+
130+
if (bucketAggLevelCount > 1) {
108131
throw new IllegalArgumentException("Multiple bucket aggregations at the same level are not supported");
109132
}
110133

@@ -137,6 +160,18 @@ private void processAggs(long docCount, List<Aggregation> aggregations) throws I
137160
}
138161
}
139162
}
163+
noMoreBucketsToProcess = singleBucketAggregations.isEmpty() && noMoreBucketsToProcess;
164+
// we support more than one `SingleBucketAggregation` at each level
165+
// However, we only want to recurse with multi/single bucket aggs.
166+
// Non-bucketed sub-aggregations were handle as leaf aggregations at this level
167+
for (SingleBucketAggregation singleBucketAggregation : singleBucketAggregations) {
168+
processAggs(singleBucketAggregation.getDocCount(),
169+
asList(singleBucketAggregation.getAggregations())
170+
.stream()
171+
.filter(
172+
aggregation -> (aggregation instanceof MultiBucketsAggregation || aggregation instanceof SingleBucketAggregation))
173+
.collect(Collectors.toList()));
174+
}
140175

141176
// If there are no more bucket aggregations to process we've reached the end
142177
// and it's time to write the doc

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.elasticsearch.search.aggregations.Aggregation;
99
import org.elasticsearch.search.aggregations.Aggregations;
10+
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
1011
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
1112
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
1213
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
@@ -37,6 +38,14 @@ static Histogram.Bucket createHistogramBucket(long timestamp, long docCount, Lis
3738
return bucket;
3839
}
3940

41+
static SingleBucketAggregation createSingleBucketAgg(String name, long docCount, List<Aggregation> subAggregations) {
42+
SingleBucketAggregation singleBucketAggregation = mock(SingleBucketAggregation.class);
43+
when(singleBucketAggregation.getName()).thenReturn(name);
44+
when(singleBucketAggregation.getDocCount()).thenReturn(docCount);
45+
when(singleBucketAggregation.getAggregations()).thenReturn(createAggs(subAggregations));
46+
return singleBucketAggregation;
47+
}
48+
4049
static Histogram.Bucket createHistogramBucket(long timestamp, long docCount) {
4150
return createHistogramBucket(timestamp, docCount, Collections.emptyList());
4251
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket;
3232
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createMax;
3333
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createPercentiles;
34+
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleBucketAgg;
3435
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleValue;
3536
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms;
3637
import static org.hamcrest.Matchers.containsString;
@@ -439,6 +440,38 @@ public void testBucketsBeforeStartArePruned() throws IOException {
439440
"{\"time\":4000,\"my_field\":4.0,\"doc_count\":14}"));
440441
}
441442

443+
public void testSingleBucketAgg() throws IOException {
444+
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
445+
createHistogramBucket(1000L, 4, Arrays.asList(
446+
createMax("time", 1000),
447+
createSingleBucketAgg("agg1", 3, Collections.singletonList(createMax("field1", 5.0))),
448+
createSingleBucketAgg("agg2", 1, Collections.singletonList(createMax("field2", 3.0))))),
449+
createHistogramBucket(2000L, 7, Arrays.asList(
450+
createMax("time", 2000),
451+
createSingleBucketAgg("agg2", 3, Collections.singletonList(createMax("field2", 1.0))),
452+
createSingleBucketAgg("agg1", 4, Collections.singletonList(createMax("field1", 7.0))))));
453+
454+
String json = aggToString(Sets.newHashSet("field1", "field2"), histogramBuckets);
455+
456+
assertThat(json, equalTo("{\"time\":1000,\"field1\":5.0,\"field2\":3.0,\"doc_count\":4}" +
457+
" {\"time\":2000,\"field2\":1.0,\"field1\":7.0,\"doc_count\":7}"));
458+
}
459+
460+
public void testSingleBucketAgg_failureWithSubMultiBucket() throws IOException {
461+
462+
List<Histogram.Bucket> histogramBuckets = Collections.singletonList(
463+
createHistogramBucket(1000L, 4, Arrays.asList(
464+
createMax("time", 1000),
465+
createSingleBucketAgg("agg1", 3,
466+
Arrays.asList(createHistogramAggregation("histo", Collections.emptyList()),createMax("field1", 5.0))),
467+
createSingleBucketAgg("agg2", 1,
468+
Arrays.asList(createHistogramAggregation("histo", Collections.emptyList()),createMax("field1", 3.0))))));
469+
470+
471+
expectThrows(IllegalArgumentException.class,
472+
() -> aggToString(Sets.newHashSet("my_field"), histogramBuckets));
473+
}
474+
442475
private String aggToString(Set<String> fields, Histogram.Bucket bucket) throws IOException {
443476
return aggToString(fields, Collections.singletonList(bucket));
444477
}

0 commit comments

Comments
 (0)