From 4caace556849794446eb38a6b3b79668fe1c6843 Mon Sep 17 00:00:00 2001 From: Josemy Duarte Date: Sun, 27 Oct 2019 22:33:25 +0100 Subject: [PATCH 1/2] Fix #48469 - Refactor and DRY up Kahan Sum algorithm Add license Fix #48469 - Refactor and DRY up Kahan Sum algorithm --- .../aggregations/metrics/AvgAggregator.java | 15 +-- .../aggregations/metrics/CompensatedSum.java | 105 ++++++++++++++++++ .../metrics/ExtendedStatsAggregator.java | 32 ++---- .../metrics/GeoCentroidAggregator.java | 21 ++-- .../aggregations/metrics/InternalAvg.java | 14 +-- .../aggregations/metrics/InternalStats.java | 16 +-- .../aggregations/metrics/InternalSum.java | 14 +-- .../metrics/InternalWeightedAvg.java | 34 ++---- .../aggregations/metrics/StatsAggregator.java | 14 +-- .../aggregations/metrics/SumAggregator.java | 16 +-- .../metrics/WeightedAvgAggregator.java | 15 +-- .../metrics/CompensatedSumTests.java | 88 +++++++++++++++ 12 files changed, 252 insertions(+), 132 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/search/aggregations/metrics/CompensatedSum.java create mode 100644 server/src/test/java/org/elasticsearch/search/aggregations/metrics/CompensatedSumTests.java diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java index 22142799a9358..3a70f2b74c8a0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java @@ -87,20 +87,15 @@ public void collect(int doc, long bucket) throws IOException { // accurate than naive summation. double sum = sums.get(bucket); double compensation = compensations.get(bucket); + CompensatedSum kahanSummation = CompensatedSum.newInstance(sum, compensation); for (int i = 0; i < valueCount; i++) { double value = values.nextValue(); - if (Double.isFinite(value) == false) { - sum += value; - } else if (Double.isFinite(sum)) { - double corrected = value - compensation; - double newSum = sum + corrected; - compensation = (newSum - sum) - corrected; - sum = newSum; - } + kahanSummation = kahanSummation.add(value); } - sums.set(bucket, sum); - compensations.set(bucket, compensation); + + sums.set(bucket, kahanSummation.value()); + compensations.set(bucket, kahanSummation.delta()); } } }; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CompensatedSum.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CompensatedSum.java new file mode 100644 index 0000000000000..95dc6f0f15b33 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CompensatedSum.java @@ -0,0 +1,105 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.metrics; + + +/** + * Used to calculate sums using the Kahan summation algorithm. + * + *

