Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ default List<QuerySpec<?>> getQueries() {
default List<AggregationSpec> getAggregations() {
return emptyList();
}
/**
* Allows plugins to register new aggregations using aggregation names that are already defined
* in Core, as long as the new aggregations target different ValuesSourceTypes
*/
default List<Consumer<ValuesSourceRegistry>> getBareAggregatorRegistrar() {
return emptyList();
}
/**
* The new {@link PipelineAggregator}s added by this plugin.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,9 @@ private void registerAggregations(List<SearchPlugin> plugins) {
registerAggregation((new AggregationSpec(CompositeAggregationBuilder.NAME, CompositeAggregationBuilder::new,
CompositeAggregationBuilder.PARSER).addResultReader(InternalComposite::new)));
registerFromPlugin(plugins, SearchPlugin::getAggregations, this::registerAggregation);

// after aggs have been registered, see if there are any new VSTypes that need to be linked to core fields
registerFromPlugin(plugins, SearchPlugin::getBareAggregatorRegistrar, this::registerBareAggregatorRegistrar);
}

private void registerAggregation(AggregationSpec spec) {
Expand All @@ -478,6 +481,12 @@ private void registerAggregation(AggregationSpec spec) {
}
}

private void registerBareAggregatorRegistrar(Consumer<ValuesSourceRegistry> registrar) {
if (registrar != null) {
registrar.accept(this.valuesSourceRegistry);
}
}

private void registerPipelineAggregations(List<SearchPlugin> plugins) {
registerPipelineAggregation(new PipelineAggregationSpec(
DerivativePipelineAggregationBuilder.NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.index.fielddata.HistogramValue;
import org.elasticsearch.index.fielddata.HistogramValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
Expand Down Expand Up @@ -78,18 +76,8 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
if (valuesSource instanceof ValuesSource.Histogram) {
final HistogramValues values = ((ValuesSource.Histogram)valuesSource).getHistogramValues(ctx);
return collectHistogramValues(values, bigArrays, sub);
} else {
final SortedNumericDoubleValues values = ((ValuesSource.Numeric)valuesSource).doubleValues(ctx);
return collectNumeric(values, bigArrays, sub);
}

}

private LeafBucketCollector collectNumeric(final SortedNumericDoubleValues values,
final BigArrays bigArrays, final LeafBucketCollector sub) {
final SortedNumericDoubleValues values = ((ValuesSource.Numeric)valuesSource).doubleValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
Expand All @@ -102,35 +90,21 @@ public void collect(int doc, long bucket) throws IOException {
}
}
};
}

private LeafBucketCollector collectHistogramValues(final HistogramValues values,
final BigArrays bigArrays, final LeafBucketCollector sub) {
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
DoubleHistogram state = getExistingOrNewHistogram(bigArrays, bucket);
if (values.advanceExact(doc)) {
final HistogramValue sketch = values.histogram();
while (sketch.next()) {
state.recordValueWithCount(sketch.value(), sketch.count());
}
}
}
};
}

private DoubleHistogram getExistingOrNewHistogram(final BigArrays bigArrays, long bucket) {
states = bigArrays.grow(states, bucket + 1);
DoubleHistogram state = states.get(bucket);
if (state == null) {
state = new DoubleHistogram(numberOfSignificantValueDigits);
// Set the histogram to autosize so it can resize itself as
// the data range increases. Resize operations should be
// rare as the histogram buckets are exponential (on the top
// level). In the future we could expose the range as an
// option on the request so the histogram can be fixed at
// initialisation and doesn't need resizing.
/* Set the histogram to autosize so it can resize itself as
the data range increases. Resize operations should be
rare as the histogram buckets are exponential (on the top
level). In the future we could expose the range as an
option on the request so the histogram can be fixed at
initialisation and doesn't need resizing.
*/
state.setAutoResize(true);
states.set(bucket, state);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.index.fielddata.HistogramValue;
import org.elasticsearch.index.fielddata.HistogramValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
Expand Down Expand Up @@ -77,18 +75,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
return LeafBucketCollector.NO_OP_COLLECTOR;
}
final BigArrays bigArrays = context.bigArrays();
if (valuesSource instanceof ValuesSource.Histogram) {
final HistogramValues values = ((ValuesSource.Histogram)valuesSource).getHistogramValues(ctx);
return collectHistogramValues(values, bigArrays, sub);
} else {
final SortedNumericDoubleValues values = ((ValuesSource.Numeric)valuesSource).doubleValues(ctx);
return collectNumeric(values, bigArrays, sub);
}

}

