Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;

Expand All @@ -46,17 +46,16 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid> extends Bucke
protected final int requiredSize;
protected final int shardSize;
protected final ValuesSource.Numeric valuesSource;
protected final LongHash bucketOrds;
protected SortedNumericDocValues values;
protected final LongKeyedBucketOrds bucketOrds;

GeoGridAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource,
int requiredSize, int shardSize, SearchContext aggregationContext,
Aggregator parent, Map<String, Object> metadata) throws IOException {
Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, aggregationContext, parent, metadata);
this.valuesSource = valuesSource;
this.requiredSize = requiredSize;
this.shardSize = shardSize;
bucketOrds = new LongHash(1, aggregationContext.bigArrays());
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), collectsFromSingleBucket);
}

@Override
Expand All @@ -70,19 +69,18 @@ public ScoreMode scoreMode() {
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
values = valuesSource.longValues(ctx);
SortedNumericDocValues values = valuesSource.longValues(ctx);
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int doc, long bucket) throws IOException {
assert bucket == 0;
public void collect(int doc, long owningBucketOrd) throws IOException {
if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();

long previous = Long.MAX_VALUE;
for (int i = 0; i < valuesCount; ++i) {
final long val = values.nextValue();
if (previous != val || i == 0) {
long bucketOrdinal = bucketOrds.add(val);
long bucketOrdinal = bucketOrds.add(owningBucketOrd, val);
if (bucketOrdinal < 0) { // already seen
bucketOrdinal = -1 - bucketOrdinal;
collectExistingBucket(sub, doc, bucketOrdinal);
Expand All @@ -108,31 +106,38 @@ public void collect(int doc, long bucket) throws IOException {

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
final int size = (int) Math.min(bucketOrds.size(), shardSize);
consumeBucketsAndMaybeBreak(size);

BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
InternalGeoGridBucket spare = null;
for (long i = 0; i < bucketOrds.size(); i++) {
if (spare == null) {
spare = newEmptyBucket();
InternalGeoGridBucket[][] topBucketsPerOrd = new InternalGeoGridBucket[owningBucketOrds.length][];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]), shardSize);
consumeBucketsAndMaybeBreak(size);

BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
InternalGeoGridBucket spare = null;
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
if (spare == null) {
spare = newEmptyBucket();
}

// need a special function to keep the source bucket
// up-to-date so it can get the appropriate key
spare.hashAsLong = ordsEnum.value();
spare.docCount = bucketDocCount(ordsEnum.ord());
spare.bucketOrd = ordsEnum.ord();
spare = ordered.insertWithOverflow(spare);
}

topBucketsPerOrd[ordIdx] = new InternalGeoGridBucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; --i) {
topBucketsPerOrd[ordIdx][i] = ordered.pop();
}

// need a special function to keep the source bucket
// up-to-date so it can get the appropriate key
spare.hashAsLong = bucketOrds.get(i);
spare.docCount = bucketDocCount(i);
spare.bucketOrd = i;
spare = ordered.insertWithOverflow(spare);
}

final InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; --i) {
list[i] = ordered.pop();
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
results[ordIdx] = buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd[ordIdx]), metadata());
}
buildSubAggsForBuckets(list, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
return new InternalAggregation[] {buildAggregation(name, requiredSize, Arrays.asList(list), metadata())};
return results;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class GeoHashGridAggregator extends GeoGridAggregator<InternalGeoHashGrid

public GeoHashGridAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource,
int requiredSize, int shardSize, SearchContext aggregationContext,
Aggregator parent, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, metadata);
Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, collectsFromSingleBucket, metadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,18 @@ protected Aggregator doCreateInternal(final ValuesSource valuesSource,
throw new AggregationExecutionException("Registry miss-match - expected "
+ GeoGridAggregatorSupplier.class.getName() + ", found [" + aggregatorSupplier.getClass().toString() + "]");
}
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, searchContext, parent);
}
return ((GeoGridAggregatorSupplier) aggregatorSupplier).build(name, factories, valuesSource, precision, geoBoundingBox,
requiredSize, shardSize, searchContext, parent, metadata);
requiredSize, shardSize, searchContext, parent, collectsFromSingleBucket, metadata);
}

