Skip to content

Commit 872004a

Browse files
committed
aggs: Allow aggregation sorting via nested aggregation.
The nested aggregator now buffers all bucket ords per parent document and emits all bucket ords for a parent document's nested document once. This way the nested documents document DocIdSetIterator gets used once per bucket instead of wrapping the nested aggregator inside a multi bucket aggregator, which was the current solution upto now. This allows sorting by buckets under a nested bucket. Closes #16838
1 parent 00eec59 commit 872004a

File tree

4 files changed

+321
-27
lines changed

4 files changed

+321
-27
lines changed

core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java

Lines changed: 96 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.search.aggregations.bucket.nested;
2020

21+
import com.carrotsearch.hppc.LongArrayList;
2122
import org.apache.lucene.index.IndexReaderContext;
2223
import org.apache.lucene.index.LeafReaderContext;
2324
import org.apache.lucene.index.ReaderUtil;
@@ -51,14 +52,19 @@ class NestedAggregator extends BucketsAggregator implements SingleBucketAggregat
5152

5253
private final BitSetProducer parentFilter;
5354
private final Query childFilter;
55+
private final boolean collectsFromSingleBucket;
56+
57+
private BufferingNestedLeafBucketCollector bufferingNestedLeafBucketCollector;
5458

5559
NestedAggregator(String name, AggregatorFactories factories, ObjectMapper parentObjectMapper, ObjectMapper childObjectMapper,
56-
SearchContext context, Aggregator parentAggregator,
57-
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
60+
SearchContext context, Aggregator parentAggregator,
61+
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData,
62+
boolean collectsFromSingleBucket) throws IOException {
5863
super(name, factories, context, parentAggregator, pipelineAggregators, metaData);
5964
Query parentFilter = parentObjectMapper != null ? parentObjectMapper.nestedTypeFilter() : Queries.newNonNestedFilter();
6065
this.parentFilter = context.bitsetFilterCache().getBitSetProducer(parentFilter);
6166
this.childFilter = childObjectMapper.nestedTypeFilter();
67+
this.collectsFromSingleBucket = collectsFromSingleBucket;
6268
}
6369

6470
@Override
@@ -71,26 +77,38 @@ public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final L
7177

7278
final BitSet parentDocs = parentFilter.getBitSet(ctx);
7379
final DocIdSetIterator childDocs = childDocsScorer != null ? childDocsScorer.iterator() : null;
74-
return new LeafBucketCollectorBase(sub, null) {
75-
@Override
76-
public void collect(int parentDoc, long bucket) throws IOException {
77-
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
78-
// doc), so we can skip:
79-
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
80-
return;
81-
}
80+
if (collectsFromSingleBucket) {
81+
return new LeafBucketCollectorBase(sub, null) {
82+
@Override
83+
public void collect(int parentDoc, long bucket) throws IOException {
84+
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
85+
// doc), so we can skip:
86+
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
87+
return;
88+
}
8289

83-
final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
84-
int childDocId = childDocs.docID();
85-
if (childDocId <= prevParentDoc) {
86-
childDocId = childDocs.advance(prevParentDoc + 1);
87-
}
90+
final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
91+
int childDocId = childDocs.docID();
92+
if (childDocId <= prevParentDoc) {
93+
childDocId = childDocs.advance(prevParentDoc + 1);
94+
}
8895

89-
for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
90-
collectBucket(sub, childDocId, bucket);
96+
for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
97+
collectBucket(sub, childDocId, bucket);
98+
}
9199
}
92-
}
93-
};
100+
};
101+
} else {
102+
doPostCollection();
103+
return bufferingNestedLeafBucketCollector = new BufferingNestedLeafBucketCollector(sub, parentDocs, childDocs);
104+
}
105+
}
106+
107+
@Override
108+
protected void doPostCollection() throws IOException {
109+
if (bufferingNestedLeafBucketCollector != null) {
110+
bufferingNestedLeafBucketCollector.postCollect();
111+
}
94112
}
95113

96114
@Override
@@ -104,4 +122,63 @@ public InternalAggregation buildEmptyAggregation() {
104122
return new InternalNested(name, 0, buildEmptySubAggregations(), pipelineAggregators(), metaData());
105123
}
106124

