diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java index 9c4cf46e094eb..e316fa4e96a73 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java @@ -19,11 +19,13 @@ package org.elasticsearch.search.aggregations.bucket.terms; +import org.apache.lucene.index.IndexReader; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; +import org.elasticsearch.search.internal.ContextIndexSearcher; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; @@ -42,10 +44,17 @@ abstract class AbstractStringTermsAggregator extends TermsAggregator { this.showTermDocCountError = showTermDocCountError; } - @Override - public InternalAggregation buildEmptyAggregation() { + protected StringTerms buildEmptyTermsAggregation() { return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, 0, emptyList(), 0); } + protected SignificantStringTerms buildEmptySignificantTermsAggregation(SignificanceHeuristic significanceHeuristic) { + // We need to account for the significance of a miss in our global stats - provide corpus size as context + ContextIndexSearcher searcher = context.searcher(); + IndexReader topReader = searcher.getIndexReader(); + int supersetSize = topReader.numDocs(); + return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), + metadata(), format, 0, supersetSize, significanceHeuristic, emptyList()); + } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsSignificantTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsSignificantTermsAggregator.java deleted file mode 100644 index d2ffbaab42d26..0000000000000 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsSignificantTermsAggregator.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.search.aggregations.bucket.terms; - -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.search.DocValueFormat; -import org.elasticsearch.search.aggregations.Aggregator; -import org.elasticsearch.search.aggregations.AggregatorFactories; -import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; -import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; -import org.elasticsearch.search.aggregations.support.ValuesSource; -import org.elasticsearch.search.internal.ContextIndexSearcher; -import org.elasticsearch.search.internal.SearchContext; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Map; - -import static java.util.Collections.emptyList; - -/** - * An global ordinal based implementation of significant terms, based on {@link SignificantStringTermsAggregator}. - */ -public class GlobalOrdinalsSignificantTermsAggregator extends GlobalOrdinalsStringTermsAggregator { - - protected long numCollectedDocs; - protected final SignificantTermsAggregatorFactory termsAggFactory; - private final SignificanceHeuristic significanceHeuristic; - - public GlobalOrdinalsSignificantTermsAggregator(String name, - AggregatorFactories factories, - ValuesSource.Bytes.WithOrdinals.FieldData valuesSource, - DocValueFormat format, - BucketCountThresholds bucketCountThresholds, - IncludeExclude.OrdinalsFilter includeExclude, - SearchContext context, - Aggregator parent, - boolean forceRemapGlobalOrds, - SignificanceHeuristic significanceHeuristic, - SignificantTermsAggregatorFactory termsAggFactory, - Map metadata) throws IOException { - super(name, factories, valuesSource, null, format, bucketCountThresholds, includeExclude, context, parent, - forceRemapGlobalOrds, SubAggCollectionMode.BREADTH_FIRST, false, metadata); - this.significanceHeuristic = significanceHeuristic; - this.termsAggFactory = termsAggFactory; - this.numCollectedDocs = 0; - } - - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { - return new LeafBucketCollectorBase(super.getLeafCollector(ctx, sub), null) { - @Override - public void collect(int doc, long bucket) throws IOException { - super.collect(doc, bucket); - numCollectedDocs++; - } - }; - } - - @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0; - if (valueCount == 0) { // no context in this reader - return new InternalAggregation[] {buildEmptyAggregation()}; - } - - final int size; - if (bucketCountThresholds.getMinDocCount() == 0) { - // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns - size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize()); - } else { - size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize()); - } - long supersetSize = termsAggFactory.getSupersetNumDocs(); - long subsetSize = numCollectedDocs; - - BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue<>(size); - collectionStrategy.forEach(new BucketInfoConsumer() { - SignificantStringTerms.Bucket spare = null; - - @Override - public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException { - if (docCount >= bucketCountThresholds.getShardMinDocCount()) { - if (spare == null) { - spare = new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0); - } - spare.bucketOrd = bucketOrd; - copy(lookupGlobalOrd.apply(globalOrd), spare.termBytes); - spare.subsetDf = docCount; - spare.subsetSize = subsetSize; - spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes); - spare.supersetSize = supersetSize; - // During shard-local down-selection we use subset/superset stats - // that are for this shard only - // Back at the central reducer these properties will be updated with - // global stats - spare.updateScore(significanceHeuristic); - spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } - } - } - }); - - final SignificantStringTerms.Bucket[] list = new SignificantStringTerms.Bucket[ordered.size()]; - for (int i = ordered.size() - 1; i >= 0; i--) { - list[i] = ordered.pop(); - /* - * The terms are owned by the BytesRefHash which will close after - * we're finished building the aggregation so we need to pull a copy. - */ - list[i].termBytes = BytesRef.deepCopyOf(list[i].termBytes); - } - buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); - - return new InternalAggregation[] { - new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, subsetSize, supersetSize, significanceHeuristic, Arrays.asList(list)) - }; - } - - @Override - public SignificantStringTerms buildEmptyAggregation() { - // We need to account for the significance of a miss in our global stats - provide corpus size as context - ContextIndexSearcher searcher = context.searcher(); - IndexReader topReader = searcher.getIndexReader(); - int supersetSize = topReader.numDocs(); - return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, numCollectedDocs, supersetSize, significanceHeuristic, emptyList()); - } - - @Override - protected void doClose() { - super.doClose(); - Releasables.close(termsAggFactory); - } -} - diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index af8257f1c44c6..83126c3385f53 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -26,6 +26,7 @@ import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; @@ -38,8 +39,10 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.internal.SearchContext; @@ -47,6 +50,7 @@ import java.util.Arrays; import java.util.Map; import java.util.function.BiConsumer; +import java.util.function.Function; import java.util.function.LongPredicate; import java.util.function.LongUnaryOperator; @@ -56,7 +60,7 @@ * An aggregator of string values that relies on global ordinals in order to build buckets. */ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggregator { - + protected final ResultStrategy resultStrategy; protected final ValuesSource.Bytes.WithOrdinals valuesSource; // TODO: cache the acceptedglobalValues per aggregation definition. @@ -65,28 +69,33 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr // first defined one. // So currently for each instance of this aggregator the acceptedglobalValues will be computed, this is unnecessary // especially if this agg is on a second layer or deeper. - protected final LongPredicate acceptedGlobalOrdinals; - protected final long valueCount; - protected final GlobalOrdLookupFunction lookupGlobalOrd; + private final LongPredicate acceptedGlobalOrdinals; + private final long valueCount; + private final GlobalOrdLookupFunction lookupGlobalOrd; protected final CollectionStrategy collectionStrategy; public interface GlobalOrdLookupFunction { BytesRef apply(long ord) throws IOException; } - public GlobalOrdinalsStringTermsAggregator(String name, AggregatorFactories factories, - ValuesSource.Bytes.WithOrdinals valuesSource, - BucketOrder order, - DocValueFormat format, - BucketCountThresholds bucketCountThresholds, - IncludeExclude.OrdinalsFilter includeExclude, - SearchContext context, - Aggregator parent, - boolean remapGlobalOrds, - SubAggCollectionMode collectionMode, - boolean showTermDocCountError, - Map metadata) throws IOException { + public GlobalOrdinalsStringTermsAggregator( + String name, + AggregatorFactories factories, + Function> resultStrategy, + ValuesSource.Bytes.WithOrdinals valuesSource, + BucketOrder order, + DocValueFormat format, + BucketCountThresholds bucketCountThresholds, + IncludeExclude.OrdinalsFilter includeExclude, + SearchContext context, + Aggregator parent, + boolean remapGlobalOrds, + SubAggCollectionMode collectionMode, + boolean showTermDocCountError, + Map metadata + ) throws IOException { super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata); + this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job. this.valuesSource = valuesSource; final IndexReader reader = context.searcher().getIndexReader(); final SortedSetDocValues values = reader.leaves().size() > 0 ? @@ -107,105 +116,50 @@ private SortedSetDocValues getGlobalOrds(LeafReaderContext ctx) throws IOExcepti } @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { SortedSetDocValues globalOrds = getGlobalOrds(ctx); collectionStrategy.globalOrdsReady(globalOrds); SortedDocValues singleValues = DocValues.unwrapSingleton(globalOrds); if (singleValues != null) { - return new LeafBucketCollectorBase(sub, globalOrds) { + return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, globalOrds) { @Override - public void collect(int doc, long bucket) throws IOException { - assert bucket == 0; + public void collect(int doc, long owningBucketOrd) throws IOException { + assert owningBucketOrd == 0; if (singleValues.advanceExact(doc)) { int ord = singleValues.ordValue(); collectionStrategy.collectGlobalOrd(doc, ord, sub); } } - }; + }); } - return new LeafBucketCollectorBase(sub, globalOrds) { + return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, globalOrds) { @Override - public void collect(int doc, long bucket) throws IOException { - assert bucket == 0; + public void collect(int doc, long owningBucketOrd) throws IOException { + assert owningBucketOrd == 0; if (globalOrds.advanceExact(doc)) { for (long globalOrd = globalOrds.nextOrd(); globalOrd != NO_MORE_ORDS; globalOrd = globalOrds.nextOrd()) { collectionStrategy.collectGlobalOrd(doc, globalOrd, sub); } } } - }; - } - - protected static void copy(BytesRef from, BytesRef to) { - if (to.bytes.length < from.length) { - to.bytes = new byte[ArrayUtil.oversize(from.length, 1)]; - } - to.offset = 0; - to.length = from.length; - System.arraycopy(from.bytes, from.offset, to.bytes, 0, from.length); + }); } @Override public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0; - if (valueCount == 0) { // no context in this reader - return new InternalAggregation[] {buildEmptyAggregation()}; - } - - final int size; - if (bucketCountThresholds.getMinDocCount() == 0) { - // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns - size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize()); - } else { - size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize()); - } - long[] otherDocCount = new long[1]; - BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); - collectionStrategy.forEach(new BucketInfoConsumer() { - OrdBucket spare = null; - - @Override - public void accept(long globalOrd, long bucketOrd, long docCount) { - otherDocCount[0] += docCount; - if (docCount >= bucketCountThresholds.getShardMinDocCount()) { - if (spare == null) { - spare = new OrdBucket(showTermDocCountError, format); - } - spare.globalOrd = globalOrd; - spare.bucketOrd = bucketOrd; - spare.docCount = docCount; - spare = ordered.insertWithOverflow(spare); - if (spare == null) { - consumeBucketsAndMaybeBreak(1); - } - } - } - }); + return resultStrategy.buildAggregations(owningBucketOrds); + } - // Get the top buckets - final StringTerms.Bucket[] list = new StringTerms.Bucket[ordered.size()]; - for (int i = ordered.size() - 1; i >= 0; --i) { - final OrdBucket bucket = ordered.pop(); - BytesRef scratch = new BytesRef(); - copy(lookupGlobalOrd.apply(bucket.globalOrd), scratch); - list[i] = new StringTerms.Bucket(scratch, bucket.docCount, null, showTermDocCountError, 0, format); - list[i].bucketOrd = bucket.bucketOrd; - otherDocCount[0] -= list[i].docCount; - list[i].docCountError = 0; - } - buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); - - return new InternalAggregation[] { - new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), - metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, - otherDocCount[0], Arrays.asList(list), 0) - }; + @Override + public InternalAggregation buildEmptyAggregation() { + return resultStrategy.buildEmptyResult(); } @Override public void collectDebugInfo(BiConsumer add) { super.collectDebugInfo(add); add.accept("collection_strategy", collectionStrategy.describe()); + add.accept("result_strategy", resultStrategy.describe()); } /** @@ -251,40 +205,46 @@ protected final XContentBuilder keyToXContent(XContentBuilder builder) throws IO @Override protected void doClose() { - Releasables.close(collectionStrategy); + Releasables.close(resultStrategy, collectionStrategy); } /** - * Variant of {@link GlobalOrdinalsStringTermsAggregator} that resolves global ordinals post segment collection - * instead of on the fly for each match.This is beneficial for low cardinality fields, because it can reduce - * the amount of look-ups significantly. + * Variant of {@link GlobalOrdinalsStringTermsAggregator} that + * resolves global ordinals post segment collection instead of on the fly + * for each match.This is beneficial for low cardinality fields, because + * it can reduce the amount of look-ups significantly. + *

+ * This is only supported for the standard {@code terms} aggregation and + * doesn't support {@code significant_terms} so this forces + * {@link StandardTermsResults}. */ static class LowCardinality extends GlobalOrdinalsStringTermsAggregator { private LongUnaryOperator mapping; private IntArray segmentDocCounts; - LowCardinality(String name, - AggregatorFactories factories, - ValuesSource.Bytes.WithOrdinals valuesSource, - BucketOrder order, - DocValueFormat format, - BucketCountThresholds bucketCountThresholds, - SearchContext context, - Aggregator parent, - boolean forceDenseMode, - SubAggCollectionMode collectionMode, - boolean showTermDocCountError, - Map metadata) throws IOException { - super(name, factories, valuesSource, order, format, bucketCountThresholds, null, + LowCardinality( + String name, + AggregatorFactories factories, + ValuesSource.Bytes.WithOrdinals valuesSource, + BucketOrder order, + DocValueFormat format, + BucketCountThresholds bucketCountThresholds, + SearchContext context, + Aggregator parent, + boolean forceDenseMode, + SubAggCollectionMode collectionMode, + boolean showTermDocCountError, + Map metadata + ) throws IOException { + super(name, factories, a -> a.new StandardTermsResults(), valuesSource, order, format, bucketCountThresholds, null, context, parent, forceDenseMode, collectionMode, showTermDocCountError, metadata); assert factories == null || factories.countAggregators() == 0; this.segmentDocCounts = context.bigArrays().newIntArray(1, true); } @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { if (mapping != null) { mapSegmentCountsToGlobalCounts(mapping); } @@ -294,29 +254,28 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final SortedDocValues singleValues = DocValues.unwrapSingleton(segmentOrds); mapping = valuesSource.globalOrdinalsMapping(ctx); if (singleValues != null) { - return new LeafBucketCollectorBase(sub, segmentOrds) { + return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, segmentOrds) { @Override - public void collect(int doc, long bucket) throws IOException { - assert bucket == 0; + public void collect(int doc, long owningBucketOrd) throws IOException { + assert owningBucketOrd == 0; if (singleValues.advanceExact(doc)) { final int ord = singleValues.ordValue(); segmentDocCounts.increment(ord + 1, 1); } } - }; - } else { - return new LeafBucketCollectorBase(sub, segmentOrds) { - @Override - public void collect(int doc, long bucket) throws IOException { - assert bucket == 0; - if (segmentOrds.advanceExact(doc)) { - for (long segmentOrd = segmentOrds.nextOrd(); segmentOrd != NO_MORE_ORDS; segmentOrd = segmentOrds.nextOrd()) { - segmentDocCounts.increment(segmentOrd + 1, 1); - } + }); + } + return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, segmentOrds) { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + assert owningBucketOrd == 0; + if (segmentOrds.advanceExact(doc)) { + for (long segmentOrd = segmentOrds.nextOrd(); segmentOrd != NO_MORE_ORDS; segmentOrd = segmentOrds.nextOrd()) { + segmentDocCounts.increment(segmentOrd + 1, 1); } } - }; - } + } + }); } @Override @@ -329,7 +288,7 @@ protected void doPostCollection() throws IOException { @Override protected void doClose() { - Releasables.close(segmentDocCounts); + Releasables.close(resultStrategy, segmentDocCounts, collectionStrategy); } private void mapSegmentCountsToGlobalCounts(LongUnaryOperator mapping) throws IOException { @@ -396,7 +355,7 @@ public boolean advanceExact(int target) throws IOException { /** * Strategy for collecting global ordinals. *

- * The {@link GlobalOrdinalsSignificantTermsAggregator} uses one of these + * The {@link GlobalOrdinalsStringTermsAggregator} uses one of these * to collect the global ordinals by calling * {@link CollectionStrategy#collectGlobalOrd(int, long, LeafBucketCollector)} * for each global ordinal that it hits and then calling @@ -545,4 +504,300 @@ public void close() { bucketOrds.close(); } } + + /** + * Strategy for building results. + */ + abstract class ResultStrategy< + R extends InternalAggregation, + B extends InternalMultiBucketAggregation.InternalBucket, + TB extends InternalMultiBucketAggregation.InternalBucket> implements Releasable { + + private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0; + if (valueCount == 0) { // no context in this reader + return new InternalAggregation[] {buildEmptyAggregation()}; + } + + final int size; + if (bucketCountThresholds.getMinDocCount() == 0) { + // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns + size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize()); + } else { + size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize()); + } + long[] otherDocCount = new long[1]; + PriorityQueue ordered = buildPriorityQueue(size); + collectionStrategy.forEach(new BucketInfoConsumer() { + TB spare = null; + + @Override + public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException { + otherDocCount[0] += docCount; + if (docCount >= bucketCountThresholds.getShardMinDocCount()) { + if (spare == null) { + spare = buildEmptyTemporaryBucket(); + } + updateBucket(spare, globalOrd, bucketOrd, docCount); + spare = ordered.insertWithOverflow(spare); + if (spare == null) { + consumeBucketsAndMaybeBreak(1); + } + } + } + }); + + // Get the top buckets + B[] topBuckets = buildBuckets(ordered.size()); + for (int i = ordered.size() - 1; i >= 0; --i) { + topBuckets[i] = convertTempBucketToRealBucket(ordered.pop()); + } + buildSubAggs(topBuckets); + + return new InternalAggregation[] { + buildResult(topBuckets) + }; + } + + /** + * Short description of the collection mechanism added to the profile + * output to help with debugging. + */ + abstract String describe(); + + /** + * Wrap the "standard" numeric terms collector to collect any more + * information that this result type may need. + */ + abstract LeafBucketCollector wrapCollector(LeafBucketCollector primary); + + /** + * Build an empty temporary bucket. + */ + abstract TB buildEmptyTemporaryBucket(); + + /** + * Update fields in {@code spare} to reflect information collected for + * this bucket ordinal. + */ + abstract void updateBucket(TB spare, long globalOrd, long bucketOrd, long docCount) throws IOException; + + /** + * Build a {@link PriorityQueue} to sort the buckets. After we've + * collected all of the buckets we'll collect all entries in the queue. + */ + abstract PriorityQueue buildPriorityQueue(int size); + + /** + * Build an array of buckets for a particular ordinal to collect the + * results. The populated list is passed to {@link #buildResult}. + */ + abstract B[] buildBuckets(int size); + + /** + * Convert a temporary bucket into a real bucket. + */ + abstract B convertTempBucketToRealBucket(TB temp) throws IOException; + + /** + * Build the sub-aggregations into the buckets. This will usually + * delegate to {@link #buildSubAggsForAllBuckets}. + */ + abstract void buildSubAggs(B[] topBuckets) throws IOException; + + /** + * Turn the buckets into an aggregation result. + */ + abstract R buildResult(B[] topBuckets); + + /** + * Build an "empty" result. Only called if there isn't any data on this + * shard. + */ + abstract R buildEmptyResult(); + } + + /** + * Builds results for the standard {@code terms} aggregation. + */ + class StandardTermsResults extends ResultStrategy { + private long otherDocCount; + + @Override + String describe() { + return "terms"; + } + + @Override + LeafBucketCollector wrapCollector(LeafBucketCollector primary) { + return primary; + } + + @Override + StringTerms.Bucket[] buildBuckets(int size) { + return new StringTerms.Bucket[size]; + } + + @Override + OrdBucket buildEmptyTemporaryBucket() { + return new OrdBucket(showTermDocCountError, format); + } + + @Override + void updateBucket(OrdBucket spare, long globalOrd, long bucketOrd, long docCount) throws IOException { + spare.globalOrd = globalOrd; + spare.bucketOrd = bucketOrd; + spare.docCount = docCount; + otherDocCount += docCount; + } + + @Override + PriorityQueue buildPriorityQueue(int size) { + return new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); + } + + StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp) throws IOException { + BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd)); + StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format); + result.bucketOrd = temp.bucketOrd; + otherDocCount -= temp.docCount; + result.docCountError = 0; + return result; + } + + @Override + void buildSubAggs(StringTerms.Bucket[] topBuckets) throws IOException { + buildSubAggsForBuckets(topBuckets, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + } + + @Override + StringTerms buildResult(StringTerms.Bucket[] topBuckets) { + return new StringTerms(name, order, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), + metadata(), format, bucketCountThresholds.getShardSize(), showTermDocCountError, + otherDocCount, Arrays.asList(topBuckets), 0); + } + + @Override + StringTerms buildEmptyResult() { + return buildEmptyTermsAggregation(); + } + + @Override + public void close() {} + } + + /** + * Builds results for the {@code significant_terms} aggregation. + */ + class SignificantTermsResults extends ResultStrategy< + SignificantStringTerms, + SignificantStringTerms.Bucket, + SignificantStringTerms.Bucket> { + + // TODO a reference to the factory is weird - probably should be reference to what we need from it. + private final SignificantTermsAggregatorFactory termsAggFactory; + private final SignificanceHeuristic significanceHeuristic; + + private long subsetSize = 0; + + SignificantTermsResults(SignificantTermsAggregatorFactory termsAggFactory, SignificanceHeuristic significanceHeuristic) { + this.termsAggFactory = termsAggFactory; + this.significanceHeuristic = significanceHeuristic; + } + + @Override + String describe() { + return "terms"; + } + + @Override + LeafBucketCollector wrapCollector(LeafBucketCollector primary) { + return new LeafBucketCollectorBase(primary, null) { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + super.collect(doc, owningBucketOrd); + subsetSize++; + } + }; + } + + @Override + SignificantStringTerms.Bucket[] buildBuckets(int size) { + return new SignificantStringTerms.Bucket[size]; + } + + @Override + SignificantStringTerms.Bucket buildEmptyTemporaryBucket() { + return new SignificantStringTerms.Bucket(new BytesRef(), 0, 0, 0, 0, null, format, 0); + } + + @Override + void updateBucket(SignificantStringTerms.Bucket spare, long globalOrd, long bucketOrd, long docCount) throws IOException { + spare.bucketOrd = bucketOrd; + oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes); + spare.subsetDf = docCount; + spare.subsetSize = subsetSize; + spare.supersetDf = termsAggFactory.getBackgroundFrequency(spare.termBytes); + spare.supersetSize = termsAggFactory.getSupersetNumDocs(); + /* + * During shard-local down-selection we use subset/superset stats + * that are for this shard only. Back at the central reducer these + * properties will be updated with global stats. + */ + spare.updateScore(significanceHeuristic); + } + + @Override + PriorityQueue buildPriorityQueue(int size) { + return new BucketSignificancePriorityQueue<>(size); + } + + @Override + SignificantStringTerms.Bucket convertTempBucketToRealBucket(SignificantStringTerms.Bucket temp) throws IOException { + return temp; + } + + @Override + void buildSubAggs(SignificantStringTerms.Bucket[] topBuckets) throws IOException { + buildSubAggsForBuckets(topBuckets, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + } + + @Override + SignificantStringTerms buildResult(SignificantStringTerms.Bucket[] topBuckets) { + return new SignificantStringTerms(name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), + metadata(), format, subsetSize, termsAggFactory.getSupersetNumDocs(), significanceHeuristic, Arrays.asList(topBuckets)); + } + + @Override + SignificantStringTerms buildEmptyResult() { + return buildEmptySignificantTermsAggregation(significanceHeuristic); + } + + @Override + public void close() { + termsAggFactory.close(); + } + + /** + * Copies the bytes from {@code from} into {@code to}, oversizing + * the destination array if the bytes won't fit into the array. + *

+ * This is fairly similar in spirit to + * {@link BytesRef#deepCopyOf(BytesRef)} in that it is a way to read + * bytes from a mutable {@link BytesRef} into + * something that won't mutate out from under you. + * Unlike {@linkplain BytesRef#deepCopyOf(BytesRef)} its designed to + * be run over and over again into the same destination. In particular, + * oversizing the destination bytes helps to keep from allocating + * a bunch of little arrays over and over and over again. + */ + private void oversizedCopy(BytesRef from, BytesRef to) { + if (to.bytes.length < from.length) { + to.bytes = new byte[ArrayUtil.oversize(from.length, 1)]; + } + to.offset = 0; + to.length = from.length; + System.arraycopy(from.bytes, from.offset, to.bytes, 0, from.length); + } + } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index 17b536941790a..76a4d0d2e16d4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -78,7 +78,7 @@ public NumericTermsAggregator( ) throws IOException { super(name, factories, aggregationContext, parent, bucketCountThresholds, order, format, subAggCollectMode, metadata); - this.resultStrategy = resultStrategy.apply(this); + this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job. this.valuesSource = valuesSource; this.longFilter = longFilter; bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java index 129ffa2c91b00..15182218fed37 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java @@ -389,10 +389,23 @@ Aggregator create(String name, **/ remapGlobalOrd = false; } - return new GlobalOrdinalsSignificantTermsAggregator(name, factories, - (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, format, bucketCountThresholds, filter, - aggregationContext, parent, remapGlobalOrd, significanceHeuristic, termsAggregatorFactory, metadata); - + + return new GlobalOrdinalsStringTermsAggregator( + name, + factories, + a -> a.new SignificantTermsResults(termsAggregatorFactory, significanceHeuristic), + (ValuesSource.Bytes.WithOrdinals.FieldData) valuesSource, + null, + format, + bucketCountThresholds, + filter, + aggregationContext, + parent, + remapGlobalOrd, + SubAggCollectionMode.BREADTH_FIRST, + false, + metadata + ); } }; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java index 84ada76cff83a..3b64a6979977b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregator.java @@ -171,6 +171,11 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I }; } + @Override + public InternalAggregation buildEmptyAggregation() { + return buildEmptyTermsAggregation(); + } + @Override public void doClose() { Releasables.close(bucketOrds); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 0b6308efd8f31..ba3dea93fdf3d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -391,9 +391,22 @@ Aggregator create(String name, remapGlobalOrds = false; } } - return new GlobalOrdinalsStringTermsAggregator(name, factories, ordinalsValuesSource, order, - format, bucketCountThresholds, filter, context, parent, remapGlobalOrds, subAggCollectMode, showTermDocCountError, - metadata); + return new GlobalOrdinalsStringTermsAggregator( + name, + factories, + a -> a.new StandardTermsResults(), + ordinalsValuesSource, + order, + format, + bucketCountThresholds, + filter, + context, + parent, + remapGlobalOrds, + subAggCollectMode, + showTermDocCountError, + metadata + ); } };