Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.LongUnaryOperator;
import java.util.function.ToLongFunction;

public abstract class BucketsAggregator extends AggregatorBase {
Expand Down Expand Up @@ -105,17 +106,35 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do
* ordinals and doc ID deltas.
*
* Refer to that method for documentation about the merge map.
*
* @deprecated use {@link mergeBuckets(long, LongUnaryOperator)}
*/
@Deprecated
public final void mergeBuckets(long[] mergeMap, long newNumBuckets) {
mergeBuckets(newNumBuckets, bucket -> mergeMap[Math.toIntExact(bucket)]);
}

/**
*
* @param mergeMap a unary operator which maps a bucket's ordinal to the ordinal it should be merged with.
* If a bucket's ordinal is mapped to -1 then the bucket is removed entirely.
*
* This only tidies up doc counts. Call {@link MergingBucketsDeferringCollector#mergeBuckets(LongUnaryOperator)} to
* merge the actual ordinals and doc ID deltas.
*/
public final void mergeBuckets(long newNumBuckets, LongUnaryOperator mergeMap){
try (IntArray oldDocCounts = docCounts) {
docCounts = bigArrays.newIntArray(newNumBuckets, true);
docCounts.fill(0, newNumBuckets, 0);
for (int i = 0; i < oldDocCounts.size(); i++) {
for (long i = 0; i < oldDocCounts.size(); i++) {
int docCount = oldDocCounts.get(i);

if(docCount == 0) continue;

// Skip any in the map which have been "removed", signified with -1
if (docCount != 0 && mergeMap[i] != -1) {
docCounts.increment(mergeMap[i], docCount);
long destinationOrdinal = mergeMap.applyAsLong(i);
if (destinationOrdinal != -1) {
docCounts.increment(destinationOrdinal, docCount);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.function.LongUnaryOperator;

/**
* A specialization of {@link BestBucketsDeferringCollector} that collects all
Expand All @@ -51,8 +52,24 @@ public MergingBucketsDeferringCollector(SearchContext context, boolean isGlobal)
*
* This process rebuilds the ordinals and docDeltas according to the mergeMap, so it should
* not be called unless there are actually changes to be made, to avoid unnecessary work.
*
* @deprecated use {@link mergeBuckets(LongUnaryOperator)}
*/
@Deprecated
public void mergeBuckets(long[] mergeMap) {
mergeBuckets(bucket -> mergeMap[Math.toIntExact(bucket)]);
}

/**
* Merges/prunes the existing bucket ordinals and docDeltas according to the provided mergeMap.
*
* @param mergeMap a unary operator which maps a bucket's ordinal to the ordinal it should be merged with.
* If a bucket's ordinal is mapped to -1 then the bucket is removed entirely.
*
* This process rebuilds the ordinals and docDeltas according to the mergeMap, so it should
* not be called unless there are actually changes to be made, to avoid unnecessary work.
*/
public void mergeBuckets(LongUnaryOperator mergeMap){
List<Entry> newEntries = new ArrayList<>(entries.size());
for (Entry sourceEntry : entries) {
PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
Expand All @@ -66,7 +83,7 @@ public void mergeBuckets(long[] mergeMap) {
long delta = docDeltasItr.next();

// Only merge in the ordinal if it hasn't been "removed", signified with -1
long ordinal = mergeMap[Math.toIntExact(bucket)];
long ordinal = mergeMap.applyAsLong(bucket);

if (ordinal != -1) {
newBuckets.add(ordinal);
Expand Down Expand Up @@ -102,7 +119,7 @@ public void mergeBuckets(long[] mergeMap) {
long bucket = itr.next();
assert docDeltasItr.hasNext();
long delta = docDeltasItr.next();
long ordinal = mergeMap[Math.toIntExact(bucket)];
long ordinal = mergeMap.applyAsLong(bucket);

// Only merge in the ordinal if it hasn't been "removed", signified with -1
if (ordinal != -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;

public class VariableWidthHistogramAggregator extends DeferableBucketAggregator {

Expand Down Expand Up @@ -353,22 +354,23 @@ private void moveLastCluster(int index){
clusterSizes.set(index, holdSize);

// Move the underlying buckets
long[] mergeMap = new long[numClusters];
for (int i = 0; i < index; i++) {
// The clusters in range {0 ... idx - 1} don't move
mergeMap[i] = i;
}
for (int i = index; i < numClusters - 1; i++) {
// The clusters in range {index ... numClusters - 1} shift up
mergeMap[i] = i + 1;
}
// Finally, the new cluster moves to index
mergeMap[numClusters - 1] = index;
LongUnaryOperator mergeMap = new LongUnaryOperator() {
@Override
public long applyAsLong(long i) {
if(i < index) {
// The clusters in range {0 ... idx - 1} don't move
return i;
}
if(i == numClusters - 1) {
// The new cluster moves to index
return (long)index;
}
// The clusters in range {index ... numClusters - 1} shift forward
return i + 1;
}
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd have tried to write this:

mergeBuckets(numClusters, bucket -> {
  if (i < index) {
    // The clusters in range {0 ... idx - 1} don't move
    return 1;
  }
  if (i == numClusters - 1) {
    // The new cluster moves to index
    return i;
  }
  // The clusters in range {index ... numClusters - 1} shift forward
  return i = 1;
});

I like the "inline function declaration" form of this because it makes it super obvious that it doesn't escape.

I also like early return instead of else if, but that is totally up to you. Its a matter of style and we don't have a standard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this would work, since this same function is used in the calls to bothBucketsAggregator::mergeBuckets and MergingBucketsDeferringCollector::mergeBuckets.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! Well, what you have is just fine too.


// TODO: Create a moveLastCluster() method in BucketsAggregator which is like BucketsAggregator::mergeBuckets,
// except it doesn't require a merge map. This would be more efficient as there would be no need to create a
// merge map on every call.
mergeBuckets(mergeMap, numClusters);
mergeBuckets(numClusters, mergeMap);
if (deferringCollector != null) {
deferringCollector.mergeBuckets(mergeMap);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;

import static org.elasticsearch.search.aggregations.MultiBucketConsumerService.DEFAULT_MAX_BUCKETS;

public class BucketsAggregatorTests extends AggregatorTestCase{

public BucketsAggregator buildMergeAggregator() throws IOException{
try(Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
Document document = new Document();
document.add(new SortedNumericDocValuesField("numeric", 0));
indexWriter.addDocument(document);
}

try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = new IndexSearcher(indexReader);

SearchContext searchContext = createSearchContext(
indexSearcher,
createIndexSettings(),
null,
new MultiBucketConsumerService.MultiBucketConsumer(
DEFAULT_MAX_BUCKETS,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
),
new NumberFieldMapper.NumberFieldType("test", NumberFieldMapper.NumberType.INTEGER)
);

return new BucketsAggregator("test", AggregatorFactories.EMPTY, searchContext, null, null, null) {
@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
return null;
}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return new InternalAggregation[0];
}

@Override
public InternalAggregation buildEmptyAggregation() {
return null;
}
};
}
}
}

public void testBucketMergeNoDelete() throws IOException{
BucketsAggregator mergeAggregator = buildMergeAggregator();

mergeAggregator.grow(10);
for(int i = 0; i < 10; i++){
mergeAggregator.incrementBucketDocCount(i, i);
}

mergeAggregator.mergeBuckets(10, bucket -> bucket % 5);

for(int i=0; i<5; i++) {
// The i'th bucket should now have all docs whose index % 5 = i
// This is buckets i and i + 5
// i + (i+5) = 2*i + 5
assertEquals(mergeAggregator.getDocCounts().get(i), (2 * i) + 5);
}
for(int i=5; i<10; i++){
assertEquals(mergeAggregator.getDocCounts().get(i), 0);
}
}

public void testBucketMergeAndDelete() throws IOException{
BucketsAggregator mergeAggregator = buildMergeAggregator();

mergeAggregator.grow(10);
int sum = 0;
for(int i = 0; i < 20; i++){
mergeAggregator.incrementBucketDocCount(i, i);
if(5 <= i && i < 15) {
sum += i;
}
}

// Put the buckets in indices 5 ... 14 into bucket 5, and delete the rest of the buckets
mergeAggregator.mergeBuckets(10, bucket -> (5 <= bucket && bucket < 15) ? 5 : -1);

assertEquals(mergeAggregator.getDocCounts().size(), 10); // Confirm that the 10 other buckets were deleted
for(int i=0; i<10; i++){
assertEquals(mergeAggregator.getDocCounts().get(i), i == 5 ? sum : 0);
}
}
}
Loading