diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenAggregatorFactory.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenAggregatorFactory.java index 9c38fa2eae6b9..1f466f1020d18 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenAggregatorFactory.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenAggregatorFactory.java @@ -35,39 +35,49 @@ import java.util.List; import java.util.Map; -public class ChildrenAggregatorFactory - extends ValuesSourceAggregatorFactory { +public class ChildrenAggregatorFactory extends ValuesSourceAggregatorFactory { private final Query parentFilter; private final Query childFilter; - public ChildrenAggregatorFactory(String name, ValuesSourceConfig config, - Query childFilter, Query parentFilter, SearchContext context, AggregatorFactory parent, - AggregatorFactories.Builder subFactoriesBuilder, Map metaData) throws IOException { + public ChildrenAggregatorFactory(String name, + ValuesSourceConfig config, + Query childFilter, + Query parentFilter, + SearchContext context, + AggregatorFactory parent, + AggregatorFactories.Builder subFactoriesBuilder, + Map metaData) throws IOException { super(name, config, context, parent, subFactoriesBuilder, metaData); + this.childFilter = childFilter; this.parentFilter = parentFilter; } @Override - protected Aggregator createUnmapped(Aggregator parent, List pipelineAggregators, Map metaData) - throws IOException { + protected Aggregator createUnmapped(Aggregator parent, + List pipelineAggregators, Map metaData) throws IOException { return new NonCollectingAggregator(name, context, parent, pipelineAggregators, metaData) { - @Override public InternalAggregation buildEmptyAggregation() { return new InternalChildren(name, 0, buildEmptySubAggregations(), pipelineAggregators(), metaData()); } - }; } @Override - protected Aggregator doCreateInternal(WithOrdinals valuesSource, Aggregator parent, - boolean collectsFromSingleBucket, List pipelineAggregators, Map metaData) - throws IOException { + protected Aggregator doCreateInternal(WithOrdinals valuesSource, + Aggregator parent, + boolean collectsFromSingleBucket, + List pipelineAggregators, + Map metaData) throws IOException { + long maxOrd = valuesSource.globalMaxOrd(context.searcher()); - return new ParentToChildrenAggregator(name, factories, context, parent, childFilter, - parentFilter, valuesSource, maxOrd, pipelineAggregators, metaData); + if (collectsFromSingleBucket) { + return new ParentToChildrenAggregator(name, factories, context, parent, childFilter, + parentFilter, valuesSource, maxOrd, pipelineAggregators, metaData); + } else { + return asMultiBucketAggregator(this, context, parent); + } } } diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentJoinAggregator.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentJoinAggregator.java new file mode 100644 index 0000000000000..46e358319a28a --- /dev/null +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentJoinAggregator.java @@ -0,0 +1,173 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.join.aggregations; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.Scorable; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Weight; +import org.apache.lucene.util.Bits; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.util.BitArray; +import org.elasticsearch.common.util.LongHash; +import org.elasticsearch.search.aggregations.Aggregator; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.ValuesSource; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * An aggregator that joins documents based on global ordinals. + * Global ordinals that match the main query and the inFilter query are replayed + * with documents matching the outFilter query. + */ +public abstract class ParentJoinAggregator extends BucketsAggregator implements SingleBucketAggregator { + private final Weight inFilter; + private final Weight outFilter; + private final ValuesSource.Bytes.WithOrdinals valuesSource; + private final boolean singleAggregator; + + /** + * If this aggregator is nested under another aggregator we allocate a long hash per bucket. + */ + private final LongHash ordsHash; + /** + * Otherwise we use a dense bit array to record the global ordinals. + */ + private final BitArray ordsBit; + + public ParentJoinAggregator(String name, + AggregatorFactories factories, + SearchContext context, + Aggregator parent, + Query inFilter, + Query outFilter, + ValuesSource.Bytes.WithOrdinals valuesSource, + long maxOrd, + List pipelineAggregators, + Map metaData) throws IOException { + super(name, factories, context, parent, pipelineAggregators, metaData); + + if (maxOrd > Integer.MAX_VALUE) { + throw new IllegalStateException("the number of parent [" + maxOrd + "] + is greater than the allowed limit " + + "for this aggregation: " + Integer.MAX_VALUE); + } + + // these two filters are cached in the parser + this.inFilter = context.searcher().createWeight(context.searcher().rewrite(inFilter), ScoreMode.COMPLETE_NO_SCORES, 1f); + this.outFilter = context.searcher().createWeight(context.searcher().rewrite(outFilter), ScoreMode.COMPLETE_NO_SCORES, 1f); + this.valuesSource = valuesSource; + this.singleAggregator = parent == null; + this.ordsBit = singleAggregator ? new BitArray((int) maxOrd, context.bigArrays()) : null; + this.ordsHash = singleAggregator ? null : new LongHash(1, context.bigArrays()); + } + + private void addGlobalOrdinal(int globalOrdinal) { + if (singleAggregator) { + ordsBit.set(globalOrdinal); + } else { + ordsHash.add(globalOrdinal); + } + } + + private boolean existsGlobalOrdinal(int globalOrdinal) { + return singleAggregator ? ordsBit.get(globalOrdinal): ordsHash.find(globalOrdinal) >= 0; + } + + @Override + public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, + final LeafBucketCollector sub) throws IOException { + if (valuesSource == null) { + return LeafBucketCollector.NO_OP_COLLECTOR; + } + final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx); + final Bits parentDocs = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), inFilter.scorerSupplier(ctx)); + return new LeafBucketCollector() { + @Override + public void collect(int docId, long bucket) throws IOException { + assert bucket == 0; + if (parentDocs.get(docId) && globalOrdinals.advanceExact(docId)) { + int globalOrdinal = (int) globalOrdinals.nextOrd(); + assert globalOrdinal != -1 && globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS; + addGlobalOrdinal(globalOrdinal); + } + } + }; + } + + @Override + protected final void doPostCollection() throws IOException { + IndexReader indexReader = context().searcher().getIndexReader(); + for (LeafReaderContext ctx : indexReader.leaves()) { + Scorer childDocsScorer = outFilter.scorer(ctx); + if (childDocsScorer == null) { + continue; + } + DocIdSetIterator childDocsIter = childDocsScorer.iterator(); + + final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx); + + final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx); + // Set the scorer, since we now replay only the child docIds + sub.setScorer(new Scorable() { + @Override + public float score() { + return 1f; + } + + @Override + public int docID() { + return childDocsIter.docID(); + } + }); + + final Bits liveDocs = ctx.reader().getLiveDocs(); + for (int docId = childDocsIter.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = childDocsIter.nextDoc()) { + if (liveDocs != null && liveDocs.get(docId) == false) { + continue; + } + if (globalOrdinals.advanceExact(docId)) { + int globalOrdinal = (int) globalOrdinals.nextOrd(); + assert globalOrdinal != -1 && globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS; + if (existsGlobalOrdinal(globalOrdinal)) { + collectBucket(sub, docId, 0); + } + } + } + } + } + + @Override + protected void doClose() { + Releasables.close(ordsBit, ordsHash); + } +} diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java index 064d1d1e5977c..3990e8697ef63 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java @@ -18,73 +18,28 @@ */ package org.elasticsearch.join.aggregations; -import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.SortedSetDocValues; -import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Query; -import org.apache.lucene.search.Scorable; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.Scorer; -import org.apache.lucene.search.Weight; -import org.apache.lucene.util.Bits; import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.util.LongArray; -import org.elasticsearch.common.util.LongObjectPagedHashMap; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.InternalAggregation; -import org.elasticsearch.search.aggregations.LeafBucketCollector; -import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; -import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.internal.SearchContext; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; -// The RecordingPerReaderBucketCollector assumes per segment recording which isn't the case for this -// aggregation, for this reason that collector can't be used -public class ParentToChildrenAggregator extends BucketsAggregator implements SingleBucketAggregator { +public class ParentToChildrenAggregator extends ParentJoinAggregator { static final ParseField TYPE_FIELD = new ParseField("type"); - private final Weight childFilter; - private final Weight parentFilter; - private final ValuesSource.Bytes.WithOrdinals valuesSource; - - // Maybe use PagedGrowableWriter? This will be less wasteful than LongArray, - // but then we don't have the reuse feature of BigArrays. - // Also if we know the highest possible value that a parent agg will create - // then we store multiple values into one slot - private final LongArray parentOrdToBuckets; - - // Only pay the extra storage price if the a parentOrd has multiple buckets - // Most of the times a parent doesn't have multiple buckets, since there is - // only one document per parent ord, - // only in the case of terms agg if a parent doc has multiple terms per - // field this is needed: - private final LongObjectPagedHashMap parentOrdToOtherBuckets; - private boolean multipleBucketsPerParentOrd = false; - public ParentToChildrenAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent, Query childFilter, Query parentFilter, ValuesSource.Bytes.WithOrdinals valuesSource, - long maxOrd, List pipelineAggregators, Map metaData) - throws IOException { - super(name, factories, context, parent, pipelineAggregators, metaData); - // these two filters are cached in the parser - this.childFilter = context.searcher().createWeight(context.searcher().rewrite(childFilter), ScoreMode.COMPLETE_NO_SCORES, 1f); - this.parentFilter = context.searcher().createWeight(context.searcher().rewrite(parentFilter), ScoreMode.COMPLETE_NO_SCORES, 1f); - this.parentOrdToBuckets = context.bigArrays().newLongArray(maxOrd, false); - this.parentOrdToBuckets.fill(0, maxOrd, -1); - this.parentOrdToOtherBuckets = new LongObjectPagedHashMap<>(context.bigArrays()); - this.valuesSource = valuesSource; + long maxOrd, List pipelineAggregators, Map metaData) throws IOException { + super(name, factories, context, parent, parentFilter, childFilter, valuesSource, maxOrd, pipelineAggregators, metaData); } @Override @@ -99,96 +54,4 @@ public InternalAggregation buildEmptyAggregation() { metaData()); } - @Override - public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, - final LeafBucketCollector sub) throws IOException { - if (valuesSource == null) { - return LeafBucketCollector.NO_OP_COLLECTOR; - } - final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx); - final Bits parentDocs = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), parentFilter.scorerSupplier(ctx)); - return new LeafBucketCollector() { - - @Override - public void collect(int docId, long bucket) throws IOException { - if (parentDocs.get(docId) && globalOrdinals.advanceExact(docId)) { - long globalOrdinal = globalOrdinals.nextOrd(); - assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS; - if (globalOrdinal != -1) { - if (parentOrdToBuckets.get(globalOrdinal) == -1) { - parentOrdToBuckets.set(globalOrdinal, bucket); - } else { - long[] bucketOrds = parentOrdToOtherBuckets.get(globalOrdinal); - if (bucketOrds != null) { - bucketOrds = Arrays.copyOf(bucketOrds, bucketOrds.length + 1); - bucketOrds[bucketOrds.length - 1] = bucket; - parentOrdToOtherBuckets.put(globalOrdinal, bucketOrds); - } else { - parentOrdToOtherBuckets.put(globalOrdinal, new long[] { bucket }); - } - multipleBucketsPerParentOrd = true; - } - } - } - } - }; - } - - @Override - protected void doPostCollection() throws IOException { - IndexReader indexReader = context().searcher().getIndexReader(); - for (LeafReaderContext ctx : indexReader.leaves()) { - Scorer childDocsScorer = childFilter.scorer(ctx); - if (childDocsScorer == null) { - continue; - } - DocIdSetIterator childDocsIter = childDocsScorer.iterator(); - - final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx); - - final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx); - // Set the scorer, since we now replay only the child docIds - sub.setScorer(new Scorable() { - @Override - public float score() { - return 1f; - } - - @Override - public int docID() { - return childDocsIter.docID(); - } - }); - - final Bits liveDocs = ctx.reader().getLiveDocs(); - for (int docId = childDocsIter - .nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = childDocsIter - .nextDoc()) { - if (liveDocs != null && liveDocs.get(docId) == false) { - continue; - } - if (globalOrdinals.advanceExact(docId)) { - long globalOrdinal = globalOrdinals.nextOrd(); - assert globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS; - long bucketOrd = parentOrdToBuckets.get(globalOrdinal); - if (bucketOrd != -1) { - collectBucket(sub, docId, bucketOrd); - if (multipleBucketsPerParentOrd) { - long[] otherBucketOrds = parentOrdToOtherBuckets.get(globalOrdinal); - if (otherBucketOrds != null) { - for (long otherBucketOrd : otherBucketOrds) { - collectBucket(sub, docId, otherBucketOrd); - } - } - } - } - } - } - } - } - - @Override - protected void doClose() { - Releasables.close(parentOrdToBuckets, parentOrdToOtherBuckets); - } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BitArray.java b/server/src/main/java/org/elasticsearch/common/util/BitArray.java similarity index 92% rename from server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BitArray.java rename to server/src/main/java/org/elasticsearch/common/util/BitArray.java index 6b35d7d2e2e0a..54fa4a669de29 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BitArray.java +++ b/server/src/main/java/org/elasticsearch/common/util/BitArray.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.search.aggregations.bucket.composite; +package org.elasticsearch.common.util; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; @@ -30,11 +30,11 @@ * The underlying long array grows lazily based on the biggest index * that needs to be set. */ -final class BitArray implements Releasable { +public final class BitArray implements Releasable { private final BigArrays bigArrays; private LongArray bits; - BitArray(BigArrays bigArrays, int initialSize) { + public BitArray(int initialSize, BigArrays bigArrays) { this.bigArrays = bigArrays; this.bits = bigArrays.newLongArray(initialSize, true); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java index d243b0e75924e..633d919f140cc 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BitArray; import org.elasticsearch.common.util.DoubleArray; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.index.mapper.MappedFieldType; @@ -48,7 +49,7 @@ class DoubleValuesSource extends SingleDimensionValuesSource { DocValueFormat format, boolean missingBucket, int size, int reverseMul) { super(bigArrays, format, fieldType, missingBucket, size, reverseMul); this.docValuesFunc = docValuesFunc; - this.bits = missingBucket ? new BitArray(bigArrays, 100) : null; + this.bits = missingBucket ? new BitArray(100, bigArrays) : null; this.values = bigArrays.newDoubleArray(Math.min(size, 100), false); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java index 6d5e9f7d6e251..e5ecbd6d00e20 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.BitArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; @@ -61,7 +62,7 @@ class LongValuesSource extends SingleDimensionValuesSource { this.bigArrays = bigArrays; this.docValuesFunc = docValuesFunc; this.rounding = rounding; - this.bits = missingBucket ? new BitArray(bigArrays, Math.min(size, 100)) : null; + this.bits = missingBucket ? new BitArray(Math.min(size, 100), bigArrays) : null; this.values = bigArrays.newLongArray(Math.min(size, 100), false); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/BitArrayTests.java b/server/src/test/java/org/elasticsearch/common/util/BitArrayTests.java similarity index 90% rename from server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/BitArrayTests.java rename to server/src/test/java/org/elasticsearch/common/util/BitArrayTests.java index 1806080260f28..518bbc08f4cf9 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/BitArrayTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/BitArrayTests.java @@ -17,9 +17,8 @@ * under the License. */ -package org.elasticsearch.search.aggregations.bucket.composite; +package org.elasticsearch.common.util; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; @@ -28,7 +27,7 @@ public class BitArrayTests extends ESTestCase { public void testRandom() { - try (BitArray bitArray = new BitArray(BigArrays.NON_RECYCLING_INSTANCE, 1)) { + try (BitArray bitArray = new BitArray(1, BigArrays.NON_RECYCLING_INSTANCE)) { int numBits = randomIntBetween(1000, 10000); for (int step = 0; step < 3; step++) { boolean[] bits = new boolean[numBits];