From 11e5dada8324eaad44cf9673478a102abc2997f2 Mon Sep 17 00:00:00 2001
From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com>
Date: Fri, 2 Apr 2021 15:48:57 -0400
Subject: [PATCH 01/20] [ML] Log categorization multi-bucket agg
---
docs/build.gradle | 34 ++
docs/reference/aggregations/bucket.asciidoc | 2 +
.../categorize-text-aggregation.asciidoc | 396 +++++++++++++++++
.../elasticsearch/search/SearchService.java | 1 +
.../ParsedMultiBucketAggregation.java | 2 +-
.../support/AggregationContext.java | 13 +
.../aggregations/AggregatorTestCase.java | 20 +
.../test/AbstractBuilderTestCase.java | 10 +-
.../ml/qa/ml-with-security/build.gradle | 3 +
.../CategorizationAggregationIT.java | 160 +++++++
.../xpack/ml/MachineLearning.java | 15 +
.../CategorizationTokenTree.java | 161 +++++++
.../CategorizeTextAggregationBuilder.java | 275 ++++++++++++
.../CategorizeTextAggregator.java | 208 +++++++++
.../CategorizeTextAggregatorFactory.java | 90 ++++
.../InternalCategorizationAggregation.java | 415 ++++++++++++++++++
.../ml/aggs/categorization/LogGroup.java | 105 +++++
.../ml/aggs/categorization/TreeNode.java | 398 +++++++++++++++++
.../aggs/categorization/TreeNodeFactory.java | 16 +
.../CategorizationAnalyzer.java | 5 +
.../xpack/ml/LocalStateMachineLearning.java | 18 +
...CategorizeTextAggregationBuilderTests.java | 55 +++
.../CategorizeTextAggregatorTests.java | 297 +++++++++++++
.../categorization/InnerTreeNodeTests.java | 109 +++++
...nternalCategorizationAggregationTests.java | 117 +++++
.../categorization/LeafTreeNodeTests.java | 68 +++
.../ml/aggs/categorization/LogGroupTests.java | 51 +++
.../categorization/ParsedCategorization.java | 113 +++++
.../test/ml/categorization_agg.yml | 136 ++++++
29 files changed, 3290 insertions(+), 3 deletions(-)
create mode 100644 docs/reference/aggregations/bucket/categorize-text-aggregation.asciidoc
create mode 100644 x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/CategorizationAggregationIT.java
create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizationTokenTree.java
create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregationBuilder.java
create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java
create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregatorFactory.java
create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/InternalCategorizationAggregation.java
create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/LogGroup.java
create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/TreeNode.java
create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/TreeNodeFactory.java
create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregationBuilderTests.java
create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregatorTests.java
create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/categorization/InnerTreeNodeTests.java
create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/categorization/InternalCategorizationAggregationTests.java
create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/categorization/LeafTreeNodeTests.java
create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/categorization/LogGroupTests.java
create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/aggs/categorization/ParsedCategorization.java
create mode 100644 x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/categorization_agg.yml
diff --git a/docs/build.gradle b/docs/build.gradle
index ec7168061815e..c9e55a651dfa3 100644
--- a/docs/build.gradle
+++ b/docs/build.gradle
@@ -1071,6 +1071,40 @@ buildRestTests.setups['farequote_datafeed'] = buildRestTests.setups['farequote_j
"indexes":"farequote"
}
'''
+buildRestTests.setups['categorize_text'] = '''
+ - do:
+ indices.create:
+ index: log-messages
+ body:
+ settings:
+ number_of_shards: 1
+ number_of_replicas: 0
+ mappings:
+ metric:
+ properties:
+ time:
+ type: date
+ message:
+ type: text
+
+ - do:
+ bulk:
+ index: log-messages
+ refresh: true
+ body: |
+ {"index": {}}
+ {"time":"2016-02-07T00:00:00+0000", "message": "2016-02-07T00:00:00+0000 Node 3 shutting down"}
+ {"index": {}}
+ {"time":"2016-02-07T00:00:00+0000", "message": "2016-02-07T00:00:00+0000 Node 5 starting up"}
+ {"index": {}}
+ {"time":"2016-02-07T00:00:00+0000", "message": "2016-02-07T00:00:00+0000 Node 4 shutting down"}
+ {"index": {}}
+ {"time":"2016-02-08T00:00:00+0000", "message": "2016-02-08T00:00:00+0000 Node 5 shutting down"}
+ {"index": {}}
+ {"time":"2016-02-08T00:00:00+0000", "message": "2016-02-08T00:00:00+0000 User foo_325 logging on"}
+ {"index": {}}
+ {"time":"2016-02-08T00:00:00+0000", "message": "2016-02-08T00:00:00+0000 User foo_864 logged off"}
+'''
buildRestTests.setups['server_metrics_index'] = '''
- do:
indices.create:
diff --git a/docs/reference/aggregations/bucket.asciidoc b/docs/reference/aggregations/bucket.asciidoc
index 302e196caf3ce..dfdaca18e6cfb 100644
--- a/docs/reference/aggregations/bucket.asciidoc
+++ b/docs/reference/aggregations/bucket.asciidoc
@@ -20,6 +20,8 @@ include::bucket/adjacency-matrix-aggregation.asciidoc[]
include::bucket/autodatehistogram-aggregation.asciidoc[]
+include::bucket/categorize-text-aggregation.asciidoc[]
+
include::bucket/children-aggregation.asciidoc[]
include::bucket/composite-aggregation.asciidoc[]
diff --git a/docs/reference/aggregations/bucket/categorize-text-aggregation.asciidoc b/docs/reference/aggregations/bucket/categorize-text-aggregation.asciidoc
new file mode 100644
index 0000000000000..5aa486b9dcf40
--- /dev/null
+++ b/docs/reference/aggregations/bucket/categorize-text-aggregation.asciidoc
@@ -0,0 +1,396 @@
+[[search-aggregations-bucket-categorize-text-aggregation]]
+=== Categorize text aggregation
+++++
+Categorize text
+++++
+
+experimental::[]
+
+A multi-bucket aggregation that groups semi-structured text into buckets. Each `text` field is re-analyzed
+using a custom analyzer. The resulting tokens are then categorized creating buckets of similarly formatted
+text values. This aggregation works best with machine generated text like system logs.
+
+WARNING: Re-analyzing _large_ result sets will require a lot of time and memory. This aggregation should be
+ used in conjunction with <>.
+
+[[bucket-categorize-text-agg-syntax]]
+==== Parameters
+
+`field`::
+(Required, string)
+The semi-structured text field to categorize.
+
+`max_children`::
+(Optional, integer, default: `100`)
+The maximum number of unique tokens at any given layer of the tokenization tree.
+Must be larger than 1. The smaller the value, the more broad the text categories.
+Larger values may cause the aggregation to more memory and run more slowly
+
+`max_depth`::
+(Optional, integer, default: `5`)
+The maximum number of tokens matched on before attempting to merge categories.
+Larger values may cause the aggregation to more memory and run more slowly.
+
+`similarity_threshold`::
+(Optional, double, default: `0.5`)
+The minimum percentage of tokens that must match for text to be added to the
+category bucket.
+Must be between 0.1 and 1.0. The larger the value the more restrictive the log categories.
+Larger values may increase memory usage.
+
+`categorization_filters`::
+(Optional, array of strings, default: `[]`)
+This property expects an array of regular expressions. The expressions
+are used to filter out matching sequences from the categorization field values.
+You can use this functionality to fine tune the categorization by excluding
+sequences from consideration when categories are defined. For example, you can
+exclude SQL statements that appear in your log files.
+
+`shard_size`::
+(Optional, integer)
+The number of categorization buckets to return from each shard before merging
+all the results.
+
+`size`::
+(Optional, integer, default: `10`)
+The number of buckets to return.
+
+`min_doc_count`::
+(Optional, integer)
+The minimum number of documents for a bucket to be returned to the results.
+
+`shard_min_doc_count`::
+(Optional, integer)
+The minimum number of documents for a bucket to be returned from the shard before
+merging.
+
+==== Basic use
+
+Example:
+
+[source,console,id=categorize-text-aggregation-example]
+--------------------------------------------------
+POST log-messages/_search?filter_path=aggregations
+{
+ "aggs": {
+ "categories": {
+ "categorize_text": {
+ "field": "message"
+ }
+ }
+ }
+}
+--------------------------------------------------
+// TEST[setup:categorize_text]
+
+Response:
+
+[source,console-result]
+--------------------------------------------------
+{
+ "aggregations" : {
+ "categories" : {
+ "buckets" : [
+ {
+ "doc_count" : 3,
+ "key" : "Node shutting down"
+ },
+ {
+ "doc_count" : 1,
+ "key" : "User foo_864 logged off"
+ },
+ {
+ "doc_count" : 1,
+ "key" : "User foo_325 logging on"
+ },
+ {
+ "doc_count" : 1,
+ "key" : "Node starting up"
+ }
+ ]
+ }
+ }
+}
+--------------------------------------------------
+// TESTRESPONSE
+
+
+Here is an example using `categorization_filters`
+
+[source,console,id=categorize-text-aggregation-with-filters-example]
+--------------------------------------------------
+POST log-messages/_search?filter_path=aggregations
+{
+ "aggs": {
+ "categories": {
+ "categorize_text": {
+ "field": "message",
+ "categorization_filters": ["\\w+\\_\\d{3}"] <1>
+ }
+ }
+ }
+}
+--------------------------------------------------
+// TEST[setup:categorize_text]
+<1> The filters to apply to the analyzed tokens. It filters
+ out tokens like `bar_123`.
+Note how the `foo_` tokens are not part of the
+category results
+
+[source,console-result]
+--------------------------------------------------
+{
+ "aggregations" : {
+ "categories" : {
+ "buckets" : [
+ {
+ "doc_count" : 3,
+ "key" : "Node shutting down"
+ },
+ {
+ "doc_count" : 1,
+ "key" : "User logged off"
+ },
+ {
+ "doc_count" : 1,
+ "key" : "User logging on"
+ },
+ {
+ "doc_count" : 1,
+ "key" : "Node starting up"
+ }
+ ]
+ }
+ }
+}
+--------------------------------------------------
+// TESTRESPONSE
+
+Here is an example using `categorization_filters`
+
+[source,console,id=categorize-text-aggregation-with-broad-categories-example]
+--------------------------------------------------
+POST log-messages/_search?filter_path=aggregations
+{
+ "aggs": {
+ "categories": {
+ "categorize_text": {
+ "field": "message",
+ "categorization_filters": ["\\w+\\_\\d{3}"], <1>
+ "max_depth": 2, <2>
+ "similarity_threshold": 0.3 <3>
+ }
+ }
+ }
+}
+--------------------------------------------------
+// TEST[setup:categorize_text]
+<1> The filters to apply to the analyzed tokens. It filters
+out tokens like `bar_123`.
+<2> Only the token tree to have 2 tokens before the log categories
+ attempt to merge together
+<3> Require 30% of the tokens to match before expanding a log categories
+ to add a new log entry
+
+The resulting categories are now broad, matching the first token
+and merging the log groups.
+
+[source,console-result]
+--------------------------------------------------
+{
+ "aggregations" : {
+ "categories" : {
+ "buckets" : [
+ {
+ "doc_count" : 4,
+ "key" : "Node *"
+ },
+ {
+ "doc_count" : 2,
+ "key" : "User *"
+ }
+ ]
+ }
+ }
+}
+--------------------------------------------------
+// TESTRESPONSE
+
+This aggregation can have both sub-aggregations and itself be a sub-aggregation.
+
+[source,console,id=categorize-text-aggregation-with-broad-categories-sub-aggs-example]
+--------------------------------------------------
+POST log-messages/_search?filter_path=aggregations
+{
+ "aggs": {
+ "daily": {
+ "date_histogram": {
+ "field": "time",
+ "fixed_interval": "1d"
+ },
+ "aggs": {
+ "categories": {
+ "categorize_text": {
+ "field": "message",
+ "categorization_filters": ["\\w+\\_\\d{3}"]
+ },
+ "aggs": {
+ "hit": {
+ "top_hits": {
+ "size": 1,
+ "_source": "message"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
+--------------------------------------------------
+[source,console-result]
+--------------------------------------------------
+{
+ "aggregations" : {
+ "daily" : {
+ "buckets" : [
+ {
+ "key_as_string" : "2016-02-07T00:00:00.000Z",
+ "key" : 1454803200000,
+ "doc_count" : 3,
+ "categories" : {
+ "buckets" : [
+ {
+ "doc_count" : 2,
+ "key" : "Node shutting down",
+ "hit" : {
+ "hits" : {
+ "total" : {
+ "value" : 2,
+ "relation" : "eq"
+ },
+ "max_score" : 1.0,
+ "hits" : [
+ {
+ "_index" : "log-messages",
+ "_id" : "DU9q4HsBtGA51sVjTrac",
+ "_score" : 1.0,
+ "_source" : {
+ "message" : "2016-02-07T00:00:00+0000 Node 3 shutting down"
+ }
+ }
+ ]
+ }
+ }
+ },
+ {
+ "doc_count" : 1,
+ "key" : "Node starting up",
+ "hit" : {
+ "hits" : {
+ "total" : {
+ "value" : 1,
+ "relation" : "eq"
+ },
+ "max_score" : 1.0,
+ "hits" : [
+ {
+ "_index" : "log-messages",
+ "_id" : "Dk9q4HsBtGA51sVjTrac",
+ "_score" : 1.0,
+ "_source" : {
+ "message" : "2016-02-07T00:00:00+0000 Node 5 starting up"
+ }
+ }
+ ]
+ }
+ }
+ }
+ ]
+ }
+ },
+ {
+ "key_as_string" : "2016-02-08T00:00:00.000Z",
+ "key" : 1454889600000,
+ "doc_count" : 3,
+ "categories" : {
+ "buckets" : [
+ {
+ "doc_count" : 1,
+ "key" : "User logged off",
+ "hit" : {
+ "hits" : {
+ "total" : {
+ "value" : 1,
+ "relation" : "eq"
+ },
+ "max_score" : 1.0,
+ "hits" : [
+ {
+ "_index" : "log-messages",
+ "_id" : "Ek9q4HsBtGA51sVjTrac",
+ "_score" : 1.0,
+ "_source" : {
+ "message" : "2016-02-08T00:00:00+0000 User foo_864 logged off"
+ }
+ }
+ ]
+ }
+ }
+ },
+ {
+ "doc_count" : 1,
+ "key" : "User logging on",
+ "hit" : {
+ "hits" : {
+ "total" : {
+ "value" : 1,
+ "relation" : "eq"
+ },
+ "max_score" : 1.0,
+ "hits" : [
+ {
+ "_index" : "log-messages",
+ "_id" : "EU9q4HsBtGA51sVjTrac",
+ "_score" : 1.0,
+ "_source" : {
+ "message" : "2016-02-08T00:00:00+0000 User foo_325 logging on"
+ }
+ }
+ ]
+ }
+ }
+ },
+ {
+ "doc_count" : 1,
+ "key" : "Node shutting down",
+ "hit" : {
+ "hits" : {
+ "total" : {
+ "value" : 1,
+ "relation" : "eq"
+ },
+ "max_score" : 1.0,
+ "hits" : [
+ {
+ "_index" : "log-messages",
+ "_id" : "EE9q4HsBtGA51sVjTrac",
+ "_score" : 1.0,
+ "_source" : {
+ "message" : "2016-02-08T00:00:00+0000 Node 5 shutting down"
+ }
+ }
+ ]
+ }
+ }
+ }
+ ]
+ }
+ }
+ ]
+ }
+ }
+}
+--------------------------------------------------
+// TESTRESPONSE
+
diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java
index 6b3c456ef18bc..46a23468505a6 100644
--- a/server/src/main/java/org/elasticsearch/search/SearchService.java
+++ b/server/src/main/java/org/elasticsearch/search/SearchService.java
@@ -985,6 +985,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
context.terminateAfter(source.terminateAfter());
if (source.aggregations() != null && includeAggregations) {
AggregationContext aggContext = new ProductionAggregationContext(
+ indicesService.getAnalysis(),
context.getSearchExecutionContext(),
bigArrays,
source.aggregations().bytesToPreallocate(),
diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/ParsedMultiBucketAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/ParsedMultiBucketAggregation.java
index 76ca0a917fb5d..48bd678ce5f80 100644
--- a/server/src/main/java/org/elasticsearch/search/aggregations/ParsedMultiBucketAggregation.java
+++ b/server/src/main/java/org/elasticsearch/search/aggregations/ParsedMultiBucketAggregation.java
@@ -48,7 +48,7 @@ protected XContentBuilder doXContentBody(XContentBuilder builder, Params params)
return builder;
}
- protected static , T extends ParsedBucket> void declareMultiBucketAggregationFields(
+ public static , T extends ParsedBucket> void declareMultiBucketAggregationFields(
final ObjectParser objectParser,
final CheckedFunction bucketParser,
final CheckedFunction keyedBucketParser
diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java
index 5008fbe08eac4..ad6366d75ce64 100644
--- a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java
+++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java
@@ -18,6 +18,7 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.fielddata.IndexFieldData;
@@ -95,6 +96,10 @@ public final FieldContext buildFieldContext(String field) {
return new FieldContext(field, buildFieldData(ft), ft);
}
+ public AnalysisRegistry getAnalysisRegistry() {
+ return null;
+ }
+
/**
* Lookup the context for an already resolved field type.
*/
@@ -277,10 +282,12 @@ public static class ProductionAggregationContext extends AggregationContext {
private final Supplier isCancelled;
private final Function filterQuery;
private final boolean enableRewriteToFilterByFilter;
+ private final AnalysisRegistry analysisRegistry;
private final List releaseMe = new ArrayList<>();
public ProductionAggregationContext(
+ AnalysisRegistry analysisRegistry,
SearchExecutionContext context,
BigArrays bigArrays,
long bytesToPreallocate,
@@ -295,6 +302,7 @@ public ProductionAggregationContext(
Function filterQuery,
boolean enableRewriteToFilterByFilter
) {
+ this.analysisRegistry = analysisRegistry;
this.context = context;
if (bytesToPreallocate == 0) {
/*
@@ -327,6 +335,11 @@ public ProductionAggregationContext(
this.enableRewriteToFilterByFilter = enableRewriteToFilterByFilter;
}
+ @Override
+ public AnalysisRegistry getAnalysisRegistry() {
+ return this.analysisRegistry;
+ }
+
@Override
public Query query() {
return topLevelQuery.get();
diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java
index 0a0fe066677a4..906fc8b50366d 100644
--- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java
@@ -56,6 +56,8 @@
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalysisRegistry;
@@ -94,8 +96,10 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesModule;
+import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
+import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.script.ScriptCompiler;
import org.elasticsearch.script.ScriptService;
@@ -159,6 +163,7 @@
public abstract class AggregatorTestCase extends ESTestCase {
private List releasables = new ArrayList<>();
protected ValuesSourceRegistry valuesSourceRegistry;
+ protected AnalysisModule analysisModule;
// A list of field types that should not be tested, or are not currently supported
private static final List TYPE_TEST_BLACKLIST = List.of(
@@ -178,6 +183,16 @@ public void initValuesSourceRegistry() {
valuesSourceRegistry = searchModule.getValuesSourceRegistry();
}
+ @Before
+ public void initAnalysisRegistry() throws IOException {
+ analysisModule = new AnalysisModule(
+ TestEnvironment.newEnvironment(
+ Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build()
+ ),
+ getAnalysisPlugins()
+ );
+ }
+
/**
* Test cases should override this if they have plugins that need to be loaded, e.g. the plugins their aggregators are in.
*/
@@ -185,6 +200,10 @@ protected List getSearchPlugins() {
return List.of();
}
+ protected List getAnalysisPlugins() {
+ return List.of();
+ }
+
protected A createAggregator(AggregationBuilder aggregationBuilder,
IndexSearcher searcher,
MappedFieldType... fieldTypes) throws IOException {
@@ -283,6 +302,7 @@ public void onCache(ShardId shardId, Accountable accountable) {}
MultiBucketConsumer consumer = new MultiBucketConsumer(maxBucket, breakerService.getBreaker(CircuitBreaker.REQUEST));
AggregationContext context = new ProductionAggregationContext(
+ analysisModule.getAnalysisRegistry(),
searchExecutionContext,
new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), breakerService),
bytesToPreallocate,
diff --git a/test/framework/src/main/java/org/elasticsearch/test/AbstractBuilderTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/AbstractBuilderTestCase.java
index 30bcfd62a8e99..f7475f8536153 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/AbstractBuilderTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/AbstractBuilderTestCase.java
@@ -142,6 +142,10 @@ protected Collection> getPlugins() {
return Collections.singletonList(TestGeoShapeFieldMapperPlugin.class);
}
+ protected Collection> getExtraPlugins() {
+ return Collections.emptyList();
+ }
+
protected void initializeAdditionalMappings(MapperService mapperService) throws IOException {
}
@@ -208,9 +212,11 @@ public void beforeTest() throws Exception {
// this setup
long masterSeed = SeedUtils.parseSeed(RandomizedTest.getContext().getRunnerSeedAsString());
RandomizedTest.getContext().runWithPrivateRandomness(masterSeed, (Callable) () -> {
- serviceHolder = new ServiceHolder(nodeSettings, createTestIndexSettings(), getPlugins(), nowInMillis,
+ Collection> plugins = new ArrayList<>(getPlugins());
+ plugins.addAll(getExtraPlugins());
+ serviceHolder = new ServiceHolder(nodeSettings, createTestIndexSettings(), plugins, nowInMillis,
AbstractBuilderTestCase.this, true);
- serviceHolderWithNoType = new ServiceHolder(nodeSettings, createTestIndexSettings(), getPlugins(), nowInMillis,
+ serviceHolderWithNoType = new ServiceHolder(nodeSettings, createTestIndexSettings(), plugins, nowInMillis,
AbstractBuilderTestCase.this, false);
return null;
});
diff --git a/x-pack/plugin/ml/qa/ml-with-security/build.gradle b/x-pack/plugin/ml/qa/ml-with-security/build.gradle
index 0207b69ee9972..02cbd1ba601a9 100644
--- a/x-pack/plugin/ml/qa/ml-with-security/build.gradle
+++ b/x-pack/plugin/ml/qa/ml-with-security/build.gradle
@@ -35,6 +35,9 @@ tasks.named("yamlRestTest").configure {
'ml/calendar_crud/Test put calendar given id contains invalid chars',
'ml/calendar_crud/Test delete event from non existing calendar',
'ml/calendar_crud/Test delete job from non existing calendar',
+ // These are searching tests with aggregations, and do not call any ML endpoints
+ 'ml/categorization_agg/Test categorization agg simple',
+ 'ml/categorization_agg/Test categorization aggregation with poor settings',
'ml/custom_all_field/Test querying custom all field',
'ml/datafeeds_crud/Test delete datafeed with missing id',
'ml/datafeeds_crud/Test put datafeed referring to missing job_id',
diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/CategorizationAggregationIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/CategorizationAggregationIT.java
new file mode 100644
index 0000000000000..3eba0d46d0516
--- /dev/null
+++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/CategorizationAggregationIT.java
@@ -0,0 +1,160 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.ml.integration;
+
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.metrics.Max;
+import org.elasticsearch.search.aggregations.metrics.Min;
+import org.elasticsearch.xpack.ml.aggs.categorization.CategorizeTextAggregationBuilder;
+import org.elasticsearch.xpack.ml.aggs.categorization.InternalCategorizationAggregation;
+import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
+import org.junit.Before;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notANumber;
+
+public class CategorizationAggregationIT extends BaseMlIntegTestCase {
+
+ private static final String DATA_INDEX = "categorization-agg-data";
+
+ @Before
+ public void setupCluster() {
+ internalCluster().ensureAtLeastNumDataNodes(3);
+ ensureStableCluster();
+ createSourceData();
+ }
+
+ public void testAggregation() {
+ SearchResponse response = client().prepareSearch(DATA_INDEX)
+ .setSize(0)
+ .setTrackTotalHits(false)
+ .addAggregation(
+ new CategorizeTextAggregationBuilder("categorize", "msg")
+ .subAggregation(AggregationBuilders.max("max").field("time"))
+ .subAggregation(AggregationBuilders.min("min").field("time"))
+ ).get();
+
+ InternalCategorizationAggregation agg = response.getAggregations().get("categorize");
+ assertThat(agg.getBuckets(), hasSize(3));
+
+ assertCategorizationBucket(agg.getBuckets().get(0), "Node started", 3);
+ assertCategorizationBucket(
+ agg.getBuckets().get(1),
+ "Failed to shutdown error org.aaaa.bbbb.Cccc line caused by foo exception",
+ 2
+ );
+ assertCategorizationBucket(agg.getBuckets().get(2), "Node stopped", 1);
+ }
+
+ public void testAggregationWithOnlyOneBucket() {
+ SearchResponse response = client().prepareSearch(DATA_INDEX)
+ .setSize(0)
+ .setTrackTotalHits(false)
+ .addAggregation(
+ new CategorizeTextAggregationBuilder("categorize", "msg")
+ .size(1)
+ .subAggregation(AggregationBuilders.max("max").field("time"))
+ .subAggregation(AggregationBuilders.min("min").field("time"))
+ ).get();
+ InternalCategorizationAggregation agg = response.getAggregations().get("categorize");
+ assertThat(agg.getBuckets(), hasSize(1));
+
+ assertCategorizationBucket(agg.getBuckets().get(0), "Node started", 3);
+ }
+
+ public void testAggregationWithBroadCategories() {
+ SearchResponse response = client().prepareSearch(DATA_INDEX)
+ .setSize(0)
+ .setTrackTotalHits(false)
+ .addAggregation(
+ new CategorizeTextAggregationBuilder("categorize", "msg")
+ .setSimilarityThreshold(0.11)
+ .setMaxChildren(2)
+ .setMaxDepth(1)
+ .subAggregation(AggregationBuilders.max("max").field("time"))
+ .subAggregation(AggregationBuilders.min("min").field("time"))
+ ).get();
+ InternalCategorizationAggregation agg = response.getAggregations().get("categorize");
+ assertThat(agg.getBuckets(), hasSize(2));
+
+ assertCategorizationBucket(agg.getBuckets().get(0), "Node *", 4);
+ assertCategorizationBucket(
+ agg.getBuckets().get(1),
+ "Failed to shutdown error org.aaaa.bbbb.Cccc line caused by foo exception",
+ 2
+ );
+ }
+
+ private void assertCategorizationBucket(InternalCategorizationAggregation.Bucket bucket, String key, long docCount) {
+ assertThat(bucket.getKeyAsString(), equalTo(key));
+ assertThat(bucket.getDocCount(), equalTo(docCount));
+ assertThat(((Max)bucket.getAggregations().get("max")).getValue(), not(notANumber()));
+ assertThat(((Min)bucket.getAggregations().get("min")).getValue(), not(notANumber()));
+ }
+
+ private void ensureStableCluster() {
+ ensureStableCluster(internalCluster().getNodeNames().length, TimeValue.timeValueSeconds(60));
+ }
+
+ private void createSourceData() {
+ client().admin().indices().prepareCreate(DATA_INDEX)
+ .setMapping("time", "type=date,format=epoch_millis",
+ "msg", "type=text")
+ .get();
+
+ long nowMillis = System.currentTimeMillis();
+
+ BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
+ IndexRequest indexRequest = new IndexRequest(DATA_INDEX);
+ indexRequest.source("time", nowMillis - TimeValue.timeValueHours(2).millis(),
+ "msg", "Node 1 started",
+ "part", "nodes");
+ bulkRequestBuilder.add(indexRequest);
+ indexRequest = new IndexRequest(DATA_INDEX);
+ indexRequest.source("time", nowMillis - TimeValue.timeValueHours(2).millis() + 1,
+ "msg", "Failed to shutdown [error org.aaaa.bbbb.Cccc line 54 caused by foo exception]",
+ "part", "shutdowns");
+ bulkRequestBuilder.add(indexRequest);
+ indexRequest = new IndexRequest(DATA_INDEX);
+ indexRequest.source("time", nowMillis - TimeValue.timeValueHours(2).millis() + 1,
+ "msg", "Failed to shutdown [error org.aaaa.bbbb.Cccc line 55 caused by foo exception]",
+ "part", "shutdowns");
+ bulkRequestBuilder.add(indexRequest);
+ indexRequest = new IndexRequest(DATA_INDEX);
+ indexRequest.source("time", nowMillis - TimeValue.timeValueHours(1).millis(),
+ "msg", "Node 2 started",
+ "part", "nodes");
+ bulkRequestBuilder.add(indexRequest);
+ indexRequest = new IndexRequest(DATA_INDEX);
+ indexRequest.source("time", nowMillis,
+ "msg", "Node 3 started",
+ "part", "nodes");
+ bulkRequestBuilder.add(indexRequest);
+
+ indexRequest = new IndexRequest(DATA_INDEX);
+ indexRequest.source("time", nowMillis,
+ "msg", "Node 3 stopped",
+ "part", "nodes");
+ bulkRequestBuilder.add(indexRequest);
+
+ BulkResponse bulkResponse = bulkRequestBuilder
+ .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+ .get();
+ assertThat(bulkResponse.hasFailures(), is(false));
+ }
+
+}
diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
index c1a25a6544bfa..330eb43029814 100644
--- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
+++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
@@ -264,6 +264,8 @@
import org.elasticsearch.xpack.ml.action.TransportUpgradeJobModelSnapshotAction;
import org.elasticsearch.xpack.ml.action.TransportValidateDetectorAction;
import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction;
+import org.elasticsearch.xpack.ml.aggs.categorization.CategorizeTextAggregationBuilder;
+import org.elasticsearch.xpack.ml.aggs.categorization.InternalCategorizationAggregation;
import org.elasticsearch.xpack.ml.aggs.correlation.BucketCorrelationAggregationBuilder;
import org.elasticsearch.xpack.ml.aggs.correlation.CorrelationNamedContentProvider;
import org.elasticsearch.xpack.ml.aggs.heuristic.PValueScore;
@@ -1218,6 +1220,7 @@ public List> getExecutorBuilders(Settings settings) {
return Arrays.asList(jobComms, utility, datafeed);
}
+ @Override
public Map> getCharFilters() {
return MapBuilder.>newMapBuilder()
.put(FirstNonBlankLineCharFilter.NAME, FirstNonBlankLineCharFilterFactory::new)
@@ -1247,6 +1250,18 @@ public List> getSignificanceHeuristics() {
);
}
+ @Override
+ public List getAggregations() {
+ return Arrays.asList(
+ new AggregationSpec(
+ CategorizeTextAggregationBuilder.NAME,
+ CategorizeTextAggregationBuilder::new,
+ CategorizeTextAggregationBuilder.PARSER
+ ).addResultReader(InternalCategorizationAggregation::new)
+ .setAggregatorRegistrar(s -> s.registerUsage(CategorizeTextAggregationBuilder.NAME))
+ );
+ }
+
@Override
public UnaryOperator