diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/330_auto_date_histogram.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/330_auto_date_histogram.yml index 93285a802ab90..68b4d8538a6d2 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/330_auto_date_histogram.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/330_auto_date_histogram.yml @@ -4,6 +4,7 @@ setup: index: test body: settings: + number_of_shards: 1 number_of_replicas: 0 mappings: properties: @@ -75,3 +76,25 @@ setup: - match: { aggregations.histo.buckets.1.doc_count: 2 } - match: { aggregations.histo.buckets.1.v.value: 7 } - match: { aggregations.histo_avg_v.value: 5 } + +--- +"profile at top level": + - skip: + version: " - 7.99.99" + reason: Debug information added in 8.0.0 (to be backported to 7.9.0) + + - do: + search: + body: + profile: true + size: 0 + aggs: + histo: + auto_date_histogram: + field: date + buckets: 2 + + - match: { hits.total.value: 4 } + - length: { aggregations.histo.buckets: 2 } + - match: { profile.shards.0.aggregations.0.type: AutoDateHistogramAggregator.FromSingle } + - match: { profile.shards.0.aggregations.0.debug.surviving_buckets: 4 } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index 75cf163074251..3ed81bc4a8a0a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -30,6 +30,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregator; import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.internal.SearchContext; @@ -329,7 +330,7 @@ protected final InternalAggregation[] buildAggregationsForVariableBuckets(lo * @param bucketOrds hash of values to the bucket ordinal */ protected final InternalAggregation[] buildAggregationsForVariableBuckets(long[] owningBucketOrds, LongKeyedBucketOrds bucketOrds, - BucketBuilderForVariable bucketBuilder, Function, InternalAggregation> resultBuilder) throws IOException { + BucketBuilderForVariable bucketBuilder, ResultBuilderForVariable resultBuilder) throws IOException { long totalOrdsToCollect = 0; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { totalOrdsToCollect += bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); @@ -360,7 +361,7 @@ protected final InternalAggregation[] buildAggregationsForVariableBuckets(lo } buckets.add(bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults[b++])); } - results[ordIdx] = resultBuilder.apply(buckets); + results[ordIdx] = resultBuilder.build(owningBucketOrds[ordIdx], buckets); } return results; } @@ -368,6 +369,10 @@ protected final InternalAggregation[] buildAggregationsForVariableBuckets(lo protected interface BucketBuilderForVariable { B build(long bucketValue, int docCount, InternalAggregations subAggregationResults); } + @FunctionalInterface + protected interface ResultBuilderForVariable { + InternalAggregation build(long owninigBucketOrd, List buckets); + } /** * Utility method to build empty aggregations of the sub aggregators. @@ -407,4 +412,15 @@ public BucketComparator bucketComparator(String key, SortOrder order) { "Either drop the key (a la \"" + name() + "\") or change it to \"doc_count\" (a la \"" + name() + ".doc_count\") or \"key\"."); } + + public static boolean descendsFromGlobalAggregator(Aggregator parent) { + while (parent != null) { + if (parent.getClass() == GlobalAggregator.class) { + return true; + } + parent = parent.parent(); + } + return false; + } + } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java index 807ec17e46358..d2499611e4ea5 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java @@ -23,7 +23,6 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.BucketCollector; import org.elasticsearch.search.aggregations.MultiBucketCollector; -import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregator; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; @@ -70,16 +69,6 @@ protected void doPreCollection() throws IOException { collectableSubAggregators = MultiBucketCollector.wrap(collectors); } - public static boolean descendsFromGlobalAggregator(Aggregator parent) { - while (parent != null) { - if (parent.getClass() == GlobalAggregator.class) { - return true; - } - parent = parent.parent(); - } - return false; - } - public DeferringBucketCollector getDeferringCollector() { // Default impl is a collector that selects the best buckets // but an alternative defer policy may be based on best docs. diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java index f026bcdc01f41..4f639368ea745 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java @@ -90,7 +90,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I double roundKey = Double.longBitsToDouble(bucketValue); double key = roundKey * interval + offset; return new InternalHistogram.Bucket(key, docCount, keyed, formatter, subAggregationResults); - }, buckets -> { + }, (owningBucketOrd, buckets) -> { // the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregationBuilder.java index 3ac8b5b314e66..a97c947744792 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregationBuilder.java @@ -290,6 +290,10 @@ public long getRoughEstimateDurationMillis() { return roughEstimateDurationMillis; } + public long getMaximumRoughEstimateDurationMillis() { + return getRoughEstimateDurationMillis() * getMaximumInnerInterval(); + } + @Override public int hashCode() { return Objects.hash(rounding, Arrays.hashCode(innerIntervals), dateTimeUnit); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 49340ed5aae62..59946f272a4a8 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -23,8 +23,11 @@ import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.common.Rounding; +import org.elasticsearch.common.Rounding.Prepared; import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.util.LongHash; +import org.elasticsearch.common.util.ByteArray; +import org.elasticsearch.common.util.IntArray; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; @@ -36,6 +39,7 @@ import org.elasticsearch.search.aggregations.bucket.DeferringBucketCollector; import org.elasticsearch.search.aggregations.bucket.MergingBucketsDeferringCollector; import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo; +import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import org.elasticsearch.search.internal.SearchContext; @@ -43,30 +47,76 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.LongToIntFunction; /** * An aggregator for date values that attempts to return a specific number of * buckets, reconfiguring how it rounds dates to buckets on the fly as new * data arrives. + *

+ * This class is abstract because there is a simple implementation for when the + * aggregator only collects from a single bucket and a more complex + * implementation when it doesn't. This ain't great from a test coverage + * standpoint but the simpler implementation is between 7% and 15% faster + * when you can use it. This is an important aggregation and we need that + * performance. */ -class AutoDateHistogramAggregator extends DeferableBucketAggregator { +abstract class AutoDateHistogramAggregator extends DeferableBucketAggregator { + static AutoDateHistogramAggregator build( + String name, + AggregatorFactories factories, + int targetBuckets, + RoundingInfo[] roundingInfos, + Function roundingPreparer, + ValuesSourceConfig valuesSourceConfig, + SearchContext aggregationContext, + Aggregator parent, + boolean collectsFromSingleBucket, + Map metadata + ) throws IOException { + return collectsFromSingleBucket + ? new FromSingle( + name, + factories, + targetBuckets, + roundingInfos, + roundingPreparer, + valuesSourceConfig, + aggregationContext, + parent, + metadata + ) + : new FromMany( + name, + factories, + targetBuckets, + roundingInfos, + roundingPreparer, + valuesSourceConfig, + aggregationContext, + parent, + metadata + ); + } private final ValuesSource.Numeric valuesSource; private final DocValueFormat formatter; - private final RoundingInfo[] roundingInfos; private final Function roundingPreparer; - private int roundingIdx = 0; - private Rounding.Prepared preparedRounding; - - private LongHash bucketOrds; - private int targetBuckets; + /** + * A reference to the collector so we can + * {@link MergingBucketsDeferringCollector#mergeBuckets(long[])}. + */ private MergingBucketsDeferringCollector deferringCollector; - AutoDateHistogramAggregator( + protected final RoundingInfo[] roundingInfos; + protected final int targetBuckets; + + private AutoDateHistogramAggregator( String name, AggregatorFactories factories, - int numBuckets, + int targetBuckets, RoundingInfo[] roundingInfos, Function roundingPreparer, ValuesSourceConfig valuesSourceConfig, @@ -76,20 +126,16 @@ class AutoDateHistogramAggregator extends DeferableBucketAggregator { ) throws IOException { super(name, factories, aggregationContext, parent, metadata); - this.targetBuckets = numBuckets; + this.targetBuckets = targetBuckets; // TODO: Remove null usage here, by using a different aggregator for create this.valuesSource = valuesSourceConfig.hasValues() ? (ValuesSource.Numeric) valuesSourceConfig.getValuesSource() : null; this.formatter = valuesSourceConfig.format(); this.roundingInfos = roundingInfos; this.roundingPreparer = roundingPreparer; - preparedRounding = roundingPreparer.apply(roundingInfos[roundingIdx].rounding); - - bucketOrds = new LongHash(1, aggregationContext.bigArrays()); - } @Override - public ScoreMode scoreMode() { + public final ScoreMode scoreMode() { if (valuesSource != null && valuesSource.needsScores()) { return ScoreMode.COMPLETE; } @@ -97,29 +143,145 @@ public ScoreMode scoreMode() { } @Override - protected boolean shouldDefer(Aggregator aggregator) { + protected final boolean shouldDefer(Aggregator aggregator) { return true; } @Override - public DeferringBucketCollector getDeferringCollector() { + public final DeferringBucketCollector getDeferringCollector() { deferringCollector = new MergingBucketsDeferringCollector(context, descendsFromGlobalAggregator(parent())); return deferringCollector; } + protected abstract LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException; + @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { + public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { if (valuesSource == null) { return LeafBucketCollector.NO_OP_COLLECTOR; } - final SortedNumericDocValues values = valuesSource.longValues(ctx); - return new LeafBucketCollectorBase(sub, values) { - @Override - public void collect(int doc, long bucket) throws IOException { - assert bucket == 0; - if (values.advanceExact(doc)) { - final int valuesCount = values.docValueCount(); + return getLeafCollector(valuesSource.longValues(ctx), sub); + } + + protected final InternalAggregation[] buildAggregations( + LongKeyedBucketOrds bucketOrds, + LongToIntFunction roundingIndexFor, + long[] owningBucketOrds + ) throws IOException { + return buildAggregationsForVariableBuckets( + owningBucketOrds, + bucketOrds, + (bucketValue, docCount, subAggregationResults) -> new InternalAutoDateHistogram.Bucket( + bucketValue, + docCount, + formatter, + subAggregationResults + ), + (owningBucketOrd, buckets) -> { + // the contract of the histogram aggregation is that shards must return + // buckets ordered by key in ascending order + CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); + + // value source will be null for unmapped fields + InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo( + roundingInfos, + roundingIndexFor.applyAsInt(owningBucketOrd), + buildEmptySubAggregations() + ); + + return new InternalAutoDateHistogram(name, buckets, targetBuckets, emptyBucketInfo, formatter, metadata(), 1); + } + ); + } + + @Override + public final InternalAggregation buildEmptyAggregation() { + InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo( + roundingInfos, + 0, + buildEmptySubAggregations() + ); + return new InternalAutoDateHistogram(name, Collections.emptyList(), targetBuckets, emptyBucketInfo, formatter, metadata(), 1); + } + + protected final Rounding.Prepared prepareRounding(int index) { + return roundingPreparer.apply(roundingInfos[index].rounding); + } + + protected final void merge(long[] mergeMap, long newNumBuckets) { + mergeBuckets(mergeMap, newNumBuckets); + if (deferringCollector != null) { + deferringCollector.mergeBuckets(mergeMap); + } + } + + /** + * Initially it uses the most fine grained rounding configuration possible + * but as more data arrives it rebuckets the data until it "fits" in the + * aggregation rounding. Similar to {@link FromMany} this checks both the + * bucket count and range of the aggregation, but unlike + * {@linkplain FromMany} it keeps an accurate count of the buckets and it + * doesn't delay rebucketing. + *

+ * Rebucketing is roughly {@code O(number_of_hits_collected_so_far)} but we + * rebucket roughly {@code O(log number_of_hits_collected_so_far)} because + * the "shape" of the roundings is roughly + * logarithmically increasing. + */ + private static class FromSingle extends AutoDateHistogramAggregator { + private int roundingIdx; + private Rounding.Prepared preparedRounding; + /** + * Map from value to bucket ordinals. + *

+ * It is important that this is the exact subtype of + * {@link LongKeyedBucketOrds} so that the JVM can make a monomorphic + * call to {@link LongKeyedBucketOrds#add(long, long)} in the tight + * inner loop of {@link LeafBucketCollector#collect(int, long)}. You'd + * think that it wouldn't matter, but its seriously 7%-15% performance + * difference for the aggregation. Yikes. + */ + private LongKeyedBucketOrds.FromSingle bucketOrds; + private long min = Long.MAX_VALUE; + private long max = Long.MIN_VALUE; + + FromSingle( + String name, + AggregatorFactories factories, + int targetBuckets, + RoundingInfo[] roundingInfos, + Function roundingPreparer, + ValuesSourceConfig valuesSourceConfig, + SearchContext aggregationContext, + Aggregator parent, + Map metadata + ) throws IOException { + super( + name, + factories, + targetBuckets, + roundingInfos, + roundingPreparer, + valuesSourceConfig, + aggregationContext, + parent, + metadata + ); + + preparedRounding = prepareRounding(0); + bucketOrds = new LongKeyedBucketOrds.FromSingle(context.bigArrays()); + } + + @Override + protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException { + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + assert owningBucketOrd == 0; + if (false == values.advanceExact(doc)) { + return; + } + int valuesCount = values.docValueCount(); long previousRounded = Long.MIN_VALUE; for (int i = 0; i < valuesCount; ++i) { @@ -129,74 +291,355 @@ public void collect(int doc, long bucket) throws IOException { if (rounded == previousRounded) { continue; } - long bucketOrd = bucketOrds.add(rounded); - if (bucketOrd < 0) { // already seen - bucketOrd = -1 - bucketOrd; - collectExistingBucket(sub, doc, bucketOrd); - } else { - collectBucket(sub, doc, bucketOrd); - while (roundingIdx < roundingInfos.length - 1 - && bucketOrds.size() > (targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval())) { - increaseRounding(); + collectValue(doc, rounded); + previousRounded = rounded; + } + } + + private void collectValue(int doc, long rounded) throws IOException { + long bucketOrd = bucketOrds.add(0, rounded); + if (bucketOrd < 0) { // already seen + bucketOrd = -1 - bucketOrd; + collectExistingBucket(sub, doc, bucketOrd); + return; + } + collectBucket(sub, doc, bucketOrd); + increaseRoundingIfNeeded(rounded); + } + + private void increaseRoundingIfNeeded(long rounded) { + if (roundingIdx >= roundingInfos.length - 1) { + return; + } + min = Math.min(min, rounded); + max = Math.max(max, rounded); + if (bucketOrds.size() <= targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() + && max - min <= targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()) { + return; + } + do { + try (LongKeyedBucketOrds oldOrds = bucketOrds) { + preparedRounding = prepareRounding(++roundingIdx); + long[] mergeMap = new long[Math.toIntExact(oldOrds.size())]; + bucketOrds = new LongKeyedBucketOrds.FromSingle(context.bigArrays()); + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = oldOrds.ordsEnum(0); + while (ordsEnum.next()) { + long oldKey = ordsEnum.value(); + long newKey = preparedRounding.round(oldKey); + long newBucketOrd = bucketOrds.add(0, newKey); + mergeMap[(int) ordsEnum.ord()] = newBucketOrd >= 0 ? newBucketOrd : -1 - newBucketOrd; } + merge(mergeMap, bucketOrds.size()); + } + } while (roundingIdx < roundingInfos.length - 1 + && (bucketOrds.size() > targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() + || max - min > targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis())); + } + }; + } + + @Override + public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + return buildAggregations(bucketOrds, l -> roundingIdx, owningBucketOrds); + } + + @Override + public void collectDebugInfo(BiConsumer add) { + super.collectDebugInfo(add); + add.accept("surviving_buckets", bucketOrds.size()); + } + + @Override + protected void doClose() { + Releasables.close(bucketOrds); + } + } + + /** + * Initially it uses the most fine grained rounding configuration possible but + * as more data arrives it uses two heuristics to shift to coarser and coarser + * rounding. The first heuristic is the number of buckets, specifically, + * when there are more buckets than can "fit" in the current rounding it shifts + * to the next rounding. Instead of redoing the rounding, it estimates the + * number of buckets that will "survive" at the new rounding and uses + * that as the initial value for the bucket count that it + * increments in order to trigger another promotion to another coarser + * rounding. This works fairly well at containing the number of buckets, but + * the estimate of the number of buckets will be wrong if the buckets are + * quite a spread out compared to the rounding. + *

+ * The second heuristic it uses to trigger promotion to a coarser rounding is + * the distance between the min and max bucket. When that distance is greater + * than what the current rounding supports it promotes. This heuristic + * isn't good at limiting the number of buckets but is great when the buckets + * are spread out compared to the rounding. So it should complement the first + * heuristic. + *

+ * When promoting a rounding we keep the old buckets around because it is + * expensive to call {@link MergingBucketsDeferringCollector#mergeBuckets}. + * In particular it is {@code O(number_of_hits_collected_so_far)}. So if we + * called it frequently we'd end up in {@code O(n^2)} territory. Bad news for + * aggregations! Instead, we keep a "budget" of buckets that we're ok + * "wasting". When we promote the rounding and our estimate of the number of + * "dead" buckets that have data but have yet to be merged into the buckets + * that are valid for the current rounding exceeds the budget then we rebucket + * the entire aggregation and double the budget. + *

+ * Once we're done collecting and we know exactly which buckets we'll be + * returning we finally perform a "real", "perfect bucketing", + * rounding all of the keys for {@code owningBucketOrd} that we're going to + * collect and picking the rounding based on a real, accurate count and the + * min and max. + */ + private static class FromMany extends AutoDateHistogramAggregator { + /** + * An array of prepared roundings in the same order as + * {@link #roundingInfos}. The 0th entry is prepared initially, + * and other entries are null until first needed. + */ + private final Rounding.Prepared[] preparedRoundings; + /** + * Map from value to bucket ordinals. + *

+ * It is important that this is the exact subtype of + * {@link LongKeyedBucketOrds} so that the JVM can make a monomorphic + * call to {@link LongKeyedBucketOrds#add(long, long)} in the tight + * inner loop of {@link LeafBucketCollector#collect(int, long)}. + */ + private LongKeyedBucketOrds.FromMany bucketOrds; + /** + * The index of the rounding that each {@code owningBucketOrd} is + * currently using. + *

+ * During collection we use overestimates for how much buckets are save + * by bumping to the next rounding index. So we end up bumping less + * aggressively than a "perfect" algorithm. That is fine because we + * correct the error when we merge the buckets together all the way + * up in {@link InternalAutoDateHistogram#reduceBucket}. In particular, + * on final reduce we bump the rounding until it we appropriately + * cover the date range across all of the results returned by all of + * the {@link AutoDateHistogramAggregator}s. + */ + private ByteArray roundingIndices; + /** + * The minimum key per {@code owningBucketOrd}. + */ + private LongArray mins; + /** + * The max key per {@code owningBucketOrd}. + */ + private LongArray maxes; + + /** + * An underestimate of the number of buckets that are "live" in the + * current rounding for each {@code owningBucketOrdinal}. + */ + private IntArray liveBucketCountUnderestimate; + /** + * An over estimate of the number of wasted buckets. When this gets + * too high we {@link #rebucket} which sets it to 0. + */ + private long wastedBucketsOverestimate = 0; + /** + * The next {@link #wastedBucketsOverestimate} that will trigger a + * {@link #rebucket() rebucketing}. + */ + private long nextRebucketAt = 1000; // TODO this could almost certainly start higher when asMultiBucketAggregator is gone + /** + * The number of times the aggregator had to {@link #rebucket()} the + * results. We keep this just to report to the profiler. + */ + private int rebucketCount = 0; + + FromMany( + String name, + AggregatorFactories factories, + int targetBuckets, + RoundingInfo[] roundingInfos, + Function roundingPreparer, + ValuesSourceConfig valuesSourceConfig, + SearchContext aggregationContext, + Aggregator parent, + Map metadata + ) throws IOException { + + super( + name, + factories, + targetBuckets, + roundingInfos, + roundingPreparer, + valuesSourceConfig, + aggregationContext, + parent, + metadata + ); + assert roundingInfos.length < 127 : "Rounding must fit in a signed byte"; + roundingIndices = context.bigArrays().newByteArray(1, true); + mins = context.bigArrays().newLongArray(1, false); + mins.set(0, Long.MAX_VALUE); + maxes = context.bigArrays().newLongArray(1, false); + maxes.set(0, Long.MIN_VALUE); + preparedRoundings = new Rounding.Prepared[roundingInfos.length]; + // Prepare the first rounding because we know we'll need it. + preparedRoundings[0] = roundingPreparer.apply(roundingInfos[0].rounding); + bucketOrds = new LongKeyedBucketOrds.FromMany(context.bigArrays()); + liveBucketCountUnderestimate = context.bigArrays().newIntArray(1, true); + } + + @Override + protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException { + return new LeafBucketCollectorBase(sub, values) { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + if (false == values.advanceExact(doc)) { + return; + } + int valuesCount = values.docValueCount(); + + long previousRounded = Long.MIN_VALUE; + int roundingIdx = roundingIndexFor(owningBucketOrd); + for (int i = 0; i < valuesCount; ++i) { + long value = values.nextValue(); + long rounded = preparedRoundings[roundingIdx].round(value); + assert rounded >= previousRounded; + if (rounded == previousRounded) { + continue; } + roundingIdx = collectValue(owningBucketOrd, roundingIdx, doc, rounded); previousRounded = rounded; } } - } - private void increaseRounding() { - try (LongHash oldBucketOrds = bucketOrds) { - LongHash newBucketOrds = new LongHash(1, context.bigArrays()); - long[] mergeMap = new long[(int) oldBucketOrds.size()]; - preparedRounding = roundingPreparer.apply(roundingInfos[++roundingIdx].rounding); - for (int i = 0; i < oldBucketOrds.size(); i++) { - long oldKey = oldBucketOrds.get(i); - long newKey = preparedRounding.round(oldKey); - long newBucketOrd = newBucketOrds.add(newKey); - if (newBucketOrd >= 0) { - mergeMap[i] = newBucketOrd; - } else { - mergeMap[i] = -1 - newBucketOrd; - } + private int collectValue(long owningBucketOrd, int roundingIdx, int doc, long rounded) throws IOException { + long bucketOrd = bucketOrds.add(owningBucketOrd, rounded); + if (bucketOrd < 0) { // already seen + bucketOrd = -1 - bucketOrd; + collectExistingBucket(sub, doc, bucketOrd); + return roundingIdx; + } + collectBucket(sub, doc, bucketOrd); + liveBucketCountUnderestimate = context.bigArrays().grow(liveBucketCountUnderestimate, owningBucketOrd + 1); + int estimatedBucketCount = liveBucketCountUnderestimate.increment(owningBucketOrd, 1); + return increaseRoundingIfNeeded(owningBucketOrd, estimatedBucketCount, rounded, roundingIdx); + } + + /** + * Increase the rounding of {@code owningBucketOrd} using + * estimated, bucket counts, {@link #rebucket() rebucketing} the all + * buckets if the estimated number of wasted buckets is too high. + */ + private int increaseRoundingIfNeeded(long owningBucketOrd, int oldEstimatedBucketCount, long newKey, int oldRounding) { + if (oldRounding >= roundingInfos.length - 1) { + return oldRounding; + } + if (mins.size() < owningBucketOrd + 1) { + long oldSize = mins.size(); + mins = context.bigArrays().grow(mins, owningBucketOrd + 1); + mins.fill(oldSize, mins.size(), Long.MAX_VALUE); } - mergeBuckets(mergeMap, newBucketOrds.size()); - if (deferringCollector != null) { - deferringCollector.mergeBuckets(mergeMap); + if (maxes.size() < owningBucketOrd + 1) { + long oldSize = maxes.size(); + maxes = context.bigArrays().grow(maxes, owningBucketOrd + 1); + maxes.fill(oldSize, maxes.size(), Long.MIN_VALUE); } - bucketOrds = newBucketOrds; + + long min = Math.min(mins.get(owningBucketOrd), newKey); + mins.set(owningBucketOrd, min); + long max = Math.max(maxes.get(owningBucketOrd), newKey); + maxes.set(owningBucketOrd, max); + if (oldEstimatedBucketCount <= targetBuckets * roundingInfos[oldRounding].getMaximumInnerInterval() + && max - min <= targetBuckets * roundingInfos[oldRounding].getMaximumRoughEstimateDurationMillis()) { + return oldRounding; + } + long oldRoughDuration = roundingInfos[oldRounding].roughEstimateDurationMillis; + int newRounding = oldRounding; + int newEstimatedBucketCount; + do { + newRounding++; + double ratio = (double) oldRoughDuration / (double) roundingInfos[newRounding].getRoughEstimateDurationMillis(); + newEstimatedBucketCount = (int) Math.ceil(oldEstimatedBucketCount * ratio); + } while (newRounding < roundingInfos.length - 1 + && (newEstimatedBucketCount > targetBuckets * roundingInfos[newRounding].getMaximumInnerInterval() + || max - min > targetBuckets * roundingInfos[newRounding].getMaximumRoughEstimateDurationMillis())); + setRounding(owningBucketOrd, newRounding); + mins.set(owningBucketOrd, preparedRoundings[newRounding].round(mins.get(owningBucketOrd))); + maxes.set(owningBucketOrd, preparedRoundings[newRounding].round(maxes.get(owningBucketOrd))); + wastedBucketsOverestimate += oldEstimatedBucketCount - newEstimatedBucketCount; + if (wastedBucketsOverestimate > nextRebucketAt) { + rebucket(); + // Bump the threshold for the next rebucketing + wastedBucketsOverestimate = 0; + nextRebucketAt *= 2; + } else { + liveBucketCountUnderestimate.set(owningBucketOrd, newEstimatedBucketCount); + } + return newRounding; } + }; + } + + private void rebucket() { + rebucketCount++; + try (LongKeyedBucketOrds oldOrds = bucketOrds) { + long[] mergeMap = new long[Math.toIntExact(oldOrds.size())]; + bucketOrds = new LongKeyedBucketOrds.FromMany(context.bigArrays()); + for (long owningBucketOrd = 0; owningBucketOrd <= oldOrds.maxOwningBucketOrd(); owningBucketOrd++) { + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = oldOrds.ordsEnum(owningBucketOrd); + Rounding.Prepared preparedRounding = preparedRoundings[roundingIndexFor(owningBucketOrd)]; + while (ordsEnum.next()) { + long oldKey = ordsEnum.value(); + long newKey = preparedRounding.round(oldKey); + long newBucketOrd = bucketOrds.add(owningBucketOrd, newKey); + mergeMap[(int) ordsEnum.ord()] = newBucketOrd >= 0 ? newBucketOrd : -1 - newBucketOrd; + } + liveBucketCountUnderestimate = context.bigArrays().grow(liveBucketCountUnderestimate, owningBucketOrd + 1); + liveBucketCountUnderestimate.set(owningBucketOrd, Math.toIntExact(bucketOrds.bucketsInOrd(owningBucketOrd))); + } + merge(mergeMap, bucketOrds.size()); } - }; - } + } - @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, - (bucketValue, docCount, subAggregationResults) -> - new InternalAutoDateHistogram.Bucket(bucketValue, docCount, formatter, subAggregationResults), - buckets -> { - // the contract of the histogram aggregation is that shards must return - // buckets ordered by key in ascending order - CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); - - // value source will be null for unmapped fields - InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundingInfos, - roundingIdx, buildEmptySubAggregations()); - - return new InternalAutoDateHistogram(name, buckets, targetBuckets, emptyBucketInfo, formatter, metadata(), 1); - }); - } + @Override + public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + /* + * Rebucket before building the aggregation to build as small as result + * as possible. + * + * TODO it'd be faster if we could apply the merging on the fly as we + * replay the hits and build the buckets. How much faster is not clear, + * but it does have the advantage of only touching the buckets that we + * want to collect. + */ + rebucket(); + return buildAggregations(bucketOrds, this::roundingIndexFor, owningBucketOrds); + } - @Override - public InternalAggregation buildEmptyAggregation() { - InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundingInfos, roundingIdx, - buildEmptySubAggregations()); - return new InternalAutoDateHistogram(name, Collections.emptyList(), targetBuckets, emptyBucketInfo, formatter, metadata(), 1); - } + @Override + public void collectDebugInfo(BiConsumer add) { + super.collectDebugInfo(add); + add.accept("surviving_buckets", bucketOrds.size()); + add.accept("wasted_buckets_overestimate", wastedBucketsOverestimate); + add.accept("next_rebucket_at", nextRebucketAt); + add.accept("rebucket_count", rebucketCount); + } - @Override - public void doClose() { - Releasables.close(bucketOrds); + private void setRounding(long owningBucketOrd, int newRounding) { + roundingIndices = context.bigArrays().grow(roundingIndices, owningBucketOrd + 1); + roundingIndices.set(owningBucketOrd, (byte) newRounding); + if (preparedRoundings[newRounding] == null) { + preparedRoundings[newRounding] = prepareRounding(newRounding); + } + } + + private int roundingIndexFor(long owningBucketOrd) { + return owningBucketOrd < roundingIndices.size() ? roundingIndices.get(owningBucketOrd) : 0; + } + + @Override + public void doClose() { + Releasables.close(bucketOrds, roundingIndices, mins, maxes, liveBucketCountUnderestimate); + } } + } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorFactory.java index 2c64a44fd146f..d4022845a77d2 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorFactory.java @@ -43,7 +43,7 @@ public final class AutoDateHistogramAggregatorFactory extends ValuesSourceAggreg public static void registerAggregators(ValuesSourceRegistry.Builder builder) { builder.register(AutoDateHistogramAggregationBuilder.NAME, List.of(CoreValuesSourceType.DATE, CoreValuesSourceType.NUMERIC, CoreValuesSourceType.BOOLEAN), - (AutoDateHistogramAggregatorSupplier) AutoDateHistogramAggregator::new); + (AutoDateHistogramAggregatorSupplier) AutoDateHistogramAggregator::build); } private final int numBuckets; @@ -67,9 +67,6 @@ protected Aggregator doCreateInternal(SearchContext searchContext, Aggregator parent, boolean collectsFromSingleBucket, Map metadata) throws IOException { - if (collectsFromSingleBucket == false) { - return asMultiBucketAggregator(this, searchContext, parent); - } AggregatorSupplier aggregatorSupplier = queryShardContext.getValuesSourceRegistry().getAggregator(config, AutoDateHistogramAggregationBuilder.NAME); if (aggregatorSupplier instanceof AutoDateHistogramAggregatorSupplier == false) { @@ -79,14 +76,24 @@ protected Aggregator doCreateInternal(SearchContext searchContext, Function roundingPreparer = config.getValuesSource().roundingPreparer(searchContext.getQueryShardContext().getIndexReader()); return ((AutoDateHistogramAggregatorSupplier) aggregatorSupplier).build(name, factories, numBuckets, roundingInfos, - roundingPreparer, config, searchContext, parent, metadata); + roundingPreparer, config, searchContext, parent, collectsFromSingleBucket, metadata); } @Override protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map metadata) throws IOException { - return new AutoDateHistogramAggregator(name, factories, numBuckets, roundingInfos, Rounding::prepareForUnknown, - config, searchContext, parent, metadata); + return AutoDateHistogramAggregator.build( + name, + factories, + numBuckets, + roundingInfos, + Rounding::prepareForUnknown, + config, + searchContext, + parent, + false, + metadata + ); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorSupplier.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorSupplier.java index 8ecf709469659..049c452840d52 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorSupplier.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorSupplier.java @@ -43,6 +43,7 @@ Aggregator build( ValuesSourceConfig valuesSourceConfig, SearchContext aggregationContext, Aggregator parent, + boolean collectsFromSingleBucket, Map metadata ) throws IOException; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 3e3f640bb90d3..7f244285aefca 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -144,7 +144,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> { return new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults); - }, buckets -> { + }, (owningBucketOrd, buckets) -> { // the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java index 238fc75ce18af..2e771d64384ae 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java @@ -171,7 +171,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults), - buckets -> { + (owningBucketOrd, buckets) -> { // the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrds.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrds.java index aacf102eb9f2d..79057c38359f3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrds.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrds.java @@ -47,21 +47,29 @@ private LongKeyedBucketOrds() {} public abstract long add(long owningBucketOrd, long value); /** - * Find the {@code owningBucketOrd, value} pair. Return the ord for - * their bucket if they have been added or {@code -1} if they haven't. + * Count the buckets in {@code owningBucketOrd}. + *

+ * Some aggregations expect this to be fast but most wouldn't + * mind particularly if it weren't. */ - public abstract long find(long owningBucketOrd, long value); + public abstract long bucketsInOrd(long owningBucketOrd); /** - * Count the buckets in {@code owningBucketOrd}. + * Find the {@code owningBucketOrd, value} pair. Return the ord for + * their bucket if they have been added or {@code -1} if they haven't. */ - public abstract long bucketsInOrd(long owningBucketOrd); + public abstract long find(long owningBucketOrd, long value); /** * The number of collected buckets. */ public abstract long size(); + /** + * The maximum possible used {@code owningBucketOrd}. + */ + public abstract long maxOwningBucketOrd(); + /** * Build an iterator for buckets inside {@code owningBucketOrd} in order * of increasing ord. @@ -105,10 +113,10 @@ public interface BucketOrdsEnum { /** * Implementation that only works if it is collecting from a single bucket. */ - private static class FromSingle extends LongKeyedBucketOrds { + public static class FromSingle extends LongKeyedBucketOrds { private final LongHash ords; - FromSingle(BigArrays bigArrays) { + public FromSingle(BigArrays bigArrays) { ords = new LongHash(1, bigArrays); } @@ -135,6 +143,11 @@ public long size() { return ords.size(); } + @Override + public long maxOwningBucketOrd() { + return 0; + } + @Override public BucketOrdsEnum ordsEnum(long owningBucketOrd) { assert owningBucketOrd == 0; @@ -173,7 +186,7 @@ public void close() { /** * Implementation that works properly when collecting from many buckets. */ - private static class FromMany extends LongKeyedBucketOrds { + public static class FromMany extends LongKeyedBucketOrds { // TODO we can almost certainly do better here by building something fit for purpose rather than trying to lego together stuff private static class Buckets implements Releasable { private final LongHash valueToThisBucketOrd; @@ -193,7 +206,7 @@ public void close() { private ObjectArray owningOrdToBuckets; private long lastGlobalOrd = -1; - FromMany(BigArrays bigArrays) { + public FromMany(BigArrays bigArrays) { this.bigArrays = bigArrays; owningOrdToBuckets = bigArrays.newObjectArray(1); } @@ -261,6 +274,11 @@ public long size() { return lastGlobalOrd + 1; } + @Override + public long maxOwningBucketOrd() { + return owningOrdToBuckets.size() - 1; + } + @Override public BucketOrdsEnum ordsEnum(long owningBucketOrd) { if (owningBucketOrd >= owningOrdToBuckets.size()) { diff --git a/server/src/main/java/org/elasticsearch/search/profile/aggregation/InternalAggregationProfileTree.java b/server/src/main/java/org/elasticsearch/search/profile/aggregation/InternalAggregationProfileTree.java index 4670d73cdc724..38b9fec2f4704 100644 --- a/server/src/main/java/org/elasticsearch/search/profile/aggregation/InternalAggregationProfileTree.java +++ b/server/src/main/java/org/elasticsearch/search/profile/aggregation/InternalAggregationProfileTree.java @@ -41,6 +41,10 @@ protected String getTypeFromElement(Aggregator element) { if (element instanceof MultiBucketAggregatorWrapper) { return ((MultiBucketAggregatorWrapper) element).getWrappedClass().getSimpleName(); } + Class enclosing = element.getClass().getEnclosingClass(); + if (enclosing != null) { + return enclosing.getSimpleName() + "." + element.getClass().getSimpleName(); + } return element.getClass().getSimpleName(); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java index 406bca2cadcfa..477df9e59163c 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -22,34 +22,41 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.MultiBucketConsumerService; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.InternalMax; import org.elasticsearch.search.aggregations.metrics.InternalStats; +import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper; import org.hamcrest.Matchers; import java.io.IOException; +import java.time.Instant; import java.time.LocalDate; import java.time.YearMonth; import java.time.ZoneOffset; @@ -58,14 +65,20 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.stream.Collectors; +import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasSize; -public class AutoDateHistogramAggregatorTests extends AggregatorTestCase { +public class AutoDateHistogramAggregatorTests extends DateHistogramAggregatorTestCase { private static final String DATE_FIELD = "date"; private static final String INSTANT_FIELD = "instant"; private static final String NUMERIC_FIELD = "numeric"; @@ -95,19 +108,22 @@ public void testMatchNoDocs() throws IOException { } public void testMatchAllDocs() throws IOException { + Map expectedDocCount = new TreeMap<>(); + expectedDocCount.put("2010-01-01T00:00:00.000Z", 2); + expectedDocCount.put("2012-01-01T00:00:00.000Z", 1); + expectedDocCount.put("2013-01-01T00:00:00.000Z", 2); + expectedDocCount.put("2015-01-01T00:00:00.000Z", 3); + expectedDocCount.put("2016-01-01T00:00:00.000Z", 1); + expectedDocCount.put("2017-01-01T00:00:00.000Z", 1); testSearchCase(DEFAULT_QUERY, DATES_WITH_TIME, - aggregation -> aggregation.setNumBuckets(6).field(DATE_FIELD), - histogram -> { - assertEquals(10, histogram.getBuckets().size()); - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - } + aggregation -> aggregation.setNumBuckets(8).field(DATE_FIELD), + result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount)) ); + expectedDocCount.put("2011-01-01T00:00:00.000Z", 0); + expectedDocCount.put("2014-01-01T00:00:00.000Z", 0); testSearchAndReduceCase(DEFAULT_QUERY, DATES_WITH_TIME, aggregation -> aggregation.setNumBuckets(8).field(DATE_FIELD), - histogram -> { - assertEquals(8, histogram.getBuckets().size()); - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - } + result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount)) ); } @@ -194,6 +210,177 @@ public void testSubAggregations() throws IOException { }); } + public void testAsSubAgg() throws IOException { + AggregationBuilder builder = new TermsAggregationBuilder("k1").field("k1").subAggregation( + new AutoDateHistogramAggregationBuilder("dh").field(AGGREGABLE_DATE).setNumBuckets(3).subAggregation( + new MaxAggregationBuilder("max").field("n"))); + asSubAggTestCase(builder, (StringTerms terms) -> { + StringTerms.Bucket a = terms.getBucketByKey("a"); + InternalAutoDateHistogram adh = a.getAggregations().get("dh"); + Map expectedDocCount = new TreeMap<>(); + expectedDocCount.put("2020-01-01T00:00:00.000Z", 2); + expectedDocCount.put("2021-01-01T00:00:00.000Z", 2); + assertThat(bucketCountsAsMap(adh), equalTo(expectedDocCount)); + Map expectedMax = new TreeMap<>(); + expectedMax.put("2020-01-01T00:00:00.000Z", 2.0); + expectedMax.put("2021-01-01T00:00:00.000Z", 4.0); + assertThat(maxAsMap(adh), equalTo(expectedMax)); + + StringTerms.Bucket b = terms.getBucketByKey("b"); + InternalAutoDateHistogram bdh = b.getAggregations().get("dh"); + expectedDocCount.clear(); + expectedDocCount.put("2020-02-01T00:00:00.000Z", 1); + assertThat(bucketCountsAsMap(bdh), equalTo(expectedDocCount)); + expectedMax.clear(); + expectedMax.put("2020-02-01T00:00:00.000Z", 5.0); + assertThat(maxAsMap(bdh), equalTo(expectedMax)); + }); + builder = new TermsAggregationBuilder("k2").field("k2").subAggregation(builder); + asSubAggTestCase(builder, (StringTerms terms) -> { + StringTerms.Bucket a = terms.getBucketByKey("a"); + StringTerms ak1 = a.getAggregations().get("k1"); + StringTerms.Bucket ak1a = ak1.getBucketByKey("a"); + InternalAutoDateHistogram ak1adh = ak1a.getAggregations().get("dh"); + Map expectedDocCount = new TreeMap<>(); + expectedDocCount.put("2020-01-01T00:00:00.000Z", 2); + expectedDocCount.put("2021-01-01T00:00:00.000Z", 1); + assertThat(bucketCountsAsMap(ak1adh), equalTo(expectedDocCount)); + Map expectedMax = new TreeMap<>(); + expectedMax.put("2020-01-01T00:00:00.000Z", 2.0); + expectedMax.put("2021-01-01T00:00:00.000Z", 3.0); + assertThat(maxAsMap(ak1adh), equalTo(expectedMax)); + + StringTerms.Bucket b = terms.getBucketByKey("b"); + StringTerms bk1 = b.getAggregations().get("k1"); + StringTerms.Bucket bk1a = bk1.getBucketByKey("a"); + InternalAutoDateHistogram bk1adh = bk1a.getAggregations().get("dh"); + expectedDocCount.clear(); + expectedDocCount.put("2021-03-01T00:00:00.000Z", 1); + assertThat(bucketCountsAsMap(bk1adh), equalTo(expectedDocCount)); + expectedMax.clear(); + expectedMax.put("2021-03-01T00:00:00.000Z", 4.0); + assertThat(maxAsMap(bk1adh), equalTo(expectedMax)); + StringTerms.Bucket bk1b = bk1.getBucketByKey("b"); + InternalAutoDateHistogram bk1bdh = bk1b.getAggregations().get("dh"); + expectedDocCount.clear(); + expectedDocCount.put("2020-02-01T00:00:00.000Z", 1); + assertThat(bucketCountsAsMap(bk1bdh), equalTo(expectedDocCount)); + expectedMax.clear(); + expectedMax.put("2020-02-01T00:00:00.000Z", 5.0); + assertThat(maxAsMap(bk1bdh), equalTo(expectedMax)); + }); + } + + public void testAsSubAggWithIncreasedRounding() throws IOException { + CheckedBiConsumer buildIndex = (iw, dft) -> { + long start = dft.parse("2020-01-01T00:00:00Z"); + long end = dft.parse("2021-01-01T00:00:00Z"); + long useC = dft.parse("2020-07-01T00:00Z"); + long anHour = dft.resolution().convert(Instant.ofEpochSecond(TimeUnit.HOURS.toSeconds(1))); + List> docs = new ArrayList<>(); + BytesRef aBytes = new BytesRef("a"); + BytesRef bBytes = new BytesRef("b"); + BytesRef cBytes = new BytesRef("c"); + int n = 0; + for (long d = start; d < end; d += anHour) { + docs.add(List.of( + new SortedNumericDocValuesField(AGGREGABLE_DATE, d), + new SortedSetDocValuesField("k1", aBytes), + new SortedSetDocValuesField("k1", d < useC ? bBytes : cBytes), + new SortedNumericDocValuesField("n", n++) + )); + } + /* + * Intentionally add all documents at once to put them on the + * same shard to make the reduce behavior consistent. + */ + iw.addDocuments(docs); + }; + AggregationBuilder builder = new TermsAggregationBuilder("k1").field("k1").subAggregation( + new AutoDateHistogramAggregationBuilder("dh").field(AGGREGABLE_DATE).setNumBuckets(4).subAggregation( + new MaxAggregationBuilder("max").field("n"))); + asSubAggTestCase(builder, buildIndex, (StringTerms terms) -> { + StringTerms.Bucket a = terms.getBucketByKey("a"); + InternalAutoDateHistogram adh = a.getAggregations().get("dh"); + Map expectedDocCount = new TreeMap<>(); + expectedDocCount.put("2020-01-01T00:00:00.000Z", 2184); + expectedDocCount.put("2020-04-01T00:00:00.000Z", 2184); + expectedDocCount.put("2020-07-01T00:00:00.000Z", 2208); + expectedDocCount.put("2020-10-01T00:00:00.000Z", 2208); + assertThat(bucketCountsAsMap(adh), equalTo(expectedDocCount)); + Map expectedMax = new TreeMap<>(); + expectedMax.put("2020-01-01T00:00:00.000Z", 2183.0); + expectedMax.put("2020-04-01T00:00:00.000Z", 4367.0); + expectedMax.put("2020-07-01T00:00:00.000Z", 6575.0); + expectedMax.put("2020-10-01T00:00:00.000Z", 8783.0); + assertThat(maxAsMap(adh), equalTo(expectedMax)); + + StringTerms.Bucket b = terms.getBucketByKey("b"); + InternalAutoDateHistogram bdh = b.getAggregations().get("dh"); + expectedDocCount.clear(); + expectedDocCount.put("2020-01-01T00:00:00.000Z", 2184); + expectedDocCount.put("2020-04-01T00:00:00.000Z", 2184); + assertThat(bucketCountsAsMap(bdh), equalTo(expectedDocCount)); + expectedMax.clear(); + expectedMax.put("2020-01-01T00:00:00.000Z", 2183.0); + expectedMax.put("2020-04-01T00:00:00.000Z", 4367.0); + assertThat(maxAsMap(bdh), equalTo(expectedMax)); + + StringTerms.Bucket c = terms.getBucketByKey("c"); + InternalAutoDateHistogram cdh = c.getAggregations().get("dh"); + expectedDocCount.clear(); + expectedDocCount.put("2020-07-01T00:00:00.000Z", 2208); + expectedDocCount.put("2020-10-01T00:00:00.000Z", 2208); + assertThat(bucketCountsAsMap(cdh), equalTo(expectedDocCount)); + expectedMax.clear(); + expectedMax.put("2020-07-01T00:00:00.000Z", 6575.0); + expectedMax.put("2020-10-01T00:00:00.000Z", 8783.0); + assertThat(maxAsMap(cdh), equalTo(expectedMax)); + }); + } + + public void testAsSubAggInManyBuckets() throws IOException { + CheckedBiConsumer buildIndex = (iw, dft) -> { + long start = dft.parse("2020-01-01T00:00:00Z"); + long end = dft.parse("2021-01-01T00:00:00Z"); + long anHour = dft.resolution().convert(Instant.ofEpochSecond(TimeUnit.HOURS.toSeconds(1))); + List> docs = new ArrayList<>(); + int n = 0; + for (long d = start; d < end; d += anHour) { + docs.add(List.of( + new SortedNumericDocValuesField(AGGREGABLE_DATE, d), + new SortedNumericDocValuesField("n", n % 100) + )); + n++; + } + /* + * Intentionally add all documents at once to put them on the + * same shard to make the reduce behavior consistent. + */ + iw.addDocuments(docs); + }; + AggregationBuilder builder = new HistogramAggregationBuilder("n").field("n").interval(1).subAggregation( + new AutoDateHistogramAggregationBuilder("dh").field(AGGREGABLE_DATE).setNumBuckets(4).subAggregation( + new MaxAggregationBuilder("max").field("n"))); + asSubAggTestCase(builder, buildIndex, (InternalHistogram histo) -> { + assertThat(histo.getBuckets(), hasSize(100)); + for (int n = 0; n < 100; n ++) { + InternalHistogram.Bucket b = histo.getBuckets().get(n); + InternalAutoDateHistogram dh = b.getAggregations().get("dh"); + assertThat(bucketCountsAsMap(dh), hasEntry(equalTo("2020-01-01T00:00:00.000Z"), either(equalTo(21)).or(equalTo(22)))); + assertThat(bucketCountsAsMap(dh), hasEntry(equalTo("2020-04-01T00:00:00.000Z"), either(equalTo(21)).or(equalTo(22)))); + assertThat(bucketCountsAsMap(dh), hasEntry(equalTo("2020-07-01T00:00:00.000Z"), either(equalTo(22)).or(equalTo(23)))); + assertThat(bucketCountsAsMap(dh), hasEntry(equalTo("2020-10-01T00:00:00.000Z"), either(equalTo(22)).or(equalTo(23)))); + Map expectedMax = new TreeMap<>(); + expectedMax.put("2020-01-01T00:00:00.000Z", (double) n); + expectedMax.put("2020-04-01T00:00:00.000Z", (double) n); + expectedMax.put("2020-07-01T00:00:00.000Z", (double) n); + expectedMax.put("2020-10-01T00:00:00.000Z", (double) n); + assertThat(maxAsMap(dh), equalTo(expectedMax)); + } + }); + } + public void testNoDocs() throws IOException { final List dates = Collections.emptyList(); final Consumer aggregation = agg -> agg.setNumBuckets(10).field(DATE_FIELD); @@ -244,20 +431,7 @@ public void testIntervalYear() throws IOException { final long start = LocalDate.of(2015, 1, 1).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli(); final long end = LocalDate.of(2017, 12, 31).atStartOfDay(ZoneOffset.UTC).toInstant().toEpochMilli(); final Query rangeQuery = LongPoint.newRangeQuery(INSTANT_FIELD, start, end); - testSearchCase(rangeQuery, DATES_WITH_TIME, - aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD), - histogram -> { - final List buckets = histogram.getBuckets(); - assertEquals(5, buckets.size()); - for (int i = 0; i < buckets.size(); i++) { - final Histogram.Bucket bucket = buckets.get(i); - assertEquals(DATES_WITH_TIME.get(5 + i), bucket.getKey()); - assertEquals(1, bucket.getDocCount()); - } - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - } - ); - testSearchAndReduceCase(rangeQuery, DATES_WITH_TIME, + testBothCases(rangeQuery, DATES_WITH_TIME, aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD), histogram -> { final ZonedDateTime startDate = ZonedDateTime.of(2015, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); @@ -282,29 +456,13 @@ public void testIntervalMonth() throws IOException { ZonedDateTime.of(2017, 3, 4, 0, 0, 0, 0, ZoneOffset.UTC), ZonedDateTime.of(2017, 3, 5, 0, 0, 0, 0, ZoneOffset.UTC), ZonedDateTime.of(2017, 3, 6, 0, 0, 0, 0, ZoneOffset.UTC)); - testSearchCase(DEFAULT_QUERY, datesForMonthInterval, - aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD), histogram -> { - final List buckets = histogram.getBuckets(); - assertEquals(datesForMonthInterval.size(), buckets.size()); - for (int i = 0; i < buckets.size(); i++) { - final Histogram.Bucket bucket = buckets.get(i); - assertEquals(datesForMonthInterval.get(i), bucket.getKey()); - assertEquals(1, bucket.getDocCount()); - } - }); - testSearchAndReduceCase(DEFAULT_QUERY, datesForMonthInterval, + Map expectedDocCount = new TreeMap<>(); + expectedDocCount.put("2017-01-01T00:00:00.000Z", 1); + expectedDocCount.put("2017-02-01T00:00:00.000Z", 2); + expectedDocCount.put("2017-03-01T00:00:00.000Z", 3); + testBothCases(DEFAULT_QUERY, datesForMonthInterval, aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD), - histogram -> { - final Map expectedDocCount = new HashMap<>(); - expectedDocCount.put(datesForMonthInterval.get(0).withDayOfMonth(1), 1); - expectedDocCount.put(datesForMonthInterval.get(1).withDayOfMonth(1), 2); - expectedDocCount.put(datesForMonthInterval.get(3).withDayOfMonth(1), 3); - final List buckets = histogram.getBuckets(); - assertEquals(expectedDocCount.size(), buckets.size()); - buckets.forEach(bucket -> - assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount())); - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - } + result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount)) ); } @@ -327,28 +485,19 @@ public void testIntervalDay() throws IOException { ZonedDateTime.of(2017, 2, 3, 0, 0, 0, 0, ZoneOffset.UTC), ZonedDateTime.of(2017, 2, 3, 0, 0, 0, 0, ZoneOffset.UTC), ZonedDateTime.of(2017, 2, 5, 0, 0, 0, 0, ZoneOffset.UTC)); - final Map expectedDocCount = new HashMap<>(); - expectedDocCount.put(datesForDayInterval.get(0), 1); - expectedDocCount.put(datesForDayInterval.get(1), 2); - expectedDocCount.put(datesForDayInterval.get(3), 3); - expectedDocCount.put(datesForDayInterval.get(6), 1); - + Map expectedDocCount = new TreeMap<>(); + expectedDocCount.put("2017-02-01T00:00:00.000Z", 1); + expectedDocCount.put("2017-02-02T00:00:00.000Z", 2); + expectedDocCount.put("2017-02-03T00:00:00.000Z", 3); + expectedDocCount.put("2017-02-05T00:00:00.000Z", 1); testSearchCase(DEFAULT_QUERY, datesForDayInterval, - aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), histogram -> { - final List buckets = histogram.getBuckets(); - assertEquals(expectedDocCount.size(), buckets.size()); - buckets.forEach(bucket -> - assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount())); - }); + aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), + result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount)) + ); + expectedDocCount.put("2017-02-04T00:00:00.000Z", 0); testSearchAndReduceCase(DEFAULT_QUERY, datesForDayInterval, aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD), - histogram -> { - final List buckets = histogram.getBuckets(); - assertEquals(5, buckets.size()); - buckets.forEach(bucket -> - assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount())); - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - } + result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount)) ); } @@ -361,32 +510,20 @@ public void testIntervalDayWithTZ() throws IOException { ZonedDateTime.of(2017, 2, 3, 0, 0, 0, 0, ZoneOffset.UTC), ZonedDateTime.of(2017, 2, 3, 0, 0, 0, 0, ZoneOffset.UTC), ZonedDateTime.of(2017, 2, 5, 0, 0, 0, 0, ZoneOffset.UTC)); + Map expectedDocCount = new TreeMap<>(); + expectedDocCount.put("2017-01-31T00:00:00.000-01:00", 1); + expectedDocCount.put("2017-02-01T00:00:00.000-01:00", 2); + expectedDocCount.put("2017-02-02T00:00:00.000-01:00", 3); + expectedDocCount.put("2017-02-04T00:00:00.000-01:00", 1); testSearchCase(DEFAULT_QUERY, datesForDayInterval, - aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD).timeZone(ZoneOffset.ofHours(-1)), histogram -> { - final Map expectedDocCount = new HashMap<>(); - expectedDocCount.put("2017-01-31T23:00:00.000-01:00", 1); - expectedDocCount.put("2017-02-01T23:00:00.000-01:00", 2); - expectedDocCount.put("2017-02-02T23:00:00.000-01:00", 3); - expectedDocCount.put("2017-02-04T23:00:00.000-01:00", 1); - final List buckets = histogram.getBuckets(); - assertEquals(expectedDocCount.size(), buckets.size()); - buckets.forEach(bucket -> - assertEquals(expectedDocCount.getOrDefault(bucket.getKeyAsString(), 0).longValue(), bucket.getDocCount())); - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - }); + aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD).timeZone(ZoneOffset.ofHours(-1)), + result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount)) + ); + expectedDocCount.put("2017-02-03T00:00:00.000-01:00", 0); testSearchAndReduceCase(DEFAULT_QUERY, datesForDayInterval, - aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD).timeZone(ZoneOffset.ofHours(-1)), histogram -> { - final Map expectedDocCount = new HashMap<>(); - expectedDocCount.put("2017-01-31T00:00:00.000-01:00", 1); - expectedDocCount.put("2017-02-01T00:00:00.000-01:00", 2); - expectedDocCount.put("2017-02-02T00:00:00.000-01:00", 3); - expectedDocCount.put("2017-02-04T00:00:00.000-01:00", 1); - final List buckets = histogram.getBuckets(); - assertEquals(5, buckets.size()); - buckets.forEach(bucket -> - assertEquals(expectedDocCount.getOrDefault(bucket.getKeyAsString(), 0).longValue(), bucket.getDocCount())); - assertTrue(AggregationInspectionHelper.hasValue(histogram)); - }); + aggregation -> aggregation.setNumBuckets(5).field(DATE_FIELD).timeZone(ZoneOffset.ofHours(-1)), + result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount)) + ); } public void testIntervalHour() throws IOException { @@ -401,51 +538,36 @@ public void testIntervalHour() throws IOException { ZonedDateTime.of(2017, 2, 1, 16, 6, 0, 0, ZoneOffset.UTC), ZonedDateTime.of(2017, 2, 1, 16, 48, 0, 0, ZoneOffset.UTC), ZonedDateTime.of(2017, 2, 1, 16, 59, 0, 0, ZoneOffset.UTC)); + Map expectedDocCount = new TreeMap<>(); + expectedDocCount.put("2017-02-01T09:00:00.000Z", 2); + expectedDocCount.put("2017-02-01T10:00:00.000Z", 1); + expectedDocCount.put("2017-02-01T13:00:00.000Z", 1); + expectedDocCount.put("2017-02-01T14:00:00.000Z", 2); + expectedDocCount.put("2017-02-01T15:00:00.000Z", 1); + expectedDocCount.put("2017-02-01T15:00:00.000Z", 1); + expectedDocCount.put("2017-02-01T16:00:00.000Z", 3); testSearchCase(DEFAULT_QUERY, datesForHourInterval, aggregation -> aggregation.setNumBuckets(8).field(DATE_FIELD), - histogram -> { - final List buckets = histogram.getBuckets(); - assertEquals(datesForHourInterval.size(), buckets.size()); - for (int i = 0; i < buckets.size(); i++) { - final Histogram.Bucket bucket = buckets.get(i); - assertEquals(datesForHourInterval.get(i), bucket.getKey()); - assertEquals(1, bucket.getDocCount()); - } - } + result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount)) ); + expectedDocCount.put("2017-02-01T11:00:00.000Z", 0); + expectedDocCount.put("2017-02-01T12:00:00.000Z", 0); testSearchAndReduceCase(DEFAULT_QUERY, datesForHourInterval, aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD), - histogram -> { - final Map expectedDocCount = new HashMap<>(); - expectedDocCount.put(datesForHourInterval.get(0).withMinute(0), 2); - expectedDocCount.put(datesForHourInterval.get(2).withMinute(0), 1); - expectedDocCount.put(datesForHourInterval.get(3).withMinute(0), 1); - expectedDocCount.put(datesForHourInterval.get(4).withMinute(0), 2); - expectedDocCount.put(datesForHourInterval.get(6).withMinute(0), 1); - expectedDocCount.put(datesForHourInterval.get(7).withMinute(0), 3); - final List buckets = histogram.getBuckets(); - assertEquals(8, buckets.size()); - buckets.forEach(bucket -> - assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount())); - } + result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount)) ); + expectedDocCount.clear(); + expectedDocCount.put("2017-02-01T09:00:00.000Z", 3); + expectedDocCount.put("2017-02-01T12:00:00.000Z", 3); + expectedDocCount.put("2017-02-01T15:00:00.000Z", 4); testSearchAndReduceCase(DEFAULT_QUERY, datesForHourInterval, aggregation -> aggregation.setNumBuckets(6).field(DATE_FIELD), - histogram -> { - final Map expectedDocCount = new HashMap<>(); - expectedDocCount.put(datesForHourInterval.get(0).withMinute(0), 3); - expectedDocCount.put(datesForHourInterval.get(0).plusHours(3).withMinute(0), 3); - expectedDocCount.put(datesForHourInterval.get(0).plusHours(6).withMinute(0), 4); - final List buckets = histogram.getBuckets(); - assertEquals(expectedDocCount.size(), buckets.size()); - buckets.forEach(bucket -> - assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount())); - } + result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount)) ); } public void testIntervalHourWithTZ() throws IOException { - final List datesForHourInterval = Arrays.asList( + List datesForHourInterval = Arrays.asList( ZonedDateTime.of(2017, 2, 1, 9, 2, 0, 0, ZoneOffset.UTC), ZonedDateTime.of(2017, 2, 1, 9, 35, 0, 0, ZoneOffset.UTC), ZonedDateTime.of(2017, 2, 1, 10, 15, 0, 0, ZoneOffset.UTC), @@ -456,36 +578,22 @@ public void testIntervalHourWithTZ() throws IOException { ZonedDateTime.of(2017, 2, 1, 16, 6, 0, 0, ZoneOffset.UTC), ZonedDateTime.of(2017, 2, 1, 16, 48, 0, 0, ZoneOffset.UTC), ZonedDateTime.of(2017, 2, 1, 16, 59, 0, 0, ZoneOffset.UTC)); + Map expectedDocCount = new TreeMap<>(); + expectedDocCount.put("2017-02-01T08:00:00.000-01:00", 2); + expectedDocCount.put("2017-02-01T09:00:00.000-01:00", 1); + expectedDocCount.put("2017-02-01T12:00:00.000-01:00", 1); + expectedDocCount.put("2017-02-01T13:00:00.000-01:00", 2); + expectedDocCount.put("2017-02-01T14:00:00.000-01:00", 1); + expectedDocCount.put("2017-02-01T15:00:00.000-01:00", 3); testSearchCase(DEFAULT_QUERY, datesForHourInterval, aggregation -> aggregation.setNumBuckets(8).field(DATE_FIELD).timeZone(ZoneOffset.ofHours(-1)), - histogram -> { - final List dateStrings = datesForHourInterval.stream() - .map(dateTime -> DateFormatter.forPattern("strict_date_time") - .format(dateTime.withZoneSameInstant(ZoneOffset.ofHours(-1)))).collect(Collectors.toList()); - final List buckets = histogram.getBuckets(); - assertEquals(datesForHourInterval.size(), buckets.size()); - for (int i = 0; i < buckets.size(); i++) { - final Histogram.Bucket bucket = buckets.get(i); - assertEquals(dateStrings.get(i), bucket.getKeyAsString()); - assertEquals(1, bucket.getDocCount()); - } - } + result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount)) ); + expectedDocCount.put("2017-02-01T10:00:00.000-01:00", 0); + expectedDocCount.put("2017-02-01T11:00:00.000-01:00", 0); testSearchAndReduceCase(DEFAULT_QUERY, datesForHourInterval, aggregation -> aggregation.setNumBuckets(10).field(DATE_FIELD).timeZone(ZoneOffset.ofHours(-1)), - histogram -> { - final Map expectedDocCount = new HashMap<>(); - expectedDocCount.put("2017-02-01T08:00:00.000-01:00", 2); - expectedDocCount.put("2017-02-01T09:00:00.000-01:00", 1); - expectedDocCount.put("2017-02-01T12:00:00.000-01:00", 1); - expectedDocCount.put("2017-02-01T13:00:00.000-01:00", 2); - expectedDocCount.put("2017-02-01T14:00:00.000-01:00", 1); - expectedDocCount.put("2017-02-01T15:00:00.000-01:00", 3); - final List buckets = histogram.getBuckets(); - assertEquals(8, buckets.size()); - buckets.forEach(bucket -> - assertEquals(expectedDocCount.getOrDefault(bucket.getKeyAsString(), 0).longValue(), bucket.getDocCount())); - } + result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount)) ); } @@ -685,31 +793,35 @@ public void testIntervalMinute() throws IOException { ZonedDateTime.of(2017, 2, 1, 9, 15, 37, 0, ZoneOffset.UTC), ZonedDateTime.of(2017, 2, 1, 9, 16, 4, 0, ZoneOffset.UTC), ZonedDateTime.of(2017, 2, 1, 9, 16, 42, 0, ZoneOffset.UTC)); - + Map skeletonDocCount = new TreeMap<>(); + skeletonDocCount.put("2017-02-01T09:02:00.000Z", 2); + skeletonDocCount.put("2017-02-01T09:15:00.000Z", 1); + skeletonDocCount.put("2017-02-01T09:16:00.000Z", 2); testSearchCase(DEFAULT_QUERY, datesForMinuteInterval, aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD), - histogram -> { - final List buckets = histogram.getBuckets(); - assertEquals(datesForMinuteInterval.size(), buckets.size()); - for (int i = 0; i < buckets.size(); i++) { - final Histogram.Bucket bucket = buckets.get(i); - assertEquals(datesForMinuteInterval.get(i), bucket.getKey()); - assertEquals(1, bucket.getDocCount()); - } - } + result -> assertThat(bucketCountsAsMap(result), equalTo(skeletonDocCount)) ); + Map fullDocCount = new TreeMap<>(); + fullDocCount.put("2017-02-01T09:02:00.000Z", 2); + fullDocCount.put("2017-02-01T09:07:00.000Z", 0); + fullDocCount.put("2017-02-01T09:12:00.000Z", 3); testSearchAndReduceCase(DEFAULT_QUERY, datesForMinuteInterval, + aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD), + result -> assertThat(bucketCountsAsMap(result), equalTo(fullDocCount)) + ); + + testSearchCase(DEFAULT_QUERY, datesForMinuteInterval, aggregation -> aggregation.setNumBuckets(15).field(DATE_FIELD), - histogram -> { - final Map expectedDocCount = new HashMap<>(); - expectedDocCount.put(datesForMinuteInterval.get(0).withSecond(0), 2); - expectedDocCount.put(datesForMinuteInterval.get(2).withSecond(0), 1); - expectedDocCount.put(datesForMinuteInterval.get(3).withSecond(0), 2); - final List buckets = histogram.getBuckets(); - assertEquals(15, buckets.size()); - buckets.forEach(bucket -> - assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount())); - } + result -> assertThat(bucketCountsAsMap(result), equalTo(skeletonDocCount)) + ); + fullDocCount.clear(); + fullDocCount.putAll(skeletonDocCount); + for (int minute = 3; minute < 15; minute++) { + fullDocCount.put(String.format(Locale.ROOT, "2017-02-01T09:%02d:00.000Z", minute), 0); + } + testSearchAndReduceCase(DEFAULT_QUERY, datesForMinuteInterval, + aggregation -> aggregation.setNumBuckets(15).field(DATE_FIELD), + result -> assertThat(bucketCountsAsMap(result), equalTo(fullDocCount)) ); } @@ -721,27 +833,21 @@ public void testIntervalSecond() throws IOException { ZonedDateTime.of(2017, 2, 1, 0, 0, 11, 688, ZoneOffset.UTC), ZonedDateTime.of(2017, 2, 1, 0, 0, 11, 210, ZoneOffset.UTC), ZonedDateTime.of(2017, 2, 1, 0, 0, 11, 380, ZoneOffset.UTC)); - final ZonedDateTime startDate = datesForSecondInterval.get(0).withNano(0); - final Map expectedDocCount = new HashMap<>(); - expectedDocCount.put(startDate, 1); - expectedDocCount.put(startDate.plusSeconds(2), 2); - expectedDocCount.put(startDate.plusSeconds(6), 3); - + Map expectedDocCount = new TreeMap<>(); + expectedDocCount.put("2017-02-01T00:00:05.000Z", 1); + expectedDocCount.put("2017-02-01T00:00:07.000Z", 2); + expectedDocCount.put("2017-02-01T00:00:11.000Z", 3); testSearchCase(DEFAULT_QUERY, datesForSecondInterval, - aggregation -> aggregation.setNumBuckets(7).field(DATE_FIELD), histogram -> { - final List buckets = histogram.getBuckets(); - assertEquals(expectedDocCount.size(), buckets.size()); - buckets.forEach(bucket -> - assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount())); - }); + aggregation -> aggregation.setNumBuckets(7).field(DATE_FIELD), + result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount)) + ); + expectedDocCount.put("2017-02-01T00:00:06.000Z", 0); + expectedDocCount.put("2017-02-01T00:00:08.000Z", 0); + expectedDocCount.put("2017-02-01T00:00:09.000Z", 0); + expectedDocCount.put("2017-02-01T00:00:10.000Z", 0); testSearchAndReduceCase(DEFAULT_QUERY, datesForSecondInterval, aggregation -> aggregation.setNumBuckets(7).field(DATE_FIELD), - histogram -> { - final List buckets = histogram.getBuckets(); - assertEquals(7, buckets.size()); - buckets.forEach(bucket -> - assertEquals(expectedDocCount.getOrDefault(bucket.getKey(), 0).longValue(), bucket.getDocCount())); - } + result -> assertThat(bucketCountsAsMap(result), equalTo(expectedDocCount)) ); } @@ -859,6 +965,25 @@ private void indexSampleData(List dataset, RandomIndexWriter inde } } + private Map bucketCountsAsMap(InternalAutoDateHistogram result) { + LinkedHashMap map = new LinkedHashMap<>(result.getBuckets().size()); + result.getBuckets().stream().forEach(b -> { + Object old = map.put(b.getKeyAsString(), Math.toIntExact(b.getDocCount())); + assertNull(old); + }); + return map; + } + + private Map maxAsMap(InternalAutoDateHistogram result) { + LinkedHashMap map = new LinkedHashMap<>(result.getBuckets().size()); + result.getBuckets().stream().forEach(b -> { + InternalMax max = b.getAggregations().get("max"); + Object old = map.put(b.getKeyAsString(), max.getValue()); + assertNull(old); + }); + return map; + } + @Override public void doAssertReducedMultiBucketConsumer(Aggregation agg, MultiBucketConsumerService.MultiBucketConsumer bucketConsumer) { /* diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTestCase.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTestCase.java new file mode 100644 index 0000000000000..3f8db1c465e3a --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTestCase.java @@ -0,0 +1,105 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.histogram; + +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.CheckedBiConsumer; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.KeywordFieldMapper; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.InternalAggregation; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +public abstract class DateHistogramAggregatorTestCase extends AggregatorTestCase { + /** + * A date that is always "aggregable" because it has doc values but may or + * may not have a search index. If it doesn't then we can't use our fancy + * date rounding mechanism that needs to know the minimum and maximum dates + * it is going to round because it ready *that* out of the search index. + */ + protected static final String AGGREGABLE_DATE = "aggregable_date"; + + protected final void asSubAggTestCase(AggregationBuilder builder, Consumer verify) + throws IOException { + CheckedBiConsumer buildIndex = (iw, dft) -> { + iw.addDocument(List.of( + new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2020-02-01T00:00:00Z")), + new SortedSetDocValuesField("k1", new BytesRef("a")), + new SortedSetDocValuesField("k2", new BytesRef("a")), + new SortedNumericDocValuesField("n", 1) + )); + iw.addDocument(List.of( + new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2020-03-01T00:00:00Z")), + new SortedSetDocValuesField("k1", new BytesRef("a")), + new SortedSetDocValuesField("k2", new BytesRef("a")), + new SortedNumericDocValuesField("n", 2) + )); + iw.addDocument(List.of( + new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2021-02-01T00:00:00Z")), + new SortedSetDocValuesField("k1", new BytesRef("a")), + new SortedSetDocValuesField("k2", new BytesRef("a")), + new SortedNumericDocValuesField("n", 3) + )); + iw.addDocument(List.of( + new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2021-03-01T00:00:00Z")), + new SortedSetDocValuesField("k1", new BytesRef("a")), + new SortedSetDocValuesField("k2", new BytesRef("b")), + new SortedNumericDocValuesField("n", 4) + )); + iw.addDocument(List.of( + new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2020-02-01T00:00:00Z")), + new SortedSetDocValuesField("k1", new BytesRef("b")), + new SortedSetDocValuesField("k2", new BytesRef("b")), + new SortedNumericDocValuesField("n", 5) + )); + }; + asSubAggTestCase(builder, buildIndex, verify); + } + + protected final void asSubAggTestCase( + AggregationBuilder builder, + CheckedBiConsumer buildIndex, + Consumer verify + ) throws IOException { + KeywordFieldMapper.KeywordFieldType k1ft = new KeywordFieldMapper.KeywordFieldType("k1"); + KeywordFieldMapper.KeywordFieldType k2ft = new KeywordFieldMapper.KeywordFieldType("k2"); + NumberFieldMapper.NumberFieldType nft = new NumberFieldMapper.NumberFieldType("n", NumberType.LONG); + DateFieldMapper.DateFieldType dft = aggregableDateFieldType(false, randomBoolean()); + testCase(builder, new MatchAllDocsQuery(), iw -> buildIndex.accept(iw, dft), verify, k1ft, k2ft, nft, dft); + } + + protected final DateFieldMapper.DateFieldType aggregableDateFieldType(boolean useNanosecondResolution, boolean isSearchable) { + return new DateFieldMapper.DateFieldType(AGGREGABLE_DATE, isSearchable, true, + DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER, + useNanosecondResolution ? DateFieldMapper.Resolution.NANOSECONDS : DateFieldMapper.Resolution.MILLISECONDS, + Collections.emptyMap()); + } +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java index 19ced2aa074ca..da7d898b3b9ba 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java @@ -22,7 +22,6 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.SortedNumericDocValuesField; -import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.RandomIndexWriter; @@ -31,13 +30,9 @@ import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.store.Directory; -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.time.DateFormatters; import org.elasticsearch.index.mapper.DateFieldMapper; -import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.search.aggregations.AggregationBuilder; -import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; @@ -53,15 +48,7 @@ import static java.util.stream.Collectors.toList; import static org.hamcrest.Matchers.equalTo; -public class DateHistogramAggregatorTests extends AggregatorTestCase { - - /** - * A date that is always "aggregable" because it has doc values but may or - * may not have a search index. If it doesn't then we can't use our fancy - * date rounding mechanism that needs to know the minimum and maximum dates - * it is going to round because it ready *that* out of the search index. - */ - private static final String AGGREGABLE_DATE = "aggregable_date"; +public class DateHistogramAggregatorTests extends DateHistogramAggregatorTestCase { /** * A date that is always "searchable" because it is indexed. */ @@ -169,39 +156,9 @@ public void testMatchAllDocs() throws IOException { } public void testAsSubAgg() throws IOException { - KeywordFieldMapper.KeywordFieldType k1ft = new KeywordFieldMapper.KeywordFieldType("k1"); - KeywordFieldMapper.KeywordFieldType k2ft = new KeywordFieldMapper.KeywordFieldType("k2"); - DateFieldMapper.DateFieldType dft = aggregableDateFieldType(false, randomBoolean()); - CheckedConsumer buildIndex = iw -> { - iw.addDocument(List.of( - new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2020-02-01T00:00:00Z")), - new SortedSetDocValuesField("k1", new BytesRef("a")), - new SortedSetDocValuesField("k2", new BytesRef("a")) - )); - iw.addDocument(List.of( - new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2020-03-01T00:00:00Z")), - new SortedSetDocValuesField("k1", new BytesRef("a")), - new SortedSetDocValuesField("k2", new BytesRef("a")) - )); - iw.addDocument(List.of( - new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2021-02-01T00:00:00Z")), - new SortedSetDocValuesField("k1", new BytesRef("a")), - new SortedSetDocValuesField("k2", new BytesRef("a")) - )); - iw.addDocument(List.of( - new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2021-03-01T00:00:00Z")), - new SortedSetDocValuesField("k1", new BytesRef("a")), - new SortedSetDocValuesField("k2", new BytesRef("b")) - )); - iw.addDocument(List.of( - new SortedNumericDocValuesField(AGGREGABLE_DATE, dft.parse("2020-02-01T00:00:00Z")), - new SortedSetDocValuesField("k1", new BytesRef("b")), - new SortedSetDocValuesField("k2", new BytesRef("b")) - )); - }; AggregationBuilder builder = new TermsAggregationBuilder("k1").field("k1").subAggregation( - new DateHistogramAggregationBuilder("dh").field(AGGREGABLE_DATE).calendarInterval(DateHistogramInterval.YEAR)); - testCase(builder, new MatchAllDocsQuery(), buildIndex, (StringTerms terms) -> { + new DateHistogramAggregationBuilder("dh").field(AGGREGABLE_DATE).calendarInterval(DateHistogramInterval.YEAR)); + asSubAggTestCase(builder, (StringTerms terms) -> { StringTerms.Bucket a = terms.getBucketByKey("a"); InternalDateHistogram adh = a.getAggregations().get("dh"); assertThat(adh.getBuckets().stream().map(bucket -> bucket.getKey().toString()).collect(toList()), equalTo(List.of( @@ -213,9 +170,9 @@ public void testAsSubAgg() throws IOException { assertThat(bdh.getBuckets().stream().map(bucket -> bucket.getKey().toString()).collect(toList()), equalTo(List.of( "2020-01-01T00:00Z" ))); - }, k1ft, dft); + }); builder = new TermsAggregationBuilder("k2").field("k2").subAggregation(builder); - testCase(builder, new MatchAllDocsQuery(), buildIndex, (StringTerms terms) -> { + asSubAggTestCase(builder, (StringTerms terms) -> { StringTerms.Bucket a = terms.getBucketByKey("a"); StringTerms ak1 = a.getAggregations().get("k1"); StringTerms.Bucket ak1a = ak1.getBucketByKey("a"); @@ -236,7 +193,7 @@ public void testAsSubAgg() throws IOException { assertThat(bk1bdh.getBuckets().stream().map(bucket -> bucket.getKey().toString()).collect(toList()), equalTo(List.of( "2020-01-01T00:00Z" ))); - }, k1ft, k2ft, dft); + }); } public void testNoDocsDeprecatedInterval() throws IOException { @@ -1274,13 +1231,6 @@ private void executeTestCase(boolean reduced, } } - private DateFieldMapper.DateFieldType aggregableDateFieldType(boolean useNanosecondResolution, boolean isSearchable) { - return new DateFieldMapper.DateFieldType(AGGREGABLE_DATE, isSearchable, true, - DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER, - useNanosecondResolution ? DateFieldMapper.Resolution.NANOSECONDS : DateFieldMapper.Resolution.MILLISECONDS, - Collections.emptyMap()); - } - private static long asLong(String dateTime) { return DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(dateTime)).toInstant().toEpochMilli(); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrdsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrdsTests.java index 3863689c4f582..3aa8cca9d74d6 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrdsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/LongKeyedBucketOrdsTests.java @@ -30,6 +30,7 @@ import java.util.Set; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class LongKeyedBucketOrdsTests extends ESTestCase { private final MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); @@ -96,6 +97,8 @@ private void collectsFromSingleBucketCase(LongKeyedBucketOrds ords) { assertThat(ordsEnum.value(), equalTo(values[i])); } assertFalse(ordsEnum.next()); + + assertThat(ords.maxOwningBucketOrd(), equalTo(0L)); } finally { ords.close(); } @@ -164,6 +167,8 @@ public void testCollectsFromManyBuckets() { } assertFalse(ords.ordsEnum(randomLongBetween(maxOwningBucketOrd + 1, Long.MAX_VALUE)).next()); assertThat(ords.bucketsInOrd(randomLongBetween(maxOwningBucketOrd + 1, Long.MAX_VALUE)), equalTo(0L)); + + assertThat(ords.maxOwningBucketOrd(), greaterThanOrEqualTo(maxOwningBucketOrd)); } }