Skip to content

Commit 429dc7d

Browse files
jimczikcm
authored andcommitted
Refactor children aggregator into a generic ParentJoinAggregator (#34845)
This commit adds a new ParentJoinAggregator that implements a join using global ordinals in a way that can be reused by the `children` and the upcoming `parent` aggregation. This new aggregator is a refactor of the existing ParentToChildrenAggregator with two main changes: * It uses a dense bit array instead of a long array when the aggregation does not have any parent. * It uses a single aggregator per bucket if it is nested under another aggregation. For the latter case we use a `MultiBucketAggregatorWrapper` in the factory in order to ensure that each instance of the aggregator handles a single bucket. This is more inlined with the strategy we use for other aggregations like `terms` aggregation for instance since the number of buckets to handle should be low (thanks to the breadth_first strategy). This change is also required for #34210 which adds the `parent` aggregation in the parent-join module. Relates #34508
1 parent 7716310 commit 429dc7d

File tree

7 files changed

+209
-162
lines changed

7 files changed

+209
-162
lines changed

modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenAggregatorFactory.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,39 +35,49 @@
3535
import java.util.List;
3636
import java.util.Map;
3737

38-
public class ChildrenAggregatorFactory
39-
extends ValuesSourceAggregatorFactory<WithOrdinals, ChildrenAggregatorFactory> {
38+
public class ChildrenAggregatorFactory extends ValuesSourceAggregatorFactory<WithOrdinals, ChildrenAggregatorFactory> {
4039

4140
private final Query parentFilter;
4241
private final Query childFilter;
4342

44-
public ChildrenAggregatorFactory(String name, ValuesSourceConfig<WithOrdinals> config,
45-
Query childFilter, Query parentFilter, SearchContext context, AggregatorFactory<?> parent,
46-
AggregatorFactories.Builder subFactoriesBuilder, Map<String, Object> metaData) throws IOException {
43+
public ChildrenAggregatorFactory(String name,
44+
ValuesSourceConfig<WithOrdinals> config,
45+
Query childFilter,
46+
Query parentFilter,
47+
SearchContext context,
48+
AggregatorFactory<?> parent,
49+
AggregatorFactories.Builder subFactoriesBuilder,
50+
Map<String, Object> metaData) throws IOException {
4751
super(name, config, context, parent, subFactoriesBuilder, metaData);
52+
4853
this.childFilter = childFilter;
4954
this.parentFilter = parentFilter;
5055
}
5156

5257
@Override
53-
protected Aggregator createUnmapped(Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
54-
throws IOException {
58+
protected Aggregator createUnmapped(Aggregator parent,
59+
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
5560
return new NonCollectingAggregator(name, context, parent, pipelineAggregators, metaData) {
56-
5761
@Override
5862
public InternalAggregation buildEmptyAggregation() {
5963
return new InternalChildren(name, 0, buildEmptySubAggregations(), pipelineAggregators(), metaData());
6064
}
61-
6265
};
6366
}
6467

6568
@Override
66-
protected Aggregator doCreateInternal(WithOrdinals valuesSource, Aggregator parent,
67-
boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
68-
throws IOException {
69+
protected Aggregator doCreateInternal(WithOrdinals valuesSource,
70+
Aggregator parent,
71+
boolean collectsFromSingleBucket,
72+
List<PipelineAggregator> pipelineAggregators,
73+
Map<String, Object> metaData) throws IOException {
74+
6975
long maxOrd = valuesSource.globalMaxOrd(context.searcher());
70-
return new ParentToChildrenAggregator(name, factories, context, parent, childFilter,
71-
parentFilter, valuesSource, maxOrd, pipelineAggregators, metaData);
76+
if (collectsFromSingleBucket) {
77+
return new ParentToChildrenAggregator(name, factories, context, parent, childFilter,
78+
parentFilter, valuesSource, maxOrd, pipelineAggregators, metaData);
79+
} else {
80+
return asMultiBucketAggregator(this, context, parent);
81+
}
7282
}
7383
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.join.aggregations;
20+
21+
import org.apache.lucene.index.IndexReader;
22+
import org.apache.lucene.index.LeafReaderContext;
23+
import org.apache.lucene.index.SortedSetDocValues;
24+
import org.apache.lucene.search.DocIdSetIterator;
25+
import org.apache.lucene.search.Query;
26+
import org.apache.lucene.search.Scorable;
27+
import org.apache.lucene.search.Scorer;
28+
import org.apache.lucene.search.ScoreMode;
29+
import org.apache.lucene.search.Weight;
30+
import org.apache.lucene.util.Bits;
31+
import org.elasticsearch.common.lease.Releasables;
32+
import org.elasticsearch.common.lucene.Lucene;
33+
import org.elasticsearch.common.util.BitArray;
34+
import org.elasticsearch.common.util.LongHash;
35+
import org.elasticsearch.search.aggregations.Aggregator;
36+
import org.elasticsearch.search.aggregations.AggregatorFactories;
37+
import org.elasticsearch.search.aggregations.LeafBucketCollector;
38+
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
39+
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregator;
40+
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
41+
import org.elasticsearch.search.aggregations.support.ValuesSource;
42+
import org.elasticsearch.search.internal.SearchContext;
43+
44+
import java.io.IOException;
45+
import java.util.List;
46+
import java.util.Map;
47+
48+
/**
49+
* An aggregator that joins documents based on global ordinals.
50+
* Global ordinals that match the main query and the <code>inFilter</code> query are replayed
51+
* with documents matching the <code>outFilter</code> query.
52+
*/
53+
public abstract class ParentJoinAggregator extends BucketsAggregator implements SingleBucketAggregator {
54+
private final Weight inFilter;
55+
private final Weight outFilter;
56+
private final ValuesSource.Bytes.WithOrdinals valuesSource;
57+
private final boolean singleAggregator;
58+
59+
/**
60+
* If this aggregator is nested under another aggregator we allocate a long hash per bucket.
61+
*/
62+
private final LongHash ordsHash;
63+
/**
64+
* Otherwise we use a dense bit array to record the global ordinals.
65+
*/
66+
private final BitArray ordsBit;
67+
68+
public ParentJoinAggregator(String name,
69+
AggregatorFactories factories,
70+
SearchContext context,
71+
Aggregator parent,
72+
Query inFilter,
73+
Query outFilter,
74+
ValuesSource.Bytes.WithOrdinals valuesSource,
75+
long maxOrd,
76+
List<PipelineAggregator> pipelineAggregators,
77+
Map<String, Object> metaData) throws IOException {
78+
super(name, factories, context, parent, pipelineAggregators, metaData);
79+
80+
if (maxOrd > Integer.MAX_VALUE) {
81+
throw new IllegalStateException("the number of parent [" + maxOrd + "] + is greater than the allowed limit " +
82+
"for this aggregation: " + Integer.MAX_VALUE);
83+
}
84+
85+
// these two filters are cached in the parser
86+
this.inFilter = context.searcher().createWeight(context.searcher().rewrite(inFilter), ScoreMode.COMPLETE_NO_SCORES, 1f);
87+
this.outFilter = context.searcher().createWeight(context.searcher().rewrite(outFilter), ScoreMode.COMPLETE_NO_SCORES, 1f);
88+
this.valuesSource = valuesSource;
89+
this.singleAggregator = parent == null;
90+
this.ordsBit = singleAggregator ? new BitArray((int) maxOrd, context.bigArrays()) : null;
91+
this.ordsHash = singleAggregator ? null : new LongHash(1, context.bigArrays());
92+
}
93+
94+
private void addGlobalOrdinal(int globalOrdinal) {
95+
if (singleAggregator) {
96+
ordsBit.set(globalOrdinal);
97+
} else {
98+
ordsHash.add(globalOrdinal);
99+
}
100+
}
101+
102+
private boolean existsGlobalOrdinal(int globalOrdinal) {
103+
return singleAggregator ? ordsBit.get(globalOrdinal): ordsHash.find(globalOrdinal) >= 0;
104+
}
105+
106+
@Override
107+
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
108+
final LeafBucketCollector sub) throws IOException {
109+
if (valuesSource == null) {
110+
return LeafBucketCollector.NO_OP_COLLECTOR;
111+
}
112+
final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx);
113+
final Bits parentDocs = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), inFilter.scorerSupplier(ctx));
114+
return new LeafBucketCollector() {
115+
@Override
116+
public void collect(int docId, long bucket) throws IOException {
117+
assert bucket == 0;
118+
if (parentDocs.get(docId) && globalOrdinals.advanceExact(docId)) {
119+
int globalOrdinal = (int) globalOrdinals.nextOrd();
120+
assert globalOrdinal != -1 && globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS;
121+
addGlobalOrdinal(globalOrdinal);
122+
}
123+
}
124+
};
125+
}
126+
127+
@Override
128+
protected final void doPostCollection() throws IOException {
129+
IndexReader indexReader = context().searcher().getIndexReader();
130+
for (LeafReaderContext ctx : indexReader.leaves()) {
131+
Scorer childDocsScorer = outFilter.scorer(ctx);
132+
if (childDocsScorer == null) {
133+
continue;
134+
}
135+
DocIdSetIterator childDocsIter = childDocsScorer.iterator();
136+
137+
final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx);
138+
139+
final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx);
140+
// Set the scorer, since we now replay only the child docIds
141+
sub.setScorer(new Scorable() {
142+
@Override
143+
public float score() {
144+
return 1f;
145+
}
146+
147+
@Override
148+
public int docID() {
149+
return childDocsIter.docID();
150+
}
151+
});
152+
153+
final Bits liveDocs = ctx.reader().getLiveDocs();
154+
for (int docId = childDocsIter.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = childDocsIter.nextDoc()) {
155+
if (liveDocs != null && liveDocs.get(docId) == false) {
156+
continue;
157+
}
158+
if (globalOrdinals.advanceExact(docId)) {
159+
int globalOrdinal = (int) globalOrdinals.nextOrd();
160+
assert globalOrdinal != -1 && globalOrdinals.nextOrd() == SortedSetDocValues.NO_MORE_ORDS;
161+
if (existsGlobalOrdinal(globalOrdinal)) {
162+
collectBucket(sub, docId, 0);
163+
}
164+
}
165+
}
166+
}
167+
}
168+
169+
@Override
170+
protected void doClose() {
171+
Releasables.close(ordsBit, ordsHash);
172+
}
173+
}

0 commit comments

Comments
 (0)