From 99ebd13e4b42f7977ff19b9568566b30d019dc6b Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Sun, 5 Apr 2020 11:15:11 -0400 Subject: [PATCH 1/3] Fix scripted metric in ccs `scripted_metric` did not work with cross cluster search because it assumed that you'd never perform a partial reduction, serialize the results, and then perform a final reduction. That serialized-after-partial-reduction step was broken. This is also required to support #54758. --- .../test/multi_cluster/10_basic.yml | 39 ++++++++++++++++ .../metrics/InternalScriptedMetric.java | 45 +++++++++++++------ .../metrics/InternalScriptedMetricTests.java | 2 +- .../test/InternalAggregationTestCase.java | 11 ++++- 4 files changed, 81 insertions(+), 16 deletions(-) diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml index 77dafaf76ca91..edf5579439364 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml @@ -196,6 +196,45 @@ - match: { aggregations.cluster.buckets.1.animal.buckets.1.s.value: 0 } - match: { aggregations.cluster.buckets.1.average_sum.value: 1 } + # scripted_metric + - do: + search: + rest_total_hits_as_int: true + index: test_index,my_remote_cluster:test_index + body: + seq_no_primary_term: true + aggs: + cluster: + terms: + field: f1.keyword + aggs: + animal_length: + scripted_metric: + init_script: | + state.sum = 0 + map_script: | + state.sum += doc['animal.keyword'].value.length() + combine_script: | + state.sum + reduce_script: | + long sum = 0; + for (s in states) { + sum += s; + } + return sum + - match: { num_reduce_phases: 3 } + - match: {_clusters.total: 2} + - match: {_clusters.successful: 2} + - match: {_clusters.skipped: 0} + - match: { _shards.total: 5 } + - match: { hits.total: 11 } + - length: { aggregations.cluster.buckets: 2 } + - match: { aggregations.cluster.buckets.0.key: "remote_cluster" } + - match: { aggregations.cluster.buckets.0.doc_count: 6 } + - match: { aggregations.cluster.buckets.0.animal_length.value: 34 } + - match: { aggregations.cluster.buckets.1.key: "local_cluster" } + - match: { aggregations.cluster.buckets.1.animal_length.value: 15 } + --- "Add transient remote cluster based on the preset cluster": - do: diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetric.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetric.java index e010000747bc5..6d68a1007b784 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetric.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetric.java @@ -19,12 +19,13 @@ package org.elasticsearch.search.aggregations.metrics; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.script.ScriptedMetricAggContexts; import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptedMetricAggContexts; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -36,19 +37,21 @@ import java.util.Map; import java.util.Objects; +import static java.util.Collections.singletonList; + public class InternalScriptedMetric extends InternalAggregation implements ScriptedMetric { final Script reduceScript; - private final List aggregation; + private final List aggregations; InternalScriptedMetric(String name, Object aggregation, Script reduceScript, List pipelineAggregators, Map metadata) { this(name, Collections.singletonList(aggregation), reduceScript, pipelineAggregators, metadata); } - private InternalScriptedMetric(String name, List aggregation, Script reduceScript, List pipelineAggregators, - Map metadata) { + private InternalScriptedMetric(String name, List aggregations, Script reduceScript, + List pipelineAggregators, Map metadata) { super(name, pipelineAggregators, metadata); - this.aggregation = aggregation; + this.aggregations = aggregations; this.reduceScript = reduceScript; } @@ -58,13 +61,29 @@ private InternalScriptedMetric(String name, List aggregation, Script red public InternalScriptedMetric(StreamInput in) throws IOException { super(in); reduceScript = in.readOptionalWriteable(Script::new); - aggregation = Collections.singletonList(in.readGenericValue()); + if (in.getVersion().before(Version.V_7_8_0)) { + aggregations = singletonList(in.readGenericValue()); + } else { + aggregations = in.readList(StreamInput::readGenericValue); + } } @Override protected void doWriteTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(reduceScript); - out.writeGenericValue(aggregation()); + if (out.getVersion().before(Version.V_7_8_0)) { + if (aggregations.size() > 0) { + /* + * I *believe* that this situation can only happen in cross + * cluster search right now. Thus the message. But computers + * are hard. + */ + throw new IllegalArgumentException("scripted_metric doesn't support cross cluster search until 7.8.0"); + } + out.writeGenericValue(aggregations.get(0)); + } else { + out.writeCollection(aggregations, StreamOutput::writeGenericValue); + } } @Override @@ -74,14 +93,14 @@ public String getWriteableName() { @Override public Object aggregation() { - if (aggregation.size() != 1) { + if (aggregations.size() != 1) { throw new IllegalStateException("aggregation was not reduced"); } - return aggregation.get(0); + return aggregations.get(0); } List getAggregation() { - return aggregation; + return aggregations; } @Override @@ -89,7 +108,7 @@ public InternalAggregation reduce(List aggregations, Reduce List aggregationObjects = new ArrayList<>(); for (InternalAggregation aggregation : aggregations) { InternalScriptedMetric mapReduceAggregation = (InternalScriptedMetric) aggregation; - aggregationObjects.addAll(mapReduceAggregation.aggregation); + aggregationObjects.addAll(mapReduceAggregation.aggregations); } InternalScriptedMetric firstAggregation = ((InternalScriptedMetric) aggregations.get(0)); List aggregation; @@ -142,12 +161,12 @@ public boolean equals(Object obj) { InternalScriptedMetric other = (InternalScriptedMetric) obj; return Objects.equals(reduceScript, other.reduceScript) && - Objects.equals(aggregation, other.aggregation); + Objects.equals(aggregations, other.aggregations); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), reduceScript, aggregation); + return Objects.hash(super.hashCode(), reduceScript, aggregations); } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetricTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetricTests.java index 176ed21fd275c..17fbb58b37a92 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetricTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalScriptedMetricTests.java @@ -132,7 +132,7 @@ protected void assertReduced(InternalScriptedMetric reduced, List) reduced.aggregation()).size()); + assertEquals(inputs.size(), ((List) reduced.aggregation()).size()); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index ef5d6c69b79f1..5bd5fd04f2a0d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -281,7 +281,7 @@ protected T createUnmappedInstance(String name, Map metadata) { return createTestInstance(name, metadata); } - public void testReduceRandom() { + public void testReduceRandom() throws IOException { String name = randomAlphaOfLength(5); List inputs = new ArrayList<>(); List toReduce = new ArrayList<>(); @@ -296,7 +296,7 @@ public void testReduceRandom() { ScriptService mockScriptService = mockScriptService(); MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); if (randomBoolean() && toReduce.size() > 1) { - // sometimes do an incremental reduce + // sometimes do an partial reduce Collections.shuffle(toReduce, random()); int r = randomIntBetween(1, toReduceSize); List internalAggregations = toReduce.subList(0, r); @@ -312,6 +312,13 @@ public void testReduceRandom() { //check that non final reduction never adds buckets assertThat(reducedBucketCount, lessThanOrEqualTo(initialBucketCount)); toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize)); + /* + * sometimes simulate cross cluster search by serializing and + * deserializing the partially reduced result. + */ + if (randomBoolean()) { + reduced = copyInstance(reduced); + } toReduce.add(reduced); } MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, From 89439f9540542c7eaaa5d2dc8b9e3d72498df33b Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 6 Apr 2020 11:33:32 -0400 Subject: [PATCH 2/3] itr --- .../rest-api-spec/test/multi_cluster/10_basic.yml | 3 +-- .../elasticsearch/test/InternalAggregationTestCase.java | 7 ++++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml index edf5579439364..29e459490c71b 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml @@ -199,7 +199,6 @@ # scripted_metric - do: search: - rest_total_hits_as_int: true index: test_index,my_remote_cluster:test_index body: seq_no_primary_term: true @@ -227,7 +226,7 @@ - match: {_clusters.successful: 2} - match: {_clusters.skipped: 0} - match: { _shards.total: 5 } - - match: { hits.total: 11 } + - match: { hits.total.value: 11 } - length: { aggregations.cluster.buckets: 2 } - match: { aggregations.cluster.buckets.0.key: "remote_cluster" } - match: { aggregations.cluster.buckets.0.doc_count: 6 } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index 5bd5fd04f2a0d..cb147db1757f9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -296,7 +296,7 @@ public void testReduceRandom() throws IOException { ScriptService mockScriptService = mockScriptService(); MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); if (randomBoolean() && toReduce.size() > 1) { - // sometimes do an partial reduce + // sometimes do a partial reduce Collections.shuffle(toReduce, random()); int r = randomIntBetween(1, toReduceSize); List internalAggregations = toReduce.subList(0, r); @@ -313,8 +313,9 @@ public void testReduceRandom() throws IOException { assertThat(reducedBucketCount, lessThanOrEqualTo(initialBucketCount)); toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize)); /* - * sometimes simulate cross cluster search by serializing and - * deserializing the partially reduced result. + * Sometimes serializing and deserializing the partially reduced + * result to simulate the compaction that we attempt after a + * partial reduce. And to simulate cross cluster search. */ if (randomBoolean()) { reduced = copyInstance(reduced); From f525d4cec333b5e155755f67e731c8eaa5cf6d7e Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 6 Apr 2020 16:41:58 -0400 Subject: [PATCH 3/3] Sneaky --- .../test/InternalAggregationTestCase.java | 2 +- .../stringstats/InternalStringStatsTests.java | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java index cb147db1757f9..834f3333af5bb 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalAggregationTestCase.java @@ -311,7 +311,6 @@ public void testReduceRandom() throws IOException { int reducedBucketCount = countInnerBucket(reduced); //check that non final reduction never adds buckets assertThat(reducedBucketCount, lessThanOrEqualTo(initialBucketCount)); - toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize)); /* * Sometimes serializing and deserializing the partially reduced * result to simulate the compaction that we attempt after a @@ -320,6 +319,7 @@ public void testReduceRandom() throws IOException { if (randomBoolean()) { reduced = copyInstance(reduced); } + toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize)); toReduce.add(reduced); } MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(DEFAULT_MAX_BUCKETS, diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/stringstats/InternalStringStatsTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/stringstats/InternalStringStatsTests.java index afc210477f832..4e2649d358a63 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/stringstats/InternalStringStatsTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/stringstats/InternalStringStatsTests.java @@ -42,8 +42,16 @@ protected InternalStringStats createTestInstance(String name, Map