Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ protected void prepareSubAggs(long[] ordsToCollect) throws IOException {
continue;
}
DocIdSetIterator childDocsIter = childDocsScorer.iterator();

final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx);
final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(new AggregationExecutionContext(ctx, null, null));

final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx);
// Set the scorer, since we now replay only the child docIds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.search.aggregations;

import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.MultiCollector;
import org.apache.lucene.search.Scorable;
Expand Down Expand Up @@ -127,7 +126,7 @@ private MultiBucketCollector(boolean terminateIfNoop, BucketCollector... collect
this.terminateIfNoop = terminateIfNoop;
this.collectors = collectors;
int numNeedsScores = 0;
for (Collector collector : collectors) {
for (BucketCollector collector : collectors) {
if (collector.scoreMode().needsScores()) {
numNeedsScores += 1;
}
Expand All @@ -138,7 +137,7 @@ private MultiBucketCollector(boolean terminateIfNoop, BucketCollector... collect
@Override
public ScoreMode scoreMode() {
ScoreMode scoreMode = null;
for (Collector collector : collectors) {
for (BucketCollector collector : collectors) {
if (scoreMode == null) {
scoreMode = collector.scoreMode();
} else if (scoreMode != collector.scoreMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.search.aggregations.bucket;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
Expand Down Expand Up @@ -42,12 +41,12 @@
*/
public class BestBucketsDeferringCollector extends DeferringBucketCollector {
static class Entry {
final LeafReaderContext context;
final AggregationExecutionContext aggCtx;
final PackedLongValues docDeltas;
final PackedLongValues buckets;

Entry(LeafReaderContext context, PackedLongValues docDeltas, PackedLongValues buckets) {
this.context = Objects.requireNonNull(context);
Entry(AggregationExecutionContext aggCtx, PackedLongValues docDeltas, PackedLongValues buckets) {
this.aggCtx = Objects.requireNonNull(aggCtx);
this.docDeltas = Objects.requireNonNull(docDeltas);
this.buckets = Objects.requireNonNull(buckets);
}
Expand All @@ -59,7 +58,7 @@ static class Entry {

private List<Entry> entries = new ArrayList<>();
private BucketCollector collector;
private LeafReaderContext context;
private AggregationExecutionContext aggCtx;
private PackedLongValues.Builder docDeltasBuilder;
private PackedLongValues.Builder bucketsBuilder;
private LongHash selectedBuckets;
Expand Down Expand Up @@ -93,10 +92,10 @@ public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
* Button up the builders for the current leaf.
*/
private void finishLeaf() {
if (context != null) {
if (aggCtx != null) {
assert docDeltasBuilder != null && bucketsBuilder != null;
assert docDeltasBuilder.size() > 0;
entries.add(new Entry(context, docDeltasBuilder.build(), bucketsBuilder.build()));
entries.add(new Entry(aggCtx, docDeltasBuilder.build(), bucketsBuilder.build()));
clearLeaf();
}
}
Expand All @@ -105,22 +104,22 @@ private void finishLeaf() {
* Clear the status for the current leaf.
*/
private void clearLeaf() {
context = null;
aggCtx = null;
docDeltasBuilder = null;
bucketsBuilder = null;
}

@Override
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException {
public LeafBucketCollector getLeafCollector(AggregationExecutionContext context) throws IOException {
finishLeaf();

return new LeafBucketCollector() {
int lastDoc = 0;

@Override
public void collect(int doc, long bucket) throws IOException {
if (context == null) {
context = aggCtx.getLeafReaderContext();
public void collect(int doc, long bucket) {
if (aggCtx == null) {
aggCtx = context;
docDeltasBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
bucketsBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
}
Expand Down Expand Up @@ -169,10 +168,10 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
for (Entry entry : entries) {
assert entry.docDeltas.size() > 0 : "segment should have at least one document to replay, got 0";
try {
final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context);
final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.aggCtx);
DocIdSetIterator scoreIt = null;
if (needsScores) {
Scorer scorer = weight.scorer(entry.context);
Scorer scorer = weight.scorer(entry.aggCtx.getLeafReaderContext());
// We don't need to check if the scorer is null
// since we are sure that there are documents to replay (entry.docDeltas it not empty).
scoreIt = scorer.iterator();
Expand Down Expand Up @@ -266,7 +265,7 @@ public void rewriteBuckets(LongUnaryOperator howToRewrite) {
// Only create an entry if this segment has buckets after merging
if (newBuckets.size() > 0) {
assert newDocDeltas.size() > 0 : "docDeltas was empty but we had buckets";
newEntries.add(new Entry(sourceEntry.context, newDocDeltas.build(), newBuckets.build()));
newEntries.add(new Entry(sourceEntry.aggCtx, newDocDeltas.build(), newBuckets.build()));
}
}
entries = newEntries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ public abstract class BucketsAggregator extends AggregatorBase {
public BucketsAggregator(
String name,
AggregatorFactories factories,
AggregationContext context,
AggregationContext aggCtx,
Aggregator parent,
CardinalityUpperBound bucketCardinality,
Map<String, Object> metadata
) throws IOException {
super(name, factories, context, parent, bucketCardinality, metadata);
multiBucketConsumer = context.multiBucketConsumer();
super(name, factories, aggCtx, parent, bucketCardinality, metadata);
multiBucketConsumer = aggCtx.multiBucketConsumer();
docCounts = bigArrays().newLongArray(1, true);
docCountProvider = new DocCountProvider();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public final class CompositeAggregator extends BucketsAggregator implements Size
private final DateHistogramValuesSource[] innerSizedBucketAggregators;

private final List<Entry> entries = new ArrayList<>();
private LeafReaderContext currentLeaf;
private AggregationExecutionContext currentAggCtx;
private RoaringDocIdSet.Builder docIdSetBuilder;
private BucketCollector deferredCollectors;

Expand All @@ -87,22 +87,22 @@ public final class CompositeAggregator extends BucketsAggregator implements Size
CompositeAggregator(
String name,
AggregatorFactories factories,
AggregationContext context,
AggregationContext aggCtx,
Aggregator parent,
Map<String, Object> metadata,
int size,
CompositeValuesSourceConfig[] sourceConfigs,
CompositeKey rawAfterKey
) throws IOException {
super(name, factories, context, parent, CardinalityUpperBound.MANY, metadata);
super(name, factories, aggCtx, parent, CardinalityUpperBound.MANY, metadata);
this.size = size;
this.sourceNames = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::name).toList();
this.reverseMuls = Arrays.stream(sourceConfigs).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray();
this.missingOrders = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::missingOrder).toArray(MissingOrder[]::new);
this.formats = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::format).toList();
this.sources = new SingleDimensionValuesSource<?>[sourceConfigs.length];
// check that the provided size is not greater than the search.max_buckets setting
int bucketLimit = context.multiBucketConsumer().getLimit();
int bucketLimit = aggCtx.multiBucketConsumer().getLimit();
if (size > bucketLimit) {
throw new MultiBucketConsumerService.TooManyBucketsException(
"Trying to create too many buckets. Must be less than or equal"
Expand All @@ -120,8 +120,8 @@ public final class CompositeAggregator extends BucketsAggregator implements Size
List<DateHistogramValuesSource> dateHistogramValuesSources = new ArrayList<>();
for (int i = 0; i < sourceConfigs.length; i++) {
this.sources[i] = sourceConfigs[i].createValuesSource(
context.bigArrays(),
context.searcher().getIndexReader(),
aggCtx.bigArrays(),
aggCtx.searcher().getIndexReader(),
size,
this::addRequestCircuitBreakerBytes
);
Expand All @@ -130,7 +130,7 @@ public final class CompositeAggregator extends BucketsAggregator implements Size
}
}
this.innerSizedBucketAggregators = dateHistogramValuesSources.toArray(new DateHistogramValuesSource[0]);
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size);
this.queue = new CompositeValuesCollectorQueue(aggCtx.bigArrays(), sources, size);
if (rawAfterKey != null) {
try {
this.queue.setAfterKey(rawAfterKey);
Expand Down Expand Up @@ -230,10 +230,10 @@ public InternalAggregation buildEmptyAggregation() {
}

private void finishLeaf() {
if (currentLeaf != null) {
if (currentAggCtx != null) {
DocIdSet docIdSet = docIdSetBuilder.build();
entries.add(new Entry(currentLeaf, docIdSet));
currentLeaf = null;
entries.add(new Entry(currentAggCtx, docIdSet));
currentAggCtx = null;
docIdSetBuilder = null;
}
}
Expand Down Expand Up @@ -454,7 +454,7 @@ protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCt
// in the queue.
DocIdSet docIdSet = sortedDocsProducer.processLeaf(topLevelQuery(), queue, aggCtx.getLeafReaderContext(), fillDocIdSet);
if (fillDocIdSet) {
entries.add(new Entry(aggCtx.getLeafReaderContext(), docIdSet));
entries.add(new Entry(aggCtx, docIdSet));
}
// We can bypass search entirely for this segment, the processing is done in the previous call.
// Throwing this exception will terminate the execution of the search for this root aggregation,
Expand All @@ -463,7 +463,7 @@ protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCt
return LeafBucketCollector.NO_OP_COLLECTOR;
} else {
if (fillDocIdSet) {
currentLeaf = aggCtx.getLeafReaderContext();
currentAggCtx = aggCtx;
docIdSetBuilder = new RoaringDocIdSet.Builder(aggCtx.getLeafReaderContext().reader().maxDoc());
}
if (rawAfterKey != null && sortPrefixLen > 0) {
Expand Down Expand Up @@ -538,11 +538,14 @@ private void runDeferredCollections() throws IOException {
if (docIdSetIterator == null) {
continue;
}
final LeafBucketCollector subCollector = deferredCollectors.getLeafCollector(entry.context);
final LeafBucketCollector collector = queue.getLeafCollector(entry.context, getSecondPassCollector(subCollector));
final LeafBucketCollector subCollector = deferredCollectors.getLeafCollector(entry.aggCtx);
final LeafBucketCollector collector = queue.getLeafCollector(
entry.aggCtx.getLeafReaderContext(),
getSecondPassCollector(subCollector)
);
DocIdSetIterator scorerIt = null;
if (needsScores) {
Scorer scorer = weight.scorer(entry.context);
Scorer scorer = weight.scorer(entry.aggCtx.getLeafReaderContext());
if (scorer != null) {
scorerIt = scorer.iterator();
subCollector.setScorer(scorer);
Expand Down Expand Up @@ -605,11 +608,11 @@ public double bucketSize(Rounding.DateTimeUnit unit) {
}

private static class Entry {
final LeafReaderContext context;
final AggregationExecutionContext aggCtx;
final DocIdSet docIdSet;

Entry(LeafReaderContext context, DocIdSet docIdSet) {
this.context = context;
Entry(AggregationExecutionContext aggCtx, DocIdSet docIdSet) {
this.aggCtx = aggCtx;
this.docIdSet = docIdSet;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public abstract static class AdapterBuilder<T> {
private final String name;
private final List<QueryToFilterAdapter> filters = new ArrayList<>();
private final boolean keyed;
private final AggregationContext context;
private final AggregationContext aggCtx;
private final Aggregator parent;
private final CardinalityUpperBound cardinality;
private final Map<String, Object> metadata;
Expand All @@ -58,18 +58,18 @@ public AdapterBuilder(
String name,
boolean keyed,
String otherBucketKey,
AggregationContext context,
AggregationContext aggCtx,
Aggregator parent,
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
this.name = name;
this.keyed = keyed;
this.context = context;
this.aggCtx = aggCtx;
this.parent = parent;
this.cardinality = cardinality;
this.metadata = metadata;
this.rewrittenTopLevelQuery = context.searcher().rewrite(context.query());
this.rewrittenTopLevelQuery = aggCtx.searcher().rewrite(aggCtx.query());
this.valid = parent == null && otherBucketKey == null;
}

Expand All @@ -93,7 +93,7 @@ public final void add(String key, Query query) throws IOException {
valid = false;
return;
}
add(QueryToFilterAdapter.build(context.searcher(), key, query));
add(QueryToFilterAdapter.build(aggCtx.searcher(), key, query));
}

final void add(QueryToFilterAdapter filter) throws IOException {
Expand All @@ -120,7 +120,7 @@ final void add(QueryToFilterAdapter filter) throws IOException {
* fields are expensive to decode and the overhead of iterating per
* filter causes us to decode doc counts over and over again.
*/
if (context.hasDocCountField()) {
if (aggCtx.hasDocCountField()) {
valid = false;
return;
}
Expand All @@ -140,7 +140,7 @@ class AdapterBuild implements CheckedFunction<AggregatorFactories, FilterByFilte

@Override
public FilterByFilterAggregator apply(AggregatorFactories subAggregators) throws IOException {
agg = new FilterByFilterAggregator(name, subAggregators, filters, keyed, context, parent, cardinality, metadata);
agg = new FilterByFilterAggregator(name, subAggregators, filters, keyed, aggCtx, parent, cardinality, metadata);
return agg;
}
}
Expand Down Expand Up @@ -202,12 +202,12 @@ private FilterByFilterAggregator(
AggregatorFactories factories,
List<QueryToFilterAdapter> filters,
boolean keyed,
AggregationContext context,
AggregationContext aggCtx,
Aggregator parent,
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
super(name, factories, filters, keyed, null, context, parent, cardinality, metadata);
super(name, factories, filters, keyed, null, aggCtx, parent, cardinality, metadata);
}

/**
Expand Down Expand Up @@ -237,7 +237,7 @@ protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCt
collectCount(aggCtx.getLeafReaderContext(), live);
} else {
segmentsCollected++;
collectSubs(aggCtx.getLeafReaderContext(), live, sub);
collectSubs(aggCtx, live, sub);
}
return LeafBucketCollector.NO_OP_COLLECTOR;
}
Expand Down Expand Up @@ -273,7 +273,7 @@ private void collectCount(LeafReaderContext ctx, Bits live) throws IOException {
* less memory because there isn't a need to buffer a block of matches.
* And its a hell of a lot less code.
*/
private void collectSubs(LeafReaderContext ctx, Bits live, LeafBucketCollector sub) throws IOException {
private void collectSubs(AggregationExecutionContext aggCtx, Bits live, LeafBucketCollector sub) throws IOException {
class MatchCollector implements LeafCollector {
LeafBucketCollector subCollector = sub;
int filterOrd;
Expand All @@ -287,11 +287,11 @@ public void collect(int docId) throws IOException {
public void setScorer(Scorable scorer) throws IOException {}
}
MatchCollector collector = new MatchCollector();
filters().get(0).collect(ctx, collector, live);
filters().get(0).collect(aggCtx.getLeafReaderContext(), collector, live);
for (int filterOrd = 1; filterOrd < filters().size(); filterOrd++) {
collector.subCollector = collectableSubAggregators.getLeafCollector(ctx);
collector.subCollector = collectableSubAggregators.getLeafCollector(aggCtx);
collector.filterOrd = filterOrd;
filters().get(filterOrd).collect(ctx, collector, live);
filters().get(filterOrd).collect(aggCtx.getLeafReaderContext(), collector, live);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ protected FilterByFilterAggregator adapt(
List<QueryToFilterAdapter> filters,
boolean keyed,
String otherBucketKey,
AggregationContext context,
AggregationContext aggCtx,
Aggregator parent,
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
super(name, factories, context, parent, cardinality.multiply(filters.size() + (otherBucketKey == null ? 0 : 1)), metadata);
super(name, factories, aggCtx, parent, cardinality.multiply(filters.size() + (otherBucketKey == null ? 0 : 1)), metadata);
this.filters = List.copyOf(filters);
this.keyed = keyed;
this.otherBucketKey = otherBucketKey;
Expand Down
Loading