The Kahan summation algorithm (also known as compensated summation) reduces the numerical errors that + * occur when adding a sequence of finite precision floating point numbers. Numerical errors arise due to + * truncation and rounding. These errors can lead to numerical instability. + * + * @see Kahan Summation Algorithm + */ +public class CompensatedSum { + + private static final double NO_CORRECTION = 0.0; + + private final double value; + private final double delta; + + /** + * Used to calculate sums using the Kahan summation algorithm. + * + * @param value the sum + * @param delta correction term + */ + private CompensatedSum(double value, double delta) { + this.value = value; + this.delta = delta; + } + + public static CompensatedSum newInstance(double value, double delta) { + return new CompensatedSum(value, delta); + } + + public static CompensatedSum newZeroInstance() { + return new CompensatedSum(0.0, 0.0); + } + + /** + * The value of the sum. + */ + public double value() { + return value; + } + + /** + * The correction term. + */ + public double delta() { + return delta; + } + + /** + * Increments the Kahan sum by adding a value and a correction term. + */ + public CompensatedSum add(double value, double delta) { + return add(new CompensatedSum(value, delta)); + } + + /** + * Increments the Kahan sum by adding a value without a correction term. + */ + public CompensatedSum add(double value) { + return add(new CompensatedSum(value, NO_CORRECTION)); + } + + /** + * Increments the Kahan sum by adding two sums, and updating the correction term for reducing numeric errors. + */ + public CompensatedSum add(CompensatedSum other) { + + if (!Double.isFinite(other.value())) { + return new CompensatedSum(other.value() + this.value, this.delta); + } + + if (Double.isFinite(this.value)) { + double correctedSum = other.value() + (this.delta + other.delta()); + double updatedValue = this.value + correctedSum; + double updatedDelta = correctedSum - (updatedValue - this.value); + return new CompensatedSum(updatedValue, updatedDelta); + } + return this; + } + +} + diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java index 4774bec573e42..fb345b075321e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java @@ -117,34 +117,24 @@ public void collect(int doc, long bucket) throws IOException { // which is more accurate than naive summation. double sum = sums.get(bucket); double compensation = compensations.get(bucket); + CompensatedSum compensatedSum = CompensatedSum.newInstance(sum, compensation); + double sumOfSqr = sumOfSqrs.get(bucket); double compensationOfSqr = compensationOfSqrs.get(bucket); + CompensatedSum compensatedSumOfSqr = CompensatedSum.newInstance(sumOfSqr, compensationOfSqr); + for (int i = 0; i < valuesCount; i++) { double value = values.nextValue(); - if (Double.isFinite(value) == false) { - sum += value; - sumOfSqr += value * value; - } else { - if (Double.isFinite(sum)) { - double corrected = value - compensation; - double newSum = sum + corrected; - compensation = (newSum - sum) - corrected; - sum = newSum; - } - if (Double.isFinite(sumOfSqr)) { - double correctedOfSqr = value * value - compensationOfSqr; - double newSumOfSqr = sumOfSqr + correctedOfSqr; - compensationOfSqr = (newSumOfSqr - sumOfSqr) - correctedOfSqr; - sumOfSqr = newSumOfSqr; - } - } + compensatedSum = compensatedSum.add(value); + compensatedSumOfSqr = compensatedSumOfSqr.add(value * value); min = Math.min(min, value); max = Math.max(max, value); } - sums.set(bucket, sum); - compensations.set(bucket, compensation); - sumOfSqrs.set(bucket, sumOfSqr); - compensationOfSqrs.set(bucket, compensationOfSqr); + + sums.set(bucket, compensatedSum.value()); + compensations.set(bucket, compensatedSum.delta()); + sumOfSqrs.set(bucket, compensatedSumOfSqr.value()); + compensationOfSqrs.set(bucket, compensatedSumOfSqr.delta()); mins.set(bucket, min); maxes.set(bucket, max); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java index 414679f6e2e42..3911bf4ed1582 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java @@ -88,24 +88,21 @@ public void collect(int doc, long bucket) throws IOException { double sumLon = lonSum.get(bucket); double compensationLon = lonCompensations.get(bucket); + CompensatedSum compensatedSumLat = CompensatedSum.newInstance(sumLat, compensationLat); + CompensatedSum compensatedSumLon = CompensatedSum.newInstance(sumLon, compensationLon); + // update the sum for (int i = 0; i < valueCount; ++i) { GeoPoint value = values.nextValue(); //latitude - double correctedLat = value.getLat() - compensationLat; - double newSumLat = sumLat + correctedLat; - compensationLat = (newSumLat - sumLat) - correctedLat; - sumLat = newSumLat; + compensatedSumLat = compensatedSumLat.add(value.getLat()); //longitude - double correctedLon = value.getLon() - compensationLon; - double newSumLon = sumLon + correctedLon; - compensationLon = (newSumLon - sumLon) - correctedLon; - sumLon = newSumLon; + compensatedSumLon = compensatedSumLon.add(value.getLon()); } - lonSum.set(bucket, sumLon); - lonCompensations.set(bucket, compensationLon); - latSum.set(bucket, sumLat); - latCompensations.set(bucket, compensationLat); + lonSum.set(bucket, compensatedSumLon.value()); + lonCompensations.set(bucket, compensatedSumLon.delta()); + latSum.set(bucket, compensatedSumLat.value()); + latCompensations.set(bucket, compensatedSumLat.delta()); } } }; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalAvg.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalAvg.java index 089407314de99..a6b4821faa79b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalAvg.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalAvg.java @@ -88,24 +88,16 @@ public String getWriteableName() { @Override public InternalAvg doReduce(List aggregations, ReduceContext reduceContext) { + CompensatedSum kahanSummation = CompensatedSum.newZeroInstance(); long count = 0; - double sum = 0; - double compensation = 0; // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. for (InternalAggregation aggregation : aggregations) { InternalAvg avg = (InternalAvg) aggregation; count += avg.count; - if (Double.isFinite(avg.sum) == false) { - sum += avg.sum; - } else if (Double.isFinite(sum)) { - double corrected = avg.sum - compensation; - double newSum = sum + corrected; - compensation = (newSum - sum) - corrected; - sum = newSum; - } + kahanSummation = kahanSummation.add(avg.sum); } - return new InternalAvg(getName(), sum, count, format, pipelineAggregators(), getMetaData()); + return new InternalAvg(getName(), kahanSummation.value(), count, format, pipelineAggregators(), getMetaData()); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalStats.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalStats.java index 3c0aa8c7313bc..061c655c45cb8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalStats.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalStats.java @@ -149,8 +149,8 @@ public InternalStats doReduce(List aggregations, ReduceCont long count = 0; double min = Double.POSITIVE_INFINITY; double max = Double.NEGATIVE_INFINITY; - double sum = 0; - double compensation = 0; + CompensatedSum kahanSummation = CompensatedSum.newZeroInstance(); + for (InternalAggregation aggregation : aggregations) { InternalStats stats = (InternalStats) aggregation; count += stats.getCount(); @@ -158,17 +158,9 @@ public InternalStats doReduce(List aggregations, ReduceCont max = Math.max(max, stats.getMax()); // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. - double value = stats.getSum(); - if (Double.isFinite(value) == false) { - sum += value; - } else if (Double.isFinite(sum)) { - double corrected = value - compensation; - double newSum = sum + corrected; - compensation = (newSum - sum) - corrected; - sum = newSum; - } + kahanSummation = kahanSummation.add(stats.getSum()); } - return new InternalStats(name, count, sum, min, max, format, pipelineAggregators(), getMetaData()); + return new InternalStats(name, count, kahanSummation.value(), min, max, format, pipelineAggregators(), getMetaData()); } static class Fields { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java index 2e1c9635aa896..cc8ca0c322e4e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java @@ -74,20 +74,12 @@ public double getValue() { public InternalSum doReduce(List aggregations, ReduceContext reduceContext) { // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. - double sum = 0; - double compensation = 0; + CompensatedSum kahanSummation = CompensatedSum.newZeroInstance(); for (InternalAggregation aggregation : aggregations) { double value = ((InternalSum) aggregation).sum; - if (Double.isFinite(value) == false) { - sum += value; - } else if (Double.isFinite(sum)) { - double corrected = value - compensation; - double newSum = sum + corrected; - compensation = (newSum - sum) - corrected; - sum = newSum; - } + kahanSummation = kahanSummation.add(value); } - return new InternalSum(name, sum, format, pipelineAggregators(), getMetaData()); + return new InternalSum(name, kahanSummation.value(), format, pipelineAggregators(), getMetaData()); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalWeightedAvg.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalWeightedAvg.java index 6165803299296..80c601506cfb7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalWeightedAvg.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalWeightedAvg.java @@ -88,37 +88,21 @@ public String getWriteableName() { @Override public InternalWeightedAvg doReduce(List aggregations, ReduceContext reduceContext) { - double weight = 0; - double sum = 0; - double sumCompensation = 0; - double weightCompensation = 0; + CompensatedSum sumCompensation = CompensatedSum.newZeroInstance(); + CompensatedSum weightCompensation = CompensatedSum.newZeroInstance(); + // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. for (InternalAggregation aggregation : aggregations) { InternalWeightedAvg avg = (InternalWeightedAvg) aggregation; - // If the weight is Inf or NaN, just add it to the running tally to "convert" to - // Inf/NaN. This keeps the behavior bwc from before kahan summing - if (Double.isFinite(avg.weight) == false) { - weight += avg.weight; - } else if (Double.isFinite(weight)) { - double corrected = avg.weight - weightCompensation; - double newWeight = weight + corrected; - weightCompensation = (newWeight - weight) - corrected; - weight = newWeight; - } - // If the avg is Inf or NaN, just add it to the running tally to "convert" to - // Inf/NaN. This keeps the behavior bwc from before kahan summing - if (Double.isFinite(avg.sum) == false) { - sum += avg.sum; - } else if (Double.isFinite(sum)) { - double corrected = avg.sum - sumCompensation; - double newSum = sum + corrected; - sumCompensation = (newSum - sum) - corrected; - sum = newSum; - } + weightCompensation = weightCompensation.add(avg.weight); + sumCompensation = sumCompensation.add(avg.sum); } - return new InternalWeightedAvg(getName(), sum, weight, format, pipelineAggregators(), getMetaData()); + + return new InternalWeightedAvg(getName(), sumCompensation.value(), weightCompensation.value(), + format, pipelineAggregators(), getMetaData()); } + @Override public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { builder.field(CommonFields.VALUE.getPreferredName(), weight != 0 ? getValue() : null); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java index 1093ecb0692ab..a22a805ec9961 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java @@ -105,22 +105,16 @@ public void collect(int doc, long bucket) throws IOException { // accurate than naive summation. double sum = sums.get(bucket); double compensation = compensations.get(bucket); + CompensatedSum kahanSummation = CompensatedSum.newInstance(sum, compensation); for (int i = 0; i < valuesCount; i++) { double value = values.nextValue(); - if (Double.isFinite(value) == false) { - sum += value; - } else if (Double.isFinite(sum)) { - double corrected = value - compensation; - double newSum = sum + corrected; - compensation = (newSum - sum) - corrected; - sum = newSum; - } + kahanSummation = kahanSummation.add(value); min = Math.min(min, value); max = Math.max(max, value); } - sums.set(bucket, sum); - compensations.set(bucket, compensation); + sums.set(bucket, kahanSummation.value()); + compensations.set(bucket, kahanSummation.delta()); mins.set(bucket, min); maxes.set(bucket, max); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java index 07e91f5e12bec..ae1f390d3300b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java @@ -81,19 +81,15 @@ public void collect(int doc, long bucket) throws IOException { // accurate than naive summation. double sum = sums.get(bucket); double compensation = compensations.get(bucket); + CompensatedSum kahanSummation = CompensatedSum.newInstance(sum, compensation); + for (int i = 0; i < valuesCount; i++) { double value = values.nextValue(); - if (Double.isFinite(value) == false) { - sum += value; - } else if (Double.isFinite(sum)) { - double corrected = value - compensation; - double newSum = sum + corrected; - compensation = (newSum - sum) - corrected; - sum = newSum; - } + kahanSummation = kahanSummation.add(value); } - compensations.set(bucket, compensation); - sums.set(bucket, sum); + + compensations.set(bucket, kahanSummation.delta()); + sums.set(bucket, kahanSummation.value()); } } }; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/WeightedAvgAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/WeightedAvgAggregator.java index 08d06cf21eda3..613216d1423a7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/WeightedAvgAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/WeightedAvgAggregator.java @@ -117,16 +117,11 @@ private static void kahanSum(double value, DoubleArray values, DoubleArray compe double sum = values.get(bucket); double compensation = compensations.get(bucket); - if (Double.isFinite(value) == false) { - sum += value; - } else if (Double.isFinite(sum)) { - double corrected = value - compensation; - double newSum = sum + corrected; - compensation = (newSum - sum) - corrected; - sum = newSum; - } - values.set(bucket, sum); - compensations.set(bucket, compensation); + CompensatedSum kahanSummation = CompensatedSum.newInstance(sum, compensation) + .add(value); + + values.set(bucket, kahanSummation.value()); + compensations.set(bucket, kahanSummation.delta()); } @Override diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/CompensatedSumTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/CompensatedSumTests.java new file mode 100644 index 0000000000000..a785136e9e172 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/CompensatedSumTests.java @@ -0,0 +1,88 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.metrics; + +import org.elasticsearch.test.ESTestCase; +import org.junit.Assert; + +public class CompensatedSumTests extends ESTestCase { + + /** + * When adding a series of numbers the order of the numbers should not impact the results. + * + *

This test shows that a naive summation comes up with a different result than Kahan + * Summation when you start with either a smaller or larger number in some cases and + * helps prove our Kahan Summation is working. + */ + public void testAdd() { + final CompensatedSum smallSum = CompensatedSum.newInstance(0.001, 0.0); + final CompensatedSum largeSum = CompensatedSum.newInstance(1000, 0.0); + + CompensatedSum compensatedResult1 = smallSum; + CompensatedSum compensatedResult2 = largeSum; + double naiveResult1 = smallSum.value(); + double naiveResult2 = largeSum.value(); + + for (int i = 0; i < 10; i++) { + compensatedResult1 = compensatedResult1.add(smallSum); + compensatedResult2 = compensatedResult2.add(smallSum); + naiveResult1 += smallSum.value(); + naiveResult2 += smallSum.value(); + } + + compensatedResult1 = compensatedResult1.add(largeSum); + compensatedResult2 = compensatedResult2.add(smallSum); + naiveResult1 += largeSum.value(); + naiveResult2 += smallSum.value(); + + // Kahan summation gave the same result no matter what order we added + Assert.assertEquals(1000.011, compensatedResult1.value(), 0.0); + Assert.assertEquals(1000.011, compensatedResult2.value(), 0.0); + + // naive addition gave a small floating point error + Assert.assertEquals(1000.011, naiveResult1, 0.0); + Assert.assertEquals(1000.0109999999997, naiveResult2, 0.0); + + Assert.assertEquals(compensatedResult1.value(), compensatedResult2.value(), 0.0); + Assert.assertEquals(naiveResult1, naiveResult2, 0.0001); + Assert.assertNotEquals(naiveResult1, naiveResult2, 0.0); + } + + public void testDelta() { + CompensatedSum compensatedResult1 = CompensatedSum.newInstance(0.001, 0.0); + for (int i = 0; i < 10; i++) { + compensatedResult1 = compensatedResult1.add(0.001); + } + + Assert.assertEquals(0.011, compensatedResult1.value(), 0.0); + Assert.assertEquals(Double.parseDouble("8.673617379884035E-19"), compensatedResult1.delta(), 0.0); + } + + public void testInfiniteAndNaN() { + CompensatedSum compensatedResult1 = CompensatedSum.newZeroInstance(); + double[] doubles = {Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY, Double.NaN}; + for (double d : doubles) { + compensatedResult1 = compensatedResult1.add(d); + + } + + Assert.assertTrue(Double.isNaN(compensatedResult1.value())); + } +} From 15051defc133f758ab0cc81f258ecc8a5bb23acf Mon Sep 17 00:00:00 2001 From: Josemy Duarte Date: Sun, 3 Nov 2019 18:04:34 +0100 Subject: [PATCH 2/2] Remove static factory & immutability --- .../aggregations/metrics/AvgAggregator.java | 4 +- .../aggregations/metrics/CompensatedSum.java | 40 +++++++------------ .../metrics/ExtendedStatsAggregator.java | 8 ++-- .../metrics/GeoCentroidAggregator.java | 8 ++-- .../aggregations/metrics/InternalAvg.java | 4 +- .../aggregations/metrics/InternalStats.java | 4 +- .../aggregations/metrics/InternalSum.java | 4 +- .../metrics/InternalWeightedAvg.java | 8 ++-- .../aggregations/metrics/StatsAggregator.java | 4 +- .../aggregations/metrics/SumAggregator.java | 4 +- .../metrics/WeightedAvgAggregator.java | 2 +- .../metrics/CompensatedSumTests.java | 24 +++++------ 12 files changed, 51 insertions(+), 63 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java index 3a70f2b74c8a0..843e380e425ea 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/AvgAggregator.java @@ -87,11 +87,11 @@ public void collect(int doc, long bucket) throws IOException { // accurate than naive summation. double sum = sums.get(bucket); double compensation = compensations.get(bucket); - CompensatedSum kahanSummation = CompensatedSum.newInstance(sum, compensation); + CompensatedSum kahanSummation = new CompensatedSum(sum, compensation); for (int i = 0; i < valueCount; i++) { double value = values.nextValue(); - kahanSummation = kahanSummation.add(value); + kahanSummation.add(value); } sums.set(bucket, kahanSummation.value()); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CompensatedSum.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CompensatedSum.java index 95dc6f0f15b33..965ac665159a0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CompensatedSum.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/CompensatedSum.java @@ -33,8 +33,8 @@ public class CompensatedSum { private static final double NO_CORRECTION = 0.0; - private final double value; - private final double delta; + private double value; + private double delta; /** * Used to calculate sums using the Kahan summation algorithm. @@ -42,19 +42,11 @@ public class CompensatedSum { * @param value the sum * @param delta correction term */ - private CompensatedSum(double value, double delta) { + public CompensatedSum(double value, double delta) { this.value = value; this.delta = delta; } - public static CompensatedSum newInstance(double value, double delta) { - return new CompensatedSum(value, delta); - } - - public static CompensatedSum newZeroInstance() { - return new CompensatedSum(0.0, 0.0); - } - /** * The value of the sum. */ @@ -69,37 +61,33 @@ public double delta() { return delta; } - /** - * Increments the Kahan sum by adding a value and a correction term. - */ - public CompensatedSum add(double value, double delta) { - return add(new CompensatedSum(value, delta)); - } - /** * Increments the Kahan sum by adding a value without a correction term. */ public CompensatedSum add(double value) { - return add(new CompensatedSum(value, NO_CORRECTION)); + return add(value, NO_CORRECTION); } /** * Increments the Kahan sum by adding two sums, and updating the correction term for reducing numeric errors. */ - public CompensatedSum add(CompensatedSum other) { - - if (!Double.isFinite(other.value())) { - return new CompensatedSum(other.value() + this.value, this.delta); + public CompensatedSum add(double value, double delta) { + // If the value is Inf or NaN, just add it to the running tally to "convert" to + // Inf/NaN. This keeps the behavior bwc from before kahan summing + if (Double.isFinite(value) == false) { + this.value = value + this.value; } if (Double.isFinite(this.value)) { - double correctedSum = other.value() + (this.delta + other.delta()); + double correctedSum = value + (this.delta + delta); double updatedValue = this.value + correctedSum; - double updatedDelta = correctedSum - (updatedValue - this.value); - return new CompensatedSum(updatedValue, updatedDelta); + this.delta = correctedSum - (updatedValue - this.value); + this.value = updatedValue; } + return this; } + } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java index fb345b075321e..c4dcfebf5e1be 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/ExtendedStatsAggregator.java @@ -117,16 +117,16 @@ public void collect(int doc, long bucket) throws IOException { // which is more accurate than naive summation. double sum = sums.get(bucket); double compensation = compensations.get(bucket); - CompensatedSum compensatedSum = CompensatedSum.newInstance(sum, compensation); + CompensatedSum compensatedSum = new CompensatedSum(sum, compensation); double sumOfSqr = sumOfSqrs.get(bucket); double compensationOfSqr = compensationOfSqrs.get(bucket); - CompensatedSum compensatedSumOfSqr = CompensatedSum.newInstance(sumOfSqr, compensationOfSqr); + CompensatedSum compensatedSumOfSqr = new CompensatedSum(sumOfSqr, compensationOfSqr); for (int i = 0; i < valuesCount; i++) { double value = values.nextValue(); - compensatedSum = compensatedSum.add(value); - compensatedSumOfSqr = compensatedSumOfSqr.add(value * value); + compensatedSum.add(value); + compensatedSumOfSqr.add(value * value); min = Math.min(min, value); max = Math.max(max, value); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java index 3911bf4ed1582..d5a91b002213e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/GeoCentroidAggregator.java @@ -88,16 +88,16 @@ public void collect(int doc, long bucket) throws IOException { double sumLon = lonSum.get(bucket); double compensationLon = lonCompensations.get(bucket); - CompensatedSum compensatedSumLat = CompensatedSum.newInstance(sumLat, compensationLat); - CompensatedSum compensatedSumLon = CompensatedSum.newInstance(sumLon, compensationLon); + CompensatedSum compensatedSumLat = new CompensatedSum(sumLat, compensationLat); + CompensatedSum compensatedSumLon = new CompensatedSum(sumLon, compensationLon); // update the sum for (int i = 0; i < valueCount; ++i) { GeoPoint value = values.nextValue(); //latitude - compensatedSumLat = compensatedSumLat.add(value.getLat()); + compensatedSumLat.add(value.getLat()); //longitude - compensatedSumLon = compensatedSumLon.add(value.getLon()); + compensatedSumLon.add(value.getLon()); } lonSum.set(bucket, compensatedSumLon.value()); lonCompensations.set(bucket, compensatedSumLon.delta()); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalAvg.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalAvg.java index a6b4821faa79b..3e3b2ae03ea0d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalAvg.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalAvg.java @@ -88,14 +88,14 @@ public String getWriteableName() { @Override public InternalAvg doReduce(List aggregations, ReduceContext reduceContext) { - CompensatedSum kahanSummation = CompensatedSum.newZeroInstance(); + CompensatedSum kahanSummation = new CompensatedSum(0, 0); long count = 0; // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. for (InternalAggregation aggregation : aggregations) { InternalAvg avg = (InternalAvg) aggregation; count += avg.count; - kahanSummation = kahanSummation.add(avg.sum); + kahanSummation.add(avg.sum); } return new InternalAvg(getName(), kahanSummation.value(), count, format, pipelineAggregators(), getMetaData()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalStats.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalStats.java index 061c655c45cb8..adb879999a410 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalStats.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalStats.java @@ -149,7 +149,7 @@ public InternalStats doReduce(List aggregations, ReduceCont long count = 0; double min = Double.POSITIVE_INFINITY; double max = Double.NEGATIVE_INFINITY; - CompensatedSum kahanSummation = CompensatedSum.newZeroInstance(); + CompensatedSum kahanSummation = new CompensatedSum(0, 0); for (InternalAggregation aggregation : aggregations) { InternalStats stats = (InternalStats) aggregation; @@ -158,7 +158,7 @@ public InternalStats doReduce(List aggregations, ReduceCont max = Math.max(max, stats.getMax()); // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. - kahanSummation = kahanSummation.add(stats.getSum()); + kahanSummation.add(stats.getSum()); } return new InternalStats(name, count, kahanSummation.value(), min, max, format, pipelineAggregators(), getMetaData()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java index cc8ca0c322e4e..6e6315eded101 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalSum.java @@ -74,10 +74,10 @@ public double getValue() { public InternalSum doReduce(List aggregations, ReduceContext reduceContext) { // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. - CompensatedSum kahanSummation = CompensatedSum.newZeroInstance(); + CompensatedSum kahanSummation = new CompensatedSum(0, 0); for (InternalAggregation aggregation : aggregations) { double value = ((InternalSum) aggregation).sum; - kahanSummation = kahanSummation.add(value); + kahanSummation.add(value); } return new InternalSum(name, kahanSummation.value(), format, pipelineAggregators(), getMetaData()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalWeightedAvg.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalWeightedAvg.java index 80c601506cfb7..4b3523b03ac3d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalWeightedAvg.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalWeightedAvg.java @@ -88,15 +88,15 @@ public String getWriteableName() { @Override public InternalWeightedAvg doReduce(List aggregations, ReduceContext reduceContext) { - CompensatedSum sumCompensation = CompensatedSum.newZeroInstance(); - CompensatedSum weightCompensation = CompensatedSum.newZeroInstance(); + CompensatedSum sumCompensation = new CompensatedSum(0, 0); + CompensatedSum weightCompensation = new CompensatedSum(0, 0); // Compute the sum of double values with Kahan summation algorithm which is more // accurate than naive summation. for (InternalAggregation aggregation : aggregations) { InternalWeightedAvg avg = (InternalWeightedAvg) aggregation; - weightCompensation = weightCompensation.add(avg.weight); - sumCompensation = sumCompensation.add(avg.sum); + weightCompensation.add(avg.weight); + sumCompensation.add(avg.sum); } return new InternalWeightedAvg(getName(), sumCompensation.value(), weightCompensation.value(), diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java index a22a805ec9961..7799f498dd491 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/StatsAggregator.java @@ -105,11 +105,11 @@ public void collect(int doc, long bucket) throws IOException { // accurate than naive summation. double sum = sums.get(bucket); double compensation = compensations.get(bucket); - CompensatedSum kahanSummation = CompensatedSum.newInstance(sum, compensation); + CompensatedSum kahanSummation = new CompensatedSum(sum, compensation); for (int i = 0; i < valuesCount; i++) { double value = values.nextValue(); - kahanSummation = kahanSummation.add(value); + kahanSummation.add(value); min = Math.min(min, value); max = Math.max(max, value); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java index ae1f390d3300b..cc440fd7d0554 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/SumAggregator.java @@ -81,11 +81,11 @@ public void collect(int doc, long bucket) throws IOException { // accurate than naive summation. double sum = sums.get(bucket); double compensation = compensations.get(bucket); - CompensatedSum kahanSummation = CompensatedSum.newInstance(sum, compensation); + CompensatedSum kahanSummation = new CompensatedSum(sum, compensation); for (int i = 0; i < valuesCount; i++) { double value = values.nextValue(); - kahanSummation = kahanSummation.add(value); + kahanSummation.add(value); } compensations.set(bucket, kahanSummation.delta()); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/WeightedAvgAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/WeightedAvgAggregator.java index 613216d1423a7..11b4a5df951dd 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/WeightedAvgAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/WeightedAvgAggregator.java @@ -117,7 +117,7 @@ private static void kahanSum(double value, DoubleArray values, DoubleArray compe double sum = values.get(bucket); double compensation = compensations.get(bucket); - CompensatedSum kahanSummation = CompensatedSum.newInstance(sum, compensation) + CompensatedSum kahanSummation = new CompensatedSum(sum, compensation) .add(value); values.set(bucket, kahanSummation.value()); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/CompensatedSumTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/CompensatedSumTests.java index a785136e9e172..c26de0d2c2399 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/CompensatedSumTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/CompensatedSumTests.java @@ -32,23 +32,23 @@ public class CompensatedSumTests extends ESTestCase { * helps prove our Kahan Summation is working. */ public void testAdd() { - final CompensatedSum smallSum = CompensatedSum.newInstance(0.001, 0.0); - final CompensatedSum largeSum = CompensatedSum.newInstance(1000, 0.0); + final CompensatedSum smallSum = new CompensatedSum(0.001, 0.0); + final CompensatedSum largeSum = new CompensatedSum(1000, 0.0); - CompensatedSum compensatedResult1 = smallSum; - CompensatedSum compensatedResult2 = largeSum; + CompensatedSum compensatedResult1 = new CompensatedSum(0.001, 0.0); + CompensatedSum compensatedResult2 = new CompensatedSum(1000, 0.0); double naiveResult1 = smallSum.value(); double naiveResult2 = largeSum.value(); for (int i = 0; i < 10; i++) { - compensatedResult1 = compensatedResult1.add(smallSum); - compensatedResult2 = compensatedResult2.add(smallSum); + compensatedResult1.add(smallSum.value()); + compensatedResult2.add(smallSum.value()); naiveResult1 += smallSum.value(); naiveResult2 += smallSum.value(); } - compensatedResult1 = compensatedResult1.add(largeSum); - compensatedResult2 = compensatedResult2.add(smallSum); + compensatedResult1.add(largeSum.value()); + compensatedResult2.add(smallSum.value()); naiveResult1 += largeSum.value(); naiveResult2 += smallSum.value(); @@ -66,9 +66,9 @@ public void testAdd() { } public void testDelta() { - CompensatedSum compensatedResult1 = CompensatedSum.newInstance(0.001, 0.0); + CompensatedSum compensatedResult1 = new CompensatedSum(0.001, 0.0); for (int i = 0; i < 10; i++) { - compensatedResult1 = compensatedResult1.add(0.001); + compensatedResult1.add(0.001); } Assert.assertEquals(0.011, compensatedResult1.value(), 0.0); @@ -76,10 +76,10 @@ public void testDelta() { } public void testInfiniteAndNaN() { - CompensatedSum compensatedResult1 = CompensatedSum.newZeroInstance(); + CompensatedSum compensatedResult1 = new CompensatedSum(0, 0); double[] doubles = {Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY, Double.NaN}; for (double d : doubles) { - compensatedResult1 = compensatedResult1.add(d); + compensatedResult1.add(d); }