From 00c70b8ae4357796b3720882b99f7f72a99ec264 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Tue, 19 Mar 2019 13:24:40 -0500 Subject: [PATCH 1/4] [ML] adds support for non-numeric mapped types and mapping overrides --- .../transforms/DataFrameTransformConfig.java | 26 +++- .../client/DataFrameTransformIT.java | 14 +- .../DataFrameTransformConfigTests.java | 16 +- .../DataFrameTransformDocumentationIT.java | 17 ++- .../dataframe/put_data_frame.asciidoc | 14 ++ .../core/dataframe/DataFrameMessages.java | 2 +- .../transforms/DataFrameTransformConfig.java | 26 +++- ...wDataFrameTransformActionRequestTests.java | 13 +- .../DataFrameTransformConfigTests.java | 25 +++- .../integration/DataFramePivotRestIT.java | 93 ++++++++++++ ...nsportPreviewDataFrameTransformAction.java | 42 ++++-- .../TransportPutDataFrameTransformAction.java | 5 +- .../transforms/DataFrameIndexer.java | 6 +- .../transforms/DataFrameTransformTask.java | 28 ++++ .../pivot/AggregationResultUtils.java | 28 ++-- .../dataframe/transforms/pivot/Pivot.java | 17 ++- .../transforms/pivot/SchemaUtil.java | 95 +++++++++--- .../pivot/AggregationResultUtilsTests.java | 138 +++++++++++++++++- .../transforms/pivot/PivotTests.java | 25 +++- .../test/data_frame/transforms_crud.yml | 43 ++++++ 20 files changed, 587 insertions(+), 86 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java index b1bf4fd3f2cd5..e0df3754b9db3 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java @@ -28,6 +28,8 @@ import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; +import java.util.Collections; +import java.util.Map; import java.util.Objects; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; @@ -39,6 +41,7 @@ public class DataFrameTransformConfig implements ToXContentObject { public static final ParseField SOURCE = new ParseField("source"); public static final ParseField DEST = new ParseField("dest"); public static final ParseField QUERY = new ParseField("query"); + public static final ParseField MAPPING_OVERRIDE = new ParseField("mapping_override"); // types of transforms public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot"); @@ -47,6 +50,7 @@ public class DataFrameTransformConfig implements ToXContentObject { private final String dest; private final QueryConfig queryConfig; private final PivotConfig pivotConfig; + private final Map mappingOverride; public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_frame_transform", true, @@ -56,7 +60,9 @@ public class DataFrameTransformConfig implements ToXContentObject { String dest = (String) args[2]; QueryConfig queryConfig = (QueryConfig) args[3]; PivotConfig pivotConfig = (PivotConfig) args[4]; - return new DataFrameTransformConfig(id, source, dest, queryConfig, pivotConfig); + @SuppressWarnings("unchecked") + Map mappingOverrides = (Map) args[5]; + return new DataFrameTransformConfig(id, source, dest, queryConfig, pivotConfig, mappingOverrides); }); static { @@ -65,23 +71,25 @@ public class DataFrameTransformConfig implements ToXContentObject { PARSER.declareString(constructorArg(), DEST); PARSER.declareObject(optionalConstructorArg(), (p, c) -> QueryConfig.fromXContent(p), QUERY); PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), MAPPING_OVERRIDE); } public static DataFrameTransformConfig fromXContent(final XContentParser parser) { return PARSER.apply(parser, null); } - public DataFrameTransformConfig(final String id, final String source, final String dest, final QueryConfig queryConfig, - final PivotConfig pivotConfig) { + final PivotConfig pivotConfig, + final Map mappingOverride) { this.id = Objects.requireNonNull(id); this.source = Objects.requireNonNull(source); this.dest = Objects.requireNonNull(dest); this.queryConfig = queryConfig; this.pivotConfig = pivotConfig; + this.mappingOverride = mappingOverride == null ? null : Collections.unmodifiableMap(mappingOverride); } public String getId() { @@ -104,6 +112,10 @@ public QueryConfig getQueryConfig() { return queryConfig; } + public Map getMappingOverrides() { + return mappingOverride; + } + public boolean isValid() { if (queryConfig != null && queryConfig.isValid() == false) { return false; @@ -128,6 +140,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa if (pivotConfig != null) { builder.field(PIVOT_TRANSFORM.getPreferredName(), pivotConfig); } + if (mappingOverride != null) { + builder.field(MAPPING_OVERRIDE.getPreferredName(), mappingOverride); + } builder.endObject(); return builder; } @@ -148,12 +163,13 @@ public boolean equals(Object other) { && Objects.equals(this.source, that.source) && Objects.equals(this.dest, that.dest) && Objects.equals(this.queryConfig, that.queryConfig) - && Objects.equals(this.pivotConfig, that.pivotConfig); + && Objects.equals(this.pivotConfig, that.pivotConfig) + && Objects.equals(this.mappingOverride, that.mappingOverride); } @Override public int hashCode() { - return Objects.hash(id, source, dest, queryConfig, pivotConfig); + return Objects.hash(id, source, dest, queryConfig, pivotConfig, mappingOverride); } @Override diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java index 9c5bb7c175480..54a7a14a81e95 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java @@ -84,7 +84,12 @@ public void testCreateDelete() throws IOException { PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig); String id = "test-crud"; - DataFrameTransformConfig transform = new DataFrameTransformConfig(id, sourceIndex, "pivot-dest", queryConfig, pivotConfig); + DataFrameTransformConfig transform = new DataFrameTransformConfig(id, + sourceIndex, + "pivot-dest", + queryConfig, + pivotConfig, + Collections.emptyMap()); DataFrameClient client = highLevelClient().dataFrame(); AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform, @@ -114,7 +119,12 @@ public void testStartStop() throws IOException { PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig); String id = "test-stop-start"; - DataFrameTransformConfig transform = new DataFrameTransformConfig(id, sourceIndex, "pivot-dest", queryConfig, pivotConfig); + DataFrameTransformConfig transform = new DataFrameTransformConfig(id, + sourceIndex, + "pivot-dest", + queryConfig, + pivotConfig, + Collections.emptyMap()); DataFrameClient client = highLevelClient().dataFrame(); AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform, diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java index af90c15c3d901..c0c8c24c06530 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java @@ -28,15 +28,29 @@ import java.io.IOException; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.function.Predicate; public class DataFrameTransformConfigTests extends AbstractXContentTestCase { public static DataFrameTransformConfig randomDataFrameTransformConfig() { return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), - randomAlphaOfLengthBetween(1, 10), QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); + randomAlphaOfLengthBetween(1, 10), QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig(), + randomNullableStringMap()); } + public static Map randomNullableStringMap() { + Map stringStringMap = null; + if (randomBoolean()) { + stringStringMap = new HashMap<>(); + int kvCount = randomInt(10); + for (int i = 0; i < kvCount; i++) { + stringStringMap.put(randomAlphaOfLength(10), randomAlphaOfLength(10)); + } + } + return stringStringMap; + } @Override protected DataFrameTransformConfig createTestInstance() { return randomDataFrameTransformConfig(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index 75fcf573f5a93..a2df24ea654dc 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -49,7 +49,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -115,6 +117,10 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException AggregationBuilders.avg("avg_rating").field("stars")); // <1> AggregationConfig aggConfig = new AggregationConfig(aggBuilder); // end::put-data-frame-transform-agg-config + // tag::put-data-frame-transform-mapping-override + Map mappingOverride = new HashMap<>(); + mappingOverride.put("avg_rating", "keyword"); // <1> + // end::put-data-frame-transform-mapping-override // tag::put-data-frame-transform-pivot-config PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig); // end::put-data-frame-transform-pivot-config @@ -124,7 +130,8 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException "source-index", // <2> "pivot-destination", // <3> queryConfig, // <4> - pivotConfig); // <5> + pivotConfig, // <5> + mappingOverride); // <6> // end::put-data-frame-transform-config { @@ -144,7 +151,7 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException { DataFrameTransformConfig configWithDifferentId = new DataFrameTransformConfig("reviewer-avg-rating2", transformConfig.getSource(), transformConfig.getDestination(), transformConfig.getQueryConfig(), - transformConfig.getPivotConfig()); + transformConfig.getPivotConfig(), null); PutDataFrameTransformRequest request = new PutDataFrameTransformRequest(configWithDifferentId); // tag::put-data-frame-transform-execute-listener @@ -189,7 +196,7 @@ public void testStartStop() throws IOException, InterruptedException { PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig); DataFrameTransformConfig transformConfig = new DataFrameTransformConfig("mega-transform", - "source-data", "pivot-dest", queryConfig, pivotConfig); + "source-data", "pivot-dest", queryConfig, pivotConfig, null); client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig), RequestOptions.DEFAULT); transformsToClean.add(transformConfig.getId()); @@ -306,9 +313,9 @@ public void testDeleteDataFrameTransform() throws IOException, InterruptedExcept PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig); DataFrameTransformConfig transformConfig1 = new DataFrameTransformConfig("mega-transform", - "source-data", "pivot-dest", queryConfig, pivotConfig); + "source-data", "pivot-dest", queryConfig, pivotConfig, null); DataFrameTransformConfig transformConfig2 = new DataFrameTransformConfig("mega-transform2", - "source-data", "pivot-dest2", queryConfig, pivotConfig); + "source-data", "pivot-dest2", queryConfig, pivotConfig, null); client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig1), RequestOptions.DEFAULT); client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig2), RequestOptions.DEFAULT); diff --git a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc index 7f8b1b0e9fbf1..20467b334e1d2 100644 --- a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc +++ b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc @@ -36,6 +36,7 @@ include-tagged::{doc-tests-file}[{api}-config] <3> The destination index <4> Optionally a QueryConfig <5> The PivotConfig +<6> The provided `Map` values to override deduced destination index mappings. [id="{upid}-{api}-query-config"] ==== QueryConfig @@ -84,6 +85,19 @@ include-tagged::{doc-tests-file}[{api}-agg-config] -------------------------------------------------- <1> Aggregate the average star rating +==== Overriding Deduced Destination Mapping Field Types + +When creating the destination index for the {dataframe-transform}, a best +effort mapping is created. This option allows overriding specific mapped +field types. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[{api}-mapping-override] +-------------------------------------------------- +<1> Override the deduced mapping for field `avg_rating` to be a type of +`keyword` + include::../execution.asciidoc[] [id="{upid}-{api}-response"] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java index 728505f3de05d..719e89ec5bb80 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java @@ -45,7 +45,7 @@ public class DataFrameMessages { "Failed to create composite aggregation from pivot function"; public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID = "Data frame transform configuration [{0}] has invalid elements"; - + public static final String DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS = "Failed to gather field mappings for index [{0}]"; public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_QUERY = "Failed to parse query for data frame transform"; public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_GROUP_BY = diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java index 6172bb2de1f6a..8ee91422ae932 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java @@ -39,6 +39,7 @@ public class DataFrameTransformConfig extends AbstractDiffable mappingOverrides; private static ConstructingObjectParser createParser(boolean lenient) { ConstructingObjectParser parser = new ConstructingObjectParser<>(NAME, lenient, @@ -93,7 +95,10 @@ private static ConstructingObjectParser create } PivotConfig pivotConfig = (PivotConfig) args[6]; - return new DataFrameTransformConfig(id, source, dest, headers, queryConfig, pivotConfig); + + @SuppressWarnings("unchecked") + Map mappingOverrides = (Map) args[7]; + return new DataFrameTransformConfig(id, source, dest, headers, queryConfig, pivotConfig, mappingOverrides); }); parser.declareString(optionalConstructorArg(), DataFrameField.ID); @@ -104,6 +109,7 @@ private static ConstructingObjectParser create parser.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), HEADERS); parser.declareObject(optionalConstructorArg(), (p, c) -> QueryConfig.fromXContent(p, lenient), QUERY); parser.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p, lenient), PIVOT_TRANSFORM); + parser.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), MAPPING_OVERRIDES); return parser; } @@ -117,13 +123,15 @@ public DataFrameTransformConfig(final String id, final String dest, final Map headers, final QueryConfig queryConfig, - final PivotConfig pivotConfig) { + final PivotConfig pivotConfig, + final Map mappingOverrides) { this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); this.source = ExceptionsHelper.requireNonNull(source, DataFrameField.SOURCE.getPreferredName()); this.dest = ExceptionsHelper.requireNonNull(dest, DataFrameField.DESTINATION.getPreferredName()); this.queryConfig = ExceptionsHelper.requireNonNull(queryConfig, QUERY.getPreferredName()); this.setHeaders(headers == null ? Collections.emptyMap() : headers); this.pivotConfig = pivotConfig; + this.mappingOverrides = mappingOverrides == null ? Collections.emptyMap() : Collections.unmodifiableMap(mappingOverrides); // at least one function must be defined if (this.pivotConfig == null) { @@ -138,6 +146,7 @@ public DataFrameTransformConfig(final StreamInput in) throws IOException { setHeaders(in.readMap(StreamInput::readString, StreamInput::readString)); queryConfig = in.readOptionalWriteable(QueryConfig::new); pivotConfig = in.readOptionalWriteable(PivotConfig::new); + mappingOverrides = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } public String getId() { @@ -168,6 +177,10 @@ public QueryConfig getQueryConfig() { return queryConfig; } + public Map getMappingOverrides() { + return mappingOverrides; + } + public boolean isValid() { // collect validation results from all child objects if (queryConfig != null && queryConfig.isValid() == false) { @@ -189,6 +202,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); out.writeOptionalWriteable(queryConfig); out.writeOptionalWriteable(pivotConfig); + out.writeMap(mappingOverrides, StreamOutput::writeString, StreamOutput::writeString); } @Override @@ -209,6 +223,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa if (headers.isEmpty() == false && params.paramAsBoolean(DataFrameField.FOR_INTERNAL_STORAGE, false) == true) { builder.field(HEADERS.getPreferredName(), headers); } + if (mappingOverrides.isEmpty() == false) { + builder.field(MAPPING_OVERRIDES.getPreferredName(), mappingOverrides); + } builder.endObject(); return builder; @@ -231,12 +248,13 @@ public boolean equals(Object other) { && Objects.equals(this.dest, that.dest) && Objects.equals(this.headers, that.headers) && Objects.equals(this.queryConfig, that.queryConfig) - && Objects.equals(this.pivotConfig, that.pivotConfig); + && Objects.equals(this.pivotConfig, that.pivotConfig) + && Objects.equals(this.mappingOverrides, that.mappingOverrides); } @Override public int hashCode() { - return Objects.hash(id, source, dest, headers, queryConfig, pivotConfig); + return Objects.hash(id, source, dest, headers, queryConfig, pivotConfig, mappingOverrides); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java index 5da0f8bef083d..7b114272a34db 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java @@ -19,6 +19,8 @@ import org.junit.Before; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import static java.util.Collections.emptyList; @@ -62,8 +64,17 @@ protected boolean supportsUnknownFields() { @Override protected Request createTestInstance() { + Map mappingOverrides = null; + if (randomBoolean()) { + mappingOverrides = new HashMap<>(); + int kvCount = randomInt(10); + for (int i = 0; i < kvCount; i++) { + mappingOverrides.put(randomAlphaOfLength(10), randomAlphaOfLength(10)); + } + } DataFrameTransformConfig config = new DataFrameTransformConfig("transform-preview", randomAlphaOfLength(10), - "unused-transform-preview-index", null, QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); + "unused-transform-preview-index", null, QueryConfigTests.randomQueryConfig(), + PivotConfigTests.randomPivotConfig(), mappingOverrides); return new Request(config); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java index 76db5a1266d6b..99c71d1ac4a56 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java @@ -36,34 +36,47 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders() { return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), - randomAlphaOfLengthBetween(1, 10), null, QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); + randomAlphaOfLengthBetween(1, 10), null, QueryConfigTests.randomQueryConfig(), + PivotConfigTests.randomPivotConfig(), randomNullableStringMap()); } public static DataFrameTransformConfig randomDataFrameTransformConfig() { return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomHeaders(), QueryConfigTests.randomQueryConfig(), - PivotConfigTests.randomPivotConfig()); + PivotConfigTests.randomPivotConfig(), randomNullableStringMap()); } public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders(String id) { return new DataFrameTransformConfig(id, randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), null, - QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); + QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig(), randomNullableStringMap()); } public static DataFrameTransformConfig randomDataFrameTransformConfig(String id) { return new DataFrameTransformConfig(id, randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomHeaders(), - QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); + QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig(), randomNullableStringMap()); } public static DataFrameTransformConfig randomInvalidDataFrameTransformConfig() { if (randomBoolean()) { return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomHeaders(), QueryConfigTests.randomInvalidQueryConfig(), - PivotConfigTests.randomPivotConfig()); + PivotConfigTests.randomPivotConfig(), randomNullableStringMap()); } // else return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomHeaders(), QueryConfigTests.randomQueryConfig(), - PivotConfigTests.randomInvalidPivotConfig()); + PivotConfigTests.randomInvalidPivotConfig(), randomNullableStringMap()); + } + + public static Map randomNullableStringMap() { + Map stringStringMap = null; + if (randomBoolean()) { + stringStringMap = new HashMap<>(); + int kvCount = randomInt(10); + for (int i = 0; i < kvCount; i++) { + stringStringMap.put(randomAlphaOfLength(10), randomAlphaOfLength(10)); + } + } + return stringStringMap; } @Before diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index 17e918625a22b..df81740970454 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -18,6 +18,7 @@ import java.util.Set; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; public class DataFramePivotRestIT extends DataFrameRestTestCase { @@ -267,6 +268,98 @@ public void testPreviewTransform() throws Exception { }); } + public void testPivotWithMaxOnDateField() throws Exception { + String transformId = "simpleDateHistogramPivotWithMaxTime"; + String dataFrameIndex = "pivot_reviews_via_date_histogram_with_max_time"; + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex); + + final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, + BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); + + String config = "{" + + " \"source\": \"" + REVIEWS_INDEX_NAME + "\"," + + " \"dest\": \"" + dataFrameIndex + "\","; + + config +=" \"pivot\": { \n" + + " \"group_by\": {\n" + + " \"by_day\": {\"date_histogram\": {\n" + + " \"interval\": \"1d\",\"field\":\"timestamp\",\"format\":\"yyyy-MM-DD\"\n" + + " }}\n" + + " },\n" + + " \n" + + " \"aggs\" :{\n" + + " \"avg_rating\": {\n" + + " \"avg\": {\"field\": \"stars\"}\n" + + " },\n" + + " \"timestamp\": {\n" + + " \"max\": {\"field\": \"timestamp\"}\n" + + " }\n" + + " }}" + + "}"; + + createDataframeTransformRequest.setJsonEntity(config); + Map createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest)); + assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + assertTrue(indexExists(dataFrameIndex)); + + startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); + + // we expect 21 documents as there shall be 21 days worth of docs + Map indexStats = getAsMap(dataFrameIndex + "/_stats"); + assertEquals(21, XContentMapValues.extractValue("_all.total.docs.count", indexStats)); + assertOnePivotValue(dataFrameIndex + "/_search?q=by_day:2017-01-15", 3.82); + Map searchResult = getAsMap(dataFrameIndex + "/_search?q=by_day:2017-01-15"); + String actual = (String) ((List) XContentMapValues.extractValue("hits.hits._source.timestamp", searchResult)).get(0); + // Do `containsString` as actual ending timestamp is indeterminate due to how data is generated + assertThat(actual, containsString("2017-01-15T20:")); + } + + public void testPivotWithMappingOverride() throws Exception { + String transformId = "simpleDateHistogramPivotWithMappingOverride"; + String dataFrameIndex = "pivot_reviews_via_date_histogram_and_mapping_override"; + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex); + + final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, + BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); + + String config = "{" + + " \"source\": \"" + REVIEWS_INDEX_NAME + "\"," + + " \"dest\": \"" + dataFrameIndex + "\","; + + config += " \"pivot\": { \n" + + " \"group_by\": {\n" + + " \"by_day\": {\"date_histogram\": {\n" + + " \"interval\": \"1d\",\"field\":\"timestamp\",\"format\":\"yyyy-MM-DD\"\n" + + " }}\n" + + " },\n" + + " \"aggs\" :{\n" + + " \"avg_rating\": {\n" + + " \"avg\": {\"field\": \"stars\"}\n" + + " },\n" + + " \"timestamp\": {\n" + + " \"max\": {\"field\": \"timestamp\"}\n" + + " }\n" + + " }},\n" + + " \"mapping_override\": {\"avg_rating\": \"keyword\"}}"; + + createDataframeTransformRequest.setJsonEntity(config); + Map createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest)); + assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + assertTrue(indexExists(dataFrameIndex)); + + startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); + + // we expect 21 documents as there shall be 21 days worth of docs + Map indexStats = getAsMap(dataFrameIndex + "/_stats"); + assertEquals(21, XContentMapValues.extractValue("_all.total.docs.count", indexStats)); + Map searchResult = getAsMap(dataFrameIndex + "/_search?q=by_day:2017-01-15"); + String actualAvg = (String) ((List) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0); + assertThat(actualAvg, equalTo("3.82")); + String actualTime = (String) ((List) XContentMapValues.extractValue("hits.hits._source.timestamp", searchResult)).get(0); + // Do `containsString` as actual ending timestamp is indeterminate due to how data is generated + assertThat(actualTime, containsString("2017-01-15T20")); + } + private void assertOnePivotValue(String query, double expected) throws IOException { Map searchResult = getAsMap(query); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java index 21a87fa743a45..72a75227e780f 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; import java.util.List; @@ -57,9 +58,12 @@ protected void doExecute(Task task, return; } - Pivot pivot = new Pivot(request.getConfig().getSource(), - request.getConfig().getQueryConfig().getQuery(), - request.getConfig().getPivotConfig()); + final DataFrameTransformConfig config = request.getConfig(); + + Pivot pivot = new Pivot(config.getSource(), + config.getQueryConfig().getQuery(), + config.getPivotConfig(), + config.getMappingOverrides()); getPreview(pivot, ActionListener.wrap( previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)), @@ -68,18 +72,24 @@ protected void doExecute(Task task, } private void getPreview(Pivot pivot, ActionListener>> listener) { - ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(), - ClientHelper.DATA_FRAME_ORIGIN, - client, - SearchAction.INSTANCE, - pivot.buildSearchRequest(null), - ActionListener.wrap( - r -> { - final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME); - DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats(); - listener.onResponse(pivot.extractResults(agg, stats).collect(Collectors.toList())); - }, - listener::onFailure - )); + pivot.deduceMappings(client, ActionListener.wrap( + deducedMappings -> { + ClientHelper.executeWithHeadersAsync(threadPool.getThreadContext().getHeaders(), + ClientHelper.DATA_FRAME_ORIGIN, + client, + SearchAction.INSTANCE, + pivot.buildSearchRequest(null), + ActionListener.wrap( + r -> { + final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME); + DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats(); + listener.onResponse(pivot.extractResults(agg, deducedMappings, stats).collect(Collectors.toList())); + }, + listener::onFailure + )); + }, + listener::onFailure + )); + } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java index b696243cc5d0d..de417bb4d20aa 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java @@ -196,7 +196,10 @@ private void handlePrivsResponse(String username, private void putDataFrame(DataFrameTransformConfig config, ActionListener listener) { - final Pivot pivot = new Pivot(config.getSource(), config.getQueryConfig().getQuery(), config.getPivotConfig()); + final Pivot pivot = new Pivot(config.getSource(), + config.getQueryConfig().getQuery(), + config.getPivotConfig(), + config.getMappingOverrides()); // <5> Return the listener, or clean up destination index on failure. diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index 8fa5a36b3d850..e0f536d4639e1 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -44,11 +44,13 @@ public DataFrameIndexer(Executor executor, AtomicReference initial protected abstract DataFrameTransformConfig getConfig(); + protected abstract Map getFieldMappings(); + @Override protected void onStartJob(long now) { QueryBuilder queryBuilder = getConfig().getQueryConfig().getQuery(); - pivot = new Pivot(getConfig().getSource(), queryBuilder, getConfig().getPivotConfig()); + pivot = new Pivot(getConfig().getSource(), queryBuilder, getConfig().getPivotConfig(), getConfig().getMappingOverrides()); } @Override @@ -70,7 +72,7 @@ private Stream processBucketsToIndexRequests(CompositeAggregation final DataFrameTransformConfig transformConfig = getConfig(); String indexName = transformConfig.getDestination(); - return pivot.extractResults(agg, getStats()).map(document -> { + return pivot.extractResults(agg, getFieldMappings(), getStats()).map(document -> { XContentBuilder builder; try { builder = jsonBuilder(); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index e4446c65abe34..c591cbe3a4f71 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -35,6 +35,7 @@ import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; +import org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -227,6 +228,7 @@ protected class ClientDataFrameIndexer extends DataFrameIndexer { private final Client client; private final DataFrameTransformsConfigManager transformsConfigManager; private final String transformId; + private Map fieldMappings = null; private DataFrameTransformConfig transformConfig = null; @@ -243,6 +245,11 @@ protected DataFrameTransformConfig getConfig() { return transformConfig; } + @Override + protected Map getFieldMappings() { + return fieldMappings; + } + @Override protected String getJobId() { return transformId; @@ -274,6 +281,27 @@ public synchronized boolean maybeTriggerAsyncJob(long now) { DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId)); } + if (fieldMappings == null) { + CountDownLatch latch = new CountDownLatch(1); + SchemaUtil.getDestinationFieldMappings(client, transformConfig.getDestination(), new LatchedActionListener<>( + ActionListener.wrap( + destinationMappings -> fieldMappings = destinationMappings, + e -> { + throw new RuntimeException( + DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS, + transformConfig.getDestination()), + e); + }), latch)); + try { + latch.await(LOAD_TRANSFORM_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException( + DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS, + transformConfig.getDestination()), + e); + } + } + return super.maybeTriggerAsyncJob(now); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java index 496f1e0ac13ce..fa7536497c4f0 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtils.java @@ -21,6 +21,8 @@ import java.util.Map; import java.util.stream.Stream; +import static org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil.isNumericType; + final class AggregationResultUtils { private static final Logger logger = LogManager.getLogger(AggregationResultUtils.class); @@ -30,30 +32,38 @@ final class AggregationResultUtils { * @param agg The aggregation result * @param groups The original groupings used for querying * @param aggregationBuilders the aggregation used for querying - * @param dataFrameIndexerTransformStats stats collector + * @param fieldTypeMap A Map containing "field-name": "type" entries to determine the appropriate type for the aggregation results. + * @param stats stats collector * @return a map containing the results of the aggregation in a consumable way */ public static Stream> extractCompositeAggregationResults(CompositeAggregation agg, - GroupConfig groups, - Collection aggregationBuilders, - DataFrameIndexerTransformStats dataFrameIndexerTransformStats) { + GroupConfig groups, + Collection aggregationBuilders, + Map fieldTypeMap, + DataFrameIndexerTransformStats stats) { return agg.getBuckets().stream().map(bucket -> { - dataFrameIndexerTransformStats.incrementNumDocuments(bucket.getDocCount()); + stats.incrementNumDocuments(bucket.getDocCount()); Map document = new HashMap<>(); - groups.getGroups().keySet().forEach(destinationFieldName -> { - document.put(destinationFieldName, bucket.getKey().get(destinationFieldName)); - }); + groups.getGroups().keySet().forEach(destinationFieldName -> + document.put(destinationFieldName, bucket.getKey().get(destinationFieldName))); for (AggregationBuilder aggregationBuilder : aggregationBuilders) { String aggName = aggregationBuilder.getName(); + final String fieldType = fieldTypeMap.get(aggName); // TODO: support other aggregation types Aggregation aggResult = bucket.getAggregations().get(aggName); if (aggResult instanceof NumericMetricsAggregation.SingleValue) { NumericMetricsAggregation.SingleValue aggResultSingleValue = (SingleValue) aggResult; - document.put(aggName, aggResultSingleValue.value()); + // If the type is numeric, simply gather the `value` type, otherwise utilize `getValueAsString` so we don't lose + // formatted outputs. + if (isNumericType(fieldType)) { + document.put(aggName, aggResultSingleValue.value()); + } else { + document.put(aggName, aggResultSingleValue.getValueAsString()); + } } else { // Execution should never reach this point! // Creating transforms with unsupported aggregations shall not be possible diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java index 2d23444e872ce..9cb940c67a0d8 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java @@ -43,12 +43,14 @@ public class Pivot { // objects for re-using private final CompositeAggregationBuilder cachedCompositeAggregation; private final SearchRequest cachedSearchRequest; + private final Map mappingOverrides; - public Pivot(String source, QueryBuilder query, PivotConfig config) { + public Pivot(String source, QueryBuilder query, PivotConfig config, Map mappingOverrides) { this.source = source; this.config = config; this.cachedCompositeAggregation = createCompositeAggregation(config); this.cachedSearchRequest = createSearchRequest(source, query, cachedCompositeAggregation); + this.mappingOverrides = mappingOverrides; } public void validate(Client client, final ActionListener listener) { @@ -65,7 +67,7 @@ public void validate(Client client, final ActionListener listener) { } public void deduceMappings(Client client, final ActionListener> listener) { - SchemaUtil.deduceMappings(client, config, source, listener); + SchemaUtil.deduceMappings(client, config, source, mappingOverrides, listener); } public SearchRequest buildSearchRequest(Map position) { @@ -77,12 +79,17 @@ public SearchRequest buildSearchRequest(Map position) { } public Stream> extractResults(CompositeAggregation agg, - DataFrameIndexerTransformStats dataFrameIndexerTransformStats) { + Map fieldTypeMap, + DataFrameIndexerTransformStats dataFrameIndexerTransformStats) { GroupConfig groups = config.getGroupConfig(); Collection aggregationBuilders = config.getAggregationConfig().getAggregatorFactories(); - return AggregationResultUtils.extractCompositeAggregationResults(agg, groups, aggregationBuilders, dataFrameIndexerTransformStats); + return AggregationResultUtils.extractCompositeAggregationResults(agg, + groups, + aggregationBuilders, + fieldTypeMap, + dataFrameIndexerTransformStats); } private void runTestQuery(Client client, final ActionListener listener) { @@ -99,7 +106,7 @@ private void runTestQuery(Client client, final ActionListener listener) } listener.onResponse(true); }, e->{ - listener.onFailure(new RuntimeException("Failed to test query",e)); + listener.onFailure(new RuntimeException("Failed to test query", e)); })); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/SchemaUtil.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/SchemaUtil.java index a8be175cf5377..0dfd5c040d639 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/SchemaUtil.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/SchemaUtil.java @@ -13,20 +13,53 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetaData; import org.elasticsearch.client.Client; +import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; -public class SchemaUtil { +public final class SchemaUtil { private static final Logger logger = LogManager.getLogger(SchemaUtil.class); + // Full collection of numeric field type strings + private static final Set NUMERIC_FIELD_MAPPER_TYPES; + static { + Set types = Stream.of(NumberFieldMapper.NumberType.values()) + .map(NumberFieldMapper.NumberType::typeName) + .collect(Collectors.toSet()); + types.add("scaled_float"); // have to add manually since scaled_float is in a module + NUMERIC_FIELD_MAPPER_TYPES = types; + } + private SchemaUtil() { } - public static void deduceMappings(final Client client, final PivotConfig config, final String source, + public static boolean isNumericType(String type) { + return type != null && NUMERIC_FIELD_MAPPER_TYPES.contains(type); + } + + /** + * Deduce the mappings for the destination index given the source index + * + * The Listener is alerted with a {@code Map} that is a "field-name":"type" mapping + * + * @param client Client from which to make requests against the cluster + * @param config The PivotConfig for which to deduce destination mapping + * @param source Source index that contains the data to pivot + * @param userProvidedTypes A map of "field-name":"mapping-type" provided by the user to override deduced mapping types + * @param listener Listener to alert on success or failure. + */ + public static void deduceMappings(final Client client, + final PivotConfig config, + final String source, + final Map userProvidedTypes, final ActionListener> listener) { // collects the fieldnames used as source for aggregations Map aggregationSourceFieldNames = new HashMap<>(); @@ -56,18 +89,44 @@ public static void deduceMappings(final Client client, final PivotConfig config, allFieldNames.putAll(fieldNamesForGrouping); getSourceFieldMappings(client, source, allFieldNames.values().toArray(new String[0]), - ActionListener.wrap(sourceMappings -> { - Map targetMapping = resolveMappings(aggregationSourceFieldNames, aggregationTypes, - fieldNamesForGrouping, sourceMappings); - - listener.onResponse(targetMapping); - }, e -> { - listener.onFailure(e); - })); + ActionListener.wrap( + sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames, + aggregationTypes, + fieldNamesForGrouping, + sourceMappings, + userProvidedTypes)), + listener::onFailure)); + } + + /** + * Gathers the field mappings for the "destination" index. Listener will receive an error, or a {@code Map} of + * "field-name":"type". + * + * @param client Client used to execute the request + * @param index The index, or index pattern, from which to gather all the field mappings + * @param listener The listener to be alerted on success or failure. + */ + public static void getDestinationFieldMappings(final Client client, + final String index, + final ActionListener> listener) { + GetFieldMappingsRequest fieldMappingRequest = new GetFieldMappingsRequest(); + fieldMappingRequest.indices(index); + fieldMappingRequest.fields("*"); + ClientHelper.executeAsyncWithOrigin(client, + ClientHelper.DATA_FRAME_ORIGIN, + GetFieldMappingsAction.INSTANCE, + fieldMappingRequest, + ActionListener.wrap( + r -> listener.onResponse(extractFieldMappings(r.mappings())), + listener::onFailure + )); } private static Map resolveMappings(Map aggregationSourceFieldNames, - Map aggregationTypes, Map fieldNamesForGrouping, Map sourceMappings) { + Map aggregationTypes, + Map fieldNamesForGrouping, + Map sourceMappings, + Map providedOverrides) { Map targetMapping = new HashMap<>(); aggregationTypes.forEach((targetFieldName, aggregationName) -> { @@ -95,6 +154,10 @@ private static Map resolveMappings(Map aggregati targetMapping.put(targetFieldName, "keyword"); } }); + providedOverrides.forEach((targetFieldName, userProvidedType) -> { + logger.debug("Using user provided mapping type [" + userProvidedType + "] for field [" + targetFieldName + "]"); + targetMapping.put(targetFieldName, userProvidedType); + }); return targetMapping; } @@ -107,14 +170,12 @@ private static void getSourceFieldMappings(Client client, String index, String[] fieldMappingRequest.indices(index); fieldMappingRequest.fields(fields); - client.execute(GetFieldMappingsAction.INSTANCE, fieldMappingRequest, ActionListener.wrap(response -> { - listener.onResponse(extractSourceFieldMappings(response.mappings())); - }, e -> { - listener.onFailure(e); - })); + client.execute(GetFieldMappingsAction.INSTANCE, fieldMappingRequest, ActionListener.wrap( + response -> listener.onResponse(extractFieldMappings(response.mappings())), + listener::onFailure)); } - private static Map extractSourceFieldMappings(Map>> mappings) { + private static Map extractFieldMappings(Map>> mappings) { Map extractedTypes = new HashMap<>(); mappings.forEach((indexName, docTypeToMapping) -> { diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java index 30b710ac13c4f..287f327d0f664 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/AggregationResultUtilsTests.java @@ -140,8 +140,11 @@ aggTypedName, asMap( aggName, 12.55 ) ); - - executeTest(groupBy, aggregationBuilders, input, expected, 20); + Map fieldTypeMap = asStringMap( + targetField, "keyword", + aggName, "double" + ); + executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 20); } public void testExtractCompositeAggregationResultsMultiSources() throws IOException { @@ -222,7 +225,12 @@ aggTypedName, asMap( aggName, 12.55 ) ); - executeTest(groupBy, aggregationBuilders, input, expected, 10); + Map fieldTypeMap = asStringMap( + aggName, "double", + targetField, "keyword", + targetField2, "keyword" + ); + executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 10); } public void testExtractCompositeAggregationResultsMultiAggregations() throws IOException { @@ -287,11 +295,119 @@ aggTypedName2, asMap( aggName2, -2.44 ) ); - executeTest(groupBy, aggregationBuilders, input, expected, 200); + Map fieldTypeMap = asStringMap( + targetField, "keyword", + aggName, "double", + aggName2, "double" + ); + executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 200); + } + + public void testExtractCompositeAggregationResultsMultiAggregationsAndTypes() throws IOException { + String targetField = randomAlphaOfLengthBetween(5, 10); + String targetField2 = randomAlphaOfLengthBetween(5, 10) + "_2"; + + GroupConfig groupBy = parseGroupConfig("{" + + "\"" + targetField + "\" : {" + + " \"terms\" : {" + + " \"field\" : \"doesn't_matter_for_this_test\"" + + " } }," + + "\"" + targetField2 + "\" : {" + + " \"terms\" : {" + + " \"field\" : \"doesn't_matter_for_this_test\"" + + " } }" + + "}"); + + String aggName = randomAlphaOfLengthBetween(5, 10); + String aggTypedName = "avg#" + aggName; + + String aggName2 = randomAlphaOfLengthBetween(5, 10) + "_2"; + String aggTypedName2 = "max#" + aggName2; + + Collection aggregationBuilders = asList(AggregationBuilders.avg(aggName), AggregationBuilders.max(aggName2)); + + Map input = asMap( + "buckets", + asList( + asMap( + KEY, asMap( + targetField, "ID1", + targetField2, "ID1_2" + ), + aggTypedName, asMap( + "value", 42.33), + aggTypedName2, asMap( + "value", 9.9), + DOC_COUNT, 1), + asMap( + KEY, asMap( + targetField, "ID1", + targetField2, "ID2_2" + ), + aggTypedName, asMap( + "value", 8.4), + aggTypedName2, asMap( + "value", 222.33), + DOC_COUNT, 2), + asMap( + KEY, asMap( + targetField, "ID2", + targetField2, "ID1_2" + ), + aggTypedName, asMap( + "value", 28.99), + aggTypedName2, asMap( + "value", -2.44), + DOC_COUNT, 3), + asMap( + KEY, asMap( + targetField, "ID3", + targetField2, "ID2_2" + ), + aggTypedName, asMap( + "value", 12.55), + aggTypedName2, asMap( + "value", -100.44), + DOC_COUNT, 4) + )); + + List> expected = asList( + asMap( + targetField, "ID1", + targetField2, "ID1_2", + aggName, 42.33, + aggName2, "9.9" + ), + asMap( + targetField, "ID1", + targetField2, "ID2_2", + aggName, 8.4, + aggName2, "222.33" + ), + asMap( + targetField, "ID2", + targetField2, "ID1_2", + aggName, 28.99, + aggName2, "-2.44" + ), + asMap( + targetField, "ID3", + targetField2, "ID2_2", + aggName, 12.55, + aggName2, "-100.44" + ) + ); + Map fieldTypeMap = asStringMap( + aggName, "double", + aggName2, "keyword", // If the second aggregation was some non-numeric mapped field + targetField, "keyword", + targetField2, "keyword" + ); + executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 10); } private void executeTest(GroupConfig groups, Collection aggregationBuilders, Map input, - List> expected, long expectedDocCounts) throws IOException { + Map fieldTypeMap, List> expected, long expectedDocCounts) throws IOException { DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats(); XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values())); builder.map(input); @@ -299,7 +415,7 @@ private void executeTest(GroupConfig groups, Collection aggr try (XContentParser parser = createParser(builder)) { CompositeAggregation agg = ParsedComposite.fromXContent(parser, "my_feature"); List> result = AggregationResultUtils - .extractCompositeAggregationResults(agg, groups, aggregationBuilders, stats).collect(Collectors.toList()); + .extractCompositeAggregationResults(agg, groups, aggregationBuilders, fieldTypeMap, stats).collect(Collectors.toList()); assertEquals(expected, result); assertEquals(expectedDocCounts, stats.getNumDocuments()); @@ -321,4 +437,14 @@ static Map asMap(Object... fields) { } return map; } + + static Map asStringMap(String... strings) { + assert strings.length % 2 == 0; + final Map map = new HashMap<>(); + for (int i = 0; i < strings.length; i += 2) { + String field = strings[i]; + map.put(field, strings[i + 1]); + } + return map; + } } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java index 1621c7148059b..d268b9edec54e 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java @@ -37,7 +37,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -83,13 +85,13 @@ protected NamedXContentRegistry xContentRegistry() { } public void testValidateExistingIndex() throws Exception { - Pivot pivot = new Pivot("existing_source_index", new MatchAllQueryBuilder(), getValidPivotConfig()); + Pivot pivot = new Pivot("existing_source_index", new MatchAllQueryBuilder(), getValidPivotConfig(), Collections.emptyMap()); assertValidTransform(client, pivot); } public void testValidateNonExistingIndex() throws Exception { - Pivot pivot = new Pivot("non_existing_source_index", new MatchAllQueryBuilder(), getValidPivotConfig()); + Pivot pivot = new Pivot("non_existing_source_index", new MatchAllQueryBuilder(), getValidPivotConfig(), Collections.emptyMap()); assertInvalidTransform(client, pivot); } @@ -97,7 +99,10 @@ public void testValidateNonExistingIndex() throws Exception { public void testSearchFailure() throws Exception { // test a failure during the search operation, transform creation fails if // search has failures although they might just be temporary - Pivot pivot = new Pivot("existing_source_index_with_failing_shards", new MatchAllQueryBuilder(), getValidPivotConfig()); + Pivot pivot = new Pivot("existing_source_index_with_failing_shards", + new MatchAllQueryBuilder(), + getValidPivotConfig(), + Collections.emptyMap()); assertInvalidTransform(client, pivot); } @@ -106,7 +111,10 @@ public void testValidateAllSupportedAggregations() throws Exception { for (String agg : supportedAggregations) { AggregationConfig aggregationConfig = getAggregationConfig(agg); - Pivot pivot = new Pivot("existing_source", new MatchAllQueryBuilder(), getValidPivotConfig(aggregationConfig)); + Pivot pivot = new Pivot("existing_source", + new MatchAllQueryBuilder(), + getValidPivotConfig(aggregationConfig), + Collections.emptyMap()); assertValidTransform(client, pivot); } @@ -116,7 +124,10 @@ public void testValidateAllUnsupportedAggregations() throws Exception { for (String agg : unsupportedAggregations) { AggregationConfig aggregationConfig = getAggregationConfig(agg); - Pivot pivot = new Pivot("existing_source", new MatchAllQueryBuilder(), getValidPivotConfig(aggregationConfig)); + Pivot pivot = new Pivot("existing_source", + new MatchAllQueryBuilder(), + getValidPivotConfig(aggregationConfig), + Collections.emptyMap()); assertInvalidTransform(client, pivot); } @@ -178,6 +189,10 @@ private AggregationConfig getAggregationConfig(String agg) throws IOException { + " }\n" + " }" + "}"); } + private Map getFieldMappings() { + return Collections.singletonMap("values", "double"); + } + private AggregationConfig parseAggregations(String json) throws IOException { final XContentParser parser = XContentType.JSON.xContent().createParser(xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml index c8e7bc0a6ba6b..1177ee848bd8d 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml @@ -105,3 +105,46 @@ setup: - match: { count: 2 } - match: { transforms.0.id: "airline-transform" } - match: { transforms.1.id: "airline-transform-dos" } +--- +"Verify put transform creates destination index with appropriate mapping": + - do: + data_frame.put_data_frame_transform: + transform_id: "airline-transform" + body: > + { + "source": "airline-data", + "dest": "airline-data-by-airline", + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}, "time": {"max": {"field": "time"}}} + } + } + - match: { acknowledged: true } + - do: + indices.get_mapping: + index: airline-data-by-airline + - match: { airline-data-by-airline.mappings.properties.airline.type: keyword } + - match: { airline-data-by-airline.mappings.properties.avg_response.type: double } + - match: { airline-data-by-airline.mappings.properties.time.type: date } + + - do: + data_frame.put_data_frame_transform: + transform_id: "airline-transform-with-string-avg" + body: > + { + "source": "airline-data", + "dest": "airline-data-by-airline-string-avg", + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}, "time": {"max": {"field": "time"}}} + }, + "mapping_override": { "avg_response": "keyword" } + } + - match: { acknowledged: true } + - do: + indices.get_mapping: + index: airline-data-by-airline-string-avg + - match: { airline-data-by-airline-string-avg.mappings.properties.airline.type: keyword } + - match: { airline-data-by-airline-string-avg.mappings.properties.avg_response.type: keyword } + - match: { airline-data-by-airline-string-avg.mappings.properties.time.type: date } + From af30be6ea44b693588cd44a72631226a6b49dbbe Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 20 Mar 2019 13:42:09 -0500 Subject: [PATCH 2/4] correcting hlrc compilation issues after merge --- .../org/elasticsearch/client/DataFrameTransformIT.java | 7 ++++++- .../dataframe/PreviewDataFrameTransformRequestTests.java | 4 ++-- .../dataframe/PutDataFrameTransformRequestTests.java | 2 +- .../documentation/DataFrameTransformDocumentationIT.java | 3 ++- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java index 8150131f353e9..812417df7a174 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java @@ -229,7 +229,12 @@ public void testPreview() throws IOException { AggregationConfig aggConfig = new AggregationConfig(aggBuilder); PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig); - DataFrameTransformConfig transform = new DataFrameTransformConfig("test-preview", sourceIndex, null, queryConfig, pivotConfig); + DataFrameTransformConfig transform = new DataFrameTransformConfig("test-preview", + sourceIndex, + null, + queryConfig, + pivotConfig, + Collections.emptyMap()); DataFrameClient client = highLevelClient().dataFrame(); PreviewDataFrameTransformResponse preview = execute(new PreviewDataFrameTransformRequest(transform), diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PreviewDataFrameTransformRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PreviewDataFrameTransformRequestTests.java index 1dd7a5436314f..b3d3e48bace1f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PreviewDataFrameTransformRequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PreviewDataFrameTransformRequestTests.java @@ -66,13 +66,13 @@ public void testValidate() { // null id and destination is valid DataFrameTransformConfig config = new DataFrameTransformConfig(null, "source", null, - QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); + QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig(), Collections.emptyMap()); assertFalse(new PreviewDataFrameTransformRequest(config).validate().isPresent()); // null source is not valid config = new DataFrameTransformConfig(null, null, null, - QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); + QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig(), Collections.emptyMap()); Optional error = new PreviewDataFrameTransformRequest(config).validate(); assertTrue(error.isPresent()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PutDataFrameTransformRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PutDataFrameTransformRequestTests.java index ee73ccd8b7d42..b38f8e24ba6cd 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PutDataFrameTransformRequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PutDataFrameTransformRequestTests.java @@ -42,7 +42,7 @@ public void testValidate() { assertFalse(createTestInstance().validate().isPresent()); DataFrameTransformConfig config = new DataFrameTransformConfig(null, null, null, - QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); + QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig(), Collections.emptyMap()); Optional error = new PutDataFrameTransformRequest(config).validate(); assertTrue(error.isPresent()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index d9940c5a56c05..e6e16fb8d1cd9 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -385,7 +385,8 @@ public void testPreview() throws IOException, InterruptedException { "source-data", null, // <2> queryConfig, - pivotConfig); + pivotConfig, + Collections.emptyMap()); PreviewDataFrameTransformRequest request = new PreviewDataFrameTransformRequest(transformConfig); // <3> From ff5699ac485932b8610f38a4f2d4663de04b8f81 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 21 Mar 2019 09:19:17 -0500 Subject: [PATCH 3/4] removing mapping_override option --- .../transforms/DataFrameTransformConfig.java | 24 ++-------- .../client/DataFrameTransformIT.java | 9 ++-- ...PreviewDataFrameTransformRequestTests.java | 4 +- .../PutDataFrameTransformRequestTests.java | 2 +- .../DataFrameTransformConfigTests.java | 16 +------ .../DataFrameTransformDocumentationIT.java | 20 +++----- .../transforms/DataFrameTransformConfig.java | 25 ++-------- ...wDataFrameTransformActionRequestTests.java | 12 +---- .../DataFrameTransformConfigTests.java | 24 +++------- .../integration/DataFramePivotRestIT.java | 46 ------------------- ...nsportPreviewDataFrameTransformAction.java | 3 +- .../TransportPutDataFrameTransformAction.java | 3 +- .../transforms/DataFrameIndexer.java | 2 +- .../dataframe/transforms/pivot/Pivot.java | 6 +-- .../transforms/pivot/SchemaUtil.java | 12 +---- .../transforms/pivot/PivotTests.java | 13 ++---- .../test/data_frame/transforms_crud.yml | 21 --------- 17 files changed, 40 insertions(+), 202 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java index efd7e051e9992..296eb61148dbe 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; -import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -41,7 +40,6 @@ public class DataFrameTransformConfig implements ToXContentObject { public static final ParseField SOURCE = new ParseField("source"); public static final ParseField DEST = new ParseField("dest"); public static final ParseField QUERY = new ParseField("query"); - public static final ParseField MAPPING_OVERRIDE = new ParseField("mapping_override"); // types of transforms public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot"); @@ -50,7 +48,6 @@ public class DataFrameTransformConfig implements ToXContentObject { private final String dest; private final QueryConfig queryConfig; private final PivotConfig pivotConfig; - private final Map mappingOverride; public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_frame_transform", true, @@ -60,9 +57,7 @@ public class DataFrameTransformConfig implements ToXContentObject { String dest = (String) args[2]; QueryConfig queryConfig = (QueryConfig) args[3]; PivotConfig pivotConfig = (PivotConfig) args[4]; - @SuppressWarnings("unchecked") - Map mappingOverrides = (Map) args[5]; - return new DataFrameTransformConfig(id, source, dest, queryConfig, pivotConfig, mappingOverrides); + return new DataFrameTransformConfig(id, source, dest, queryConfig, pivotConfig); }); static { @@ -71,7 +66,6 @@ public class DataFrameTransformConfig implements ToXContentObject { PARSER.declareString(constructorArg(), DEST); PARSER.declareObject(optionalConstructorArg(), (p, c) -> QueryConfig.fromXContent(p), QUERY); PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM); - PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), MAPPING_OVERRIDE); } public static DataFrameTransformConfig fromXContent(final XContentParser parser) { @@ -82,14 +76,12 @@ public DataFrameTransformConfig(final String id, final String source, final String dest, final QueryConfig queryConfig, - final PivotConfig pivotConfig, - final Map mappingOverride) { + final PivotConfig pivotConfig) { this.id = id; this.source = source; this.dest = dest; this.queryConfig = queryConfig; this.pivotConfig = pivotConfig; - this.mappingOverride = mappingOverride == null ? null : Collections.unmodifiableMap(mappingOverride); } public String getId() { @@ -112,10 +104,6 @@ public QueryConfig getQueryConfig() { return queryConfig; } - public Map getMappingOverride() { - return mappingOverride; - } - @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); @@ -132,9 +120,6 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa if (pivotConfig != null) { builder.field(PIVOT_TRANSFORM.getPreferredName(), pivotConfig); } - if (mappingOverride != null) { - builder.field(MAPPING_OVERRIDE.getPreferredName(), mappingOverride); - } builder.endObject(); return builder; } @@ -155,13 +140,12 @@ public boolean equals(Object other) { && Objects.equals(this.source, that.source) && Objects.equals(this.dest, that.dest) && Objects.equals(this.queryConfig, that.queryConfig) - && Objects.equals(this.pivotConfig, that.pivotConfig) - && Objects.equals(this.mappingOverride, that.mappingOverride); + && Objects.equals(this.pivotConfig, that.pivotConfig); } @Override public int hashCode() { - return Objects.hash(id, source, dest, queryConfig, pivotConfig, mappingOverride); + return Objects.hash(id, source, dest, queryConfig, pivotConfig); } @Override diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java index 812417df7a174..c8f7160333882 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java @@ -156,8 +156,7 @@ public void testCreateDelete() throws IOException { sourceIndex, "pivot-dest", queryConfig, - pivotConfig, - Collections.emptyMap()); + pivotConfig); DataFrameClient client = highLevelClient().dataFrame(); AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform, @@ -191,8 +190,7 @@ public void testStartStop() throws IOException { sourceIndex, "pivot-dest", queryConfig, - pivotConfig, - Collections.emptyMap()); + pivotConfig); DataFrameClient client = highLevelClient().dataFrame(); AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform, @@ -233,8 +231,7 @@ public void testPreview() throws IOException { sourceIndex, null, queryConfig, - pivotConfig, - Collections.emptyMap()); + pivotConfig); DataFrameClient client = highLevelClient().dataFrame(); PreviewDataFrameTransformResponse preview = execute(new PreviewDataFrameTransformRequest(transform), diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PreviewDataFrameTransformRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PreviewDataFrameTransformRequestTests.java index b3d3e48bace1f..1dd7a5436314f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PreviewDataFrameTransformRequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PreviewDataFrameTransformRequestTests.java @@ -66,13 +66,13 @@ public void testValidate() { // null id and destination is valid DataFrameTransformConfig config = new DataFrameTransformConfig(null, "source", null, - QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig(), Collections.emptyMap()); + QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); assertFalse(new PreviewDataFrameTransformRequest(config).validate().isPresent()); // null source is not valid config = new DataFrameTransformConfig(null, null, null, - QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig(), Collections.emptyMap()); + QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); Optional error = new PreviewDataFrameTransformRequest(config).validate(); assertTrue(error.isPresent()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PutDataFrameTransformRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PutDataFrameTransformRequestTests.java index b38f8e24ba6cd..ee73ccd8b7d42 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PutDataFrameTransformRequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/PutDataFrameTransformRequestTests.java @@ -42,7 +42,7 @@ public void testValidate() { assertFalse(createTestInstance().validate().isPresent()); DataFrameTransformConfig config = new DataFrameTransformConfig(null, null, null, - QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig(), Collections.emptyMap()); + QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); Optional error = new PutDataFrameTransformRequest(config).validate(); assertTrue(error.isPresent()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java index c0c8c24c06530..7a4fe436d846b 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java @@ -28,29 +28,15 @@ import java.io.IOException; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import java.util.function.Predicate; public class DataFrameTransformConfigTests extends AbstractXContentTestCase { public static DataFrameTransformConfig randomDataFrameTransformConfig() { return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), - randomAlphaOfLengthBetween(1, 10), QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig(), - randomNullableStringMap()); + randomAlphaOfLengthBetween(1, 10), QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); } - public static Map randomNullableStringMap() { - Map stringStringMap = null; - if (randomBoolean()) { - stringStringMap = new HashMap<>(); - int kvCount = randomInt(10); - for (int i = 0; i < kvCount; i++) { - stringStringMap.put(randomAlphaOfLength(10), randomAlphaOfLength(10)); - } - } - return stringStringMap; - } @Override protected DataFrameTransformConfig createTestInstance() { return randomDataFrameTransformConfig(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index e6e16fb8d1cd9..1e69d74cecb14 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -51,9 +51,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -119,10 +117,6 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException AggregationBuilders.avg("avg_rating").field("stars")); // <1> AggregationConfig aggConfig = new AggregationConfig(aggBuilder); // end::put-data-frame-transform-agg-config - // tag::put-data-frame-transform-mapping-override - Map mappingOverride = new HashMap<>(); - mappingOverride.put("avg_rating", "keyword"); // <1> - // end::put-data-frame-transform-mapping-override // tag::put-data-frame-transform-pivot-config PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig); // end::put-data-frame-transform-pivot-config @@ -132,8 +126,7 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException "source-index", // <2> "pivot-destination", // <3> queryConfig, // <4> - pivotConfig, // <5> - mappingOverride); // <6> + pivotConfig); // <5> // end::put-data-frame-transform-config { @@ -153,7 +146,7 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException { DataFrameTransformConfig configWithDifferentId = new DataFrameTransformConfig("reviewer-avg-rating2", transformConfig.getSource(), transformConfig.getDestination(), transformConfig.getQueryConfig(), - transformConfig.getPivotConfig(), null); + transformConfig.getPivotConfig()); PutDataFrameTransformRequest request = new PutDataFrameTransformRequest(configWithDifferentId); // tag::put-data-frame-transform-execute-listener @@ -198,7 +191,7 @@ public void testStartStop() throws IOException, InterruptedException { PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig); DataFrameTransformConfig transformConfig = new DataFrameTransformConfig("mega-transform", - "source-data", "pivot-dest", queryConfig, pivotConfig, null); + "source-data", "pivot-dest", queryConfig, pivotConfig); client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig), RequestOptions.DEFAULT); transformsToClean.add(transformConfig.getId()); @@ -315,9 +308,9 @@ public void testDeleteDataFrameTransform() throws IOException, InterruptedExcept PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig); DataFrameTransformConfig transformConfig1 = new DataFrameTransformConfig("mega-transform", - "source-data", "pivot-dest", queryConfig, pivotConfig, null); + "source-data", "pivot-dest", queryConfig, pivotConfig); DataFrameTransformConfig transformConfig2 = new DataFrameTransformConfig("mega-transform2", - "source-data", "pivot-dest2", queryConfig, pivotConfig, null); + "source-data", "pivot-dest2", queryConfig, pivotConfig); client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig1), RequestOptions.DEFAULT); client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig2), RequestOptions.DEFAULT); @@ -385,8 +378,7 @@ public void testPreview() throws IOException, InterruptedException { "source-data", null, // <2> queryConfig, - pivotConfig, - Collections.emptyMap()); + pivotConfig); PreviewDataFrameTransformRequest request = new PreviewDataFrameTransformRequest(transformConfig); // <3> diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java index 8ee91422ae932..bbf8be9b6aa99 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java @@ -39,7 +39,6 @@ public class DataFrameTransformConfig extends AbstractDiffable mappingOverrides; private static ConstructingObjectParser createParser(boolean lenient) { ConstructingObjectParser parser = new ConstructingObjectParser<>(NAME, lenient, @@ -96,9 +94,7 @@ private static ConstructingObjectParser create PivotConfig pivotConfig = (PivotConfig) args[6]; - @SuppressWarnings("unchecked") - Map mappingOverrides = (Map) args[7]; - return new DataFrameTransformConfig(id, source, dest, headers, queryConfig, pivotConfig, mappingOverrides); + return new DataFrameTransformConfig(id, source, dest, headers, queryConfig, pivotConfig); }); parser.declareString(optionalConstructorArg(), DataFrameField.ID); @@ -109,7 +105,6 @@ private static ConstructingObjectParser create parser.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), HEADERS); parser.declareObject(optionalConstructorArg(), (p, c) -> QueryConfig.fromXContent(p, lenient), QUERY); parser.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p, lenient), PIVOT_TRANSFORM); - parser.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), MAPPING_OVERRIDES); return parser; } @@ -123,15 +118,13 @@ public DataFrameTransformConfig(final String id, final String dest, final Map headers, final QueryConfig queryConfig, - final PivotConfig pivotConfig, - final Map mappingOverrides) { + final PivotConfig pivotConfig) { this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); this.source = ExceptionsHelper.requireNonNull(source, DataFrameField.SOURCE.getPreferredName()); this.dest = ExceptionsHelper.requireNonNull(dest, DataFrameField.DESTINATION.getPreferredName()); this.queryConfig = ExceptionsHelper.requireNonNull(queryConfig, QUERY.getPreferredName()); this.setHeaders(headers == null ? Collections.emptyMap() : headers); this.pivotConfig = pivotConfig; - this.mappingOverrides = mappingOverrides == null ? Collections.emptyMap() : Collections.unmodifiableMap(mappingOverrides); // at least one function must be defined if (this.pivotConfig == null) { @@ -146,7 +139,6 @@ public DataFrameTransformConfig(final StreamInput in) throws IOException { setHeaders(in.readMap(StreamInput::readString, StreamInput::readString)); queryConfig = in.readOptionalWriteable(QueryConfig::new); pivotConfig = in.readOptionalWriteable(PivotConfig::new); - mappingOverrides = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } public String getId() { @@ -177,10 +169,6 @@ public QueryConfig getQueryConfig() { return queryConfig; } - public Map getMappingOverrides() { - return mappingOverrides; - } - public boolean isValid() { // collect validation results from all child objects if (queryConfig != null && queryConfig.isValid() == false) { @@ -202,7 +190,6 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); out.writeOptionalWriteable(queryConfig); out.writeOptionalWriteable(pivotConfig); - out.writeMap(mappingOverrides, StreamOutput::writeString, StreamOutput::writeString); } @Override @@ -223,9 +210,6 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa if (headers.isEmpty() == false && params.paramAsBoolean(DataFrameField.FOR_INTERNAL_STORAGE, false) == true) { builder.field(HEADERS.getPreferredName(), headers); } - if (mappingOverrides.isEmpty() == false) { - builder.field(MAPPING_OVERRIDES.getPreferredName(), mappingOverrides); - } builder.endObject(); return builder; @@ -248,13 +232,12 @@ public boolean equals(Object other) { && Objects.equals(this.dest, that.dest) && Objects.equals(this.headers, that.headers) && Objects.equals(this.queryConfig, that.queryConfig) - && Objects.equals(this.pivotConfig, that.pivotConfig) - && Objects.equals(this.mappingOverrides, that.mappingOverrides); + && Objects.equals(this.pivotConfig, that.pivotConfig); } @Override public int hashCode() { - return Objects.hash(id, source, dest, headers, queryConfig, pivotConfig, mappingOverrides); + return Objects.hash(id, source, dest, headers, queryConfig, pivotConfig); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java index 4846fcec0940f..f2df2200ac30e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java @@ -22,8 +22,6 @@ import org.junit.Before; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; import static java.util.Collections.emptyList; @@ -67,17 +65,9 @@ protected boolean supportsUnknownFields() { @Override protected Request createTestInstance() { - Map mappingOverrides = null; - if (randomBoolean()) { - mappingOverrides = new HashMap<>(); - int kvCount = randomInt(10); - for (int i = 0; i < kvCount; i++) { - mappingOverrides.put(randomAlphaOfLength(10), randomAlphaOfLength(10)); - } - } DataFrameTransformConfig config = new DataFrameTransformConfig("transform-preview", randomAlphaOfLength(10), "unused-transform-preview-index", null, QueryConfigTests.randomQueryConfig(), - PivotConfigTests.randomPivotConfig(), mappingOverrides); + PivotConfigTests.randomPivotConfig()); return new Request(config); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java index 99c71d1ac4a56..1ed57fbc582f3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java @@ -37,46 +37,34 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders() { return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), null, QueryConfigTests.randomQueryConfig(), - PivotConfigTests.randomPivotConfig(), randomNullableStringMap()); + PivotConfigTests.randomPivotConfig()); } public static DataFrameTransformConfig randomDataFrameTransformConfig() { return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomHeaders(), QueryConfigTests.randomQueryConfig(), - PivotConfigTests.randomPivotConfig(), randomNullableStringMap()); + PivotConfigTests.randomPivotConfig()); } public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders(String id) { return new DataFrameTransformConfig(id, randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), null, - QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig(), randomNullableStringMap()); + QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); } public static DataFrameTransformConfig randomDataFrameTransformConfig(String id) { return new DataFrameTransformConfig(id, randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomHeaders(), - QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig(), randomNullableStringMap()); + QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); } public static DataFrameTransformConfig randomInvalidDataFrameTransformConfig() { if (randomBoolean()) { return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomHeaders(), QueryConfigTests.randomInvalidQueryConfig(), - PivotConfigTests.randomPivotConfig(), randomNullableStringMap()); + PivotConfigTests.randomPivotConfig()); } // else return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), randomHeaders(), QueryConfigTests.randomQueryConfig(), - PivotConfigTests.randomInvalidPivotConfig(), randomNullableStringMap()); - } - - public static Map randomNullableStringMap() { - Map stringStringMap = null; - if (randomBoolean()) { - stringStringMap = new HashMap<>(); - int kvCount = randomInt(10); - for (int i = 0; i < kvCount; i++) { - stringStringMap.put(randomAlphaOfLength(10), randomAlphaOfLength(10)); - } - } - return stringStringMap; + PivotConfigTests.randomInvalidPivotConfig()); } @Before diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index df81740970454..635038e2a486f 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -314,52 +314,6 @@ public void testPivotWithMaxOnDateField() throws Exception { assertThat(actual, containsString("2017-01-15T20:")); } - public void testPivotWithMappingOverride() throws Exception { - String transformId = "simpleDateHistogramPivotWithMappingOverride"; - String dataFrameIndex = "pivot_reviews_via_date_histogram_and_mapping_override"; - setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex); - - final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, - BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); - - String config = "{" - + " \"source\": \"" + REVIEWS_INDEX_NAME + "\"," - + " \"dest\": \"" + dataFrameIndex + "\","; - - config += " \"pivot\": { \n" + - " \"group_by\": {\n" + - " \"by_day\": {\"date_histogram\": {\n" + - " \"interval\": \"1d\",\"field\":\"timestamp\",\"format\":\"yyyy-MM-DD\"\n" + - " }}\n" + - " },\n" + - " \"aggs\" :{\n" + - " \"avg_rating\": {\n" + - " \"avg\": {\"field\": \"stars\"}\n" + - " },\n" + - " \"timestamp\": {\n" + - " \"max\": {\"field\": \"timestamp\"}\n" + - " }\n" + - " }},\n" + - " \"mapping_override\": {\"avg_rating\": \"keyword\"}}"; - - createDataframeTransformRequest.setJsonEntity(config); - Map createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest)); - assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); - assertTrue(indexExists(dataFrameIndex)); - - startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS); - - // we expect 21 documents as there shall be 21 days worth of docs - Map indexStats = getAsMap(dataFrameIndex + "/_stats"); - assertEquals(21, XContentMapValues.extractValue("_all.total.docs.count", indexStats)); - Map searchResult = getAsMap(dataFrameIndex + "/_search?q=by_day:2017-01-15"); - String actualAvg = (String) ((List) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0); - assertThat(actualAvg, equalTo("3.82")); - String actualTime = (String) ((List) XContentMapValues.extractValue("hits.hits._source.timestamp", searchResult)).get(0); - // Do `containsString` as actual ending timestamp is indeterminate due to how data is generated - assertThat(actualTime, containsString("2017-01-15T20")); - } - private void assertOnePivotValue(String query, double expected) throws IOException { Map searchResult = getAsMap(query); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java index 72a75227e780f..8cafb33cc62b6 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java @@ -62,8 +62,7 @@ protected void doExecute(Task task, Pivot pivot = new Pivot(config.getSource(), config.getQueryConfig().getQuery(), - config.getPivotConfig(), - config.getMappingOverrides()); + config.getPivotConfig()); getPreview(pivot, ActionListener.wrap( previewResponse -> listener.onResponse(new PreviewDataFrameTransformAction.Response(previewResponse)), diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java index de417bb4d20aa..c1aaced1c23c7 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java @@ -198,8 +198,7 @@ private void putDataFrame(DataFrameTransformConfig config, ActionListener Return the listener, or clean up destination index on failure. diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index e0f536d4639e1..f32e908694023 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -50,7 +50,7 @@ public DataFrameIndexer(Executor executor, AtomicReference initial protected void onStartJob(long now) { QueryBuilder queryBuilder = getConfig().getQueryConfig().getQuery(); - pivot = new Pivot(getConfig().getSource(), queryBuilder, getConfig().getPivotConfig(), getConfig().getMappingOverrides()); + pivot = new Pivot(getConfig().getSource(), queryBuilder, getConfig().getPivotConfig()); } @Override diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java index 9cb940c67a0d8..26ac7d93bf3ca 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java @@ -43,14 +43,12 @@ public class Pivot { // objects for re-using private final CompositeAggregationBuilder cachedCompositeAggregation; private final SearchRequest cachedSearchRequest; - private final Map mappingOverrides; - public Pivot(String source, QueryBuilder query, PivotConfig config, Map mappingOverrides) { + public Pivot(String source, QueryBuilder query, PivotConfig config) { this.source = source; this.config = config; this.cachedCompositeAggregation = createCompositeAggregation(config); this.cachedSearchRequest = createSearchRequest(source, query, cachedCompositeAggregation); - this.mappingOverrides = mappingOverrides; } public void validate(Client client, final ActionListener listener) { @@ -67,7 +65,7 @@ public void validate(Client client, final ActionListener listener) { } public void deduceMappings(Client client, final ActionListener> listener) { - SchemaUtil.deduceMappings(client, config, source, mappingOverrides, listener); + SchemaUtil.deduceMappings(client, config, source, listener); } public SearchRequest buildSearchRequest(Map position) { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/SchemaUtil.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/SchemaUtil.java index 0dfd5c040d639..175be3ea30e17 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/SchemaUtil.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/SchemaUtil.java @@ -53,13 +53,11 @@ public static boolean isNumericType(String type) { * @param client Client from which to make requests against the cluster * @param config The PivotConfig for which to deduce destination mapping * @param source Source index that contains the data to pivot - * @param userProvidedTypes A map of "field-name":"mapping-type" provided by the user to override deduced mapping types * @param listener Listener to alert on success or failure. */ public static void deduceMappings(final Client client, final PivotConfig config, final String source, - final Map userProvidedTypes, final ActionListener> listener) { // collects the fieldnames used as source for aggregations Map aggregationSourceFieldNames = new HashMap<>(); @@ -93,8 +91,7 @@ public static void deduceMappings(final Client client, sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames, aggregationTypes, fieldNamesForGrouping, - sourceMappings, - userProvidedTypes)), + sourceMappings)), listener::onFailure)); } @@ -125,8 +122,7 @@ public static void getDestinationFieldMappings(final Client client, private static Map resolveMappings(Map aggregationSourceFieldNames, Map aggregationTypes, Map fieldNamesForGrouping, - Map sourceMappings, - Map providedOverrides) { + Map sourceMappings) { Map targetMapping = new HashMap<>(); aggregationTypes.forEach((targetFieldName, aggregationName) -> { @@ -154,10 +150,6 @@ private static Map resolveMappings(Map aggregati targetMapping.put(targetFieldName, "keyword"); } }); - providedOverrides.forEach((targetFieldName, userProvidedType) -> { - logger.debug("Using user provided mapping type [" + userProvidedType + "] for field [" + targetFieldName + "]"); - targetMapping.put(targetFieldName, userProvidedType); - }); return targetMapping; } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java index d268b9edec54e..f253efad0e943 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java @@ -85,13 +85,13 @@ protected NamedXContentRegistry xContentRegistry() { } public void testValidateExistingIndex() throws Exception { - Pivot pivot = new Pivot("existing_source_index", new MatchAllQueryBuilder(), getValidPivotConfig(), Collections.emptyMap()); + Pivot pivot = new Pivot("existing_source_index", new MatchAllQueryBuilder(), getValidPivotConfig()); assertValidTransform(client, pivot); } public void testValidateNonExistingIndex() throws Exception { - Pivot pivot = new Pivot("non_existing_source_index", new MatchAllQueryBuilder(), getValidPivotConfig(), Collections.emptyMap()); + Pivot pivot = new Pivot("non_existing_source_index", new MatchAllQueryBuilder(), getValidPivotConfig()); assertInvalidTransform(client, pivot); } @@ -101,8 +101,7 @@ public void testSearchFailure() throws Exception { // search has failures although they might just be temporary Pivot pivot = new Pivot("existing_source_index_with_failing_shards", new MatchAllQueryBuilder(), - getValidPivotConfig(), - Collections.emptyMap()); + getValidPivotConfig()); assertInvalidTransform(client, pivot); } @@ -113,8 +112,7 @@ public void testValidateAllSupportedAggregations() throws Exception { Pivot pivot = new Pivot("existing_source", new MatchAllQueryBuilder(), - getValidPivotConfig(aggregationConfig), - Collections.emptyMap()); + getValidPivotConfig(aggregationConfig)); assertValidTransform(client, pivot); } @@ -126,8 +124,7 @@ public void testValidateAllUnsupportedAggregations() throws Exception { Pivot pivot = new Pivot("existing_source", new MatchAllQueryBuilder(), - getValidPivotConfig(aggregationConfig), - Collections.emptyMap()); + getValidPivotConfig(aggregationConfig)); assertInvalidTransform(client, pivot); } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml index 4a0cb01e3d0ee..013731116f3ed 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml @@ -165,24 +165,3 @@ setup: - match: { airline-data-by-airline.mappings.properties.airline.type: keyword } - match: { airline-data-by-airline.mappings.properties.avg_response.type: double } - match: { airline-data-by-airline.mappings.properties.time.type: date } - - - do: - data_frame.put_data_frame_transform: - transform_id: "airline-transform-with-string-avg" - body: > - { - "source": "airline-data", - "dest": "airline-data-by-airline-string-avg", - "pivot": { - "group_by": { "airline": {"terms": {"field": "airline"}}}, - "aggs": {"avg_response": {"avg": {"field": "responsetime"}}, "time": {"max": {"field": "time"}}} - }, - "mapping_override": { "avg_response": "keyword" } - } - - match: { acknowledged: true } - - do: - indices.get_mapping: - index: airline-data-by-airline-string-avg - - match: { airline-data-by-airline-string-avg.mappings.properties.airline.type: keyword } - - match: { airline-data-by-airline-string-avg.mappings.properties.avg_response.type: keyword } - - match: { airline-data-by-airline-string-avg.mappings.properties.time.type: date } From d69d61bd237a0c99b6561939597463ebe83ac968 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 21 Mar 2019 20:08:29 -0500 Subject: [PATCH 4/4] clearing up unnecessary changes --- .../transforms/DataFrameTransformConfig.java | 2 +- .../client/DataFrameTransformIT.java | 18 +++--------------- .../DataFrameTransformConfigTests.java | 2 +- .../DataFrameTransformDocumentationIT.java | 2 +- .../dataframe/put_data_frame.asciidoc | 14 -------------- .../transforms/DataFrameTransformConfig.java | 1 - ...ewDataFrameTransformActionRequestTests.java | 3 +-- .../DataFrameTransformConfigTests.java | 3 +-- .../TransportPutDataFrameTransformAction.java | 4 +--- 9 files changed, 9 insertions(+), 40 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java index 296eb61148dbe..08ef3a7be84a2 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; -import java.util.Map; import java.util.Objects; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; @@ -72,6 +71,7 @@ public static DataFrameTransformConfig fromXContent(final XContentParser parser) return PARSER.apply(parser, null); } + public DataFrameTransformConfig(final String id, final String source, final String dest, diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java index c8f7160333882..736c3e373caae 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java @@ -152,11 +152,7 @@ public void testCreateDelete() throws IOException { PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig); String id = "test-crud"; - DataFrameTransformConfig transform = new DataFrameTransformConfig(id, - sourceIndex, - "pivot-dest", - queryConfig, - pivotConfig); + DataFrameTransformConfig transform = new DataFrameTransformConfig(id, sourceIndex, "pivot-dest", queryConfig, pivotConfig); DataFrameClient client = highLevelClient().dataFrame(); AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform, @@ -186,11 +182,7 @@ public void testStartStop() throws IOException { PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig); String id = "test-stop-start"; - DataFrameTransformConfig transform = new DataFrameTransformConfig(id, - sourceIndex, - "pivot-dest", - queryConfig, - pivotConfig); + DataFrameTransformConfig transform = new DataFrameTransformConfig(id, sourceIndex, "pivot-dest", queryConfig, pivotConfig); DataFrameClient client = highLevelClient().dataFrame(); AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform, @@ -227,11 +219,7 @@ public void testPreview() throws IOException { AggregationConfig aggConfig = new AggregationConfig(aggBuilder); PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig); - DataFrameTransformConfig transform = new DataFrameTransformConfig("test-preview", - sourceIndex, - null, - queryConfig, - pivotConfig); + DataFrameTransformConfig transform = new DataFrameTransformConfig("test-preview", sourceIndex, null, queryConfig, pivotConfig); DataFrameClient client = highLevelClient().dataFrame(); PreviewDataFrameTransformResponse preview = execute(new PreviewDataFrameTransformRequest(transform), diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java index 7a4fe436d846b..af90c15c3d901 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java @@ -34,7 +34,7 @@ public class DataFrameTransformConfigTests extends AbstractXContentTestCase "pivot-destination", // <3> queryConfig, // <4> - pivotConfig); // <5> + pivotConfig); // <5> // end::put-data-frame-transform-config { diff --git a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc index 20467b334e1d2..7f8b1b0e9fbf1 100644 --- a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc +++ b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc @@ -36,7 +36,6 @@ include-tagged::{doc-tests-file}[{api}-config] <3> The destination index <4> Optionally a QueryConfig <5> The PivotConfig -<6> The provided `Map` values to override deduced destination index mappings. [id="{upid}-{api}-query-config"] ==== QueryConfig @@ -85,19 +84,6 @@ include-tagged::{doc-tests-file}[{api}-agg-config] -------------------------------------------------- <1> Aggregate the average star rating -==== Overriding Deduced Destination Mapping Field Types - -When creating the destination index for the {dataframe-transform}, a best -effort mapping is created. This option allows overriding specific mapped -field types. - -["source","java",subs="attributes,callouts,macros"] --------------------------------------------------- -include-tagged::{doc-tests-file}[{api}-mapping-override] --------------------------------------------------- -<1> Override the deduced mapping for field `avg_rating` to be a type of -`keyword` - include::../execution.asciidoc[] [id="{upid}-{api}-response"] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java index bbf8be9b6aa99..6172bb2de1f6a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java @@ -93,7 +93,6 @@ private static ConstructingObjectParser create } PivotConfig pivotConfig = (PivotConfig) args[6]; - return new DataFrameTransformConfig(id, source, dest, headers, queryConfig, pivotConfig); }); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java index f2df2200ac30e..4593efe489834 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java @@ -66,8 +66,7 @@ protected boolean supportsUnknownFields() { @Override protected Request createTestInstance() { DataFrameTransformConfig config = new DataFrameTransformConfig("transform-preview", randomAlphaOfLength(10), - "unused-transform-preview-index", null, QueryConfigTests.randomQueryConfig(), - PivotConfigTests.randomPivotConfig()); + "unused-transform-preview-index", null, QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); return new Request(config); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java index 1ed57fbc582f3..76db5a1266d6b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java @@ -36,8 +36,7 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHeaders() { return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10), - randomAlphaOfLengthBetween(1, 10), null, QueryConfigTests.randomQueryConfig(), - PivotConfigTests.randomPivotConfig()); + randomAlphaOfLengthBetween(1, 10), null, QueryConfigTests.randomQueryConfig(), PivotConfigTests.randomPivotConfig()); } public static DataFrameTransformConfig randomDataFrameTransformConfig() { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java index c1aaced1c23c7..b696243cc5d0d 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java @@ -196,9 +196,7 @@ private void handlePrivsResponse(String username, private void putDataFrame(DataFrameTransformConfig config, ActionListener listener) { - final Pivot pivot = new Pivot(config.getSource(), - config.getQueryConfig().getQuery(), - config.getPivotConfig()); + final Pivot pivot = new Pivot(config.getSource(), config.getQueryConfig().getQuery(), config.getPivotConfig()); // <5> Return the listener, or clean up destination index on failure.