static void registerAggregators(ValuesSourceRegistry.Builder builder) {
builder.register(GeoHashGridAggregationBuilder.NAME, CoreValuesSourceType.GEOPOINT,
(GeoGridAggregatorSupplier) (name, factories, valuesSource, precision, geoBoundingBox, requiredSize, shardSize,
aggregationContext, parent, metadata) -> {
aggregationContext, parent, collectsFromSingleBucket, metadata) -> {
CellIdSource cellIdSource = new CellIdSource((ValuesSource.GeoPoint) valuesSource, precision, geoBoundingBox,
Geohash::longEncode);
return new GeoHashGridAggregator(name, factories, cellIdSource, requiredSize, shardSize, aggregationContext,
parent, metadata);
parent, collectsFromSingleBucket, metadata);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class GeoTileGridAggregator extends GeoGridAggregator<InternalGeoTileGrid

public GeoTileGridAggregator(String name, AggregatorFactories factories, ValuesSource.Numeric valuesSource,
int requiredSize, int shardSize, SearchContext aggregationContext,
Aggregator parent, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, metadata);
Aggregator parent, boolean collectsFromSingleBucket, Map<String, Object> metadata) throws IOException {
super(name, factories, valuesSource, requiredSize, shardSize, aggregationContext, parent, collectsFromSingleBucket, metadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,18 @@ protected Aggregator doCreateInternal(final ValuesSource valuesSource,
throw new AggregationExecutionException("Registry miss-match - expected "
+ GeoGridAggregatorSupplier.class.getName() + ", found [" + aggregatorSupplier.getClass().toString() + "]");
}
if (collectsFromSingleBucket == false) {
return asMultiBucketAggregator(this, searchContext, parent);
}
return ((GeoGridAggregatorSupplier) aggregatorSupplier).build(name, factories, valuesSource, precision, geoBoundingBox,
requiredSize, shardSize, searchContext, parent, metadata);
requiredSize, shardSize, searchContext, parent, collectsFromSingleBucket, metadata);
}

static void registerAggregators(ValuesSourceRegistry.Builder builder) {
builder.register(GeoTileGridAggregationBuilder.NAME, CoreValuesSourceType.GEOPOINT,
(GeoGridAggregatorSupplier) (name, factories, valuesSource, precision, geoBoundingBox, requiredSize, shardSize,
aggregationContext, parent, metadata) -> {
aggregationContext, parent, collectsFromSingleBucket, metadata) -> {
CellIdSource cellIdSource = new CellIdSource((ValuesSource.GeoPoint) valuesSource, precision, geoBoundingBox,
GeoTileUtils::longEncode);
return new GeoTileGridAggregator(name, factories, cellIdSource, requiredSize, shardSize, aggregationContext,
parent, metadata);
parent, collectsFromSingleBucket, metadata);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,17 @@
@FunctionalInterface
public interface GeoGridAggregatorSupplier extends AggregatorSupplier {

GeoGridAggregator build(String name, AggregatorFactories factories, ValuesSource valuesSource,
int precision, GeoBoundingBox geoBoundingBox, int requiredSize, int shardSize,
SearchContext aggregationContext, Aggregator parent, Map<String, Object> metadata) throws IOException;
GeoGridAggregator build(
String name,
AggregatorFactories factories,
ValuesSource valuesSource,
int precision,
GeoBoundingBox geoBoundingBox,
int requiredSize,
int shardSize,
SearchContext aggregationContext,
Aggregator parent,
boolean collectsFromSingleBucket,
Map<String, Object> metadata
) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
package org.elasticsearch.search.aggregations.bucket.geogrid;

import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.geo.GeoEncodingUtils;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.geo.GeoBoundingBox;
import org.elasticsearch.common.geo.GeoBoundingBoxTests;
Expand All @@ -36,6 +39,8 @@
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.AggregationInspectionHelper;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
Expand All @@ -48,6 +53,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -122,18 +128,9 @@ public void testWithSeveralDocs() throws IOException {
List<LatLonDocValuesField> points = new ArrayList<>();
Set<String> distinctHashesPerDoc = new HashSet<>();
for (int pointId = 0; pointId < numPoints; pointId++) {
double lat = (180d * randomDouble()) - 90d;
double lng = (360d * randomDouble()) - 180d;

// Precision-adjust longitude/latitude to avoid wrong bucket placement
// Internally, lat/lng get converted to 32 bit integers, loosing some precision.
// This does not affect geohashing because geohash uses the same algorithm,
// but it does affect other bucketing algos, thus we need to do the same steps here.
lng = GeoEncodingUtils.decodeLongitude(GeoEncodingUtils.encodeLongitude(lng));
lat = GeoEncodingUtils.decodeLatitude(GeoEncodingUtils.encodeLatitude(lat));

points.add(new LatLonDocValuesField(FIELD_NAME, lat, lng));
String hash = hashAsString(lng, lat, precision);
double[] latLng = randomLatLng();
points.add(new LatLonDocValuesField(FIELD_NAME, latLng[0], latLng[1]));
String hash = hashAsString(latLng[1], latLng[0], precision);
if (distinctHashesPerDoc.contains(hash) == false) {
expectedCountPerGeoHash.put(hash, expectedCountPerGeoHash.getOrDefault(hash, 0) + 1);
}
Expand All @@ -150,6 +147,60 @@ public void testWithSeveralDocs() throws IOException {
});
}

public void testAsSubAgg() throws IOException {
int precision = randomPrecision();
Map<String, Map<String, Long>> expectedCountPerTPerGeoHash = new TreeMap<>();
List<List<IndexableField>> docs = new ArrayList<>();
for (int i = 0; i < 30; i++) {
String t = randomAlphaOfLength(1);
double[] latLng = randomLatLng();

List<IndexableField> doc = new ArrayList<>();
docs.add(doc);
doc.add(new LatLonDocValuesField(FIELD_NAME, latLng[0], latLng[1]));
doc.add(new SortedSetDocValuesField("t", new BytesRef(t)));

String hash = hashAsString(latLng[1], latLng[0], precision);
Map<String, Long> expectedCountPerGeoHash = expectedCountPerTPerGeoHash.get(t);
if (expectedCountPerGeoHash == null) {
expectedCountPerGeoHash = new TreeMap<>();
expectedCountPerTPerGeoHash.put(t, expectedCountPerGeoHash);
}
expectedCountPerGeoHash.put(hash, expectedCountPerGeoHash.getOrDefault(hash, 0L) + 1);
}
CheckedConsumer<RandomIndexWriter, IOException> buildIndex = iw -> iw.addDocuments(docs);
TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("t").field("t")
.size(expectedCountPerTPerGeoHash.size())
.subAggregation(createBuilder("gg").field(FIELD_NAME).precision(precision));
Consumer<StringTerms> verify = (terms) -> {
Map<String, Map<String, Long>> actual = new TreeMap<>();
for (StringTerms.Bucket tb: terms.getBuckets()) {
InternalGeoGrid<?> gg = tb.getAggregations().get("gg");
Map<String, Long> sub = new TreeMap<>();
for (InternalGeoGridBucket<?> ggb : gg.getBuckets()) {
sub.put(ggb.getKeyAsString(), ggb.getDocCount());
}
actual.put(tb.getKeyAsString(), sub);
}
assertThat(actual, equalTo(expectedCountPerTPerGeoHash));
};
testCase(aggregationBuilder, new MatchAllDocsQuery(), buildIndex, verify, keywordField("t"), geoPointField(FIELD_NAME));
}

private double[] randomLatLng() {
double lat = (180d * randomDouble()) - 90d;
double lng = (360d * randomDouble()) - 180d;

// Precision-adjust longitude/latitude to avoid wrong bucket placement
// Internally, lat/lng get converted to 32 bit integers, loosing some precision.
// This does not affect geohashing because geohash uses the same algorithm,
// but it does affect other bucketing algos, thus we need to do the same steps here.
lng = GeoEncodingUtils.decodeLongitude(GeoEncodingUtils.encodeLongitude(lng));
lat = GeoEncodingUtils.decodeLatitude(GeoEncodingUtils.encodeLatitude(lat));

return new double[] {lat, lng};
}

public void testBounds() throws IOException {
final int numDocs = randomIntBetween(64, 256);
final GeoGridAggregationBuilder builder = createBuilder("_name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,11 @@ public void testIncorrectFieldType() throws Exception {
HistogramAggregationBuilder aggBuilder = new HistogramAggregationBuilder("my_agg")
.field("field")
.interval(5);
MappedFieldType fieldType = keywordField("field");
fieldType.setHasDocValues(true);
try (IndexReader reader = w.getReader()) {
IndexSearcher searcher = new IndexSearcher(reader);

expectThrows(IllegalArgumentException.class, () -> {
search(searcher, new MatchAllDocsQuery(), aggBuilder, fieldType);
search(searcher, new MatchAllDocsQuery(), aggBuilder, keywordField("field"));
});
}
}
Expand Down
Loading