125+
class BufferingNestedLeafBucketCollector extends LeafBucketCollectorBase {
126+
127+
final BitSet parentDocs;
128+
final LeafBucketCollector sub;
129+
final DocIdSetIterator childDocs;
130+
final LongArrayList bucketBuffer = new LongArrayList();
131+
132+
int currentParentDoc = -1;
133+
134+
BufferingNestedLeafBucketCollector(LeafBucketCollector sub, BitSet parentDocs, DocIdSetIterator childDocs) {
135+
super(sub, null);
136+
this.sub = sub;
137+
this.parentDocs = parentDocs;
138+
this.childDocs = childDocs;
139+
}
140+
141+
@Override
142+
public void collect(int parentDoc, long bucket) throws IOException {
143+
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
144+
// doc), so we can skip:
145+
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
146+
return;
147+
}
148+
149+
if (currentParentDoc != parentDoc) {
150+
processChildBuckets(currentParentDoc, bucketBuffer);
151+
currentParentDoc = parentDoc;
152+
}
153+
bucketBuffer.add(bucket);
154+
}
155+
156+
void processChildBuckets(int parentDoc, LongArrayList buckets) throws IOException {
157+
if (bucketBuffer.isEmpty()) {
158+
return;
159+
}
160+
161+
162+
final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
163+
int childDocId = childDocs.docID();
164+
if (childDocId <= prevParentDoc) {
165+
childDocId = childDocs.advance(prevParentDoc + 1);
166+
}
167+
168+
for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
169+
final long[] buffer = buckets.buffer;
170+
final int size = buckets.size();
171+
for (int i = 0; i < size; i++) {
172+
collectBucket(sub, childDocId, buffer[i]);
173+
}
174+
}
175+
bucketBuffer.clear();
176+
}
177+
178+
void postCollect() throws IOException {
179+
processChildBuckets(currentParentDoc, bucketBuffer);
180+
}
181+
182+
}
183+
107184
}

core/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregatorFactory.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,11 @@ class NestedAggregatorFactory extends AggregatorFactory<NestedAggregatorFactory>
4848
@Override
4949
public Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators,
5050
Map<String, Object> metaData) throws IOException {
51-
if (collectsFromSingleBucket == false) {
52-
return asMultiBucketAggregator(this, context, parent);
53-
}
5451
if (childObjectMapper == null) {
5552
return new Unmapped(name, context, parent, pipelineAggregators, metaData);
5653
}
57-
return new NestedAggregator(name, factories, parentObjectMapper, childObjectMapper, context, parent, pipelineAggregators, metaData);
54+
return new NestedAggregator(name, factories, parentObjectMapper, childObjectMapper, context, parent,
55+
pipelineAggregators, metaData, collectsFromSingleBucket);
5856
}
5957

6058
private static final class Unmapped extends NonCollectingAggregator {

core/src/main/java/org/elasticsearch/search/aggregations/metrics/tophits/TopHitsAggregator.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,6 @@ public boolean needsScores() {
9191
public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx,
9292
final LeafBucketCollector sub) throws IOException {
9393

94-
for (LongObjectPagedHashMap.Cursor<TopDocsAndLeafCollector> cursor : topDocsCollectors) {
95-
cursor.value.leafCollector = cursor.value.topLevelCollector.getLeafCollector(ctx);
96-
}
97-
9894
return new LeafBucketCollectorBase(sub, null) {
9995

10096
Scorer scorer;
@@ -103,6 +99,11 @@ public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx,
10399
public void setScorer(Scorer scorer) throws IOException {
104100
this.scorer = scorer;
105101
for (LongObjectPagedHashMap.Cursor<TopDocsAndLeafCollector> cursor : topDocsCollectors) {
102+
// Instantiate the leaf collector not in the getLeafCollector(...) method or in the constructor of this
103+
// anonymous class. Otherwise in the case this leaf bucket collector gets invoked with post collection
104+
// then we already have moved on to the next reader and then we may encounter assertion errors or
105+
// incorrect results.
106+
cursor.value.leafCollector = cursor.value.topLevelCollector.getLeafCollector(ctx);
106107
cursor.value.leafCollector.setScorer(scorer);
107108
}
108109
super.setScorer(scorer);

0 commit comments

Comments
 (0)