Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@
import org.elasticsearch.search.aggregations.bucket.geogrid.ParsedGeoHashGrid;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.global.ParsedGlobal;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedAutoDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedHistogram;
import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder;
Expand Down Expand Up @@ -683,6 +685,7 @@ static List<NamedXContentRegistry.Entry> getDefaultNamedXContents() {
map.put(GeoCentroidAggregationBuilder.NAME, (p, c) -> ParsedGeoCentroid.fromXContent(p, (String) c));
map.put(HistogramAggregationBuilder.NAME, (p, c) -> ParsedHistogram.fromXContent(p, (String) c));
map.put(DateHistogramAggregationBuilder.NAME, (p, c) -> ParsedDateHistogram.fromXContent(p, (String) c));
map.put(AutoDateHistogramAggregationBuilder.NAME, (p, c) -> ParsedAutoDateHistogram.fromXContent(p, (String) c));
map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c));
map.put(LongTerms.NAME, (p, c) -> ParsedLongTerms.fromXContent(p, (String) c));
map.put(DoubleTerms.NAME, (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.packed.PackedInts;
import org.apache.lucene.util.packed.PackedLongValues;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class MergingBucketsDeferringCollector extends DeferringBucketCollector {

List<Entry> entries = new ArrayList<>();
BucketCollector collector;
final SearchContext searchContext;
LeafReaderContext context;
PackedLongValues.Builder docDeltas;
PackedLongValues.Builder buckets;
long maxBucket = -1;
boolean finished = false;
LongHash selectedBuckets;

public MergingBucketsDeferringCollector(SearchContext context) {
this.searchContext = context;
}

@Override
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
this.collector = BucketCollector.wrap(deferredCollectors);
}

@Override
public boolean needsScores() {
if (collector == null) {
throw new IllegalStateException();
}
return collector.needsScores();
}

@Override
public void preCollection() throws IOException {
collector.preCollection();
}

private void finishLeaf() {
if (context != null) {
entries.add(new Entry(context, docDeltas.build(), buckets.build()));
}
context = null;
docDeltas = null;
buckets = null;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
finishLeaf();

context = ctx;
docDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
buckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);

return new LeafBucketCollector() {
int lastDoc = 0;

@Override
public void collect(int doc, long bucket) throws IOException {
docDeltas.add(doc - lastDoc);
buckets.add(bucket);
lastDoc = doc;
maxBucket = Math.max(maxBucket, bucket);
}
};
}

public void mergeBuckets(long[] mergeMap) {

List<Entry> newEntries = new ArrayList<>();
for (Entry sourceEntry : entries) {
PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
for (PackedLongValues.Iterator itr = sourceEntry.buckets.iterator(); itr.hasNext();) {
long bucket = itr.next();
newBuckets.add(mergeMap[(int) bucket]);
}
newEntries.add(new Entry(sourceEntry.context, sourceEntry.docDeltas, newBuckets.build()));
}
entries = newEntries;

// if there are buckets that have been collected in the current segment
// we need to update the bucket ordinals there too
if (buckets.size() > 0) {
PackedLongValues currentBuckets = buckets.build();
PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
for (PackedLongValues.Iterator itr = currentBuckets.iterator(); itr.hasNext();) {
long bucket = itr.next();
newBuckets.add(mergeMap[(int) bucket]);
}
buckets = newBuckets;
}
}

@Override
public void postCollection() throws IOException {
finishLeaf();
finished = true;
}

/**
* Replay the wrapped collector, but only on a selection of buckets.
*/
@Override
public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
if (!finished) {
throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called");
}
if (this.selectedBuckets != null) {
throw new IllegalStateException("Already been replayed");
}

final LongHash hash = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE);
for (long bucket : selectedBuckets) {
hash.add(bucket);
}
this.selectedBuckets = hash;

boolean needsScores = collector.needsScores();
Weight weight = null;
if (needsScores) {
weight = searchContext.searcher().createNormalizedWeight(searchContext.query(), true);
}
for (Entry entry : entries) {
final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context);
DocIdSetIterator docIt = null;
if (needsScores && entry.docDeltas.size() > 0) {
Scorer scorer = weight.scorer(entry.context);
// We don't need to check if the scorer is null
// since we are sure that there are documents to replay
// (entry.docDeltas it not empty).
docIt = scorer.iterator();
leafCollector.setScorer(scorer);
}
final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator();
final PackedLongValues.Iterator buckets = entry.buckets.iterator();
int doc = 0;
for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
doc += docDeltaIterator.next();
final long bucket = buckets.next();
final long rebasedBucket = hash.find(bucket);
if (rebasedBucket != -1) {
if (needsScores) {
if (docIt.docID() < doc) {
docIt.advance(doc);
}
// aggregations should only be replayed on matching
// documents
assert docIt.docID() == doc;
}
leafCollector.collect(doc, rebasedBucket);
}
}
}

