diff --git a/docs/reference/aggregations/metrics.asciidoc b/docs/reference/aggregations/metrics.asciidoc
index ea3195567ce67..cb961d735123b 100644
--- a/docs/reference/aggregations/metrics.asciidoc
+++ b/docs/reference/aggregations/metrics.asciidoc
@@ -23,6 +23,8 @@ include::metrics/geobounds-aggregation.asciidoc[]
include::metrics/geocentroid-aggregation.asciidoc[]
+include::metrics/geoline-aggregation.asciidoc[]
+
include::metrics/matrix-stats-aggregation.asciidoc[]
include::metrics/max-aggregation.asciidoc[]
diff --git a/docs/reference/aggregations/metrics/geoline-aggregation.asciidoc b/docs/reference/aggregations/metrics/geoline-aggregation.asciidoc
new file mode 100644
index 0000000000000..4d02ba6346cd1
--- /dev/null
+++ b/docs/reference/aggregations/metrics/geoline-aggregation.asciidoc
@@ -0,0 +1,143 @@
+[role="xpack"]
+[testenv="gold"]
+[[search-aggregations-metrics-geo-line]]
+=== Geo-Line Aggregation
+++++
+Geo-Line
+++++
+
+The `geo_line` aggregation aggregates all `geo_point` values within a bucket into a LineString ordered
+by the chosen `sort` field. This `sort` can be a date field, for example. The bucket returned is a valid
+https://tools.ietf.org/html/rfc7946#section-3.2[GeoJSON Feature] representing the line geometry.
+
+[source,console,id=search-aggregations-metrics-geo-line-simple]
+----
+PUT test
+{
+ "mappings": {
+ "dynamic": "strict",
+ "_source": {
+ "enabled": false
+ },
+ "properties": {
+ "my_location": {
+ "type": "geo_point"
+ },
+ "group": {
+ "type": "keyword"
+ },
+ "@timestamp": {
+ "type": "date"
+ }
+ }
+ }
+}
+
+POST /test/_bulk?refresh
+{"index": {}}
+{"my_location": {"lat":37.3450570, "lon": -122.0499820}, "@timestamp": "2013-09-06T16:00:36"}
+{"index": {}}
+{"my_location": {"lat": 37.3451320, "lon": -122.0499820}, "@timestamp": "2013-09-06T16:00:37Z"}
+{"index": {}}
+{"my_location": {"lat": 37.349283, "lon": -122.0505010}, "@timestamp": "2013-09-06T16:00:37Z"}
+
+POST /test/_search?filter_path=aggregations
+{
+ "aggs": {
+ "line": {
+ "geo_line": {
+ "point": {"field": "my_location"},
+ "sort": {"field": "@timestamp"}
+ }
+ }
+ }
+}
+----
+
+Which returns:
+
+[source,js]
+----
+{
+ "aggregations": {
+ "line": {
+ "type" : "Feature",
+ "geometry" : {
+ "type" : "LineString",
+ "coordinates" : [
+ [
+ -122.049982,
+ 37.345057
+ ],
+ [
+ -122.050501,
+ 37.349283
+ ],
+ [
+ -122.049982,
+ 37.345132
+ ]
+ ]
+ },
+ "properties" : {
+ "complete" : true
+ }
+ }
+ }
+}
+----
+// TESTRESPONSE
+
+[[search-aggregations-metrics-geo-line-options]]
+==== Options
+
+`point`::
+(Required)
+
+This option specifies the name of the `geo_point` field
+
+Example usage configuring `my_location` as the point field:
+
+[source,js]
+----
+"point": {
+ "field": "my_location"
+}
+----
+// NOTCONSOLE
+
+`sort`::
+(Required)
+
+This option specifies the name of the numeric field to use as the sort key
+for ordering the points
+
+Example usage configuring `@timestamp` as the sort key:
+
+[source,js]
+----
+"point": {
+ "field": "@timestamp"
+}
+----
+// NOTCONSOLE
+
+`include_sort`::
+(Optional, boolean, default: `false`)
+
+This option includes, when true, an additional array of the sort values in the
+feature properties.
+
+`sort_order`::
+(Optional, string, default: `"ASC"`)
+
+This option accepts one of two values: "ASC", "DESC".
+
+The line is sorted in ascending order by the sort key when set to "ASC", and in descending
+with "DESC".
+
+`size`::
+(Optional, integer, default: `10000`)
+
+The maximum length of the line represented in the aggregation. Valid sizes are
+between one and 10000.
diff --git a/server/src/main/java/org/elasticsearch/search/sort/BucketedSort.java b/server/src/main/java/org/elasticsearch/search/sort/BucketedSort.java
index 79c015791641a..ac696dc30f678 100644
--- a/server/src/main/java/org/elasticsearch/search/sort/BucketedSort.java
+++ b/server/src/main/java/org/elasticsearch/search/sort/BucketedSort.java
@@ -66,7 +66,7 @@
* worst case. Critically, it is a very fast {@code O(1)} to check if a value
* is competitive at all which, so long as buckets aren't hit in reverse
* order, they mostly won't be. Extracting results in sorted order is still
- * {@code O(n * log n)}.
+ * {@code O(n * log n)}.
*
*
* When we first collect a bucket we make sure that we've allocated enough
@@ -90,7 +90,7 @@ public interface ExtraData {
*
* Both parameters will have previously been loaded by
* {@link Loader#loadFromDoc(long, int)} so the implementer shouldn't
- * need to grow the underlying storage to implement this.
+ * need to grow the underlying storage to implement this.
*
*/
void swap(long lhs, long rhs);
@@ -128,7 +128,7 @@ public Loader loader(LeafReaderContext ctx) throws IOException {
private final SortOrder order;
private final DocValueFormat format;
private final int bucketSize;
- private final ExtraData extra;
+ protected final ExtraData extra;
/**
* {@code true} if the bucket is in heap mode, {@code false} if
* it is still gathering.
@@ -206,9 +206,9 @@ public final List getValues(long bucket) throws IOException {
}
/**
- * Is this bucket a min heap {@code true} or in gathering mode {@code false}?
+ * Is this bucket a min heap {@code true} or in gathering mode {@code false}?
*/
- private boolean inHeapMode(long bucket) {
+ public boolean inHeapMode(long bucket) {
return heapMode.get(bucket);
}
@@ -254,7 +254,7 @@ private boolean inHeapMode(long bucket) {
/**
* {@code true} if the entry at index {@code lhs} is "better" than
* the entry at {@code rhs}. "Better" in this means "lower" for
- * {@link SortOrder#ASC} and "higher" for {@link SortOrder#DESC}.
+ * {@link SortOrder#ASC} and "higher" for {@link SortOrder#DESC}.
*/
protected abstract boolean betterThan(long lhs, long rhs);
@@ -283,7 +283,7 @@ protected final String debugFormat() {
/**
* Initialize the gather offsets after setting up values. Subclasses
- * should call this once, after setting up their {@link #values()}.
+ * should call this once, after setting up their {@link #values()}.
*/
protected final void initGatherOffsets() {
setNextGatherOffsets(0);
@@ -325,12 +325,12 @@ private void setNextGatherOffsets(long startingAt) {
* case.
*
*
- * @param rootIndex the index the start of the bucket
+ * @param rootIndex the index the start of the bucket
*/
private void heapify(long rootIndex) {
int maxParent = bucketSize / 2 - 1;
@@ -344,7 +344,7 @@ private void heapify(long rootIndex) {
* runs in {@code O(log n)} time.
* @param rootIndex index of the start of the bucket
* @param parent Index within the bucket of the parent to check.
- * For example, 0 is the "root".
+ * For example, 0 is the "root".
*/
private void downHeap(long rootIndex, int parent) {
while (true) {
@@ -443,7 +443,7 @@ public final void collect(int doc, long bucket) throws IOException {
/**
* {@code true} if the sort value for the doc is "better" than the
* entry at {@code index}. "Better" in means is "lower" for
- * {@link SortOrder#ASC} and "higher" for {@link SortOrder#DESC}.
+ * {@link SortOrder#ASC} and "higher" for {@link SortOrder#DESC}.
*/
protected abstract boolean docBetterThan(long index);
@@ -545,7 +545,7 @@ public abstract static class ForFloats extends BucketedSort {
* The maximum size of buckets this can store. This is because we
* store the next offset to write to in a float and floats only have
* {@code 23} bits of mantissa so they can't accurate store values
- * higher than {@code 2 ^ 24}.
+ * higher than {@code 2 ^ 24}.
*/
public static final int MAX_BUCKET_SIZE = (int) Math.pow(2, 24);
diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregator.java
index 2df02aff67312..4f38d1053d7ca 100644
--- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregator.java
+++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregator.java
@@ -32,6 +32,7 @@
import org.elasticsearch.search.sort.BucketedSort;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortValue;
+import org.elasticsearch.xpack.core.common.search.aggregations.MissingHelper;
import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics.MetricValue;
import java.io.IOException;
@@ -495,62 +496,4 @@ public Loader loader(LeafReaderContext ctx) throws IOException {
public void close() {}
}
- /**
- * Helps {@link LongMetricValues} track "empty" slots. It attempts to have
- * very low CPU overhead and no memory overhead when there *aren't* empty
- * values.
- */
- private static class MissingHelper implements Releasable {
- private final BigArrays bigArrays;
- private BitArray tracker;
-
- MissingHelper(BigArrays bigArrays) {
- this.bigArrays = bigArrays;
- }
-
- void markMissing(long index) {
- if (tracker == null) {
- tracker = new BitArray(index, bigArrays);
- }
- tracker.set(index);
- }
-
- void markNotMissing(long index) {
- if (tracker == null) {
- return;
- }
- tracker.clear(index);
- }
-
- void swap(long lhs, long rhs) {
- if (tracker == null) {
- return;
- }
- boolean backup = tracker.get(lhs);
- if (tracker.get(rhs)) {
- tracker.set(lhs);
- } else {
- tracker.clear(lhs);
- }
- if (backup) {
- tracker.set(rhs);
- } else {
- tracker.clear(rhs);
- }
- }
-
- boolean isEmpty(long index) {
- if (tracker == null) {
- return false;
- }
- return tracker.get(index);
- }
-
- @Override
- public void close() {
- if (tracker != null) {
- tracker.close();
- }
- }
- }
}
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java
index 7927c575e3703..8099bcfc8b7f0 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java
@@ -98,6 +98,8 @@ public enum Feature {
SPATIAL_GEO_GRID(OperationMode.GOLD, true),
+ SPATIAL_GEO_LINE(OperationMode.GOLD, true),
+
ANALYTICS(OperationMode.MISSING, true),
AGGREGATE_METRIC(OperationMode.MISSING, true),
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/search/aggregations/MissingHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/search/aggregations/MissingHelper.java
new file mode 100644
index 0000000000000..6120d11b06809
--- /dev/null
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/search/aggregations/MissingHelper.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.common.search.aggregations;
+
+import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.BitArray;
+
+/**
+ * Helps long-valued {@link org.elasticsearch.search.sort.BucketedSort.ExtraData} track "empty" slots. It attempts to have
+ * very low CPU overhead and no memory overhead when there *aren't* empty
+ * values.
+ */
+public class MissingHelper implements Releasable {
+ private final BigArrays bigArrays;
+ private BitArray tracker;
+
+ public MissingHelper(BigArrays bigArrays) {
+ this.bigArrays = bigArrays;
+ }
+
+ public void markMissing(long index) {
+ if (tracker == null) {
+ tracker = new BitArray(index, bigArrays);
+ }
+ tracker.set(index);
+ }
+
+ public void markNotMissing(long index) {
+ if (tracker == null) {
+ return;
+ }
+ tracker.clear(index);
+ }
+
+ public void swap(long lhs, long rhs) {
+ if (tracker == null) {
+ return;
+ }
+ boolean backup = tracker.get(lhs);
+ if (tracker.get(rhs)) {
+ tracker.set(lhs);
+ } else {
+ tracker.clear(lhs);
+ }
+ if (backup) {
+ tracker.set(rhs);
+ } else {
+ tracker.clear(rhs);
+ }
+ }
+
+ public boolean isEmpty(long index) {
+ if (tracker == null) {
+ return false;
+ }
+ return tracker.get(index);
+ }
+
+ @Override
+ public void close() {
+ if (tracker != null) {
+ tracker.close();
+ }
+ }
+}
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/spatial/action/SpatialStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/spatial/action/SpatialStatsAction.java
index eb50c6df53f76..eecf442a2e18c 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/spatial/action/SpatialStatsAction.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/spatial/action/SpatialStatsAction.java
@@ -38,6 +38,7 @@ private SpatialStatsAction() {
* Items to track. Serialized by ordinals. Append only, don't remove or change order of items in this list.
*/
public enum Item {
+ GEOLINE
}
public static class Request extends BaseNodesRequest implements ToXContentObject {
@@ -115,9 +116,15 @@ public EnumCounters- getStats() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
EnumCounters
- stats = getStats();
- builder.startObject("stats");
- for (Item item : Item.values()) {
- builder.field(item.name().toLowerCase(Locale.ROOT) + "_usage", stats.get(item));
+ builder.startObject();
+ {
+ builder.startObject("stats");
+ {
+ for (Item item : Item.values()) {
+ builder.field(item.name().toLowerCase(Locale.ROOT) + "_usage", stats.get(item));
+ }
+ }
+ builder.endObject();
}
builder.endObject();
return builder;
diff --git a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/SpatialPlugin.java b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/SpatialPlugin.java
index b3330e092948f..30fe89b6fcfde 100644
--- a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/SpatialPlugin.java
+++ b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/SpatialPlugin.java
@@ -8,6 +8,7 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.inject.Module;
+import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.geo.GeoPlugin;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.ingest.Processor;
@@ -34,6 +35,8 @@
import org.elasticsearch.xpack.spatial.index.mapper.ShapeFieldMapper;
import org.elasticsearch.xpack.spatial.index.query.ShapeQueryBuilder;
import org.elasticsearch.xpack.spatial.ingest.CircleProcessor;
+import org.elasticsearch.xpack.spatial.search.aggregations.GeoLineAggregationBuilder;
+import org.elasticsearch.xpack.spatial.search.aggregations.InternalGeoLine;
import org.elasticsearch.xpack.spatial.search.aggregations.bucket.geogrid.BoundedGeoHashGridTiler;
import org.elasticsearch.xpack.spatial.search.aggregations.bucket.geogrid.BoundedGeoTileGridTiler;
import org.elasticsearch.xpack.spatial.search.aggregations.bucket.geogrid.GeoGridTiler;
@@ -57,6 +60,7 @@
import static java.util.Collections.singletonList;
public class SpatialPlugin extends GeoPlugin implements MapperPlugin, ActionPlugin, SearchPlugin, IngestPlugin {
+ private final SpatialUsage usage = new SpatialUsage();
public Collection createGuiceModules() {
return Collections.singletonList(b -> {
@@ -99,6 +103,18 @@ public List> getAggregationExtentions() {
);
}
+ @Override
+ public List getAggregations() {
+ return singletonList(
+ new AggregationSpec(
+ GeoLineAggregationBuilder.NAME,
+ GeoLineAggregationBuilder::new,
+ usage.track(SpatialStatsAction.Item.GEOLINE,
+ checkLicense(GeoLineAggregationBuilder.PARSER, XPackLicenseState.Feature.SPATIAL_GEO_LINE)))
+ .addResultReader(InternalGeoLine::new)
+ .setAggregatorRegistrar(GeoLineAggregationBuilder::registerUsage));
+ }
+
@Override
public Map getProcessors(Processor.Parameters parameters) {
return Collections.singletonMap(CircleProcessor.TYPE, new CircleProcessor.Factory());
@@ -179,4 +195,13 @@ private static void registerValueCountAggregator(ValuesSourceRegistry.Builder bu
private static void registerCardinalityAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(CardinalityAggregationBuilder.REGISTRY_KEY, GeoShapeValuesSourceType.instance(), CardinalityAggregator::new, true);
}
+
+ private ContextParser checkLicense(ContextParser realParser, XPackLicenseState.Feature feature) {
+ return (parser, name) -> {
+ if (getLicenseState().checkFeature(feature) == false) {
+ throw LicenseUtils.newComplianceException(feature.name());
+ }
+ return realParser.parse(parser, name);
+ };
+ }
}
diff --git a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregationBuilder.java b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregationBuilder.java
new file mode 100644
index 0000000000000..30e87ea5f6b05
--- /dev/null
+++ b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregationBuilder.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregationBuilder;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregatorFactory;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceParseHelper;
+import org.elasticsearch.search.aggregations.support.ValueType;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
+import org.elasticsearch.search.aggregations.support.ValuesSourceType;
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+public class GeoLineAggregationBuilder
+ extends MultiValuesSourceAggregationBuilder.LeafOnly {
+
+ static final ParseField POINT_FIELD = new ParseField("point");
+ static final ParseField SORT_FIELD = new ParseField("sort");
+ static final ParseField ORDER_FIELD = new ParseField("sort_order");
+ static final ParseField INCLUDE_SORT_FIELD = new ParseField("include_sort");
+ static final ParseField SIZE_FIELD = new ParseField("size");
+
+ public static final String NAME = "geo_line";
+
+ public static final ObjectParser PARSER =
+ ObjectParser.fromBuilder(NAME, GeoLineAggregationBuilder::new);
+ static {
+ MultiValuesSourceParseHelper.declareCommon(PARSER, true, ValueType.NUMERIC);
+ MultiValuesSourceParseHelper.declareField(POINT_FIELD.getPreferredName(), PARSER, true, false, false);
+ MultiValuesSourceParseHelper.declareField(SORT_FIELD.getPreferredName(), PARSER, true, false, false);
+ PARSER.declareString((builder, order) -> builder.sortOrder(SortOrder.fromString(order)), ORDER_FIELD);
+ PARSER.declareBoolean(GeoLineAggregationBuilder::includeSort, INCLUDE_SORT_FIELD);
+ PARSER.declareInt(GeoLineAggregationBuilder::size, SIZE_FIELD);
+ }
+
+ private boolean includeSort;
+ private SortOrder sortOrder = SortOrder.ASC;
+ private int size = MAX_PATH_SIZE;
+ static final int MAX_PATH_SIZE = 10000;
+
+ public static void registerUsage(ValuesSourceRegistry.Builder builder) {
+ builder.registerUsage(NAME, CoreValuesSourceType.GEOPOINT);
+ }
+
+ public GeoLineAggregationBuilder(String name) {
+ super(name);
+ }
+
+ private GeoLineAggregationBuilder(GeoLineAggregationBuilder clone,
+ AggregatorFactories.Builder factoriesBuilder, Map metaData) {
+ super(clone, factoriesBuilder, metaData);
+ }
+
+ /**
+ * Read from a stream.
+ */
+ public GeoLineAggregationBuilder(StreamInput in) throws IOException {
+ super(in);
+ sortOrder = SortOrder.readFromStream(in);
+ includeSort = in.readBoolean();
+ size = in.readVInt();
+ }
+
+ public GeoLineAggregationBuilder includeSort(boolean includeSort) {
+ this.includeSort = includeSort;
+ return this;
+ }
+
+ public GeoLineAggregationBuilder sortOrder(SortOrder sortOrder) {
+ this.sortOrder = sortOrder;
+ return this;
+ }
+
+ public GeoLineAggregationBuilder size(int size) {
+ if (size <= 0 || size > MAX_PATH_SIZE) {
+ throw new IllegalArgumentException("invalid [size] value [" + size + "] must be a positive integer <= "
+ + MAX_PATH_SIZE);
+ }
+ this.size = size;
+ return this;
+ }
+
+ @Override
+ protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map metaData) {
+ return new GeoLineAggregationBuilder(this, factoriesBuilder, metaData);
+ }
+
+ @Override
+ public BucketCardinality bucketCardinality() {
+ return BucketCardinality.NONE;
+ }
+
+ @Override
+ protected void innerWriteTo(StreamOutput out) throws IOException {
+ sortOrder.writeTo(out);
+ out.writeBoolean(includeSort);
+ out.writeVInt(size);
+ }
+
+ @Override
+ protected ValuesSourceType defaultValueSourceType() {
+ return CoreValuesSourceType.NUMERIC;
+ }
+
+ @Override
+ protected MultiValuesSourceAggregatorFactory innerBuild(AggregationContext aggregationContext,
+ Map configs,
+ Map filters,
+ DocValueFormat format,
+ AggregatorFactory parent,
+ AggregatorFactories.Builder subFactoriesBuilder) throws IOException {
+ return new GeoLineAggregatorFactory(name, configs, format, aggregationContext, parent, subFactoriesBuilder, metadata,
+ includeSort, sortOrder, size);
+ }
+
+ public GeoLineAggregationBuilder point(MultiValuesSourceFieldConfig pointConfig) {
+ pointConfig = Objects.requireNonNull(pointConfig, "Configuration for field [" + POINT_FIELD + "] cannot be null");
+ field(POINT_FIELD.getPreferredName(), pointConfig);
+ return this;
+ }
+
+ public GeoLineAggregationBuilder sort(MultiValuesSourceFieldConfig sortConfig) {
+ sortConfig = Objects.requireNonNull(sortConfig, "Configuration for field [" + SORT_FIELD + "] cannot be null");
+ field(SORT_FIELD.getPreferredName(), sortConfig);
+ return this;
+ }
+
+ @Override
+ public XContentBuilder doXContentBody(XContentBuilder builder, ToXContent.Params params) {
+ return builder;
+ }
+
+ @Override
+ public String getType() {
+ return NAME;
+ }
+}
diff --git a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregator.java b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregator.java
new file mode 100644
index 0000000000000..3417883f48465
--- /dev/null
+++ b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregator.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.ScoreMode;
+import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.aggregations.LeafBucketCollector;
+import org.elasticsearch.search.aggregations.metrics.MetricsAggregator;
+import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoLineMultiValuesSource;
+import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.search.sort.BucketedSort;
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Metric Aggregation for joining sorted geo_point values into a single path
+ **/
+final class GeoLineAggregator extends MetricsAggregator {
+ /** Multiple ValuesSource with field names */
+ private final GeoLineMultiValuesSource valuesSources;
+
+ private final GeoLineBucketedSort sort;
+ private final GeoLineBucketedSort.Extra extra;
+ private final boolean includeSorts;
+ private final SortOrder sortOrder;
+ private final int size;
+
+ GeoLineAggregator(String name, GeoLineMultiValuesSource valuesSources, SearchContext context,
+ Aggregator parent, Map metaData, boolean includeSorts, SortOrder sortOrder,
+ int size) throws IOException {
+ super(name, context, parent, metaData);
+ this.valuesSources = valuesSources;
+ if (valuesSources != null) {
+ this.extra = new GeoLineBucketedSort.Extra(context.bigArrays(), valuesSources);
+ this.sort = new GeoLineBucketedSort(context.bigArrays(), sortOrder, null, size, valuesSources, extra);
+ } else {
+ this.extra = null;
+ this.sort = null;
+ }
+ this.includeSorts = includeSorts;
+ this.sortOrder = sortOrder;
+ this.size = size;
+ }
+
+ @Override
+ public ScoreMode scoreMode() {
+ if (valuesSources != null && valuesSources.needsScores()) {
+ return ScoreMode.COMPLETE;
+ }
+ return super.scoreMode();
+ }
+
+ @Override
+ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
+ final LeafBucketCollector sub) throws IOException {
+ if (valuesSources == null) {
+ return LeafBucketCollector.NO_OP_COLLECTOR;
+ }
+ BucketedSort.Leaf leafSort = sort.forLeaf(ctx);
+
+ return new LeafBucketCollector(){
+ @Override
+ public void collect(int doc, long bucket) throws IOException {
+ leafSort.collect(doc, bucket);
+ }
+ };
+ }
+
+ @Override
+ public InternalAggregation buildAggregation(long bucket) {
+ if (valuesSources == null) {
+ return buildEmptyAggregation();
+ }
+ boolean complete = sort.inHeapMode(bucket) == false;
+ addRequestCircuitBreakerBytes((Double.SIZE + Long.SIZE) * sort.sizeOf(bucket));
+ double[] sortVals = sort.getSortValues(bucket);
+ long[] bucketLine = sort.getPoints(bucket);
+ new PathArraySorter(bucketLine, sortVals, sortOrder).sort();
+ return new InternalGeoLine(name, bucketLine, sortVals, metadata(), complete, includeSorts, sortOrder, size);
+ }
+
+ @Override
+ public InternalAggregation buildEmptyAggregation() {
+ return new InternalGeoLine(name, null, null, metadata(), true, includeSorts, sortOrder, size);
+ }
+
+ @Override
+ public void doClose() {
+ Releasables.close(sort, extra);
+ }
+}
diff --git a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorFactory.java b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorFactory.java
new file mode 100644
index 0000000000000..a2778a97f9a8c
--- /dev/null
+++ b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.Aggregator;
+import org.elasticsearch.search.aggregations.AggregatorFactories;
+import org.elasticsearch.search.aggregations.AggregatorFactory;
+import org.elasticsearch.search.aggregations.CardinalityUpperBound;
+import org.elasticsearch.search.aggregations.support.AggregationContext;
+import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoLineMultiValuesSource;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceAggregatorFactory;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.io.IOException;
+import java.util.Map;
+
+final class GeoLineAggregatorFactory extends MultiValuesSourceAggregatorFactory {
+
+ private boolean includeSort;
+ private SortOrder sortOrder;
+ private int size;
+
+ GeoLineAggregatorFactory(String name,
+ Map configs,
+ DocValueFormat format, AggregationContext aggregationContext, AggregatorFactory parent,
+ AggregatorFactories.Builder subFactoriesBuilder,
+ Map metaData, boolean includeSort, SortOrder sortOrder, int size) throws IOException {
+ super(name, configs, format, aggregationContext, parent, subFactoriesBuilder, metaData);
+ this.includeSort = includeSort;
+ this.sortOrder = sortOrder;
+ this.size = size;
+ }
+
+ @Override
+ protected Aggregator createUnmapped(SearchContext searchContext,
+ Aggregator parent,
+ Map metaData) throws IOException {
+ return new GeoLineAggregator(name, null, searchContext, parent, metaData, includeSort, sortOrder, size);
+ }
+
+ @Override
+ protected Aggregator doCreateInternal(SearchContext searchContext,
+ Map configs,
+ DocValueFormat format,
+ Aggregator parent,
+ CardinalityUpperBound cardinality,
+ Map metaData) throws IOException {
+ GeoLineMultiValuesSource valuesSources =
+ new GeoLineMultiValuesSource(configs, searchContext.getQueryShardContext());
+ return new GeoLineAggregator(name, valuesSources, searchContext, parent, metaData, includeSort, sortOrder, size);
+ }
+
+ @Override
+ public String getStatsSubtype() {
+ return configs.get(GeoLineAggregationBuilder.POINT_FIELD.getPreferredName()).valueSourceType().typeName();
+ }
+}
diff --git a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineBucketedSort.java b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineBucketedSort.java
new file mode 100644
index 0000000000000..b804b43a86ac2
--- /dev/null
+++ b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineBucketedSort.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.apache.lucene.geo.GeoEncodingUtils;
+import org.apache.lucene.index.LeafReaderContext;
+import org.elasticsearch.common.geo.GeoPoint;
+import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.DoubleArray;
+import org.elasticsearch.common.util.LongArray;
+import org.elasticsearch.index.fielddata.MultiGeoPointValues;
+import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.search.aggregations.AggregationExecutionException;
+import org.elasticsearch.xpack.spatial.search.aggregations.support.GeoLineMultiValuesSource;
+import org.elasticsearch.search.sort.BucketedSort;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.xpack.core.common.search.aggregations.MissingHelper;
+
+import java.io.IOException;
+
+import static org.elasticsearch.xpack.spatial.search.aggregations.GeoLineAggregationBuilder.SORT_FIELD;
+
+/**
+ * A bigArrays sorter of both a geo_line's sort-values and points.
+ *
+ * This class accumulates geo_points within buckets and heapifies the
+ * bucket based on whether there are too many items in the bucket that
+ * need to be dropped based on their sort value.
+ */
+public class GeoLineBucketedSort extends BucketedSort.ForDoubles {
+ private final GeoLineMultiValuesSource valuesSources;
+
+ public GeoLineBucketedSort(BigArrays bigArrays, SortOrder sortOrder, DocValueFormat format, int bucketSize,
+ GeoLineMultiValuesSource valuesSources, GeoLineBucketedSort.Extra extra) {
+ super(bigArrays, sortOrder, format, bucketSize, extra);
+ this.valuesSources = valuesSources;
+ }
+
+ public long sizeOf(long bucket) {
+ int bucketSize = getBucketSize();
+ long rootIndex = bucket * bucketSize;
+ if (rootIndex >= values().size()) {
+ // We've never seen this bucket.
+ return 0;
+ }
+ long start = inHeapMode(bucket) ? rootIndex : (rootIndex + getNextGatherOffset(rootIndex) + 1);
+ long end = rootIndex + bucketSize;
+ return end - start;
+ }
+
+ /**
+ * @param bucket the bucket ordinal
+ * @return the array of sort-values for the specific bucket. This array may not necessarily be heapified already, so no ordering is
+ * guaranteed.
+ */
+ public double[] getSortValues(long bucket) {
+ int bucketSize = getBucketSize();
+ long rootIndex = bucket * bucketSize;
+ if (rootIndex >= values().size()) {
+ // We've never seen this bucket.
+ return new double[]{};
+ }
+ long start = inHeapMode(bucket) ? rootIndex : (rootIndex + getNextGatherOffset(rootIndex) + 1);
+ long end = rootIndex + bucketSize;
+ double[] result = new double[(int)(end - start)];
+ int i = 0;
+ for (long index = start; index < end; index++) {
+ double timestampValue = ((DoubleArray)values()).get(index);
+ result[i++] = timestampValue;
+ }
+ return result;
+ }
+
+ /**
+ * @param bucket the bucket ordinal
+ * @return the array of points, ordered by the their respective sort-value for the specific bucket.
+ */
+ public long[] getPoints(long bucket) {
+ int bucketSize = getBucketSize();
+ long rootIndex = bucket * bucketSize;
+ if (rootIndex >= values().size()) {
+ // We've never seen this bucket.
+ return new long[]{};
+ }
+ long start = inHeapMode(bucket) ? rootIndex : (rootIndex + getNextGatherOffset(rootIndex) + 1);
+ long end = rootIndex + bucketSize;
+ long[] result = new long[(int)(end - start)];
+ int i = 0;
+ for (long index = start; index < end; index++) {
+ long geoPointValue = ((Extra) extra).values.get(index);
+ result[i++] = geoPointValue;
+ }
+ return result;
+ }
+
+ @Override
+ public BucketedSort.Leaf forLeaf(LeafReaderContext ctx) throws IOException {
+ return new BucketedSort.ForDoubles.Leaf(ctx) {
+ private final SortedNumericDoubleValues docSortValues = valuesSources.getNumericField(SORT_FIELD.getPreferredName(), ctx);
+ private double docValue;
+
+ @Override
+ protected boolean advanceExact(int doc) throws IOException {
+ if (docSortValues.advanceExact(doc)) {
+ if (docSortValues.docValueCount() > 1) {
+ throw new AggregationExecutionException("Encountered more than one sort value for a " +
+ "single document. Use a script to combine multiple sort-values-per-doc into a single value.");
+ }
+
+ // There should always be one weight if advanceExact lands us here, either
+ // a real weight or a `missing` weight
+ assert docSortValues.docValueCount() == 1;
+ docValue = docSortValues.nextValue();
+ return true;
+ } else {
+ docValue = Long.MIN_VALUE;
+ }
+ return false;
+ }
+
+ @Override
+ protected double docValue() {
+ return docValue;
+ }
+ };
+ }
+
+ /**
+ * An {@link BucketedSort.ExtraData} representing the geo-point for a document
+ * within a bucket.
+ */
+ static class Extra implements BucketedSort.ExtraData, Releasable {
+
+ private final BigArrays bigArrays;
+ private final GeoLineMultiValuesSource valuesSources;
+ private LongArray values;
+ private final MissingHelper empty;
+
+ Extra(BigArrays bigArrays, GeoLineMultiValuesSource valuesSources) {
+ this.bigArrays = bigArrays;
+ this.valuesSources = valuesSources;
+ this.values = bigArrays.newLongArray(1, false);
+ this.empty = new MissingHelper(bigArrays);
+ }
+
+ @Override
+ public void swap(long lhs, long rhs) {
+ long tmp = values.get(lhs);
+ values.set(lhs, values.get(rhs));
+ values.set(rhs, tmp);
+ empty.swap(lhs, rhs);
+ }
+
+ @Override
+ public Loader loader(LeafReaderContext ctx) throws IOException {
+ final MultiGeoPointValues docGeoPointValues = valuesSources
+ .getGeoPointField(GeoLineAggregationBuilder.POINT_FIELD.getPreferredName(), ctx);
+ return (index, doc) -> {
+ if (false == docGeoPointValues.advanceExact(doc)) {
+ empty.markMissing(index);
+ return;
+ }
+
+ if (docGeoPointValues.docValueCount() > 1) {
+ throw new AggregationExecutionException("Encountered more than one geo_point value for a " +
+ "single document. Use a script to combine multiple geo_point-values-per-doc into a single value.");
+ }
+
+ if (index > values.size()) {
+ values = bigArrays.grow(values, index + 1);
+ }
+
+ final GeoPoint point = docGeoPointValues.nextValue();
+ int encodedLat = GeoEncodingUtils.encodeLatitude(point.lat());
+ int encodedLon = GeoEncodingUtils.encodeLongitude(point.lon());
+ long lonLat = (((long) encodedLon) << 32) | encodedLat & 0xffffffffL;
+
+ values.set(index, lonLat);
+ empty.markNotMissing(index);
+ };
+ }
+
+ @Override
+ public void close() {
+ Releasables.close(values, empty);
+ }
+ }
+}
diff --git a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLine.java b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLine.java
new file mode 100644
index 0000000000000..2c150bfd54b6c
--- /dev/null
+++ b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLine.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.apache.lucene.geo.GeoEncodingUtils;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.aggregations.InternalAggregation;
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A single line string representing a sorted sequence of geo-points
+ */
+public class InternalGeoLine extends InternalAggregation {
+ private static final double SCALE = Math.pow(10, 6);
+
+ private long[] line;
+ private double[] sortVals;
+ private boolean complete;
+ private boolean includeSorts;
+ private SortOrder sortOrder;
+ private int size;
+
+ /**
+ * A geo_line representing the bucket for a {@link GeoLineAggregationBuilder}. The values of
line and sortVals
+ * are expected to be sorted using sortOrder.
+ *
+ * @param name the name of the aggregation
+ * @param line the ordered geo-points representing the line
+ * @param sortVals the ordered sort-values associated with the points in the line (e.g. timestamp)
+ * @param metadata the aggregation's metadata
+ * @param complete true iff the line is representative of all the points that fall within the bucket. False otherwise.
+ * @param includeSorts true iff the sort-values should be rendered in xContent as properties of the line-string. False otherwise.
+ * @param sortOrder the {@link SortOrder} for the line. Whether the points are to be plotted in asc or desc order
+ * @param size the max length of the line-string.
+ */
+ InternalGeoLine(String name, long[] line, double[] sortVals, Map metadata, boolean complete,
+ boolean includeSorts, SortOrder sortOrder, int size) {
+ super(name, metadata);
+ this.line = line;
+ this.sortVals = sortVals;
+ this.complete = complete;
+ this.includeSorts = includeSorts;
+ this.sortOrder = sortOrder;
+ this.size = size;
+ }
+
+ /**
+ * Read from a stream.
+ */
+ public InternalGeoLine(StreamInput in) throws IOException {
+ super(in);
+ this.line = in.readLongArray();
+ this.sortVals = in.readDoubleArray();
+ this.complete = in.readBoolean();
+ this.includeSorts = in.readBoolean();
+ this.sortOrder = SortOrder.readFromStream(in);
+ this.size = in.readVInt();
+ }
+
+ @Override
+ protected void doWriteTo(StreamOutput out) throws IOException {
+ out.writeLongArray(line);
+ out.writeDoubleArray(sortVals);
+ out.writeBoolean(complete);
+ out.writeBoolean(includeSorts);
+ sortOrder.writeTo(out);
+ out.writeVInt(size);
+ }
+
+ @Override
+ public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) {
+ int mergedSize = 0;
+ boolean complete = true;
+ boolean includeSorts = true;
+ List internalGeoLines = new ArrayList<>(aggregations.size());
+ for (InternalAggregation aggregation : aggregations) {
+ InternalGeoLine geoLine = (InternalGeoLine) aggregation;
+ internalGeoLines.add(geoLine);
+ mergedSize += geoLine.line.length;
+ complete &= geoLine.complete;
+ includeSorts &= geoLine.includeSorts;
+ }
+ complete &= mergedSize <= size;
+ int finalSize = Math.min(mergedSize, size);
+
+ MergedGeoLines mergedGeoLines = new MergedGeoLines(internalGeoLines, finalSize, sortOrder);
+ mergedGeoLines.merge();
+ // the final reduce should always be in ascending order
+ if (reduceContext.isFinalReduce() && SortOrder.DESC.equals(sortOrder)) {
+ new PathArraySorter(mergedGeoLines.getFinalPoints(), mergedGeoLines.getFinalSortValues(), SortOrder.ASC).sort();
+ }
+ return new InternalGeoLine(name, mergedGeoLines.getFinalPoints(), mergedGeoLines.getFinalSortValues(), getMetadata(), complete,
+ includeSorts, sortOrder, size);
+ }
+
+ @Override
+ protected boolean mustReduceOnSingleInternalAgg() {
+ return true;
+ }
+
+ @Override
+ public String getWriteableName() {
+ return GeoLineAggregationBuilder.NAME;
+ }
+
+ public long[] line() {
+ return line;
+ }
+
+ public double[] sortVals() {
+ return sortVals;
+ }
+
+ public int length() {
+ return line.length;
+ }
+
+ public boolean isComplete() {
+ return complete;
+ }
+
+ public boolean includeSorts() {
+ return includeSorts;
+ }
+
+ public SortOrder sortOrder() {
+ return sortOrder;
+ }
+
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
+ final List coordinates = new ArrayList<>();
+ for (int i = 0; i < line.length; i++) {
+ int x = (int) (line[i] >> 32);
+ int y = (int) line[i];
+ coordinates.add(new double[] {
+ roundDegrees(GeoEncodingUtils.decodeLongitude(x)),
+ roundDegrees(GeoEncodingUtils.decodeLatitude(y))
+ });
+ }
+ builder
+ .field("type", "Feature")
+ .startObject("geometry")
+ .field("type", "LineString")
+ .array("coordinates", coordinates.toArray())
+ .endObject()
+ .startObject("properties")
+ .field("complete", isComplete());
+ if (includeSorts) {
+ builder.field("sort_values", sortVals);
+ }
+ builder.endObject();
+ return builder;
+ }
+
+ private double roundDegrees(double degree) {
+ return Math.round(degree * SCALE) / SCALE;
+ }
+
+ @Override
+ public Object getProperty(List path) {
+ if (path.isEmpty()) {
+ return this;
+ } else if (path.size() == 1 && "value".equals(path.get(0))) {
+ return line;
+ } else {
+ throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return Strings.toString(this);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), Arrays.hashCode(line), Arrays.hashCode(sortVals), complete, includeSorts, sortOrder, size);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null || getClass() != obj.getClass()) return false;
+ if (super.equals(obj) == false) return false;
+
+ InternalGeoLine that = (InternalGeoLine) obj;
+ return super.equals(obj)
+ && Arrays.equals(line, that.line)
+ && Arrays.equals(sortVals, that.sortVals)
+ && Objects.equals(complete, that.complete)
+ && Objects.equals(includeSorts, that.includeSorts)
+ && Objects.equals(sortOrder, that.sortOrder)
+ && Objects.equals(size, that.size);
+
+ }
+}
diff --git a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/MergedGeoLines.java b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/MergedGeoLines.java
new file mode 100644
index 0000000000000..8b8c4aeee1c02
--- /dev/null
+++ b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/MergedGeoLines.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.elasticsearch.search.sort.SortOrder;
+
+import java.util.List;
+
+/**
+ * Class to merge an arbitrary list of {@link InternalGeoLine} lines into a new line
+ * with the appropriate max length. The final point and sort values can be found in
+ * finalPoints and finalSortValues after merge is called.
+ */
+final class MergedGeoLines {
+
+ private final List geoLines;
+ private final int capacity;
+ private final SortOrder sortOrder;
+ private final int[] lineIndices; // index of which geoLine item represents
+ private final int[] idxsWithinLine; // index within the geoLine for the item
+ private int size;
+ private final long[] finalPoints; // the final sorted list of points, sorted by their respective sort-values. valid after merge
+ private final double[] finalSortValues; // the final sorted list of sort-values. valid after merge.
+
+ MergedGeoLines(List geoLines, int finalLength, SortOrder sortOrder) {
+ this.geoLines = geoLines;
+ this.capacity = geoLines.size();
+ this.sortOrder = sortOrder;
+ this.lineIndices = new int[capacity];
+ this.idxsWithinLine = new int[capacity];
+ this.size = 0;
+ this.finalPoints = new long[finalLength];
+ this.finalSortValues = new double[finalLength];
+ }
+
+ public long[] getFinalPoints() {
+ return finalPoints;
+ }
+
+ public double[] getFinalSortValues() {
+ return finalSortValues;
+ }
+
+ /**
+ * merges geoLines into one sorted list of values representing the combined line.
+ */
+ public void merge() {
+ // 1. add first element of each sub line to heap
+ for (int i = 0; i < geoLines.size(); i++) {
+ if (geoLines.size() > 0) {
+ add(i, 0);
+ }
+ }
+
+ // 2. take lowest/greatest value from heap and re-insert the next value from the same sub-line that specific value was chosen from.
+
+ int i = 0;
+ while (i < finalPoints.length && size > 0) {
+ // take top from heap and place in finalLists
+ int lineIdx = lineIndices[0];
+ int idxInLine = idxsWithinLine[0];
+ finalPoints[i] = getTopPoint();
+ finalSortValues[i] = getTopSortValue();
+ removeTop();
+ InternalGeoLine lineChosen = geoLines.get(lineIdx);
+ if (idxInLine + 1 < lineChosen.line().length) {
+ add(lineIdx, idxInLine + 1);
+ }
+ i++;
+ }
+ }
+
+ private long getTopPoint() {
+ InternalGeoLine line = geoLines.get(lineIndices[0]);
+ return line.line()[idxsWithinLine[0]];
+ }
+
+ private double getTopSortValue() {
+ InternalGeoLine line = geoLines.get(lineIndices[0]);
+ return line.sortVals()[idxsWithinLine[0]];
+ }
+
+ private void removeTop() {
+ if (size == 0) {
+ throw new IllegalStateException();
+ }
+ lineIndices[0] = lineIndices[size - 1];
+ idxsWithinLine[0] = idxsWithinLine[size - 1];
+ size--;
+ heapifyDown();
+ }
+
+ private void add(int lineIndex, int idxWithinLine) {
+ if (size >= capacity) {
+ throw new IllegalStateException();
+ }
+ lineIndices[size] = lineIndex;
+ idxsWithinLine[size] = idxWithinLine;
+ size++;
+ heapifyUp();
+ }
+
+ private boolean correctOrdering(int i, int j) {
+ InternalGeoLine lineI = geoLines.get(lineIndices[i]);
+ InternalGeoLine lineJ = geoLines.get(lineIndices[j]);
+ double valI = lineI.sortVals()[idxsWithinLine[i]];
+ double valJ = lineJ.sortVals()[idxsWithinLine[j]];
+ if (SortOrder.ASC.equals(sortOrder)) {
+ return valI > valJ;
+ }
+ return valI < valJ;
+ }
+
+ private int getParentIndex(int i) {
+ return (i - 1) / 2;
+ }
+
+ private int getLeftChildIndex(int i) {
+ return 2 * i + 1;
+ }
+
+ private int getRightChildIndex(int i) {
+ return 2 * i + 2;
+ }
+
+ private boolean hasParent(int i) {
+ return i > 0;
+ }
+
+ private boolean hasLeftChild(int i) {
+ return getLeftChildIndex(i) < size;
+ }
+
+ private boolean hasRightChild(int i) {
+ return getRightChildIndex(i) < size;
+ }
+
+ private void heapifyUp() {
+ int i = size - 1;
+ while (hasParent(i) && correctOrdering(getParentIndex(i), i)) {
+ int parentIndex = getParentIndex(i);
+ swap(parentIndex, i);
+ i = parentIndex;
+ }
+ }
+
+ private void heapifyDown() {
+ int i = 0;
+ while (hasLeftChild(i)) {
+ int childIndex = getLeftChildIndex(i);
+ if (hasRightChild(i) && correctOrdering(getRightChildIndex(i), childIndex) == false) {
+ childIndex = getRightChildIndex(i);
+ }
+ if (correctOrdering(childIndex, i)) {
+ break;
+ } else {
+ swap(childIndex, i);
+ i = childIndex;
+ }
+ }
+ }
+
+ private void swap(int i, int j) {
+ int tmpLineIndex = lineIndices[i];
+ int tmpIdxWithinLine = idxsWithinLine[i];
+ lineIndices[i] = lineIndices[j];
+ idxsWithinLine[i] = idxsWithinLine[j];
+ lineIndices[j] = tmpLineIndex;
+ idxsWithinLine[j] = tmpIdxWithinLine;
+ }
+}
diff --git a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/PathArraySorter.java b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/PathArraySorter.java
new file mode 100644
index 0000000000000..8d67757b132f9
--- /dev/null
+++ b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/PathArraySorter.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.apache.lucene.util.IntroSorter;
+import org.elasticsearch.search.sort.SortOrder;
+
+/**
+ * An {@link IntroSorter} that sorts points and sortValues using the
+ */
+final class PathArraySorter extends IntroSorter {
+
+ private final long[] points;
+ private final double[] sortValues;
+ private double sortValuePivot;
+ private final SortOrder sortOrder;
+
+ PathArraySorter(long[] points, double[] sortValues, SortOrder sortOrder) {
+ assert points.length == sortValues.length;
+ this.points = points;
+ this.sortValues = sortValues;
+ this.sortValuePivot = 0;
+ this.sortOrder = sortOrder;
+ }
+
+ public void sort() {
+ sort(0, points.length);
+ }
+
+ @Override
+ protected void swap(int i, int j) {
+ final long tmpPoint = points[i];
+ points[i] = points[j];
+ points[j] = tmpPoint;
+ final double tmpSortValue = sortValues[i];
+ sortValues[i] = sortValues[j];
+ sortValues[j] = tmpSortValue;
+ }
+
+ @Override
+ protected void setPivot(int i) {
+ sortValuePivot = sortValues[i];
+ }
+
+ @Override
+ protected int comparePivot(int j) {
+ if (SortOrder.ASC.equals(sortOrder)) {
+ return Double.compare(sortValuePivot, sortValues[j]);
+ }
+ return Double.compare(sortValues[j], sortValuePivot);
+ }
+}
diff --git a/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/support/GeoLineMultiValuesSource.java b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/support/GeoLineMultiValuesSource.java
new file mode 100644
index 0000000000000..6371d6f95f87e
--- /dev/null
+++ b/x-pack/plugin/spatial/src/main/java/org/elasticsearch/xpack/spatial/search/aggregations/support/GeoLineMultiValuesSource.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.spatial.search.aggregations.support;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.elasticsearch.index.fielddata.MultiGeoPointValues;
+import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
+import org.elasticsearch.index.query.QueryShardContext;
+import org.elasticsearch.search.aggregations.AggregationExecutionException;
+import org.elasticsearch.search.aggregations.support.MultiValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSource;
+import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class GeoLineMultiValuesSource extends MultiValuesSource {
+ public GeoLineMultiValuesSource(Map valuesSourceConfigs, QueryShardContext context) {
+ values = new HashMap<>(valuesSourceConfigs.size());
+ for (Map.Entry entry : valuesSourceConfigs.entrySet()) {
+ final ValuesSource valuesSource = entry.getValue().getValuesSource();
+ if (valuesSource instanceof ValuesSource.Numeric == false
+ && valuesSource instanceof ValuesSource.GeoPoint == false) {
+ throw new AggregationExecutionException("ValuesSource type " + valuesSource.toString() +
+ "is not supported for multi-valued aggregation");
+ }
+ values.put(entry.getKey(), valuesSource);
+ }
+ }
+
+ private ValuesSource getField(String fieldName) {
+ ValuesSource valuesSource = values.get(fieldName);
+ if (valuesSource == null) {
+ throw new IllegalArgumentException("Could not find field name [" + fieldName + "] in multiValuesSource");
+ }
+ return valuesSource;
+ }
+
+ public SortedNumericDoubleValues getNumericField(String fieldName, LeafReaderContext ctx) throws IOException {
+ ValuesSource valuesSource = getField(fieldName);
+ if (valuesSource instanceof ValuesSource.Numeric) {
+ return ((ValuesSource.Numeric) valuesSource).doubleValues(ctx);
+ }
+ throw new IllegalArgumentException("field [" + fieldName + "] is not a numeric type");
+ }
+
+ public MultiGeoPointValues getGeoPointField(String fieldName, LeafReaderContext ctx) {
+ ValuesSource valuesSource = getField(fieldName);
+ if (valuesSource instanceof ValuesSource.GeoPoint) {
+ return ((ValuesSource.GeoPoint) valuesSource).geoPointValues(ctx);
+ }
+ throw new IllegalArgumentException("field [" + fieldName + "] is not a geo_point type");
+ }
+
+}
diff --git a/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregationBuilderTests.java b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregationBuilderTests.java
new file mode 100644
index 0000000000000..25aba6129501a
--- /dev/null
+++ b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregationBuilderTests.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.test.AbstractSerializingTestCase;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class GeoLineAggregationBuilderTests extends AbstractSerializingTestCase {
+
+ @Override
+ protected GeoLineAggregationBuilder doParseInstance(XContentParser parser) throws IOException {
+ assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT));
+ assertThat(parser.nextToken(), equalTo(XContentParser.Token.FIELD_NAME));
+ String name = parser.currentName();
+ assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT));
+ assertThat(parser.nextToken(), equalTo(XContentParser.Token.FIELD_NAME));
+ assertThat(parser.currentName(), equalTo(GeoLineAggregationBuilder.NAME));
+ GeoLineAggregationBuilder parsed = GeoLineAggregationBuilder.PARSER.apply(parser, name);
+ assertThat(parser.nextToken(), equalTo(XContentParser.Token.END_OBJECT));
+ assertThat(parser.nextToken(), equalTo(XContentParser.Token.END_OBJECT));
+ return parsed;
+ }
+
+ @Override
+ protected Writeable.Reader instanceReader() {
+ return GeoLineAggregationBuilder::new;
+ }
+
+ @Override
+ protected GeoLineAggregationBuilder createTestInstance() {
+ MultiValuesSourceFieldConfig pointConfig = new MultiValuesSourceFieldConfig.Builder()
+ .setFieldName(randomAlphaOfLength(5))
+ .build();
+ MultiValuesSourceFieldConfig sortConfig = new MultiValuesSourceFieldConfig.Builder()
+ .setFieldName(randomAlphaOfLength(6)).build();
+ GeoLineAggregationBuilder lineAggregationBuilder = new GeoLineAggregationBuilder("_name")
+ .point(pointConfig)
+ .sort(sortConfig);
+ if (randomBoolean()) {
+ SortOrder sortOrder = randomFrom(SortOrder.values());
+ lineAggregationBuilder.sortOrder(sortOrder);
+ }
+ if (randomBoolean()) {
+ lineAggregationBuilder.size(randomIntBetween(1, GeoLineAggregationBuilder.MAX_PATH_SIZE));
+ }
+ if (randomBoolean()) {
+ lineAggregationBuilder.includeSort(randomBoolean());
+ }
+ return lineAggregationBuilder;
+ }
+
+ public void testInvalidSize() {
+ MultiValuesSourceFieldConfig pointConfig = new MultiValuesSourceFieldConfig.Builder()
+ .setFieldName(randomAlphaOfLength(5))
+ .build();
+ MultiValuesSourceFieldConfig sortConfig = new MultiValuesSourceFieldConfig.Builder()
+ .setFieldName(randomAlphaOfLength(6)).build();
+ GeoLineAggregationBuilder lineAggregationBuilder = new GeoLineAggregationBuilder("_name")
+ .point(pointConfig)
+ .sort(sortConfig);
+ expectThrows(IllegalArgumentException.class, () -> lineAggregationBuilder.size(0));
+ expectThrows(IllegalArgumentException.class,
+ () -> lineAggregationBuilder.size(GeoLineAggregationBuilder.MAX_PATH_SIZE + randomIntBetween(1, 10)));
+ }
+}
diff --git a/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorTests.java b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorTests.java
new file mode 100644
index 0000000000000..b3c43259558dd
--- /dev/null
+++ b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/GeoLineAggregatorTests.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.apache.lucene.document.LatLonDocValuesField;
+import org.apache.lucene.document.SortedDocValuesField;
+import org.apache.lucene.document.SortedNumericDocValuesField;
+import org.apache.lucene.geo.GeoEncodingUtils;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.RandomIndexWriter;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.NumericUtils;
+import org.elasticsearch.common.CheckedConsumer;
+import org.elasticsearch.geo.GeometryTestUtils;
+import org.elasticsearch.geometry.Point;
+import org.elasticsearch.index.mapper.GeoPointFieldMapper;
+import org.elasticsearch.index.mapper.KeywordFieldMapper;
+import org.elasticsearch.index.mapper.MappedFieldType;
+import org.elasticsearch.index.mapper.NumberFieldMapper;
+import org.elasticsearch.plugins.SearchPlugin;
+import org.elasticsearch.search.aggregations.AggregatorTestCase;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import org.elasticsearch.search.aggregations.support.MultiValuesSourceFieldConfig;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.xpack.spatial.SpatialPlugin;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+public class GeoLineAggregatorTests extends AggregatorTestCase {
+
+ @Override
+ protected List getSearchPlugins() {
+ return Collections.singletonList(new SpatialPlugin());
+ }
+
+ // test that missing values are ignored
+ public void testMissingValues() throws IOException {
+ MultiValuesSourceFieldConfig valueConfig = new MultiValuesSourceFieldConfig.Builder()
+ .setFieldName("value_field")
+ .build();
+ MultiValuesSourceFieldConfig sortConfig = new MultiValuesSourceFieldConfig.Builder().setFieldName("sort_field").build();
+ GeoLineAggregationBuilder lineAggregationBuilder = new GeoLineAggregationBuilder("_name")
+ .point(valueConfig)
+ .sortOrder(SortOrder.ASC)
+ .sort(sortConfig)
+ .size(10);
+
+ TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("_name")
+ .field("group_id")
+ .subAggregation(lineAggregationBuilder);
+
+ long lonLat = (((long) GeoEncodingUtils.encodeLongitude(90.0)) << 32) | GeoEncodingUtils.encodeLatitude(45.0) & 0xffffffffL;
+ //input
+ long[] points = new long[] {lonLat, 0, lonLat, 0,lonLat, lonLat, lonLat};
+ double[] sortValues = new double[]{1, 0, 2, 0, 3, 4, 5};
+ //expected
+ long[] expectedAggPoints = new long[] {lonLat, lonLat, lonLat, lonLat, lonLat};
+ double[] expectedAggSortValues = new double[]{
+ NumericUtils.doubleToSortableLong(1),
+ NumericUtils.doubleToSortableLong(2),
+ NumericUtils.doubleToSortableLong(3),
+ NumericUtils.doubleToSortableLong(4),
+ NumericUtils.doubleToSortableLong(5)
+ };
+
+ testCase(new MatchAllDocsQuery(), aggregationBuilder, iw -> {
+
+ for (int i = 0; i < points.length; i++) {
+ if (points[i] == 0) {
+ // do not index value
+ iw.addDocument(Collections.singletonList(new SortedDocValuesField("group_id", new BytesRef("group"))));
+ } else {
+ iw.addDocument(Arrays.asList(new LatLonDocValuesField("value_field", 45.0, 90.0),
+ new SortedNumericDocValuesField("sort_field", NumericUtils.doubleToSortableLong(sortValues[i])),
+ new SortedDocValuesField("group_id", new BytesRef("group"))));
+ }
+ }
+ }, terms -> {
+ assertThat(terms.getBuckets().size(), equalTo(1));
+ InternalGeoLine geoLine = terms.getBuckets().get(0).getAggregations().get("_name");
+ assertThat(geoLine.length(), equalTo(5));
+ assertTrue(geoLine.isComplete());
+ assertArrayEquals(expectedAggPoints, geoLine.line());
+ assertArrayEquals(expectedAggSortValues, geoLine.sortVals(), 0d);
+ });
+ }
+
+ public void testAscending() throws IOException {
+ testAggregator(SortOrder.ASC);
+ }
+
+ public void testDescending() throws IOException {
+ testAggregator(SortOrder.DESC);
+ }
+
+ private void testAggregator(SortOrder sortOrder) throws IOException {
+ int size = randomIntBetween(1, GeoLineAggregationBuilder.MAX_PATH_SIZE);
+ MultiValuesSourceFieldConfig valueConfig = new MultiValuesSourceFieldConfig.Builder()
+ .setFieldName("value_field")
+ .build();
+ MultiValuesSourceFieldConfig sortConfig = new MultiValuesSourceFieldConfig.Builder().setFieldName("sort_field").build();
+ GeoLineAggregationBuilder lineAggregationBuilder = new GeoLineAggregationBuilder("_name")
+ .point(valueConfig)
+ .sortOrder(sortOrder)
+ .sort(sortConfig)
+ .size(size);
+ TermsAggregationBuilder aggregationBuilder = new TermsAggregationBuilder("_name")
+ .field("group_id")
+ .subAggregation(lineAggregationBuilder);
+
+ int numGroups = randomIntBetween(1, 2);
+ Map lines = new HashMap<>(numGroups);
+ Map indexedPoints = new HashMap<>(numGroups);
+ Map indexedSortValues = new HashMap<>(numGroups);
+ for (int groupOrd = 0; groupOrd < numGroups; groupOrd++) {
+ int numPoints = randomIntBetween(2, 2 * size);
+ boolean complete = numPoints <= size;
+ long[] points = new long[numPoints];
+ double[] sortValues = new double[numPoints];
+ for (int i = 0; i < numPoints; i++) {
+ Point point = GeometryTestUtils.randomPoint(false);
+ int encodedLat = GeoEncodingUtils.encodeLatitude(point.getLat());
+ int encodedLon = GeoEncodingUtils.encodeLongitude(point.getLon());
+ long lonLat = (((long) encodedLon) << 32) | encodedLat & 0xffffffffL;
+ points[i] = lonLat;
+ sortValues[i] = SortOrder.ASC.equals(sortOrder) ? i : numPoints - i;
+ }
+ int lineSize = Math.min(numPoints, size);
+ // re-sort line to be ascending
+ long[] linePoints = Arrays.copyOf(points, lineSize);
+ double[] lineSorts = Arrays.copyOf(sortValues, lineSize);
+ new PathArraySorter(linePoints, lineSorts, SortOrder.ASC).sort();
+
+ lines.put(String.valueOf(groupOrd), new InternalGeoLine("_name",
+ linePoints, lineSorts, null, complete, true, sortOrder, size));
+
+ for (int i = 0; i < randomIntBetween(1, numPoints); i++) {
+ int idx1 = randomIntBetween(0, numPoints - 1);
+ int idx2 = randomIntBetween(0, numPoints - 1);
+ final long tmpPoint = points[idx1];
+ points[idx1] = points[idx2];
+ points[idx2] = tmpPoint;
+ final double tmpSortValue = sortValues[idx1];
+ sortValues[idx1] = sortValues[idx2];
+ sortValues[idx2] = tmpSortValue;
+ }
+ indexedPoints.put(groupOrd, points);
+ indexedSortValues.put(groupOrd, sortValues);
+ }
+
+
+ testCase(new MatchAllDocsQuery(), aggregationBuilder, iw -> {
+ for (int group = 0; group < numGroups; group++) {
+ long[] points = indexedPoints.get(group);
+ double[] sortValues = indexedSortValues.get(group);
+ for (int i = 0; i < points.length; i++) {
+ int x = (int) (points[i] >> 32);
+ int y = (int) points[i];
+ iw.addDocument(Arrays.asList(new LatLonDocValuesField("value_field",
+ GeoEncodingUtils.decodeLatitude(y),
+ GeoEncodingUtils.decodeLongitude(x)),
+ new SortedNumericDocValuesField("sort_field", NumericUtils.doubleToSortableLong(sortValues[i])),
+ new SortedDocValuesField("group_id", new BytesRef(String.valueOf(group)))));
+ }
+ }
+ }, terms -> {
+ for (Terms.Bucket bucket : terms.getBuckets()) {
+ InternalGeoLine expectedGeoLine = lines.get(bucket.getKeyAsString());
+ InternalGeoLine geoLine = bucket.getAggregations().get("_name");
+ assertThat(geoLine.length(), equalTo(expectedGeoLine.length()));
+ assertThat(geoLine.isComplete(), equalTo(expectedGeoLine.isComplete()));
+ for (int i = 0; i < geoLine.sortVals().length; i++) {
+ geoLine.sortVals()[i] = NumericUtils.sortableLongToDouble((long) geoLine.sortVals()[i]);
+ }
+ assertArrayEquals(expectedGeoLine.sortVals(), geoLine.sortVals(), 0d);
+ assertArrayEquals(expectedGeoLine.line(), geoLine.line());
+ }
+ });
+ }
+
+ private void testCase(Query query, TermsAggregationBuilder aggregationBuilder,
+ CheckedConsumer buildIndex,
+ Consumer verify) throws IOException {
+ testCase(query, aggregationBuilder, buildIndex, verify, NumberFieldMapper.NumberType.LONG);
+ }
+
+ private void testCase(Query query, TermsAggregationBuilder aggregationBuilder,
+ CheckedConsumer buildIndex,
+ Consumer verify,
+ NumberFieldMapper.NumberType fieldNumberType) throws IOException {
+
+ Directory directory = newDirectory();
+ RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
+ buildIndex.accept(indexWriter);
+ indexWriter.close();
+ IndexReader indexReader = DirectoryReader.open(directory);
+ IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
+
+ try {
+ MappedFieldType fieldType = new GeoPointFieldMapper.GeoPointFieldType("value_field");
+ MappedFieldType groupFieldType = new KeywordFieldMapper.KeywordFieldType("group_id");
+ MappedFieldType fieldType2 = new NumberFieldMapper.NumberFieldType("sort_field", fieldNumberType);
+
+ Terms terms = searchAndReduce(indexSearcher, new MatchAllDocsQuery(), aggregationBuilder,
+ fieldType, fieldType2, groupFieldType);
+ verify.accept(terms);
+ } finally {
+ indexReader.close();
+ directory.close();
+ }
+ }
+}
diff --git a/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLineTests.java b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLineTests.java
new file mode 100644
index 0000000000000..230b8a92c76d0
--- /dev/null
+++ b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/InternalGeoLineTests.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.plugins.SearchPlugin;
+import org.elasticsearch.search.aggregations.Aggregation;
+import org.elasticsearch.search.aggregations.ParsedAggregation;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.test.InternalAggregationTestCase;
+import org.elasticsearch.xpack.spatial.SpatialPlugin;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class InternalGeoLineTests extends InternalAggregationTestCase {
+
+ @Override
+ protected SearchPlugin registerPlugin() {
+ return new SpatialPlugin();
+ }
+
+ static InternalGeoLine randomInstance(String name, Map metadata, int size, SortOrder sortOrder, double magicDecimal) {
+ int length = randomIntBetween(2, size);
+ long[] points = new long[length];
+ double[] sortVals = new double[length];
+ for (int i = 0; i < length; i++) {
+ points[i] = randomNonNegativeLong();
+ sortVals[i] = i + magicDecimal;
+ }
+ Arrays.sort(sortVals);
+ if (SortOrder.DESC.equals(sortOrder)) {
+ // reverse the list
+ for (int i = 0, j = sortVals.length - 1; i < j; i++, j--) {
+ double tmp = sortVals[i];
+ sortVals[i] = sortVals[j];
+ sortVals[j] = tmp;
+ }
+ }
+ boolean complete = length <= size;
+ return new InternalGeoLine(name, points, sortVals, metadata, complete, randomBoolean(), sortOrder, size);
+ }
+
+ @Override
+ protected InternalGeoLine createTestInstance(String name, Map metadata) {
+ int size = randomIntBetween(10, GeoLineAggregationBuilder.MAX_PATH_SIZE);
+ return randomInstance(name, metadata, size, randomFrom(SortOrder.values()), randomDoubleBetween(0, 1, false));
+ }
+
+ @Override
+ protected InternalGeoLine mutateInstance(InternalGeoLine instance) {
+ String name = instance.getName();
+ long[] line = Arrays.copyOf(instance.line(), instance.line().length);
+ double[] sortVals = Arrays.copyOf(instance.sortVals(), instance.sortVals().length);
+ Map metadata = instance.getMetadata();
+ boolean complete = instance.isComplete();
+ boolean includeSorts = instance.includeSorts();
+ SortOrder sortOrder = instance.sortOrder();
+ int size = instance.size();
+ switch (randomIntBetween(0, 7)) {
+ case 0:
+ name += randomAlphaOfLength(5);
+ break;
+ case 1:
+ line[0] = line[0] + 1000000L;
+ break;
+ case 2:
+ sortVals[0] = sortVals[0] + 10000;
+ break;
+ case 3:
+ if (metadata == null) {
+ metadata = new HashMap<>(1);
+ } else {
+ metadata = new HashMap<>(instance.getMetadata());
+ }
+ metadata.put(randomAlphaOfLength(15), randomInt());
+ break;
+ case 4:
+ complete = !complete;
+ break;
+ case 5:
+ includeSorts = !includeSorts;
+ break;
+ case 6:
+ sortOrder = SortOrder.ASC.equals(sortOrder) ? SortOrder.DESC : SortOrder.ASC;
+ break;
+ case 7:
+ size = size + 1;
+ break;
+ default:
+ throw new AssertionError("Illegal randomisation branch");
+ }
+ return new InternalGeoLine(name, line, sortVals, metadata, complete, includeSorts, sortOrder, size);
+ }
+
+ @Override
+ protected List randomResultsToReduce(String name, int size) {
+ SortOrder sortOrder = randomFrom(SortOrder.values());
+ int maxLineLength = randomIntBetween(10, GeoLineAggregationBuilder.MAX_PATH_SIZE);
+ List instances = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ // use the magicDecimal to have absolute ordering between heap-sort and testing array sorting
+ instances.add(randomInstance(name, null, maxLineLength, sortOrder, ((double) i) / size));
+ }
+ return instances;
+ }
+
+ @Override
+ protected void assertReduced(InternalGeoLine reduced, List inputs) {
+ int mergedLength = 0;
+ for (InternalGeoLine subLine : inputs) {
+ mergedLength += subLine.length();
+ }
+ boolean complete = mergedLength <= reduced.size();
+ int expectedReducedLength = Math.min(mergedLength, reduced.size());
+ assertThat(reduced.length(), equalTo(expectedReducedLength));
+ assertThat(complete, equalTo(reduced.isComplete()));
+
+ // check arrays
+ long[] finalList = new long[mergedLength];
+ double[] finalSortVals = new double[mergedLength];
+ int idx = 0;
+ for (InternalGeoLine geoLine : inputs) {
+ for (int i = 0; i < geoLine.line().length; i++) {
+ finalSortVals[idx] = geoLine.sortVals()[i];
+ finalList[idx] = geoLine.line()[i];
+ idx += 1;
+ }
+ }
+
+ new PathArraySorter(finalList, finalSortVals, reduced.sortOrder()).sort();
+
+ // cap to max length
+ long[] finalCappedPoints = Arrays.copyOf(finalList, Math.min(reduced.size(), mergedLength));
+ double[] finalCappedSortVals = Arrays.copyOf(finalSortVals, Math.min(reduced.size(), mergedLength));
+
+ if (SortOrder.DESC.equals(reduced.sortOrder())) {
+ new PathArraySorter(finalCappedPoints, finalCappedSortVals, SortOrder.ASC).sort();
+ }
+
+ assertArrayEquals(finalCappedSortVals, reduced.sortVals(), 0d);
+ assertArrayEquals(finalCappedPoints, reduced.line());
+ }
+
+ @Override
+ protected void assertFromXContent(InternalGeoLine aggregation, ParsedAggregation parsedAggregation) throws IOException {
+ // There is no ParsedGeoLine yet so we cannot test it here
+ }
+
+ @Override
+ protected List getNamedXContents() {
+ List extendedNamedXContents = new ArrayList<>(super.getNamedXContents());
+ extendedNamedXContents.add(new NamedXContentRegistry.Entry(Aggregation.class,
+ new ParseField(GeoLineAggregationBuilder.NAME),
+ (p, c) -> {
+ assumeTrue("There is no ParsedGeoLine yet", false);
+ return null;
+ }
+ ));
+ return extendedNamedXContents;
+ }
+}
diff --git a/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/MergedGeoLinesTests.java b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/MergedGeoLinesTests.java
new file mode 100644
index 0000000000000..9fae79fe60b41
--- /dev/null
+++ b/x-pack/plugin/spatial/src/test/java/org/elasticsearch/xpack/spatial/search/aggregations/MergedGeoLinesTests.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.spatial.search.aggregations;
+
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class MergedGeoLinesTests extends ESTestCase {
+
+ public InternalGeoLine randomLine(SortOrder sortOrder, int maxLength, double magicDecimal) {
+ String name = randomAlphaOfLength(5);
+ int length = randomBoolean() ? maxLength : randomIntBetween(1, maxLength);
+ boolean complete = length < maxLength;
+ long[] points = new long[length];
+ double[] sortValues = new double[length];
+ for (int i = 0; i < length; i++) {
+ points[i] = randomIntBetween(1, 100);
+ sortValues[i] = i + magicDecimal;
+ }
+ return new InternalGeoLine(name, points, sortValues, Collections.emptyMap(), complete, randomBoolean(), sortOrder, maxLength);
+ }
+
+ public void testSimpleMerge() {
+ int numLines = 10;
+ int maxLength = 100;
+ int finalLength = 0;
+ SortOrder sortOrder = SortOrder.ASC;
+ List geoLines = new ArrayList<>();
+ for (int i = 0; i < numLines; i++) {
+ geoLines.add(randomLine(sortOrder, maxLength, ((double) i) / numLines));
+ finalLength += geoLines.get(i).length();
+ }
+ finalLength = Math.min(maxLength, finalLength);
+ MergedGeoLines mergedGeoLines = new MergedGeoLines(geoLines, finalLength, sortOrder);
+ mergedGeoLines.merge();
+
+ // assert that the mergedGeoLines are sorted (does not necessarily validate correctness, but it is a good heuristic)
+ long[] sortedPoints = Arrays.copyOf(mergedGeoLines.getFinalPoints(), mergedGeoLines.getFinalPoints().length);
+ double[] sortedValues = Arrays.copyOf(mergedGeoLines.getFinalSortValues(), mergedGeoLines.getFinalSortValues().length);
+ new PathArraySorter(sortedPoints, sortedValues, sortOrder).sort();
+ assertArrayEquals(sortedValues, mergedGeoLines.getFinalSortValues(), 0d);
+ assertArrayEquals(sortedPoints, mergedGeoLines.getFinalPoints());
+ }
+}
diff --git a/x-pack/plugin/spatial/src/test/resources/rest-api-spec/test/50_geoline.yml b/x-pack/plugin/spatial/src/test/resources/rest-api-spec/test/50_geoline.yml
new file mode 100644
index 0000000000000..b2593f92290d4
--- /dev/null
+++ b/x-pack/plugin/spatial/src/test/resources/rest-api-spec/test/50_geoline.yml
@@ -0,0 +1,59 @@
+---
+"Test geoline agg":
+ - do:
+ indices.create:
+ index: locations
+ body:
+ mappings:
+ properties:
+ location:
+ type: geo_point
+ rank:
+ type: double
+
+ - do:
+ bulk:
+ refresh: true
+ body:
+ - index:
+ _index: locations
+ _id: 1
+ - '{"location": [13.37139831, 47.82930284], "rank": 2.0 }'
+ - index:
+ _index: locations
+ _id: 2
+ - '{"location": [13.3784208402, 47.88832084022], "rank": 0.0 }'
+ - index:
+ _index: locations
+ _id: 3
+ - '{"location": [13.371830148701, 48.2084200148], "rank": 1.2 }'
+
+ - do:
+ search:
+ rest_total_hits_as_int: true
+ index: locations
+ size: 0
+ body:
+ aggs:
+ path:
+ geo_line:
+ include_sort: true
+ geo_point:
+ field: location
+ sort:
+ field: rank
+ - match: { hits.total: 3 }
+ - match: { aggregations.path.type: "Feature" }
+ - match: { aggregations.path.geometry.type: "LineString" }
+ - length: { aggregations.path.geometry.coordinates: 3 }
+ - match: { aggregations.path.geometry.coordinates.0.0: 13.378421 }
+ - match: { aggregations.path.geometry.coordinates.0.1: 47.888321 }
+ - match: { aggregations.path.geometry.coordinates.1.0: 13.37183 }
+ - match: { aggregations.path.geometry.coordinates.1.1: 48.20842 }
+ - match: { aggregations.path.geometry.coordinates.2.0: 13.371398 }
+ - match: { aggregations.path.geometry.coordinates.2.1: 47.829303 }
+ - is_true: aggregations.path.properties.complete
+ - length: { aggregations.path.properties.sort_values: 3 }
+ - match: { aggregations.path.properties.sort_values.0: 0.0 }
+ - match: { aggregations.path.properties.sort_values.1: 1.2 }
+ - match: { aggregations.path.properties.sort_values.2: 2.0 }
diff --git a/x-pack/plugin/spatial/src/yamlRestTest/resources/rest-api-spec/test/60_geo_line.yml b/x-pack/plugin/spatial/src/yamlRestTest/resources/rest-api-spec/test/60_geo_line.yml
new file mode 100644
index 0000000000000..f156e4db9586f
--- /dev/null
+++ b/x-pack/plugin/spatial/src/yamlRestTest/resources/rest-api-spec/test/60_geo_line.yml
@@ -0,0 +1,51 @@
+---
+"Test geo_line aggregation on geo points":
+ - do:
+ indices.create:
+ index: races
+ body:
+ mappings:
+ properties:
+ race_id:
+ type: keyword
+ position:
+ type: geo_point
+
+ - do:
+ bulk:
+ refresh: true
+ body:
+ - index:
+ _index: races
+ _id: 1
+ - '{"position": "POINT(4.912350 52.374081)", "race_id": "Amsterdam", "timestamp": 4}'
+ - index:
+ _index: races
+ _id: 2
+ - '{"position": "POINT(4.901618 52.369219)", "race_id": "Amsterdam", "timestamp": 3}'
+ - index:
+ _index: races
+ _id: 3
+ - '{"position": "POINT(4.914722 52.371667)", "race_id": "Amsterdam", "timestamp": 10}'
+
+ - do:
+ search:
+ rest_total_hits_as_int: true
+ index: races
+ size: 0
+ body:
+ aggs:
+ trace:
+ geo_line:
+ point:
+ field: position
+ sort:
+ field: timestamp
+ - match: { hits.total: 3 }
+ - match: { aggregations.trace.type: "Feature" }
+ - match: { aggregations.trace.geometry.type: "LineString" }
+ - length: { aggregations.trace.geometry.coordinates: 3 }
+ - match: { aggregations.trace.geometry.coordinates.0: [4.901618, 52.369219] }
+ - match: { aggregations.trace.geometry.coordinates.1: [4.91235, 52.374081] }
+ - match: { aggregations.trace.geometry.coordinates.2: [4.914722, 52.371667] }
+ - is_true: aggregations.trace.properties.complete