private LeafBucketCollector collectNumeric(final SortedNumericDoubleValues values,
final BigArrays bigArrays, final LeafBucketCollector sub) {
final SortedNumericDoubleValues values = ((ValuesSource.Numeric)valuesSource).doubleValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
Expand All @@ -103,22 +90,6 @@ public void collect(int doc, long bucket) throws IOException {
};
}

private LeafBucketCollector collectHistogramValues(final HistogramValues values,
final BigArrays bigArrays, final LeafBucketCollector sub) {
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
TDigestState state = getExistingOrNewHistogram(bigArrays, bucket);
if (values.advanceExact(doc)) {
final HistogramValue sketch = values.histogram();
while(sketch.next()) {
state.add(sketch.value(), sketch.count());
}
}
}
};
}

private TDigestState getExistingOrNewHistogram(final BigArrays bigArrays, long bucket) {
states = bigArrays.grow(states, bucket + 1);
TDigestState state = states.get(bucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
public class InternalHDRPercentileRanks extends AbstractInternalHDRPercentiles implements PercentileRanks {
public static final String NAME = "hdr_percentile_ranks";

InternalHDRPercentileRanks(String name, double[] cdfValues, DoubleHistogram state, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
public InternalHDRPercentileRanks(String name, double[] cdfValues, DoubleHistogram state, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, cdfValues, state, keyed, formatter, pipelineAggregators, metaData);
}

Expand Down Expand Up @@ -74,7 +74,7 @@ protected AbstractInternalHDRPercentiles createReduced(String name, double[] key
return new InternalHDRPercentileRanks(name, keys, merged, keyed, format, pipelineAggregators, metaData);
}

static double percentileRank(DoubleHistogram state, double value) {
public static double percentileRank(DoubleHistogram state, double value) {
if (state.getTotalCount() == 0) {
return Double.NaN;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
public class InternalHDRPercentiles extends AbstractInternalHDRPercentiles implements Percentiles {
public static final String NAME = "hdr_percentiles";

InternalHDRPercentiles(String name, double[] percents, DoubleHistogram state, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
public InternalHDRPercentiles(String name, double[] percents, DoubleHistogram state, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, percents, state, keyed, formatter, pipelineAggregators, metaData);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
public class InternalTDigestPercentileRanks extends AbstractInternalTDigestPercentiles implements PercentileRanks {
public static final String NAME = "tdigest_percentile_ranks";

InternalTDigestPercentileRanks(String name, double[] cdfValues, TDigestState state, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
public InternalTDigestPercentileRanks(String name, double[] cdfValues, TDigestState state, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, cdfValues, state, keyed, formatter, pipelineAggregators, metaData);
}

Expand Down Expand Up @@ -73,7 +73,7 @@ protected AbstractInternalTDigestPercentiles createReduced(String name, double[]
return new InternalTDigestPercentileRanks(name, keys, merged, keyed, format, pipelineAggregators, metaData);
}

static double percentileRank(TDigestState state, double value) {
public static double percentileRank(TDigestState state, double value) {
double percentileRank = state.cdf(value);
if (percentileRank < 0) {
percentileRank = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
public class InternalTDigestPercentiles extends AbstractInternalTDigestPercentiles implements Percentiles {
public static final String NAME = "tdigest_percentiles";

InternalTDigestPercentiles(String name, double[] percents, TDigestState state, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
public InternalTDigestPercentiles(String name, double[] percents, TDigestState state, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, percents, state, keyed, formatter, pipelineAggregators, metaData);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class PercentileRanksAggregatorFactory extends ValuesSourceAggregatorFactory {

static void registerAggregators(ValuesSourceRegistry valuesSourceRegistry) {
valuesSourceRegistry.register(PercentileRanksAggregationBuilder.NAME,
List.of(CoreValuesSourceType.NUMERIC, CoreValuesSourceType.HISTOGRAM, CoreValuesSourceType.DATE, CoreValuesSourceType.BOOLEAN),
List.of(CoreValuesSourceType.NUMERIC, CoreValuesSourceType.DATE, CoreValuesSourceType.BOOLEAN),
new PercentilesAggregatorSupplier() {
@Override
public Aggregator build(String name, ValuesSource valuesSource, SearchContext context, Aggregator parent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class PercentilesAggregatorFactory extends ValuesSourceAggregatorFactory {

static void registerAggregators(ValuesSourceRegistry valuesSourceRegistry) {
valuesSourceRegistry.register(PercentilesAggregationBuilder.NAME,
List.of(CoreValuesSourceType.NUMERIC, CoreValuesSourceType.HISTOGRAM, CoreValuesSourceType.DATE, CoreValuesSourceType.BOOLEAN),
List.of(CoreValuesSourceType.NUMERIC, CoreValuesSourceType.DATE, CoreValuesSourceType.BOOLEAN),
new PercentilesAggregatorSupplier() {
@Override
public Aggregator build(String name, ValuesSource valuesSource, SearchContext context, Aggregator parent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ public PercentilesMethod getMethod() {
return method;
}

abstract Aggregator createPercentilesAggregator(String name, ValuesSource valuesSource, SearchContext context, Aggregator parent,
double[] values, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException;
public abstract Aggregator createPercentilesAggregator(String name, ValuesSource valuesSource, SearchContext context, Aggregator parent,
double[] values, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException;

abstract Aggregator createPercentileRanksAggregator(String name, ValuesSource valuesSource, SearchContext context,
Aggregator parent, double[] values, boolean keyed,
Expand Down Expand Up @@ -102,11 +102,11 @@ public static class TDigest extends PercentilesConfig {
static final double DEFAULT_COMPRESSION = 100.0;
private double compression;

TDigest() {
public TDigest() {
this(DEFAULT_COMPRESSION);
}

TDigest(double compression) {
public TDigest(double compression) {
super(PercentilesMethod.TDIGEST);
setCompression(compression);
}
Expand All @@ -128,10 +128,10 @@ public double getCompression() {
}

@Override
Aggregator createPercentilesAggregator(String name, ValuesSource valuesSource, SearchContext context, Aggregator parent,
double[] values, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
public Aggregator createPercentilesAggregator(String name, ValuesSource valuesSource, SearchContext context, Aggregator parent,
double[] values, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new TDigestPercentilesAggregator(name, valuesSource, context, parent, values, compression, keyed, formatter,
pipelineAggregators, metaData);
}
Expand Down Expand Up @@ -179,11 +179,11 @@ public static class Hdr extends PercentilesConfig {
static final int DEFAULT_NUMBER_SIG_FIGS = 3;
private int numberOfSignificantValueDigits;

Hdr() {
public Hdr() {
this(DEFAULT_NUMBER_SIG_FIGS);
}

Hdr(int numberOfSignificantValueDigits) {
public Hdr(int numberOfSignificantValueDigits) {
super(PercentilesMethod.HDR);
setNumberOfSignificantValueDigits(numberOfSignificantValueDigits);
}
Expand All @@ -204,10 +204,10 @@ public int getNumberOfSignificantValueDigits() {
}

@Override
Aggregator createPercentilesAggregator(String name, ValuesSource valuesSource, SearchContext context, Aggregator parent,
double[] values, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
public Aggregator createPercentilesAggregator(String name, ValuesSource valuesSource, SearchContext context, Aggregator parent,
double[] values, boolean keyed, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
return new HDRPercentilesAggregator(name, valuesSource, context, parent, values, numberOfSignificantValueDigits, keyed,
formatter, pipelineAggregators, metaData);
}
Expand Down
Loading