collector.postCollection();
}

/**
* Wrap the provided aggregator so that it behaves (almost) as if it had
* been collected directly.
*/
@Override
public Aggregator wrap(final Aggregator in) {

return new WrappedAggregator(in) {

@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
if (selectedBuckets == null) {
throw new IllegalStateException("Collection has not been replayed yet.");
}
final long rebasedBucket = selectedBuckets.find(bucket);
if (rebasedBucket == -1) {
throw new IllegalStateException("Cannot build for a bucket which has not been collected [" + bucket + "]");
}
return in.buildAggregation(rebasedBucket);
}

};
}

private static class Entry {
final LeafReaderContext context;
final PackedLongValues docDeltas;
final PackedLongValues buckets;

Entry(LeafReaderContext context, PackedLongValues docDeltas, PackedLongValues buckets) {
this.context = context;
this.docDeltas = docDeltas;
this.buckets = buckets;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.bucket.histogram;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.rounding.DateTimeUnit;
import org.elasticsearch.common.rounding.Rounding;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSource.Numeric;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceParserHelper;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Objects;

public class AutoDateHistogramAggregationBuilder
extends ValuesSourceAggregationBuilder<ValuesSource.Numeric, AutoDateHistogramAggregationBuilder> {

public static final String NAME = "auto_date_histogram";

public static final ParseField NUM_BUCKETS_FIELD = new ParseField("buckets");

private static final ObjectParser<AutoDateHistogramAggregationBuilder, Void> PARSER;
static {
PARSER = new ObjectParser<>(AutoDateHistogramAggregationBuilder.NAME);
ValuesSourceParserHelper.declareNumericFields(PARSER, true, true, true);

PARSER.declareInt(AutoDateHistogramAggregationBuilder::setNumBuckets, NUM_BUCKETS_FIELD);
}

public static AutoDateHistogramAggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException {
return PARSER.parse(parser, new AutoDateHistogramAggregationBuilder(aggregationName), null);
}

private int numBuckets = 10;

/** Create a new builder with the given name. */
public AutoDateHistogramAggregationBuilder(String name) {
super(name, ValuesSourceType.NUMERIC, ValueType.DATE);
}

/** Read from a stream, for internal use only. */
public AutoDateHistogramAggregationBuilder(StreamInput in) throws IOException {
super(in, ValuesSourceType.NUMERIC, ValueType.DATE);
numBuckets = in.readVInt();
}

@Override
protected void innerWriteTo(StreamOutput out) throws IOException {
out.writeVInt(numBuckets);
}

@Override
public String getType() {
return NAME;
}

public AutoDateHistogramAggregationBuilder setNumBuckets(int numBuckets) {
if (numBuckets <= 0) {
throw new IllegalArgumentException(NUM_BUCKETS_FIELD.getPreferredName() + " must be greater than 0 for [" + name + "]");
}
this.numBuckets = numBuckets;
return this;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should probably be > 1 actually, no? A single bucket is not useful for a histogram.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a soft limit for the number of buckets too, something like 1000?

}

public int getNumBuckets() {
return numBuckets;
}

@Override
protected ValuesSourceAggregatorFactory<Numeric, ?> innerBuild(SearchContext context, ValuesSourceConfig<Numeric> config,
AggregatorFactory<?> parent, Builder subFactoriesBuilder) throws IOException {
Rounding[] roundings = new Rounding[6];
roundings[0] = createRounding(DateTimeUnit.SECOND_OF_MINUTE);
roundings[1] = createRounding(DateTimeUnit.MINUTES_OF_HOUR);
roundings[2] = createRounding(DateTimeUnit.HOUR_OF_DAY);
roundings[3] = createRounding(DateTimeUnit.DAY_OF_MONTH);
roundings[4] = createRounding(DateTimeUnit.MONTH_OF_YEAR);
roundings[5] = createRounding(DateTimeUnit.YEAR_OF_CENTURY);
return new AutoDateHistogramAggregatorFactory(name, config, numBuckets, roundings, context, parent, subFactoriesBuilder, metaData);
}

private Rounding createRounding(DateTimeUnit interval) {
Rounding.Builder tzRoundingBuilder = Rounding.builder(interval);
if (timeZone() != null) {
tzRoundingBuilder.timeZone(timeZone());
}
Rounding rounding = tzRoundingBuilder.build();
return rounding;
}

@Override
protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(NUM_BUCKETS_FIELD.getPreferredName(), numBuckets);
return builder;
}

@Override
protected int innerHashCode() {
return Objects.hash(numBuckets);
}

@Override
protected boolean innerEquals(Object obj) {
AutoDateHistogramAggregationBuilder other = (AutoDateHistogramAggregationBuilder) obj;
return Objects.equals(numBuckets, other.numBuckets);
}
}
Loading