Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference/search/profile.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ And the response:
]
},
{
"name": "BucketCollector: [[my_scoped_agg, my_global_agg]]",
"name": "MultiBucketCollector: [[my_scoped_agg, my_global_agg]]",
"reason": "aggregation",
"time_in_nanos": 8273
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void preProcess(SearchContext context) {
}
context.aggregations().aggregators(aggregators);
if (!collectors.isEmpty()) {
Collector collector = BucketCollector.wrap(collectors);
Collector collector = MultiBucketCollector.wrap(collectors);
((BucketCollector)collector).preCollection();
if (context.getProfilers() != null) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION,
Expand Down Expand Up @@ -97,7 +97,7 @@ public void execute(SearchContext context) {

// optimize the global collector based execution
if (!globals.isEmpty()) {
BucketCollector globalsCollector = BucketCollector.wrap(globals);
BucketCollector globalsCollector = MultiBucketCollector.wrap(globals);
Query query = context.buildFilteredQuery(Queries.newMatchAllQuery());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ protected void doPreCollection() throws IOException {
@Override
public final void preCollection() throws IOException {
List<BucketCollector> collectors = Arrays.asList(subAggregators);
collectableSubAggregators = BucketCollector.wrap(collectors);
collectableSubAggregators = MultiBucketCollector.wrap(collectors);
doPreCollection();
collectableSubAggregators.preCollection();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@
import org.apache.lucene.search.Collector;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.StreamSupport;

/**
* A Collector that can collect data in separate buckets.
Expand All @@ -54,61 +50,6 @@ public boolean needsScores() {
}
};

/**
* Wrap the given collectors into a single instance.
*/
public static BucketCollector wrap(Iterable<? extends BucketCollector> collectorList) {
final BucketCollector[] collectors =
StreamSupport.stream(collectorList.spliterator(), false).toArray(size -> new BucketCollector[size]);
switch (collectors.length) {
case 0:
return NO_OP_COLLECTOR;
case 1:
return collectors[0];
default:
return new BucketCollector() {

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
List<LeafBucketCollector> leafCollectors = new ArrayList<>(collectors.length);
for (BucketCollector c : collectors) {
leafCollectors.add(c.getLeafCollector(ctx));
}
return LeafBucketCollector.wrap(leafCollectors);
}

@Override
public void preCollection() throws IOException {
for (BucketCollector collector : collectors) {
collector.preCollection();
}
}

@Override
public void postCollection() throws IOException {
for (BucketCollector collector : collectors) {
collector.postCollection();
}
}

@Override
public boolean needsScores() {
for (BucketCollector collector : collectors) {
if (collector.needsScores()) {
return true;
}
}
return false;
}

@Override
public String toString() {
return Arrays.toString(collectors);
}
};
}
}

@Override
public abstract LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.MultiCollector;
import org.apache.lucene.search.ScoreCachingWrappingScorer;
import org.apache.lucene.search.Scorer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* A {@link BucketCollector} which allows running a bucket collection with several
* {@link BucketCollector}s. It is similar to the {@link MultiCollector} except that the
* {@link #wrap} method filters out the {@link BucketCollector#NO_OP_COLLECTOR}s and not
* the null ones.
*/
public class MultiBucketCollector extends BucketCollector {

/** See {@link #wrap(Iterable)}. */
public static BucketCollector wrap(BucketCollector... collectors) {
return wrap(Arrays.asList(collectors));
}

/**
* Wraps a list of {@link BucketCollector}s with a {@link MultiBucketCollector}. This
* method works as follows:
* <ul>
* <li>Filters out the {@link BucketCollector#NO_OP_COLLECTOR}s collectors, so they are not used
* during search time.
* <li>If the input contains 1 real collector, it is returned.
* <li>Otherwise the method returns a {@link MultiBucketCollector} which wraps the
* non-{@link BucketCollector#NO_OP_COLLECTOR} collectors.
* </ul>
*/
public static BucketCollector wrap(Iterable<? extends BucketCollector> collectors) {
// For the user's convenience, we allow NO_OP collectors to be passed.
// However, to improve performance, these null collectors are found
// and dropped from the array we save for actual collection time.
int n = 0;
for (BucketCollector c : collectors) {
if (c != NO_OP_COLLECTOR) {
n++;
}
}

if (n == 0) {
return NO_OP_COLLECTOR;
} else if (n == 1) {
// only 1 Collector - return it.
BucketCollector col = null;
for (BucketCollector c : collectors) {
if (c != null) {
col = c;
break;
}
}
return col;
} else {
BucketCollector[] colls = new BucketCollector[n];
n = 0;
for (BucketCollector c : collectors) {
if (c != null) {
colls[n++] = c;
}
}
return new MultiBucketCollector(colls);
}
}

private final boolean cacheScores;
private final BucketCollector[] collectors;

private MultiBucketCollector(BucketCollector... collectors) {
this.collectors = collectors;
int numNeedsScores = 0;
for (BucketCollector collector : collectors) {
if (collector.needsScores()) {
numNeedsScores += 1;
}
}
this.cacheScores = numNeedsScores >= 2;
}

@Override
public void preCollection() throws IOException {
for (BucketCollector collector : collectors) {
collector.preCollection();
}
}

@Override
public void postCollection() throws IOException {
for (BucketCollector collector : collectors) {
collector.postCollection();
}
}

@Override
public boolean needsScores() {
for (BucketCollector collector : collectors) {
if (collector.needsScores()) {
return true;
}
}
return false;
}

@Override
public String toString() {
return Arrays.toString(collectors);
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext context) throws IOException {
final List<LeafBucketCollector> leafCollectors = new ArrayList<>();
for (BucketCollector collector : collectors) {
final LeafBucketCollector leafCollector;
try {
leafCollector = collector.getLeafCollector(context);
} catch (CollectionTerminatedException e) {
// this leaf collector does not need this segment
continue;
}
leafCollectors.add(leafCollector);
}
switch (leafCollectors.size()) {
case 0:
throw new CollectionTerminatedException();
case 1:
return leafCollectors.get(0);
default:
return new MultiLeafBucketCollector(leafCollectors, cacheScores);
}
}

private static class MultiLeafBucketCollector extends LeafBucketCollector {

private final boolean cacheScores;
private final LeafBucketCollector[] collectors;
private int numCollectors;

private MultiLeafBucketCollector(List<LeafBucketCollector> collectors, boolean cacheScores) {
this.collectors = collectors.toArray(new LeafBucketCollector[collectors.size()]);
this.cacheScores = cacheScores;
this.numCollectors = this.collectors.length;
}

@Override
public void setScorer(Scorer scorer) throws IOException {
if (cacheScores) {
scorer = new ScoreCachingWrappingScorer(scorer);
}
for (int i = 0; i < numCollectors; ++i) {
final LeafCollector c = collectors[i];
c.setScorer(scorer);
}
}

private void removeCollector(int i) {
System.arraycopy(collectors, i + 1, collectors, i, numCollectors - i - 1);
--numCollectors;
collectors[numCollectors] = null;
}

@Override
public void collect(int doc, long bucket) throws IOException {
final LeafBucketCollector[] collectors = this.collectors;
int numCollectors = this.numCollectors;
for (int i = 0; i < numCollectors; ) {
final LeafBucketCollector collector = collectors[i];
try {
collector.collect(doc, bucket);
++i;
} catch (CollectionTerminatedException e) {
removeCollector(i);
numCollectors = this.numCollectors;
if (numCollectors == 0) {
throw new CollectionTerminatedException();
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketCollector;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
Expand Down Expand Up @@ -90,7 +91,7 @@ public boolean needsScores() {
/** Set the deferred collectors. */
@Override
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
this.collector = BucketCollector.wrap(deferredCollectors);
this.collector = MultiBucketCollector.wrap(deferredCollectors);
}

private void finishLeaf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketCollector;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.internal.SearchContext;
Expand Down Expand Up @@ -59,7 +60,7 @@ protected void doPreCollection() throws IOException {
recordingWrapper.setDeferredCollector(deferredCollectors);
collectors.add(recordingWrapper);
}
collectableSubAggregators = BucketCollector.wrap(collectors);
collectableSubAggregators = MultiBucketCollector.wrap(collectors);
}

public static boolean descendsFromGlobalAggregator(Aggregator parent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketCollector;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
Expand Down Expand Up @@ -61,7 +62,7 @@ public MergingBucketsDeferringCollector(SearchContext context) {

@Override
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
this.collector = BucketCollector.wrap(deferredCollectors);
this.collector = MultiBucketCollector.wrap(deferredCollectors);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketCollector;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
Expand Down Expand Up @@ -93,7 +94,7 @@ protected void doClose() {
@Override
protected void doPreCollection() throws IOException {
List<BucketCollector> collectors = Arrays.asList(subAggregators);
deferredCollectors = BucketCollector.wrap(collectors);
deferredCollectors = MultiBucketCollector.wrap(collectors);
collectableSubAggregators = BucketCollector.NO_OP_COLLECTOR;
}

Expand Down
Loading