Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.search.aggregations.bucket.nested;

import com.carrotsearch.hppc.LongArrayList;
import org.apache.lucene.index.IndexReaderContext;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.ReaderUtil;
Expand Down Expand Up @@ -51,14 +52,19 @@ class NestedAggregator extends BucketsAggregator implements SingleBucketAggregat

private final BitSetProducer parentFilter;
private final Query childFilter;
private final boolean collectsFromSingleBucket;

private BufferingNestedLeafBucketCollector bufferingNestedLeafBucketCollector;

NestedAggregator(String name, AggregatorFactories factories, ObjectMapper parentObjectMapper, ObjectMapper childObjectMapper,
SearchContext context, Aggregator parentAggregator,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
SearchContext context, Aggregator parentAggregator,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData,
boolean collectsFromSingleBucket) throws IOException {
super(name, factories, context, parentAggregator, pipelineAggregators, metaData);
Query parentFilter = parentObjectMapper != null ? parentObjectMapper.nestedTypeFilter() : Queries.newNonNestedFilter();
this.parentFilter = context.bitsetFilterCache().getBitSetProducer(parentFilter);
this.childFilter = childObjectMapper.nestedTypeFilter();
this.collectsFromSingleBucket = collectsFromSingleBucket;
}

@Override
Expand All @@ -71,26 +77,38 @@ public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final L

final BitSet parentDocs = parentFilter.getBitSet(ctx);
final DocIdSetIterator childDocs = childDocsScorer != null ? childDocsScorer.iterator() : null;
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
return;
}
if (collectsFromSingleBucket) {
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
return;
}

final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}

for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
collectBucket(sub, childDocId, bucket);
for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
collectBucket(sub, childDocId, bucket);
}
}
}
};
};
} else {
doPostCollection();
return bufferingNestedLeafBucketCollector = new BufferingNestedLeafBucketCollector(sub, parentDocs, childDocs);
}
}

@Override
protected void doPostCollection() throws IOException {
if (bufferingNestedLeafBucketCollector != null) {
bufferingNestedLeafBucketCollector.postCollect();
}
}

@Override
Expand All @@ -104,4 +122,63 @@ public InternalAggregation buildEmptyAggregation() {
return new InternalNested(name, 0, buildEmptySubAggregations(), pipelineAggregators(), metaData());
}

class BufferingNestedLeafBucketCollector extends LeafBucketCollectorBase {

final BitSet parentDocs;
final LeafBucketCollector sub;
final DocIdSetIterator childDocs;
final LongArrayList bucketBuffer = new LongArrayList();

int currentParentDoc = -1;

BufferingNestedLeafBucketCollector(LeafBucketCollector sub, BitSet parentDocs, DocIdSetIterator childDocs) {
super(sub, null);
this.sub = sub;
this.parentDocs = parentDocs;
this.childDocs = childDocs;
}

@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
return;
}

if (currentParentDoc != parentDoc) {
processChildBuckets(currentParentDoc, bucketBuffer);
currentParentDoc = parentDoc;
}
bucketBuffer.add(bucket);
}

void processChildBuckets(int parentDoc, LongArrayList buckets) throws IOException {
if (bucketBuffer.isEmpty()) {
return;
}


final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}

for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
final long[] buffer = buckets.buffer;
final int size = buckets.size();
for (int i = 0; i < size; i++) {
collectBucket(sub, childDocId, buffer[i]);
}
}
bucketBuffer.clear();
}

void postCollect() throws IOException {
processChildBuckets(currentParentDoc, bucketBuffer);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,11 @@ class NestedAggregatorFactory extends AggregatorFactory<NestedAggregatorFactory>
@Override
public Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, context, parent);
}
if (childObjectMapper == null) {
return new Unmapped(name, context, parent, pipelineAggregators, metaData);
}
return new NestedAggregator(name, factories, parentObjectMapper, childObjectMapper, context, parent, pipelineAggregators, metaData);
return new NestedAggregator(name, factories, parentObjectMapper, childObjectMapper, context, parent,
pipelineAggregators, metaData, collectsFromSingleBucket);
}

private static final class Unmapped extends NonCollectingAggregator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,6 @@ public boolean needsScores() {
public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {

for (LongObjectPagedHashMap.Cursor<TopDocsAndLeafCollector> cursor : topDocsCollectors) {
cursor.value.leafCollector = cursor.value.topLevelCollector.getLeafCollector(ctx);
}

return new LeafBucketCollectorBase(sub, null) {

Scorer scorer;
Expand All @@ -103,6 +99,11 @@ public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx,
public void setScorer(Scorer scorer) throws IOException {
this.scorer = scorer;
for (LongObjectPagedHashMap.Cursor<TopDocsAndLeafCollector> cursor : topDocsCollectors) {
// Instantiate the leaf collector not in the getLeafCollector(...) method or in the constructor of this
// anonymous class. Otherwise in the case this leaf bucket collector gets invoked with post collection
// then we already have moved on to the next reader and then we may encounter assertion errors or
// incorrect results.
cursor.value.leafCollector = cursor.value.topLevelCollector.getLeafCollector(ctx);
cursor.value.leafCollector.setScorer(scorer);
}
super.setScorer(scorer);
Expand Down
Loading