diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameClient.java index 8d7d3bc2ed497..f5104c1602205 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameClient.java @@ -33,6 +33,8 @@ import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse; import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest; import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse; +import org.elasticsearch.client.dataframe.UpdateDataFrameTransformRequest; +import org.elasticsearch.client.dataframe.UpdateDataFrameTransformResponse; import java.io.IOException; import java.util.Collections; @@ -88,6 +90,51 @@ public void putDataFrameTransformAsync(PutDataFrameTransformRequest request, Req Collections.emptySet()); } + /** + * Updates an existing Data Frame Transform + *

+ * For additional info + * see + * Create data frame transform documentation + * + * @param request The UpdateDataFrameTransformRequest containing the + * {@link org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate}. + * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return An UpdateDataFrameTransformResponse object containing the updated configuration + * @throws IOException when there is a serialization issue sending the request or receiving the response + */ + public UpdateDataFrameTransformResponse updateDataFrameTransform(UpdateDataFrameTransformRequest request, + RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity(request, + DataFrameRequestConverters::updateDataFrameTransform, + options, + UpdateDataFrameTransformResponse::fromXContent, + Collections.emptySet()); + } + + /** + * Updates an existing Data Frame Transform asynchronously and notifies listener on completion + *

+ * For additional info + * see + * Create data frame transform documentation + * + * @param request The UpdateDataFrameTransformRequest containing the + * {@link org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate}. + * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener Listener to be notified upon request completion + */ + public void updateDataFrameTransformAsync(UpdateDataFrameTransformRequest request, + RequestOptions options, + ActionListener listener) { + restHighLevelClient.performRequestAsyncAndParseEntity(request, + DataFrameRequestConverters::updateDataFrameTransform, + options, + UpdateDataFrameTransformResponse::fromXContent, + listener, + Collections.emptySet()); + } + /** * Get the running statistics of a Data Frame Transform *

diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java index f45f2a6c8ec40..375c0a7c3afd7 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java @@ -31,6 +31,7 @@ import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest; import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest; import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest; +import org.elasticsearch.client.dataframe.UpdateDataFrameTransformRequest; import org.elasticsearch.common.Strings; import java.io.IOException; @@ -58,6 +59,20 @@ static Request putDataFrameTransform(PutDataFrameTransformRequest putRequest) th return request; } + static Request updateDataFrameTransform(UpdateDataFrameTransformRequest updateDataFrameTransformRequest) throws IOException { + String endpoint = new RequestConverters.EndpointBuilder() + .addPathPartAsIs("_data_frame", "transforms") + .addPathPart(updateDataFrameTransformRequest.getId()) + .addPathPart("_update") + .build(); + Request request = new Request(HttpPost.METHOD_NAME, endpoint); + request.setEntity(createEntity(updateDataFrameTransformRequest, REQUEST_BODY_CONTENT_TYPE)); + if (updateDataFrameTransformRequest.getDeferValidation() != null) { + request.addParameter(DEFER_VALIDATION, Boolean.toString(updateDataFrameTransformRequest.getDeferValidation())); + } + return request; + } + static Request getDataFrameTransform(GetDataFrameTransformRequest getRequest) { String endpoint = new RequestConverters.EndpointBuilder() .addPathPartAsIs("_data_frame", "transforms") diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/UpdateDataFrameTransformRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/UpdateDataFrameTransformRequest.java new file mode 100644 index 0000000000000..f5fdb9edbb29b --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/UpdateDataFrameTransformRequest.java @@ -0,0 +1,103 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.dataframe; + +import org.elasticsearch.client.Validatable; +import org.elasticsearch.client.ValidationException; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; + +public class UpdateDataFrameTransformRequest implements ToXContentObject, Validatable { + + private final DataFrameTransformConfigUpdate update; + private final String id; + private Boolean deferValidation; + + public UpdateDataFrameTransformRequest(DataFrameTransformConfigUpdate update, String id) { + this.update = update; + this.id = id; + } + + public DataFrameTransformConfigUpdate getUpdate() { + return update; + } + + public Boolean getDeferValidation() { + return deferValidation; + } + + public String getId() { + return id; + } + + /** + * Indicates if deferrable validations should be skipped until the transform starts + * + * @param deferValidation {@code true} will cause validations to be deferred + */ + public void setDeferValidation(boolean deferValidation) { + this.deferValidation = deferValidation; + } + + @Override + public Optional validate() { + ValidationException validationException = new ValidationException(); + if (update == null) { + validationException.addValidationError("put requires a non-null data frame config update object"); + } + if (id == null) { + validationException.addValidationError("data frame transform id cannot be null"); + } + if (validationException.validationErrors().isEmpty()) { + return Optional.empty(); + } else { + return Optional.of(validationException); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return update.toXContent(builder, params); + } + + @Override + public int hashCode() { + return Objects.hash(update, deferValidation, id); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + UpdateDataFrameTransformRequest other = (UpdateDataFrameTransformRequest) obj; + return Objects.equals(update, other.update) + && Objects.equals(id, other.id) + && Objects.equals(deferValidation, other.deferValidation); + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/UpdateDataFrameTransformResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/UpdateDataFrameTransformResponse.java new file mode 100644 index 0000000000000..30b47c27d5921 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/UpdateDataFrameTransformResponse.java @@ -0,0 +1,61 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.dataframe; + +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.util.Objects; + +public class UpdateDataFrameTransformResponse { + + public static UpdateDataFrameTransformResponse fromXContent(final XContentParser parser) { + return new UpdateDataFrameTransformResponse(DataFrameTransformConfig.PARSER.apply(parser, null)); + } + + private DataFrameTransformConfig transformConfiguration; + + public UpdateDataFrameTransformResponse(DataFrameTransformConfig transformConfiguration) { + this.transformConfiguration = transformConfiguration; + } + + public DataFrameTransformConfig getTransformConfiguration() { + return transformConfiguration; + } + + @Override + public int hashCode() { + return Objects.hash(transformConfiguration); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + final UpdateDataFrameTransformResponse that = (UpdateDataFrameTransformResponse) other; + return Objects.equals(this.transformConfiguration, that.transformConfiguration); + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigUpdate.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigUpdate.java new file mode 100644 index 0000000000000..afca172b564f3 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigUpdate.java @@ -0,0 +1,208 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.dataframe.transforms; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParserUtils; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + +/** + * This class holds the mutable configuration items for a data frame transform + */ +public class DataFrameTransformConfigUpdate implements ToXContentObject { + + public static final String NAME = "data_frame_transform_config_update"; + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, + false, + (args) -> { + SourceConfig source = (SourceConfig) args[0]; + DestConfig dest = (DestConfig) args[1]; + TimeValue frequency = args[2] == null ? + null : + TimeValue.parseTimeValue((String) args[2], DataFrameTransformConfig.FREQUENCY.getPreferredName()); + SyncConfig syncConfig = (SyncConfig) args[3]; + String description = (String) args[4]; + return new DataFrameTransformConfigUpdate(source, dest, frequency, syncConfig, description); + }); + + static { + PARSER.declareObject(optionalConstructorArg(), (p, c) -> SourceConfig.PARSER.apply(p, null), DataFrameTransformConfig.SOURCE); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> DestConfig.PARSER.apply(p, null), DataFrameTransformConfig.DEST); + PARSER.declareString(optionalConstructorArg(), DataFrameTransformConfig.FREQUENCY); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p), DataFrameTransformConfig.SYNC); + PARSER.declareString(optionalConstructorArg(), DataFrameTransformConfig.DESCRIPTION); + } + + private static SyncConfig parseSyncConfig(XContentParser parser) throws IOException { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser::getTokenLocation); + SyncConfig syncConfig = parser.namedObject(SyncConfig.class, parser.currentName(), false); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser::getTokenLocation); + return syncConfig; + } + + private final SourceConfig source; + private final DestConfig dest; + private final TimeValue frequency; + private final SyncConfig syncConfig; + private final String description; + + public DataFrameTransformConfigUpdate(final SourceConfig source, + final DestConfig dest, + final TimeValue frequency, + final SyncConfig syncConfig, + final String description){ + this.source = source; + this.dest = dest; + this.frequency = frequency; + this.syncConfig = syncConfig; + this.description = description; + } + + public SourceConfig getSource() { + return source; + } + + public DestConfig getDestination() { + return dest; + } + + public TimeValue getFrequency() { + return frequency; + } + + public SyncConfig getSyncConfig() { + return syncConfig; + } + + @Nullable + public String getDescription() { + return description; + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + if (source != null) { + builder.field(DataFrameTransformConfig.SOURCE.getPreferredName(), source); + } + if (dest != null) { + builder.field(DataFrameTransformConfig.DEST.getPreferredName(), dest); + } + if (frequency != null) { + builder.field(DataFrameTransformConfig.FREQUENCY.getPreferredName(), frequency.getStringRep()); + } + if (syncConfig != null) { + builder.startObject(DataFrameTransformConfig.SYNC.getPreferredName()); + builder.field(syncConfig.getName(), syncConfig); + builder.endObject(); + } + if (description != null) { + builder.field(DataFrameTransformConfig.DESCRIPTION.getPreferredName(), description); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + final DataFrameTransformConfigUpdate that = (DataFrameTransformConfigUpdate) other; + + return Objects.equals(this.source, that.source) + && Objects.equals(this.dest, that.dest) + && Objects.equals(this.frequency, that.frequency) + && Objects.equals(this.syncConfig, that.syncConfig) + && Objects.equals(this.description, that.description); + } + + @Override + public int hashCode(){ + return Objects.hash(source, dest, frequency, syncConfig, description); + } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } + + public static Builder builder() { + return new Builder(); + } + + public static DataFrameTransformConfigUpdate fromXContent(final XContentParser parser) { + return PARSER.apply(parser, null); + } + + public static class Builder { + + private SourceConfig source; + private DestConfig dest; + private TimeValue frequency; + private SyncConfig syncConfig; + private String description; + + public Builder setSource(SourceConfig source) { + this.source = source; + return this; + } + + public Builder setDest(DestConfig dest) { + this.dest = dest; + return this; + } + + public Builder setFrequency(TimeValue frequency) { + this.frequency = frequency; + return this; + } + + public Builder setSyncConfig(SyncConfig syncConfig) { + this.syncConfig = syncConfig; + return this; + } + + public Builder setDescription(String description) { + this.description = description; + return this; + } + + public DataFrameTransformConfigUpdate build() { + return new DataFrameTransformConfigUpdate(source, dest, frequency, syncConfig, description); + } + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java index 85ff77fd6fb0a..529b1ea6235ec 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java @@ -32,8 +32,11 @@ import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest; import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest; import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest; +import org.elasticsearch.client.dataframe.UpdateDataFrameTransformRequest; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigTests; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdateTests; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -81,6 +84,26 @@ public void testPutDataFrameTransform() throws IOException { assertThat(request.getParameters(), hasEntry("defer_validation", Boolean.toString(putRequest.getDeferValidation()))); } + public void testUpdateDataFrameTransform() throws IOException { + String transformId = randomAlphaOfLength(10); + UpdateDataFrameTransformRequest updateDataFrameTransformRequest = new UpdateDataFrameTransformRequest( + DataFrameTransformConfigUpdateTests.randomDataFrameTransformConfigUpdate(), + transformId); + Request request = DataFrameRequestConverters.updateDataFrameTransform(updateDataFrameTransformRequest); + assertThat(request.getParameters(), not(hasKey("defer_validation"))); + assertEquals(HttpPost.METHOD_NAME, request.getMethod()); + assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/" + transformId + "/_update")); + + try (XContentParser parser = createParser(JsonXContent.jsonXContent, request.getEntity().getContent())) { + DataFrameTransformConfigUpdate parsedConfig = DataFrameTransformConfigUpdate.fromXContent(parser); + assertThat(parsedConfig, equalTo(updateDataFrameTransformRequest.getUpdate())); + } + updateDataFrameTransformRequest.setDeferValidation(true); + request = DataFrameRequestConverters.updateDataFrameTransform(updateDataFrameTransformRequest); + assertThat(request.getParameters(), + hasEntry("defer_validation", Boolean.toString(updateDataFrameTransformRequest.getDeferValidation()))); + } + public void testDeleteDataFrameTransform() { DeleteDataFrameTransformRequest deleteRequest = new DeleteDataFrameTransformRequest("foo"); Request request = DataFrameRequestConverters.deleteDataFrameTransform(deleteRequest); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java index a5c192c979963..86690d841b6e3 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java @@ -38,17 +38,22 @@ import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse; import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest; import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse; +import org.elasticsearch.client.dataframe.UpdateDataFrameTransformRequest; +import org.elasticsearch.client.dataframe.UpdateDataFrameTransformResponse; import org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.client.dataframe.transforms.DestConfig; import org.elasticsearch.client.dataframe.transforms.SourceConfig; +import org.elasticsearch.client.dataframe.transforms.TimeSyncConfig; import org.elasticsearch.client.dataframe.transforms.pivot.GroupConfig; import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfig; import org.elasticsearch.client.dataframe.transforms.pivot.TermsGroupSource; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.MatchAllQueryBuilder; @@ -178,6 +183,33 @@ public void testCreateDelete() throws IOException { assertThat(deleteError.getMessage(), containsString("Transform with id [test-crud] could not be found")); } + public void testUpdate() throws IOException { + String sourceIndex = "update-transform-source"; + createIndex(sourceIndex); + + String id = "test-update"; + DataFrameTransformConfig transform = validDataFrameTransformConfigBuilder(id, sourceIndex, "pivot-dest") + .setSyncConfig(new TimeSyncConfig("timefield", TimeValue.timeValueSeconds(60))) + .build(); + + DataFrameClient client = highLevelClient().dataFrame(); + AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform, + client::putDataFrameTransformAsync); + assertTrue(ack.isAcknowledged()); + + String updatedDescription = "my new description"; + DataFrameTransformConfigUpdate update = DataFrameTransformConfigUpdate.builder().setDescription(updatedDescription).build(); + UpdateDataFrameTransformResponse response = execute( + new UpdateDataFrameTransformRequest(update, id), client::updateDataFrameTransform, + client::updateDataFrameTransformAsync); + assertThat(response.getTransformConfiguration().getDescription(), equalTo(updatedDescription)); + + ElasticsearchStatusException updateError = expectThrows(ElasticsearchStatusException.class, + () -> execute(new UpdateDataFrameTransformRequest(update, "missing-transform"), client::updateDataFrameTransform, + client::updateDataFrameTransformAsync)); + assertThat(updateError.getMessage(), containsString("Transform with id [missing-transform] could not be found")); + } + public void testCreateDeleteWithDefer() throws IOException { String sourceIndex = "missing-source-index"; @@ -323,6 +355,10 @@ public void testPreview() throws IOException { } private DataFrameTransformConfig validDataFrameTransformConfig(String id, String source, String destination) { + return validDataFrameTransformConfigBuilder(id, source, destination).build(); + } + + private DataFrameTransformConfig.Builder validDataFrameTransformConfigBuilder(String id, String source, String destination) { GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer", TermsGroupSource.builder().setField("user_id").build()).build(); AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder(); @@ -336,8 +372,7 @@ private DataFrameTransformConfig validDataFrameTransformConfig(String id, String .setSource(SourceConfig.builder().setIndex(source).setQuery(new MatchAllQueryBuilder()).build()) .setDest(destConfig) .setPivotConfig(pivotConfig) - .setDescription("this is a test transform") - .build(); + .setDescription("this is a test transform"); } // TODO add tests to cover continuous situations diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/UpdateDataFrameTransformRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/UpdateDataFrameTransformRequestTests.java new file mode 100644 index 0000000000000..6994c2bf91b38 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/UpdateDataFrameTransformRequestTests.java @@ -0,0 +1,78 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.dataframe; + +import org.elasticsearch.client.ValidationException; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdateTests.randomDataFrameTransformConfigUpdate; +import static org.hamcrest.Matchers.containsString; + +public class UpdateDataFrameTransformRequestTests extends AbstractXContentTestCase { + + public void testValidate() { + assertFalse(createTestInstance().validate().isPresent()); + + DataFrameTransformConfigUpdate config = randomDataFrameTransformConfigUpdate(); + + Optional error = new UpdateDataFrameTransformRequest(config, null).validate(); + assertTrue(error.isPresent()); + assertThat(error.get().getMessage(), containsString("data frame transform id cannot be null")); + + error = new UpdateDataFrameTransformRequest(null, "123").validate(); + assertTrue(error.isPresent()); + assertThat(error.get().getMessage(), containsString("put requires a non-null data frame config")); + } + + private final String transformId = randomAlphaOfLength(10); + @Override + protected UpdateDataFrameTransformRequest createTestInstance() { + return new UpdateDataFrameTransformRequest(randomDataFrameTransformConfigUpdate(), transformId); + } + + @Override + protected UpdateDataFrameTransformRequest doParseInstance(XContentParser parser) throws IOException { + return new UpdateDataFrameTransformRequest(DataFrameTransformConfigUpdate.fromXContent(parser), transformId); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList()); + List namedXContents = searchModule.getNamedXContents(); + namedXContents.addAll(new DataFrameNamedXContentProvider().getNamedXContentParsers()); + + return new NamedXContentRegistry(namedXContents); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/UpdateDataFrameTransformResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/UpdateDataFrameTransformResponseTests.java new file mode 100644 index 0000000000000..92d20bdc06981 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/UpdateDataFrameTransformResponseTests.java @@ -0,0 +1,63 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.dataframe; + +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigTests; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester; + +public class UpdateDataFrameTransformResponseTests extends ESTestCase { + + public void testXContentParser() throws IOException { + xContentTester(this::createParser, + UpdateDataFrameTransformResponseTests::createTestInstance, + UpdateDataFrameTransformResponseTests::toXContent, + UpdateDataFrameTransformResponse::fromXContent) + .assertToXContentEquivalence(false) + .supportsUnknownFields(false) + .test(); + } + + private static UpdateDataFrameTransformResponse createTestInstance() { + return new UpdateDataFrameTransformResponse(DataFrameTransformConfigTests.randomDataFrameTransformConfig()); + } + + private static void toXContent(UpdateDataFrameTransformResponse response, XContentBuilder builder) throws IOException { + response.getTransformConfiguration().toXContent(builder, null); + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList()); + List namedXContents = searchModule.getNamedXContents(); + namedXContents.addAll(new DataFrameNamedXContentProvider().getNamedXContentParsers()); + + return new NamedXContentRegistry(namedXContents); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigUpdateTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigUpdateTests.java new file mode 100644 index 0000000000000..50c7326089c33 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigUpdateTests.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.dataframe.transforms; + +import org.elasticsearch.client.dataframe.DataFrameNamedXContentProvider; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.client.dataframe.transforms.DestConfigTests.randomDestConfig; +import static org.elasticsearch.client.dataframe.transforms.SourceConfigTests.randomSourceConfig; + +public class DataFrameTransformConfigUpdateTests extends AbstractXContentTestCase { + + public static DataFrameTransformConfigUpdate randomDataFrameTransformConfigUpdate() { + return new DataFrameTransformConfigUpdate( + randomBoolean() ? null : randomSourceConfig(), + randomBoolean() ? null : randomDestConfig(), + randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)), + randomBoolean() ? null : randomSyncConfig(), + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)); + } + + public static SyncConfig randomSyncConfig() { + return TimeSyncConfigTests.randomTimeSyncConfig(); + } + + @Override + protected DataFrameTransformConfigUpdate doParseInstance(XContentParser parser) throws IOException { + return DataFrameTransformConfigUpdate.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } + + @Override + protected DataFrameTransformConfigUpdate createTestInstance() { + return randomDataFrameTransformConfigUpdate(); + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList()); + List namedXContents = searchModule.getNamedXContents(); + namedXContents.addAll(new DataFrameNamedXContentProvider().getNamedXContentParsers()); + + return new NamedXContentRegistry(namedXContents); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index 15af8771aeb92..ed0d0506f2c8f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -39,8 +39,11 @@ import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse; import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest; import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse; +import org.elasticsearch.client.dataframe.UpdateDataFrameTransformRequest; +import org.elasticsearch.client.dataframe.UpdateDataFrameTransformResponse; import org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformProgress; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStats; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState; @@ -48,6 +51,7 @@ import org.elasticsearch.client.dataframe.transforms.NodeAttributes; import org.elasticsearch.client.dataframe.transforms.QueryConfig; import org.elasticsearch.client.dataframe.transforms.SourceConfig; +import org.elasticsearch.client.dataframe.transforms.TimeSyncConfig; import org.elasticsearch.client.dataframe.transforms.pivot.AggregationConfig; import org.elasticsearch.client.dataframe.transforms.pivot.GroupConfig; import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfig; @@ -68,6 +72,7 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTestCase { @@ -216,6 +221,96 @@ public void onFailure(Exception e) { } } + public void testUpdateDataFrameTransform() throws IOException, InterruptedException { + createIndex("source-data"); + + RestHighLevelClient client = highLevelClient(); + QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder()); + GroupConfig groupConfig = GroupConfig.builder().groupBy("reviewer", + TermsGroupSource.builder().setField("user_id").build()).build(); + AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder(); + aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars")); + AggregationConfig aggConfig = new AggregationConfig(aggBuilder); + PivotConfig pivotConfig = PivotConfig.builder().setGroups(groupConfig).setAggregationConfig(aggConfig).build(); + + DataFrameTransformConfig transformConfig = DataFrameTransformConfig.builder() + .setId("my-transform-to-update") + .setSource(SourceConfig.builder().setIndex("source-data").setQueryConfig(queryConfig).build()) + .setDest(DestConfig.builder().setIndex("pivot-dest").build()) + .setPivotConfig(pivotConfig) + .setSyncConfig(new TimeSyncConfig("time-field", TimeValue.timeValueSeconds(120))) + .build(); + + client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig), RequestOptions.DEFAULT); + transformsToClean.add(transformConfig.getId()); + + // tag::update-data-frame-transform-config + DataFrameTransformConfigUpdate update = DataFrameTransformConfigUpdate + .builder() + .setSource(SourceConfig.builder() + .setIndex("source-data") + .build()) // <1> + .setDest(DestConfig.builder() + .setIndex("pivot-dest") + .build()) // <2> + .setFrequency(TimeValue.timeValueSeconds(15)) // <3> + .setSyncConfig(new TimeSyncConfig("time-field", + TimeValue.timeValueSeconds(120))) // <4> + .setDescription("This is my updated transform") // <5> + .build(); + // end::update-data-frame-transform-config + + { + // tag::update-data-frame-transform-request + UpdateDataFrameTransformRequest request = + new UpdateDataFrameTransformRequest( + update, // <1> + "my-transform-to-update"); // <2> + request.setDeferValidation(false); // <3> + // end::update-data-frame-transform-request + + // tag::update-data-frame-transform-execute + UpdateDataFrameTransformResponse response = + client.dataFrame().updateDataFrameTransform(request, + RequestOptions.DEFAULT); + DataFrameTransformConfig updatedConfig = + response.getTransformConfiguration(); + // end::update-data-frame-transform-execute + + assertThat(updatedConfig.getDescription(), equalTo("This is my updated transform")); + } + { + UpdateDataFrameTransformRequest request = new UpdateDataFrameTransformRequest(update, + "my-transform-to-update"); + + // tag::update-data-frame-transform-execute-listener + ActionListener listener = + new ActionListener() { + @Override + public void onResponse(UpdateDataFrameTransformResponse response) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::update-data-frame-transform-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::update-data-frame-transform-execute-async + client.dataFrame().updateDataFrameTransformAsync( + request, RequestOptions.DEFAULT, listener); // <1> + // end::update-data-frame-transform-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + } + public void testStartStop() throws IOException, InterruptedException { createIndex("source-data"); diff --git a/docs/build.gradle b/docs/build.gradle index c80c16952e7cf..dce7921fb76d0 100644 --- a/docs/build.gradle +++ b/docs/build.gradle @@ -1085,7 +1085,6 @@ buildRestTests.setups['reviews'] = ''' {"index": {"_id": "2"}} {"product": "widget-foo", "rating": 5} ''' - buildRestTests.setups['remote_cluster'] = buildRestTests.setups['host'] + ''' - do: cluster.put_settings: @@ -1147,6 +1146,53 @@ buildRestTests.setups['kibana_sample_data_ecommerce'] = ''' number_of_shards: 1 number_of_replicas: 0 ''' +buildRestTests.setups['simple_kibana_continuous_pivot'] = buildRestTests.setups['kibana_sample_data_ecommerce'] + ''' + - do: + raw: + method: PUT + path: _data_frame/transforms/simple-kibana-ecomm-pivot + body: > + { + "source": { + "index": "kibana_sample_data_ecommerce", + "query": { + "term": { + "geoip.continent_name": { + "value": "Asia" + } + } + } + }, + "pivot": { + "group_by": { + "customer_id": { + "terms": { + "field": "customer_id" + } + } + }, + "aggregations": { + "max_price": { + "max": { + "field": "taxful_total_price" + } + } + } + }, + "description": "Maximum priced ecommerce data", + "dest": { + "index": "kibana_sample_data_ecommerce_transform", + "pipeline": "add_timestamp_pipeline" + }, + "frequency": "5m", + "sync": { + "time": { + "field": "order_date", + "delay": "60s" + } + } + } +''' buildRestTests.setups['setup_logdata'] = ''' - do: indices.create: diff --git a/docs/java-rest/high-level/dataframe/update_data_frame.asciidoc b/docs/java-rest/high-level/dataframe/update_data_frame.asciidoc new file mode 100644 index 0000000000000..d7aaefa192c87 --- /dev/null +++ b/docs/java-rest/high-level/dataframe/update_data_frame.asciidoc @@ -0,0 +1,51 @@ +-- +:api: update-data-frame-transform +:request: UpdateDataFrameTransformRequest +:response: UpdateDataFrameTransformResponse +-- +[id="{upid}-{api}"] +=== Update {dataframe-transform} API + +Updates an existing {dataframe-transform}. + +The API accepts a +{request}+ object as a request and returns a +{response}+. + +[id="{upid}-{api}-request"] +==== Update {dataframe-transform} request + +A +{request}+ requires the following argument: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[{api}-request] +-------------------------------------------------- +<1> The update configuration with which to update the {dataframe-transform}. +<2> The ID of the configuration to update. +<3> Whether or not to wait to run deferrable validations until `_start` is called. +This option should be used with care as the created {dataframe-transform} will run +with the privileges of the user creating it. Meaning, if they do not have privileges, +such an error will not be visible until `_start` is called. + +[id="{upid}-{api}-config"] +==== {dataframe-transform-cap} update configuration + +The `DataFrameTransformConfigUpdate` object contains all the details about updated +{dataframe-transform} configuration and contains the following arguments: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[{api}-config] +-------------------------------------------------- +<1> The source indices and query from which to gather data. +<2> The destination index and optional pipeline. +<3> How often to check for updates to the source indices. +<4> How to keep the {dataframe-transform} in sync with incoming data. +<5> Optional free text description of the transform. + +include::../execution.asciidoc[] + +[id="{upid}-{api}-response"] +==== Response + +The returned +{response}+ contains the updated {dataframe-transform} configuration +or an error if the update failed or is invalid. diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index 386b09f37992a..aa3c9aff3d8e3 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -578,6 +578,7 @@ APIs: * <<{upid}-get-data-frame-transform>> * <<{upid}-get-data-frame-transform-stats>> * <<{upid}-put-data-frame-transform>> +* <<{upid}-update-data-frame-transform>> * <<{upid}-delete-data-frame-transform>> * <<{upid}-preview-data-frame-transform>> * <<{upid}-start-data-frame-transform>> @@ -586,6 +587,7 @@ APIs: include::dataframe/get_data_frame.asciidoc[] include::dataframe/get_data_frame_stats.asciidoc[] include::dataframe/put_data_frame.asciidoc[] +include::dataframe/update_data_frame.asciidoc[] include::dataframe/delete_data_frame.asciidoc[] include::dataframe/preview_data_frame.asciidoc[] include::dataframe/start_data_frame.asciidoc[] diff --git a/docs/reference/data-frames/apis/index.asciidoc b/docs/reference/data-frames/apis/index.asciidoc index 3a40948bd667b..3abc123f3542b 100644 --- a/docs/reference/data-frames/apis/index.asciidoc +++ b/docs/reference/data-frames/apis/index.asciidoc @@ -6,6 +6,7 @@ See also {stack-ov}/ml-dataframes.html[{dataframe-transforms-cap}]. * <> +* <> * <> * <> * <> @@ -15,6 +16,8 @@ See also {stack-ov}/ml-dataframes.html[{dataframe-transforms-cap}]. //CREATE include::put-transform.asciidoc[] +//UPDATE +include::update-transform.asciidoc[] //DELETE include::delete-transform.asciidoc[] //GET diff --git a/docs/reference/data-frames/apis/update-transform.asciidoc b/docs/reference/data-frames/apis/update-transform.asciidoc new file mode 100644 index 0000000000000..457ede2c4ae91 --- /dev/null +++ b/docs/reference/data-frames/apis/update-transform.asciidoc @@ -0,0 +1,203 @@ +[role="xpack"] +[testenv="basic"] +[[update-data-frame-transform]] +=== Update {dataframe-transforms} API + +[subs="attributes"] +++++ +Update {dataframe-transforms} +++++ + +Updates an existing {dataframe-transform}. + +beta[] + +[[update-data-frame-transform-request]] +==== {api-request-title} + +`POST _data_frame/transforms//_update` + +[[update-data-frame-transform-prereqs]] +==== {api-prereq-title} + +* If the {es} {security-features} are enabled, you must have +`manage_data_frame_transforms` cluster privileges to use this API. The built-in +`data_frame_transforms_admin` role has these privileges. You must also +have `read` and `view_index_metadata` privileges on the source index and `read`, +`create_index`, and `index` privileges on the destination index. For more +information, see {stack-ov}/security-privileges.html[Security privileges] and +{stack-ov}/built-in-roles.html[Built-in roles]. + +[[update-data-frame-transform-desc]] +==== {api-description-title} + +This API updates an existing {dataframe-transform}. All settings except description do not +take effect until after the {dataframe-transform} starts the next checkpoint. This is +so there is consistency with the pivoted data in each checkpoint. + +IMPORTANT: When {es} {security-features} are enabled, your {dataframe-transform} +remembers which roles the user who updated it had at the time of update and +runs with those privileges. + +IMPORTANT: You must use {kib} or this API to update a {dataframe-transform}. + Do not update a {dataframe-transform} directly via + `.data-frame-internal*` indices using the Elasticsearch index API. + If {es} {security-features} are enabled, do not give users any + privileges on `.data-frame-internal*` indices. + +[[update-data-frame-transform-path-parms]] +==== {api-path-parms-title} + +``:: + (Required, string) Identifier for the {dataframe-transform}. This identifier + can contain lowercase alphanumeric characters (a-z and 0-9), hyphens, and + underscores. It must start and end with alphanumeric characters. + +[[update-data-frame-transform-query-parms]] +==== {api-query-parms-title} + +`defer_validation`:: + (Optional, boolean) When `true`, deferrable validations are not run. This + behavior may be desired if the source index does not exist until after the + {dataframe-transform} is updated. + +[[update-data-frame-transform-request-body]] +==== {api-request-body-title} + +`description`:: + (Optional, string) Free text description of the {dataframe-transform}. + +`dest`:: + (Optional, object) The destination configuration, which has the + following properties: + + `index`::: + (Required, string) The _destination index_ for the {dataframe-transform}. + + `pipeline`::: + (Optional, string) The unique identifier for a <>. + +`frequency`:: + (Optional, time units) The interval between checks for changes in the source + indices when the {dataframe-transform} is running continuously. Also determines + the retry interval in the event of transient failures while the {dataframe-transform} is + searching or indexing. The minimum value is `1s` and the maximum is `1h`. The + default value is `1m`. + +`source`:: + (Optional, object) The source configuration, which has the following + properties: + + `index`::: + (Required, string or array) The _source indices_ for the + {dataframe-transform}. It can be a single index, an index pattern (for + example, `"myindex*"`), or an array of indices (for example, + `["index1", "index2"]`). + + `query`::: + (Optional, object) A query clause that retrieves a subset of data from the + source index. See <>. + +`sync`:: + (Optional, object) Defines the properties required to run continuously. + `time`::: + (Required, object) Specifies that the {dataframe-transform} uses a time + field to synchronize the source and destination indices. + `field`:::: + (Required, string) The date field that is used to identify new documents + in the source. ++ +-- +TIP: In general, it’s a good idea to use a field that contains the +<>. If you use a different field, +you might need to set the `delay` such that it accounts for data transmission +delays. + +-- + `delay`:::: + (Optional, time units) The time delay between the current time and the + latest input data time. The default value is `60s`. + +[[update-data-frame-transform-example]] +==== {api-examples-title} + +[source,js] +-------------------------------------------------- +POST _data_frame/transforms/simple-kibana-ecomm-pivot/_update +{ + "source": { + "index": "kibana_sample_data_ecommerce", + "query": { + "term": { + "geoip.continent_name": { + "value": "Asia" + } + } + } + }, + "description": "Maximum priced ecommerce data by customer_id in Asia", + "dest": { + "index": "kibana_sample_data_ecommerce_transform_v2", + "pipeline": "add_timestamp_pipeline" + }, + "frequency": "15m", + "sync": { + "time": { + "field": "order_date", + "delay": "120s" + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[setup:simple_kibana_continuous_pivot] + +When the transform is updated, you receive the updated configuration: +[source,js] +---- +{ + "id": "simple-kibana-ecomm-pivot", + "source": { + "index": ["kibana_sample_data_ecommerce"], + "query": { + "term": { + "geoip.continent_name": { + "value": "Asia" + } + } + } + }, + "pivot": { + "group_by": { + "customer_id": { + "terms": { + "field": "customer_id" + } + } + }, + "aggregations": { + "max_price": { + "max": { + "field": "taxful_total_price" + } + } + } + }, + "description": "Maximum priced ecommerce data by customer_id in Asia", + "dest": { + "index": "kibana_sample_data_ecommerce_transform_v2", + "pipeline": "add_timestamp_pipeline" + }, + "frequency": "15m", + "sync": { + "time": { + "field": "order_date", + "delay": "120s" + } + }, + "version": "8.0.0-alpha1", + "create_time": 1518808660505 +} +---- +// TESTRESPONSE[s/"version": "8.0.0-alpha1"/"version": $body.version/] +// TESTRESPONSE[s/"create_time": 1518808660505/"create_time": $body.create_time/] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java index 4597bfeb2a3d0..c0c209a9b542b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java @@ -25,6 +25,7 @@ public final class DataFrameField { public static final ParseField STATS_FIELD = new ParseField("stats"); public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type"); public static final ParseField SOURCE = new ParseField("source"); + public static final ParseField DESCRIPTION = new ParseField("description"); public static final ParseField DESTINATION = new ParseField("dest"); public static final ParseField FREQUENCY = new ParseField("frequency"); public static final ParseField FORCE = new ParseField("force"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java index 6ea1134299d5a..45e923de231d8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java @@ -26,7 +26,6 @@ public class DataFrameMessages { public static final String REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX = "Destination index [{0}] should refer to a single index"; public static final String REST_PUT_DATA_FRAME_INCONSISTENT_ID = "Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument"; - public static final String DATA_FRAME_CONFIG_INVALID = "Data frame transform configuration is invalid [{0}]"; public static final String REST_DATA_FRAME_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]"; public static final String DATA_FRAME_FAILED_TO_PERSIST_STATS = "Failed to persist data frame statistics for transform [{0}]"; @@ -36,6 +35,8 @@ public class DataFrameMessages { "Unable to stop data frame transform [{0}] as it is in a failed state with reason [{1}]." + " Use force stop to stop the data frame transform."; public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]"; + public static final String FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION = + "Failed to reload data frame transform configuration for transform [{0}]"; public static final String FAILED_TO_LOAD_TRANSFORM_CONFIGURATION = "Failed to load data frame transform configuration for transform [{0}]"; public static final String FAILED_TO_PARSE_TRANSFORM_CONFIGURATION = @@ -57,6 +58,8 @@ public class DataFrameMessages { public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID = "Data frame transform configuration [{0}] has invalid elements"; public static final String DATA_FRAME_UNABLE_TO_GATHER_FIELD_MAPPINGS = "Failed to gather field mappings for index [{0}]"; + public static final String DATA_FRAME_UPDATE_CANNOT_CHANGE_SYNC_METHOD = + "Cannot change the current sync configuration of transform [{0}] from [{1}] to [{2}]"; public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_QUERY = "Failed to parse query for data frame transform"; public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_GROUP_BY = diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/UpdateDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/UpdateDataFrameTransformAction.java new file mode 100644 index 0000000000000..4136f89ce06b4 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/UpdateDataFrameTransformAction.java @@ -0,0 +1,179 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.dataframe.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.indices.InvalidIndexNameException; +import org.elasticsearch.xpack.core.dataframe.DataFrameField; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigUpdate; + +import java.io.IOException; +import java.util.Locale; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.cluster.metadata.MetaDataCreateIndexService.validateIndexOrAliasName; + +public class UpdateDataFrameTransformAction extends ActionType { + + public static final UpdateDataFrameTransformAction INSTANCE = new UpdateDataFrameTransformAction(); + public static final String NAME = "cluster:admin/data_frame/update"; + + private static final TimeValue MIN_FREQUENCY = TimeValue.timeValueSeconds(1); + private static final TimeValue MAX_FREQUENCY = TimeValue.timeValueHours(1); + + private UpdateDataFrameTransformAction() { + super(NAME, Response::new); + } + + public static class Request extends AcknowledgedRequest { + + private final DataFrameTransformConfigUpdate update; + private final String id; + private final boolean deferValidation; + + public Request(DataFrameTransformConfigUpdate update, String id, boolean deferValidation) { + this.update = update; + this.id = id; + this.deferValidation = deferValidation; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.update = new DataFrameTransformConfigUpdate(in); + this.id = in.readString(); + this.deferValidation = in.readBoolean(); + } + + public static Request fromXContent(final XContentParser parser, final String id, final boolean deferValidation) { + return new Request(DataFrameTransformConfigUpdate.fromXContent(parser), id, deferValidation); + } + + /** + * More complex validations with how {@link DataFrameTransformConfig#getDestination()} and + * {@link DataFrameTransformConfig#getSource()} relate are done in the update transport handler. + */ + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (update.getDestination() != null && update.getDestination().getIndex() != null) { + String destIndex = update.getDestination().getIndex(); + try { + validateIndexOrAliasName(destIndex, InvalidIndexNameException::new); + if (!destIndex.toLowerCase(Locale.ROOT).equals(destIndex)) { + validationException = addValidationError("dest.index [" + destIndex + "] must be lowercase", validationException); + } + } catch (InvalidIndexNameException ex) { + validationException = addValidationError(ex.getMessage(), validationException); + } + } + TimeValue frequency = update.getFrequency(); + if (frequency != null) { + if (frequency.compareTo(MIN_FREQUENCY) < 0) { + validationException = addValidationError( + "minimum permitted [" + DataFrameField.FREQUENCY + "] is [" + MIN_FREQUENCY.getStringRep() + "]", + validationException); + } else if (frequency.compareTo(MAX_FREQUENCY) > 0) { + validationException = addValidationError( + "highest permitted [" + DataFrameField.FREQUENCY + "] is [" + MAX_FREQUENCY.getStringRep() + "]", + validationException); + } + } + + return validationException; + } + + public String getId() { + return id; + } + + public boolean isDeferValidation() { + return deferValidation; + } + + public DataFrameTransformConfigUpdate getUpdate() { + return update; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + this.update.writeTo(out); + out.writeString(id); + out.writeBoolean(deferValidation); + } + + @Override + public int hashCode() { + return Objects.hash(update, id, deferValidation); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Request other = (Request) obj; + return Objects.equals(update, other.update) && + this.deferValidation == other.deferValidation && + this.id.equals(other.id); + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + + private final DataFrameTransformConfig config; + + public Response(DataFrameTransformConfig config) { + this.config = config; + } + + public Response(StreamInput in) throws IOException { + this.config = new DataFrameTransformConfig(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + this.config.writeTo(out); + } + + @Override + public int hashCode() { + return config.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + Response other = (Response) obj; + return Objects.equals(config, other.config); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return config.toXContent(builder, params); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java index 54d0ff7298383..77ebae93df14f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java @@ -47,12 +47,11 @@ public class DataFrameTransformConfig extends AbstractDiffable STRICT_PARSER = createParser(false); private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); - private static final int MAX_DESCRIPTION_LENGTH = 1_000; + static final int MAX_DESCRIPTION_LENGTH = 1_000; private final String id; private final SourceConfig source; @@ -131,7 +130,7 @@ private static ConstructingObjectParser create parser.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), HEADERS); parser.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p, lenient), PIVOT_TRANSFORM); - parser.declareString(optionalConstructorArg(), DESCRIPTION); + parser.declareString(optionalConstructorArg(), DataFrameField.DESCRIPTION); parser.declareField(optionalConstructorArg(), p -> TimeUtils.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()), CREATE_TIME, ObjectParser.ValueType.VALUE); parser.declareString(optionalConstructorArg(), VERSION); @@ -330,7 +329,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field(HEADERS.getPreferredName(), headers); } if (description != null) { - builder.field(DESCRIPTION.getPreferredName(), description); + builder.field(DataFrameField.DESCRIPTION.getPreferredName(), description); } if (transformVersion != null) { builder.field(VERSION.getPreferredName(), transformVersion); @@ -381,4 +380,113 @@ public static DataFrameTransformConfig fromXContent(final XContentParser parser, return lenient ? LENIENT_PARSER.apply(parser, optionalTransformId) : STRICT_PARSER.apply(parser, optionalTransformId); } + + public static class Builder { + private String id; + private SourceConfig source; + private DestConfig dest; + private TimeValue frequency; + private SyncConfig syncConfig; + private String description; + private Map headers; + private Version transformVersion; + private Instant createTime; + private PivotConfig pivotConfig; + + public Builder() { } + + public Builder(DataFrameTransformConfig config) { + this.id = config.id; + this.source = config.source; + this.dest = config.dest; + this.frequency = config.frequency; + this.syncConfig = config.syncConfig; + this.description = config.description; + this.transformVersion = config.transformVersion; + this.createTime = config.createTime; + this.pivotConfig = config.pivotConfig; + } + + public Builder setId(String id) { + this.id = id; + return this; + } + + public Builder setSource(SourceConfig source) { + this.source = source; + return this; + } + + public Builder setDest(DestConfig dest) { + this.dest = dest; + return this; + } + + public Builder setFrequency(TimeValue frequency) { + this.frequency = frequency; + return this; + } + + public Builder setSyncConfig(SyncConfig syncConfig) { + this.syncConfig = syncConfig; + return this; + } + + public Builder setDescription(String description) { + this.description = description; + return this; + } + + public Builder setHeaders(Map headers) { + this.headers = headers; + return this; + } + + public Builder setPivotConfig(PivotConfig pivotConfig) { + this.pivotConfig = pivotConfig; + return this; + } + + public DataFrameTransformConfig build() { + return new DataFrameTransformConfig(id, + source, + dest, + frequency, + syncConfig, + headers, + pivotConfig, + description, + createTime, + transformVersion == null ? null : transformVersion.toString()); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + final DataFrameTransformConfig.Builder that = (DataFrameTransformConfig.Builder) other; + + return Objects.equals(this.id, that.id) + && Objects.equals(this.source, that.source) + && Objects.equals(this.dest, that.dest) + && Objects.equals(this.frequency, that.frequency) + && Objects.equals(this.syncConfig, that.syncConfig) + && Objects.equals(this.headers, that.headers) + && Objects.equals(this.pivotConfig, that.pivotConfig) + && Objects.equals(this.description, that.description) + && Objects.equals(this.createTime, that.createTime) + && Objects.equals(this.transformVersion, that.transformVersion); + } + + @Override + public int hashCode(){ + return Objects.hash(id, source, dest, frequency, syncConfig, headers, pivotConfig, description, createTime, transformVersion); + } + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigUpdate.java new file mode 100644 index 0000000000000..0b80a468e0f30 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigUpdate.java @@ -0,0 +1,253 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.dataframe.transforms; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParserUtils; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.core.dataframe.DataFrameField; +import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; +import static org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig.MAX_DESCRIPTION_LENGTH; + +/** + * This class holds the mutable configuration items for a data frame transform + */ +public class DataFrameTransformConfigUpdate implements Writeable, ToXContentObject { + + public static final String NAME = "data_frame_transform_config_update"; + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, + false, + (args) -> { + SourceConfig source = (SourceConfig) args[0]; + DestConfig dest = (DestConfig) args[1]; + TimeValue frequency = args[2] == null ? + null : + TimeValue.parseTimeValue((String) args[2], DataFrameField.FREQUENCY.getPreferredName()); + SyncConfig syncConfig = (SyncConfig) args[3]; + String description = (String) args[4]; + return new DataFrameTransformConfigUpdate(source, dest, frequency, syncConfig, description); + }); + + static { + PARSER.declareObject(optionalConstructorArg(), (p, c) -> SourceConfig.fromXContent(p, false), DataFrameField.SOURCE); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> DestConfig.fromXContent(p, false), DataFrameField.DESTINATION); + PARSER.declareString(optionalConstructorArg(), DataFrameField.FREQUENCY); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p), DataFrameField.SYNC); + PARSER.declareString(optionalConstructorArg(), DataFrameField.DESCRIPTION); + } + + private static SyncConfig parseSyncConfig(XContentParser parser) throws IOException { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser::getTokenLocation); + SyncConfig syncConfig = parser.namedObject(SyncConfig.class, parser.currentName(), false); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser::getTokenLocation); + return syncConfig; + } + + private final SourceConfig source; + private final DestConfig dest; + private final TimeValue frequency; + private final SyncConfig syncConfig; + private final String description; + private Map headers; + + public DataFrameTransformConfigUpdate(final SourceConfig source, + final DestConfig dest, + final TimeValue frequency, + final SyncConfig syncConfig, + final String description){ + this.source = source; + this.dest = dest; + this.frequency = frequency; + this.syncConfig = syncConfig; + this.description = description; + if (this.description != null && this.description.length() > MAX_DESCRIPTION_LENGTH) { + throw new IllegalArgumentException("[description] must be less than 1000 characters in length."); + } + } + + public DataFrameTransformConfigUpdate(final StreamInput in) throws IOException { + source = in.readOptionalWriteable(SourceConfig::new); + dest = in.readOptionalWriteable(DestConfig::new); + frequency = in.readOptionalTimeValue(); + description = in.readOptionalString(); + syncConfig = in.readOptionalNamedWriteable(SyncConfig.class); + if (in.readBoolean()) { + setHeaders(in.readMap(StreamInput::readString, StreamInput::readString)); + } + } + + public SourceConfig getSource() { + return source; + } + + public DestConfig getDestination() { + return dest; + } + + public TimeValue getFrequency() { + return frequency; + } + + public SyncConfig getSyncConfig() { + return syncConfig; + } + + @Nullable + public String getDescription() { + return description; + } + + public Map getHeaders() { + return headers; + } + + public void setHeaders(Map headers) { + this.headers = headers; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeOptionalWriteable(source); + out.writeOptionalWriteable(dest); + out.writeOptionalTimeValue(frequency); + out.writeOptionalString(description); + out.writeOptionalNamedWriteable(syncConfig); + if (headers != null) { + out.writeBoolean(true); + out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); + } else { + out.writeBoolean(false); + } + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + if (source != null) { + builder.field(DataFrameField.SOURCE.getPreferredName(), source); + } + if (dest != null) { + builder.field(DataFrameField.DESTINATION.getPreferredName(), dest); + } + if (frequency != null) { + builder.field(DataFrameField.FREQUENCY.getPreferredName(), frequency.getStringRep()); + } + if (syncConfig != null) { + builder.startObject(DataFrameField.SYNC.getPreferredName()); + builder.field(syncConfig.getWriteableName(), syncConfig); + builder.endObject(); + } + if (description != null) { + builder.field(DataFrameField.DESCRIPTION.getPreferredName(), description); + } + if (headers != null) { + builder.field(DataFrameTransformConfig.HEADERS.getPreferredName(), headers); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + final DataFrameTransformConfigUpdate that = (DataFrameTransformConfigUpdate) other; + + return Objects.equals(this.source, that.source) + && Objects.equals(this.dest, that.dest) + && Objects.equals(this.frequency, that.frequency) + && Objects.equals(this.syncConfig, that.syncConfig) + && Objects.equals(this.description, that.description) + && Objects.equals(this.headers, that.headers); + } + + @Override + public int hashCode(){ + return Objects.hash(source, dest, frequency, syncConfig, description, headers); + } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } + + public static DataFrameTransformConfigUpdate fromXContent(final XContentParser parser) { + return PARSER.apply(parser, null); + } + + public boolean isNoop(DataFrameTransformConfig config) { + return isNullOrEqual(source, config.getSource()) + && isNullOrEqual(dest, config.getDestination()) + && isNullOrEqual(frequency, config.getFrequency()) + && isNullOrEqual(syncConfig, config.getSyncConfig()) + && isNullOrEqual(description, config.getDescription()) + && isNullOrEqual(headers, config.getHeaders()); + } + + private boolean isNullOrEqual(Object lft, Object rgt) { + return lft == null || lft.equals(rgt); + } + + public DataFrameTransformConfig apply(DataFrameTransformConfig config) { + if (isNoop(config)) { + return config; + } + DataFrameTransformConfig.Builder builder = new DataFrameTransformConfig.Builder(config); + if (source != null) { + builder.setSource(source); + } + if (dest != null) { + builder.setDest(dest); + } + if (frequency != null) { + builder.setFrequency(frequency); + } + if (syncConfig != null) { + String currentConfigName = config.getSyncConfig() == null ? "null" : config.getSyncConfig().getWriteableName(); + if (syncConfig.getWriteableName().equals(currentConfigName) == false) { + throw new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_UPDATE_CANNOT_CHANGE_SYNC_METHOD, + config.getId(), + currentConfigName, + syncConfig.getWriteableName()), + RestStatus.BAD_REQUEST); + } + builder.setSyncConfig(syncConfig); + } + if (description != null) { + builder.setDescription(description); + } + if (headers != null) { + builder.setHeaders(headers); + } + return builder.build(); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/UpdateDataFrameTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/UpdateDataFrameTransformActionRequestTests.java new file mode 100644 index 0000000000000..3aa34dae067b7 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/UpdateDataFrameTransformActionRequestTests.java @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.dataframe.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xpack.core.dataframe.action.UpdateDataFrameTransformAction.Request; + +import static org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigUpdateTests.randomDataFrameTransformConfigUpdate; + +public class UpdateDataFrameTransformActionRequestTests extends AbstractWireSerializingDataFrameTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + return new Request(randomDataFrameTransformConfigUpdate(), randomAlphaOfLength(10), randomBoolean()); + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/UpdateDataFrameTransformsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/UpdateDataFrameTransformsActionResponseTests.java new file mode 100644 index 0000000000000..6dffbc8930218 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/UpdateDataFrameTransformsActionResponseTests.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.dataframe.action; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.dataframe.action.UpdateDataFrameTransformAction.Response; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests; + +import java.io.IOException; + +public class UpdateDataFrameTransformsActionResponseTests extends AbstractSerializingDataFrameTestCase { + + @Override + protected Response createTestInstance() { + return new Response(DataFrameTransformConfigTests.randomDataFrameTransformConfigWithoutHeaders()); + } + + @Override + protected Reader instanceReader() { + return Response::new; + } + + @Override + protected Response doParseInstance(XContentParser parser) throws IOException { + return new Response(DataFrameTransformConfig.fromXContent(parser, null, false)); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigUpdateTests.java new file mode 100644 index 0000000000000..85e031e1f89a8 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigUpdateTests.java @@ -0,0 +1,188 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.dataframe.transforms; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfigTests; + +import java.io.IOException; +import java.time.Instant; +import java.util.Collections; +import java.util.Map; + +import static org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests.randomDataFrameTransformConfig; +import static org.elasticsearch.xpack.core.dataframe.transforms.DestConfigTests.randomDestConfig; +import static org.elasticsearch.xpack.core.dataframe.transforms.SourceConfigTests.randomSourceConfig; +import static org.hamcrest.Matchers.equalTo; + +public class DataFrameTransformConfigUpdateTests extends AbstractSerializingDataFrameTestCase { + + public static DataFrameTransformConfigUpdate randomDataFrameTransformConfigUpdate() { + return new DataFrameTransformConfigUpdate( + randomBoolean() ? null : randomSourceConfig(), + randomBoolean() ? null : randomDestConfig(), + randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)), + randomBoolean() ? null : randomSyncConfig(), + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)); + } + + public static SyncConfig randomSyncConfig() { + return TimeSyncConfigTests.randomTimeSyncConfig(); + } + + @Override + protected DataFrameTransformConfigUpdate doParseInstance(XContentParser parser) throws IOException { + return DataFrameTransformConfigUpdate.fromXContent(parser); + } + + @Override + protected DataFrameTransformConfigUpdate createTestInstance() { + return randomDataFrameTransformConfigUpdate(); + } + + @Override + protected Reader instanceReader() { + return DataFrameTransformConfigUpdate::new; + } + + public void testIsNoop() { + for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) { + DataFrameTransformConfig config = randomDataFrameTransformConfig(); + DataFrameTransformConfigUpdate update = new DataFrameTransformConfigUpdate(null, null, null, null, null); + assertTrue("null update is not noop", update.isNoop(config)); + update = new DataFrameTransformConfigUpdate(config.getSource(), + config.getDestination(), + config.getFrequency(), + config.getSyncConfig(), + config.getDescription()); + assertTrue("equal update is not noop", update.isNoop(config)); + + update = new DataFrameTransformConfigUpdate(config.getSource(), + config.getDestination(), + config.getFrequency(), + config.getSyncConfig(), + "this is a new description"); + assertFalse("true update is noop", update.isNoop(config)); + } + } + + public void testApply() { + DataFrameTransformConfig config = new DataFrameTransformConfig("time-transform", + randomSourceConfig(), + randomDestConfig(), + TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)), + TimeSyncConfigTests.randomTimeSyncConfig(), + Collections.singletonMap("key", "value"), + PivotConfigTests.randomPivotConfig(), + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), + randomBoolean() ? null : Instant.now(), + randomBoolean() ? null : Version.CURRENT.toString()); + DataFrameTransformConfigUpdate update = new DataFrameTransformConfigUpdate(null, null, null, null, null); + + assertThat(config, equalTo(update.apply(config))); + SourceConfig sourceConfig = new SourceConfig("the_new_index"); + DestConfig destConfig = new DestConfig("the_new_dest", "my_new_pipeline"); + TimeValue frequency = TimeValue.timeValueSeconds(10); + SyncConfig syncConfig = new TimeSyncConfig("time_field", TimeValue.timeValueSeconds(30)); + String newDescription = "new description"; + update = new DataFrameTransformConfigUpdate(sourceConfig, destConfig, frequency, syncConfig, newDescription); + + Map headers = Collections.singletonMap("foo", "bar"); + update.setHeaders(headers); + DataFrameTransformConfig updatedConfig = update.apply(config); + + assertThat(updatedConfig.getSource(), equalTo(sourceConfig)); + assertThat(updatedConfig.getDestination(), equalTo(destConfig)); + assertThat(updatedConfig.getFrequency(), equalTo(frequency)); + assertThat(updatedConfig.getSyncConfig(), equalTo(syncConfig)); + assertThat(updatedConfig.getDescription(), equalTo(newDescription)); + assertThat(updatedConfig.getHeaders(), equalTo(headers)); + } + + public void testApplyWithSyncChange() { + DataFrameTransformConfig batchConfig = new DataFrameTransformConfig("batch-transform", + randomSourceConfig(), + randomDestConfig(), + TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)), + null, + null, + PivotConfigTests.randomPivotConfig(), + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), + randomBoolean() ? null : Instant.now(), + randomBoolean() ? null : Version.CURRENT.toString()); + + DataFrameTransformConfigUpdate update = new DataFrameTransformConfigUpdate(null, + null, + null, + TimeSyncConfigTests.randomTimeSyncConfig(), + null); + + ElasticsearchStatusException ex = expectThrows(ElasticsearchStatusException.class, () -> update.apply(batchConfig)); + assertThat(ex.getMessage(), + equalTo("Cannot change the current sync configuration of transform [batch-transform] from [null] to [time]")); + + DataFrameTransformConfig timeSyncedConfig = new DataFrameTransformConfig("time-transform", + randomSourceConfig(), + randomDestConfig(), + TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)), + TimeSyncConfigTests.randomTimeSyncConfig(), + null, + PivotConfigTests.randomPivotConfig(), + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), + randomBoolean() ? null : Instant.now(), + randomBoolean() ? null : Version.CURRENT.toString()); + + DataFrameTransformConfigUpdate fooSyncUpdate = new DataFrameTransformConfigUpdate(null, + null, + null, + new FooSync(), + null); + ex = expectThrows(ElasticsearchStatusException.class, () -> fooSyncUpdate.apply(timeSyncedConfig)); + assertThat(ex.getMessage(), + equalTo("Cannot change the current sync configuration of transform [time-transform] from [time] to [foo]")); + + } + + static class FooSync implements SyncConfig { + + @Override + public boolean isValid() { + return true; + } + + @Override + public QueryBuilder getRangeQuery(DataFrameTransformCheckpoint newCheckpoint) { + return null; + } + + @Override + public QueryBuilder getRangeQuery(DataFrameTransformCheckpoint oldCheckpoint, DataFrameTransformCheckpoint newCheckpoint) { + return null; + } + + @Override + public String getWriteableName() { + return "foo"; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return null; + } + } +} diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java index 2f96b512d481e..3e2ffd5857e3f 100644 --- a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java @@ -24,7 +24,9 @@ import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse; import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest; import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse; +import org.elasticsearch.client.dataframe.UpdateDataFrameTransformRequest; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate; import org.elasticsearch.client.dataframe.transforms.DestConfig; import org.elasticsearch.client.dataframe.transforms.QueryConfig; import org.elasticsearch.client.dataframe.transforms.SourceConfig; @@ -231,6 +233,11 @@ protected void bulkIndexDocs(BulkRequest request) throws Exception { assertThat(response.buildFailureMessage(), response.hasFailures(), is(false)); } + protected void updateConfig(String id, DataFrameTransformConfigUpdate update) throws Exception { + RestHighLevelClient restClient = new TestRestHighLevelClient(); + restClient.dataFrame().updateDataFrameTransform(new UpdateDataFrameTransformRequest(update, id), RequestOptions.DEFAULT); + } + protected void createReviewsIndex(String indexName, int numDocs) throws Exception { RestHighLevelClient restClient = new TestRestHighLevelClient(); @@ -341,7 +348,7 @@ protected Settings restClientSettings() { .build(); } - private static class TestRestHighLevelClient extends RestHighLevelClient { + protected static class TestRestHighLevelClient extends RestHighLevelClient { private static final List X_CONTENT_ENTRIES = new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents(); TestRestHighLevelClient() { diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java index 819448a6317bc..a48777d79b0c4 100644 --- a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformIT.java @@ -7,28 +7,40 @@ package org.elasticsearch.xpack.dataframe.integration; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfigUpdate; import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState; +import org.elasticsearch.client.dataframe.transforms.DestConfig; import org.elasticsearch.client.dataframe.transforms.TimeSyncConfig; import org.elasticsearch.client.dataframe.transforms.pivot.SingleGroupSource; import org.elasticsearch.client.dataframe.transforms.pivot.TermsGroupSource; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.After; import java.io.IOException; import java.time.Instant; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -115,7 +127,115 @@ public void testContinuousDataFrameTransformCrud() throws Exception { // index some more docs long timeStamp = Instant.now().toEpochMilli() - 1_000; long user = 42; - BulkRequest bulk = new BulkRequest(indexName); + indexMoreDocs(timeStamp, user, indexName); + waitUntilCheckpoint(config.getId(), 2L); + + // Assert that we wrote the new docs + assertThat(getDataFrameTransformStats(config.getId()) + .getTransformsStats() + .get(0) + .getIndexerStats() + .getNumDocuments(), greaterThan(docsIndexed)); + + stopDataFrameTransform(config.getId()); + deleteDataFrameTransform(config.getId()); + } + + public void testContinuousDataFrameTransformUpdate() throws Exception { + String indexName = "continuous-reviews-update"; + createReviewsIndex(indexName, 10); + + Map groups = new HashMap<>(); + groups.put("by-user", TermsGroupSource.builder().setField("user_id").build()); + + AggregatorFactories.Builder aggs = AggregatorFactories.builder() + .addAggregator(AggregationBuilders.avg("review_score").field("stars")) + .addAggregator(AggregationBuilders.max("timestamp").field("timestamp")); + + String id = "data-frame-transform-to-update"; + String dest = "reviews-by-user-business-day-to-update"; + DataFrameTransformConfig config = createTransformConfigBuilder(id, + groups, + aggs, + dest, + QueryBuilders.matchAllQuery(), + indexName) + .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) + .build(); + + assertTrue(putDataFrameTransform(config, RequestOptions.DEFAULT).isAcknowledged()); + assertTrue(startDataFrameTransform(config.getId(), RequestOptions.DEFAULT).isAcknowledged()); + + waitUntilCheckpoint(config.getId(), 1L); + assertThat(getDataFrameTransformStats(config.getId()).getTransformsStats().get(0).getTaskState(), + equalTo(DataFrameTransformTaskState.STARTED)); + + long docsIndexed = getDataFrameTransformStats(config.getId()) + .getTransformsStats() + .get(0) + .getIndexerStats() + .getNumDocuments(); + + DataFrameTransformConfig storedConfig = getDataFrameTransform(config.getId()).getTransformConfigurations().get(0); + assertThat(storedConfig.getVersion(), equalTo(Version.CURRENT)); + Instant now = Instant.now(); + assertTrue("[create_time] is not before current time", storedConfig.getCreateTime().isBefore(now)); + + String pipelineId = "add_forty_two"; + DataFrameTransformConfigUpdate update = DataFrameTransformConfigUpdate.builder() + .setDescription("updated config") + .setDest(DestConfig.builder().setIndex(dest).setPipeline(pipelineId).build()) + .build(); + + RestHighLevelClient hlrc = new TestRestHighLevelClient(); + final XContentBuilder pipelineBuilder = jsonBuilder() + .startObject() + .startArray("processors") + .startObject() + .startObject("set") + .field("field", "static_forty_two") + .field("value", 42) + .endObject() + .endObject() + .endArray() + .endObject(); + hlrc.ingest().putPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(pipelineBuilder), XContentType.JSON), + RequestOptions.DEFAULT); + + updateConfig(id, update); + + // index some more docs + long timeStamp = Instant.now().toEpochMilli() - 1_000; + long user = 42; + indexMoreDocs(timeStamp, user, indexName); + + // Since updates are loaded on checkpoint start, we should see the updated config on this next run + waitUntilCheckpoint(config.getId(), 2L); + long numDocsAfterCp2 = getDataFrameTransformStats(config.getId()) + .getTransformsStats() + .get(0) + .getIndexerStats() + .getNumDocuments(); + assertThat(numDocsAfterCp2, greaterThan(docsIndexed)); + + final SearchRequest searchRequest = new SearchRequest(dest) + .source(new SearchSourceBuilder() + .trackTotalHits(true) + .query(QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery("static_forty_two", 42)))); + // assert that we have the new field and its value is 42 in at least some docs + assertBusy(() -> { + final SearchResponse searchResponse = hlrc.search(searchRequest, RequestOptions.DEFAULT); + assertThat(searchResponse.getHits().getTotalHits().value, greaterThan(0L)); + hlrc.indices().refresh(new RefreshRequest(dest), RequestOptions.DEFAULT); + }, 30, TimeUnit.SECONDS); + + stopDataFrameTransform(config.getId()); + deleteDataFrameTransform(config.getId()); + } + + private void indexMoreDocs(long timestamp, long userId, String index) throws Exception { + BulkRequest bulk = new BulkRequest(index); for (int i = 0; i < 25; i++) { int stars = (i + 20) % 5; long business = (i + 100) % 50; @@ -123,7 +243,7 @@ public void testContinuousDataFrameTransformCrud() throws Exception { StringBuilder sourceBuilder = new StringBuilder(); sourceBuilder.append("{\"user_id\":\"") .append("user_") - .append(user) + .append(userId) .append("\",\"count\":") .append(i) .append(",\"business_id\":\"") @@ -132,23 +252,11 @@ public void testContinuousDataFrameTransformCrud() throws Exception { .append("\",\"stars\":") .append(stars) .append(",\"timestamp\":") - .append(timeStamp) + .append(timestamp) .append("}"); bulk.add(new IndexRequest().source(sourceBuilder.toString(), XContentType.JSON)); } bulk.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); bulkIndexDocs(bulk); - - waitUntilCheckpoint(config.getId(), 2L); - - // Assert that we wrote the new docs - assertThat(getDataFrameTransformStats(config.getId()) - .getTransformsStats() - .get(0) - .getIndexerStats() - .getNumDocuments(), greaterThan(docsIndexed)); - - stopDataFrameTransform(config.getId()); - deleteDataFrameTransform(config.getId()); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java index 4494ee7201044..77c3d9e4eb136 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java @@ -52,6 +52,7 @@ import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.action.UpdateDataFrameTransformAction; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.dataframe.action.TransportDeleteDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsAction; @@ -61,6 +62,7 @@ import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformTaskAction; import org.elasticsearch.xpack.dataframe.action.TransportStopDataFrameTransformAction; +import org.elasticsearch.xpack.dataframe.action.TransportUpdateDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex; @@ -72,6 +74,7 @@ import org.elasticsearch.xpack.dataframe.rest.action.RestPutDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.rest.action.RestStartDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.rest.action.RestStopDataFrameTransformAction; +import org.elasticsearch.xpack.dataframe.rest.action.RestUpdateDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformPersistentTasksExecutor; import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; @@ -124,7 +127,8 @@ public List getRestHandlers(final Settings settings, final RestCont new RestDeleteDataFrameTransformAction(settings, restController), new RestGetDataFrameTransformsAction(settings, restController), new RestGetDataFrameTransformsStatsAction(settings, restController), - new RestPreviewDataFrameTransformAction(settings, restController) + new RestPreviewDataFrameTransformAction(settings, restController), + new RestUpdateDataFrameTransformAction(settings, restController) ); } @@ -145,6 +149,7 @@ public List getRestHandlers(final Settings settings, final RestCont new ActionHandler<>(GetDataFrameTransformsAction.INSTANCE, TransportGetDataFrameTransformsAction.class), new ActionHandler<>(GetDataFrameTransformsStatsAction.INSTANCE, TransportGetDataFrameTransformsStatsAction.class), new ActionHandler<>(PreviewDataFrameTransformAction.INSTANCE, TransportPreviewDataFrameTransformAction.class), + new ActionHandler<>(UpdateDataFrameTransformAction.INSTANCE, TransportUpdateDataFrameTransformAction.class), usageAction, infoAction); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java index 79ee148636cab..092a2070b6b97 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java @@ -82,6 +82,45 @@ public TransportPutDataFrameTransformAction(Settings settings, TransportService this.auditor = auditor; } + static HasPrivilegesRequest buildPrivilegeCheck(DataFrameTransformConfig config, + IndexNameExpressionResolver indexNameExpressionResolver, + ClusterState clusterState, + String username) { + final String destIndex = config.getDestination().getIndex(); + final String[] concreteDest = indexNameExpressionResolver.concreteIndexNames(clusterState, + IndicesOptions.lenientExpandOpen(), + config.getDestination().getIndex()); + List srcPrivileges = new ArrayList<>(2); + srcPrivileges.add("read"); + + List destPrivileges = new ArrayList<>(3); + destPrivileges.add("read"); + destPrivileges.add("index"); + // If the destination index does not exist, we can assume that we may have to create it on start. + // We should check that the creating user has the privileges to create the index. + if (concreteDest.length == 0) { + destPrivileges.add("create_index"); + // We need to read the source indices mapping to deduce the destination mapping + srcPrivileges.add("view_index_metadata"); + } + RoleDescriptor.IndicesPrivileges destIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder() + .indices(destIndex) + .privileges(destPrivileges) + .build(); + + RoleDescriptor.IndicesPrivileges sourceIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder() + .indices(config.getSource().getIndex()) + .privileges(srcPrivileges) + .build(); + + HasPrivilegesRequest privRequest = new HasPrivilegesRequest(); + privRequest.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]); + privRequest.username(username); + privRequest.clusterPrivileges(Strings.EMPTY_ARRAY); + privRequest.indexPrivileges(sourceIndexPrivileges, destIndexPrivileges); + return privRequest; + } + @Override protected String executor() { return ThreadPool.Names.SAME; @@ -128,39 +167,8 @@ protected void masterOperation(Task task, Request request, ClusterState clusterS // Early check to verify that the user can create the destination index and can read from the source if (licenseState.isAuthAllowed() && request.isDeferValidation() == false) { - final String destIndex = config.getDestination().getIndex(); - final String[] concreteDest = indexNameExpressionResolver.concreteIndexNames(clusterState, - IndicesOptions.lenientExpandOpen(), - config.getDestination().getIndex()); final String username = securityContext.getUser().principal(); - List srcPrivileges = new ArrayList<>(2); - srcPrivileges.add("read"); - - List destPrivileges = new ArrayList<>(3); - destPrivileges.add("read"); - destPrivileges.add("index"); - // If the destination index does not exist, we can assume that we may have to create it on start. - // We should check that the creating user has the privileges to create the index. - if (concreteDest.length == 0) { - destPrivileges.add("create_index"); - // We need to read the source indices mapping to deduce the destination mapping - srcPrivileges.add("view_index_metadata"); - } - RoleDescriptor.IndicesPrivileges destIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder() - .indices(destIndex) - .privileges(destPrivileges) - .build(); - - RoleDescriptor.IndicesPrivileges sourceIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder() - .indices(config.getSource().getIndex()) - .privileges(srcPrivileges) - .build(); - - HasPrivilegesRequest privRequest = new HasPrivilegesRequest(); - privRequest.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]); - privRequest.username(username); - privRequest.clusterPrivileges(Strings.EMPTY_ARRAY); - privRequest.indexPrivileges(sourceIndexPrivileges, destIndexPrivileges); + HasPrivilegesRequest privRequest = buildPrivilegeCheck(config, indexNameExpressionResolver, clusterState, username); ActionListener privResponseListener = ActionListener.wrap( r -> handlePrivsResponse(username, request, r, listener), listener::onFailure); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportUpdateDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportUpdateDataFrameTransformAction.java new file mode 100644 index 0000000000000..6f3433a84b540 --- /dev/null +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportUpdateDataFrameTransformAction.java @@ -0,0 +1,285 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.action; + +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.LicenseUtils; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.XPackField; +import org.elasticsearch.xpack.core.XPackPlugin; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; +import org.elasticsearch.xpack.core.dataframe.action.UpdateDataFrameTransformAction; +import org.elasticsearch.xpack.core.dataframe.action.UpdateDataFrameTransformAction.Request; +import org.elasticsearch.xpack.core.dataframe.action.UpdateDataFrameTransformAction.Response; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigUpdate; +import org.elasticsearch.xpack.core.security.SecurityContext; +import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction; +import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; +import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse; +import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges; +import org.elasticsearch.xpack.core.security.support.Exceptions; +import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; +import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; +import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex; +import org.elasticsearch.xpack.dataframe.transforms.SourceDestValidator; +import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot; + +import java.io.IOException; +import java.time.Clock; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.dataframe.action.TransportPutDataFrameTransformAction.buildPrivilegeCheck; + +public class TransportUpdateDataFrameTransformAction extends TransportMasterNodeAction { + + private final XPackLicenseState licenseState; + private final Client client; + private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager; + private final SecurityContext securityContext; + private final DataFrameAuditor auditor; + + @Inject + public TransportUpdateDataFrameTransformAction(Settings settings, TransportService transportService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + ClusterService clusterService, XPackLicenseState licenseState, + DataFrameTransformsConfigManager dataFrameTransformsConfigManager, Client client, + DataFrameAuditor auditor) { + super(UpdateDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters, + Request::new, indexNameExpressionResolver); + this.licenseState = licenseState; + this.client = client; + this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager; + this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ? + new SecurityContext(settings, threadPool.getThreadContext()) : null; + this.auditor = auditor; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected Response read(StreamInput in) throws IOException { + return new Response(in); + } + + @Override + protected void masterOperation(Task task, Request request, ClusterState clusterState, ActionListener listener) { + + if (!licenseState.isDataFrameAllowed()) { + listener.onFailure(LicenseUtils.newComplianceException(XPackField.DATA_FRAME)); + return; + } + + XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); + + // set headers to run data frame transform as calling user + Map filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() + .filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + DataFrameTransformConfigUpdate update = request.getUpdate(); + update.setHeaders(filteredHeaders); + + String transformId = request.getId(); + + // GET transform and attempt to update + // We don't want the update to complete if the config changed between GET and INDEX + dataFrameTransformsConfigManager.getTransformConfigurationForUpdate(request.getId(), ActionListener.wrap( + configAndVersion -> { + final DataFrameTransformConfig config = configAndVersion.v1(); + // If it is a noop don't bother even writing the doc, save the cycles, just return here. + if (update.isNoop(config)) { + listener.onResponse(new Response(config)); + return; + } + DataFrameTransformConfig updatedConfig = update.apply(config); + validateAndUpdateDataFrame(request, clusterState, updatedConfig, configAndVersion.v2(), listener); + }, + listener::onFailure + )); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + private void handlePrivsResponse(String username, + Request request, + DataFrameTransformConfig config, + DataFrameTransformsConfigManager.SeqNoPrimaryTermPair seqNoPrimaryTermPair, + ClusterState clusterState, + HasPrivilegesResponse privilegesResponse, + ActionListener listener) { + if (privilegesResponse.isCompleteMatch()) { + updateDataFrame(request, config, seqNoPrimaryTermPair, clusterState, listener); + } else { + List indices = privilegesResponse.getIndexPrivileges() + .stream() + .map(ResourcePrivileges::getResource) + .collect(Collectors.toList()); + + listener.onFailure(Exceptions.authorizationError( + "Cannot update data frame transform [{}] because user {} lacks all the required permissions for indices: {}", + request.getId(), + username, + indices)); + } + } + + private void validateAndUpdateDataFrame(Request request, + ClusterState clusterState, + DataFrameTransformConfig config, + DataFrameTransformsConfigManager.SeqNoPrimaryTermPair seqNoPrimaryTermPair, + ActionListener listener) { + try { + SourceDestValidator.validate(config, clusterState, indexNameExpressionResolver, request.isDeferValidation()); + } catch (ElasticsearchStatusException ex) { + listener.onFailure(ex); + return; + } + + + // Early check to verify that the user can create the destination index and can read from the source + if (licenseState.isAuthAllowed() && request.isDeferValidation() == false) { + final String username = securityContext.getUser().principal(); + HasPrivilegesRequest privRequest = buildPrivilegeCheck(config, indexNameExpressionResolver, clusterState, username); + ActionListener privResponseListener = ActionListener.wrap( + r -> handlePrivsResponse(username, request, config, seqNoPrimaryTermPair, clusterState, r, listener), + listener::onFailure); + + client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener); + } else { // No security enabled, just create the transform + updateDataFrame(request, config, seqNoPrimaryTermPair, clusterState, listener); + } + } + private void updateDataFrame(Request request, + DataFrameTransformConfig config, + DataFrameTransformsConfigManager.SeqNoPrimaryTermPair seqNoPrimaryTermPair, + ClusterState clusterState, + ActionListener listener) { + + final Pivot pivot = new Pivot(config.getPivotConfig()); + + // <3> Return to the listener + ActionListener putTransformConfigurationListener = ActionListener.wrap( + putTransformConfigurationResult -> { + auditor.info(config.getId(), "updated data frame transform."); + listener.onResponse(new Response(config)); + }, + // If we failed to INDEX AND we created the destination index, the destination index will still be around + // This is a similar behavior to _start + listener::onFailure + ); + + // <2> Update our transform + ActionListener createDestinationListener = ActionListener.wrap( + createDestResponse -> dataFrameTransformsConfigManager.updateTransformConfiguration(config, + seqNoPrimaryTermPair, + putTransformConfigurationListener), + listener::onFailure + ); + + // <1> Create destination index if necessary + ActionListener pivotValidationListener = ActionListener.wrap( + validationResult -> { + String[] dest = indexNameExpressionResolver.concreteIndexNames(clusterState, + IndicesOptions.lenientExpandOpen(), + config.getDestination().getIndex()); + String[] src = indexNameExpressionResolver.concreteIndexNames(clusterState, + IndicesOptions.lenientExpandOpen(), + config.getSource().getIndex()); + // If we are running, we should verify that the destination index exists and create it if it does not + if (PersistentTasksCustomMetaData.getTaskWithId(clusterState, request.getId()) != null + && dest.length == 0 + // Verify we have source indices. The user could defer_validations and if the task is already running + // we allow source indices to disappear. If the source and destination indices do not exist, don't do anything + // the transform will just have to dynamically create the destination index without special mapping. + && src.length > 0) { + createDestination(pivot, config, createDestinationListener); + } else { + createDestinationListener.onResponse(null); + } + }, + validationException -> { + if (validationException instanceof ElasticsearchStatusException) { + listener.onFailure(new ElasticsearchStatusException( + DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION, + ((ElasticsearchStatusException)validationException).status(), + validationException)); + } else { + listener.onFailure(new ElasticsearchStatusException( + DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION, + RestStatus.INTERNAL_SERVER_ERROR, + validationException)); + } + } + ); + + try { + pivot.validateConfig(); + } catch (ElasticsearchStatusException e) { + listener.onFailure(new ElasticsearchStatusException( + DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION, + e.status(), + e)); + return; + } catch (Exception e) { + listener.onFailure(new ElasticsearchStatusException( + DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION, RestStatus.INTERNAL_SERVER_ERROR, e)); + return; + } + + // <0> Validate the pivot if necessary + if (request.isDeferValidation()) { + pivotValidationListener.onResponse(true); + } else { + pivot.validateQuery(client, config.getSource(), pivotValidationListener); + } + } + + private void createDestination(Pivot pivot, DataFrameTransformConfig config, ActionListener listener) { + ActionListener> deduceMappingsListener = ActionListener.wrap( + mappings -> DataframeIndex.createDestinationIndex( + client, + Clock.systemUTC(), + config, + mappings, + ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure)), + deduceTargetMappingsException -> listener.onFailure( + new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS, + deduceTargetMappingsException)) + ); + + pivot.deduceMappings(client, config.getSource(), deduceMappingsListener); + } +} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java index f3aafc2b2b4a4..0f07b3f73fd87 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameInternalIndex.java @@ -16,7 +16,6 @@ import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage; import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; -import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformProgress; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStoredDoc; @@ -240,7 +239,7 @@ private static XContentBuilder addDataFrameTransformsConfigMappings(XContentBuil .endObject() .endObject() .endObject() - .startObject(DataFrameTransformConfig.DESCRIPTION.getPreferredName()) + .startObject(DataFrameField.DESCRIPTION.getPreferredName()) .field(TYPE, TEXT) .endObject(); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java index f729cf025c89a..2c8281eeab223 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java @@ -106,33 +106,59 @@ public void putTransformCheckpoint(DataFrameTransformCheckpoint checkpoint, Acti * @param listener listener to call after request */ public void putTransformConfiguration(DataFrameTransformConfig transformConfig, ActionListener listener) { + putTransformConfiguration(transformConfig, DocWriteRequest.OpType.CREATE, null, listener); + } + + /** + * Update the transform configuration in the internal index. + * + * Essentially the same as {@link DataFrameTransformsConfigManager#putTransformConfiguration(DataFrameTransformConfig, ActionListener)} + * but is an index operation that will fail with a version conflict + * if the current document seqNo and primaryTerm is not the same as the provided version. + * @param transformConfig the @link{DataFrameTransformConfig} + * @param seqNoPrimaryTermPair an object containing the believed seqNo and primaryTerm for the doc. + * Used for optimistic concurrency control + * @param listener listener to call after request + */ + public void updateTransformConfiguration(DataFrameTransformConfig transformConfig, + SeqNoPrimaryTermPair seqNoPrimaryTermPair, + ActionListener listener) { + putTransformConfiguration(transformConfig, DocWriteRequest.OpType.INDEX, seqNoPrimaryTermPair, listener); + } + + private void putTransformConfiguration(DataFrameTransformConfig transformConfig, + DocWriteRequest.OpType optType, + SeqNoPrimaryTermPair seqNoPrimaryTermPair, + ActionListener listener) { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS)); IndexRequest indexRequest = new IndexRequest(DataFrameInternalIndex.INDEX_NAME) - .opType(DocWriteRequest.OpType.CREATE) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .id(DataFrameTransformConfig.documentId(transformConfig.getId())) - .source(source); - + .opType(optType) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .id(DataFrameTransformConfig.documentId(transformConfig.getId())) + .source(source); + if (seqNoPrimaryTermPair != null) { + indexRequest.setIfSeqNo(seqNoPrimaryTermPair.seqNo).setIfPrimaryTerm(seqNoPrimaryTermPair.primaryTerm); + } executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(r -> { listener.onResponse(true); }, e -> { if (e instanceof VersionConflictEngineException) { // the transform already exists listener.onFailure(new ResourceAlreadyExistsException( - DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_TRANSFORM_EXISTS, - transformConfig.getId()))); + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_TRANSFORM_EXISTS, + transformConfig.getId()))); } else { listener.onFailure( - new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_PERSIST_TRANSFORM_CONFIGURATION, e)); + new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_PERSIST_TRANSFORM_CONFIGURATION, e)); } })); } catch (IOException e) { // not expected to happen but for the sake of completeness listener.onFailure(new ElasticsearchParseException( - DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_FAILED_TO_SERIALIZE_TRANSFORM, transformConfig.getId()), - e)); + DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_FAILED_TO_SERIALIZE_TRANSFORM, transformConfig.getId()), + e)); } } @@ -187,6 +213,39 @@ public void getTransformConfiguration(String transformId, ActionListener> configAndVersionListener) { + GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId)); + executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> { + + if (getResponse.isExists() == false) { + configAndVersionListener.onFailure(new ResourceNotFoundException( + DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId))); + return; + } + BytesReference source = getResponse.getSourceAsBytesRef(); + parseTransformLenientlyFromSource(source, transformId, ActionListener.wrap( + config -> configAndVersionListener.onResponse(Tuple.tuple(config, + new SeqNoPrimaryTermPair(getResponse.getSeqNo(), getResponse.getPrimaryTerm()))), + configAndVersionListener::onFailure)); + }, e -> { + if (e.getClass() == IndexNotFoundException.class) { + configAndVersionListener.onFailure(new ResourceNotFoundException( + DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId))); + } else { + configAndVersionListener.onFailure(e); + } + })); + } + /** * Given some expression comma delimited string of id expressions, * this queries our internal index for the transform Ids that match the expression. @@ -420,4 +479,22 @@ private QueryBuilder buildQueryFromTokenizedIds(String[] idTokens, String resour } return QueryBuilders.constantScoreQuery(queryBuilder); } + + public static class SeqNoPrimaryTermPair { + private final long seqNo; + private final long primaryTerm; + + public SeqNoPrimaryTermPair(long seqNo, long primaryTerm) { + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; + } + + public long getSeqNo() { + return seqNo; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestUpdateDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestUpdateDataFrameTransformAction.java new file mode 100644 index 0000000000000..7dea1aa60c39d --- /dev/null +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestUpdateDataFrameTransformAction.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.dataframe.rest.action; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.dataframe.DataFrameField; +import org.elasticsearch.xpack.core.dataframe.action.UpdateDataFrameTransformAction; + +import java.io.IOException; + +public class RestUpdateDataFrameTransformAction extends BaseRestHandler { + + public RestUpdateDataFrameTransformAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.POST, DataFrameField.REST_BASE_PATH_TRANSFORMS_BY_ID + "_update", this); + } + + @Override + public String getName() { + return "data_frame_update_transform_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + String id = restRequest.param(DataFrameField.ID.getPreferredName()); + boolean deferValidation = restRequest.paramAsBoolean(DataFrameField.DEFER_VALIDATION.getPreferredName(), false); + XContentParser parser = restRequest.contentParser(); + UpdateDataFrameTransformAction.Request request = UpdateDataFrameTransformAction.Request.fromXContent(parser, id, deferValidation); + + return channel -> client.execute(UpdateDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java index 4b10fe5fd0c5f..126d2bef45c96 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexer.java @@ -73,7 +73,7 @@ private enum RunState { protected final DataFrameAuditor auditor; - protected final DataFrameTransformConfig transformConfig; + protected volatile DataFrameTransformConfig transformConfig; protected volatile DataFrameTransformProgress progress; private final Map fieldMappings; diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 27f71d915c7dd..14edf8774e06f 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.bulk.BulkAction; @@ -29,6 +30,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.dataframe.DataFrameField; +import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerPosition; @@ -360,6 +362,14 @@ void persistStateToClusterState(DataFrameTransformState state, } synchronized void markAsFailed(String reason, ActionListener listener) { + // If the indexer is `STOPPING` this means that `DataFrameTransformTask#stop` was called previously, but something caused + // the indexer to fail. Since `ClientDataFrameIndexer#doSaveState` will persist the state to the index once the indexer stops, + // it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue. + if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPING) { + logger.info("Attempt to fail transform [" + getTransformId() + "] with reason [" + reason + "] while it was stopping."); + auditor.info(getTransformId(), "Attempted to fail transform with reason [" + reason + "] while in STOPPING state."); + return; + } auditor.error(transform.getId(), reason); // We should not keep retrying. Either the task will be stopped, or started // If it is started again, it is registered again. @@ -603,24 +613,56 @@ protected void onStart(long now, ActionListener listener) { // On each run, we need to get the total number of docs and reset the count of processed docs // Since multiple checkpoints can be executed in the task while it is running on the same node, we need to gather // the progress here, and not in the executor. - if (initialRun()) { - createCheckpoint(ActionListener.wrap(cp -> { - nextCheckpoint = cp; - TransformProgressGatherer.getInitialProgress(this.client, buildFilterQuery(), getConfig(), ActionListener.wrap( - newProgress -> { - logger.trace("[{}] reset the progress from [{}] to [{}]", transformId, progress, newProgress); - progress = newProgress; - super.onStart(now, listener); - }, - failure -> { - progress = null; - logger.warn("Unable to load progress information for task [" + transformId + "]", failure); - super.onStart(now, listener); - } - )); - }, listener::onFailure)); + ActionListener updateConfigListener = ActionListener.wrap( + updateConfigResponse -> { + if (initialRun()) { + createCheckpoint(ActionListener.wrap(cp -> { + nextCheckpoint = cp; + TransformProgressGatherer.getInitialProgress(this.client, buildFilterQuery(), getConfig(), ActionListener.wrap( + newProgress -> { + logger.trace("[{}] reset the progress from [{}] to [{}]", transformId, progress, newProgress); + progress = newProgress; + super.onStart(now, listener); + }, + failure -> { + progress = null; + logger.warn("Unable to load progress information for task [" + transformId + "]", failure); + super.onStart(now, listener); + } + )); + }, listener::onFailure)); + } else { + super.onStart(now, listener); + } + }, + listener::onFailure + ); + + // If we are continuous, we will want to verify we have the latest stored configuration + if (isContinuous()) { + transformsConfigManager.getTransformConfiguration(getJobId(), ActionListener.wrap( + config -> { + transformConfig = config; + logger.debug("[" + getJobId() + "] successfully refreshed data frame transform config from index."); + updateConfigListener.onResponse(null); + }, + failure -> { + String msg = DataFrameMessages.getMessage( + DataFrameMessages.FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION, + getJobId()); + logger.error(msg, failure); + // If the transform config index or the transform config is gone, something serious occurred + // We are in an unknown state and should fail out + if (failure instanceof ResourceNotFoundException) { + updateConfigListener.onFailure(new TransformConfigReloadingException(msg, failure)); + } else { + auditor.warning(getJobId(), msg); + updateConfigListener.onResponse(null); + } + } + )); } else { - super.onStart(now, listener); + updateConfigListener.onResponse(null); } } @@ -893,7 +935,9 @@ public boolean sourceHasChanged() { } private boolean isIrrecoverableFailure(Exception e) { - return e instanceof IndexNotFoundException || e instanceof AggregationResultUtils.AggregationExtractionException; + return e instanceof IndexNotFoundException + || e instanceof AggregationResultUtils.AggregationExtractionException + || e instanceof TransformConfigReloadingException; } synchronized void handleFailure(Exception e) { @@ -927,4 +971,10 @@ private static class BulkIndexingException extends ElasticsearchException { super(msg, args); } } + + private static class TransformConfigReloadingException extends ElasticsearchException { + TransformConfigReloadingException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } + } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.update_data_frame_transform.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.update_data_frame_transform.json new file mode 100644 index 0000000000000..91a044ef5711a --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.update_data_frame_transform.json @@ -0,0 +1,28 @@ +{ + "data_frame.update_data_frame_transform": { + "documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/update-data-frame-transform.html", + "stability": "beta", + "methods": [ "POST" ], + "url": { + "paths": [ "/_data_frame/transforms/{transform_id}/_update" ], + "parts": { + "transform_id": { + "type": "string", + "required": true, + "description": "The id of the transform." + } + }, + "params": { + "defer_validation": { + "type": "boolean", + "required": false, + "description": "If validations should be deferred until data frame transform starts, defaults to false." + } + } + }, + "body": { + "description" : "The update data frame transform definition", + "required": true + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_update.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_update.yml new file mode 100644 index 0000000000000..f764258e5afef --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_update.yml @@ -0,0 +1,307 @@ +setup: + - do: + indices.create: + index: airline-data + body: + mappings: + properties: + time: + type: date + airline: + type: keyword + responsetime: + type: float + event_rate: + type: integer + + - do: + data_frame.put_data_frame_transform: + transform_id: "updating-airline-transform" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-data-by-airline" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + }, + "description": "yaml test transform on airline-data", + "frequency": "60s", + "sync": { + "time": { + "field": "time", + "delay": "90m" + } + } + } +--- +"Test update transform with missing transform": + - do: + catch: /Transform with id \[missing-transform\] could not be found/ + data_frame.update_data_frame_transform: + transform_id: "missing-transform" + body: > + { + "description": "new description" + } +--- +"Test update transform with frequency too low": + - do: + catch: /minimum permitted \[frequency\] is \[1s\]/ + data_frame.update_data_frame_transform: + transform_id: "updating-airline-transform" + body: > + { + "frequency": "999ms" + } +--- +"Test update transform with frequency too high": + - do: + catch: /highest permitted \[frequency\] is \[1h\]/ + data_frame.update_data_frame_transform: + transform_id: "updating-airline-transform" + body: > + { + "frequency": "3600001ms" + } +--- +"Test put transform with invalid source index": + - do: + catch: /Source index \[missing-index\] does not exist/ + data_frame.update_data_frame_transform: + transform_id: "updating-airline-transform" + body: > + { + "source": { "index": "missing-index" } + } + - do: + data_frame.update_data_frame_transform: + transform_id: "updating-airline-transform" + defer_validation: true + body: > + { + "source": { "index": "missing-index" } + } + - match: { id: "updating-airline-transform" } + - match: { source.index.0: "missing-index" } +--- +"Test update transform when it is batch": + - do: + data_frame.put_data_frame_transform: + transform_id: "batch-airline-transform" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-data-by-airline" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - do: + data_frame.update_data_frame_transform: + transform_id: "batch-airline-transform" + body: > + { + "description": "time to update" + } + - match: { description: "time to update" } + + - do: + catch: /Cannot change the current sync configuration of transform \[batch-airline-transform\] from \[null\] to \[time\]/ + data_frame.update_data_frame_transform: + transform_id: "batch-airline-transform" + body: > + { + "sync": { + "time": { + "field": "time", + "delay": "90m" + } + } + } +--- +"Test basic transform crud": + - do: + indices.create: + index: other-airline-data + body: + mappings: + properties: + time: + type: date + airline: + type: keyword + responsetime: + type: float + event_rate: + type: integer + - do: + data_frame.get_data_frame_transform: + transform_id: "updating-airline-transform" + - match: { count: 1 } + - match: { transforms.0.id: "updating-airline-transform" } + - match: { transforms.0.source.index.0: "airline-data" } + - match: { transforms.0.dest.index: "airline-data-by-airline" } + - is_true: transforms.0.source.query.match_all + - is_true: transforms.0.create_time + - is_true: transforms.0.version + - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } + - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } + - match: { transforms.0.description: "yaml test transform on airline-data" } + - match: { transforms.0.sync.time.field: "time" } + - match: { transforms.0.sync.time.delay: "90m" } + - match: { transforms.0.frequency: "60s" } + + - do: + data_frame.update_data_frame_transform: + transform_id: "updating-airline-transform" + body: > + { + "source": { "index": "other-airline-data" }, + "dest": { "index": "new-destination" }, + "description": "updated description", + "sync": { + "time": { "field": "time", "delay": "120m"} + }, + "frequency": "5s" + } + - match: { id: "updating-airline-transform" } + - match: { source.index.0: "other-airline-data" } + - match: { dest.index: "new-destination" } + - is_true: source.query.match_all + - is_true: create_time + - is_true: version + - match: { pivot.group_by.airline.terms.field: "airline" } + - match: { pivot.aggregations.avg_response.avg.field: "responsetime" } + - match: { description: "updated description" } + - match: { sync.time.field: "time" } + - match: { sync.time.delay: "120m" } + - match: { frequency: "5s" } + + - do: + data_frame.get_data_frame_transform: + transform_id: "updating-airline-transform" + - match: { count: 1 } + - match: { transforms.0.id: "updating-airline-transform" } + - match: { transforms.0.source.index.0: "other-airline-data" } + - match: { transforms.0.dest.index: "new-destination" } + - is_true: transforms.0.source.query.match_all + - is_true: transforms.0.create_time + - is_true: transforms.0.version + - match: { transforms.0.pivot.group_by.airline.terms.field: "airline" } + - match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" } + - match: { transforms.0.description: "updated description" } + - match: { transforms.0.sync.time.field: "time" } + - match: { transforms.0.sync.time.delay: "120m" } + - match: { transforms.0.frequency: "5s" } + +--- +"Test transform where dest is included in source": + - do: + catch: /Destination index \[airline-data-by-airline\] is included in source expression \[airline-data/ + data_frame.update_data_frame_transform: + transform_id: "updating-airline-transform" + body: > + { + "source": { + "index": ["airline-data*"] + }, + "dest": { "index": "airline-data-by-airline" } + } + + - do: + data_frame.update_data_frame_transform: + transform_id: "updating-airline-transform" + defer_validation: true + body: > + { + "source": { + "index": ["airline-data*"] + }, + "dest": { "index": "airline-data-by-airline" } + } +--- +"Test alias scenarios": + - do: + indices.create: + index: created-destination-index + - do: + indices.create: + index: second-created-destination-index + - do: + indices.put_alias: + index: airline-data + name: source-index + - do: + indices.put_alias: + index: created-destination-index + name: dest-index + - do: + data_frame.update_data_frame_transform: + transform_id: "updating-airline-transform" + body: > + { + "source": { + "index": "source-index" + }, + "dest": { "index": "dest-index" } + } + - match: { id: "updating-airline-transform" } + + - do: + indices.put_alias: + index: created-destination-index + name: source2-index + + - do: + catch: /Destination index \[created-destination-index\] is included in source expression \[created-destination-index\]/ + data_frame.update_data_frame_transform: + transform_id: "updating-airline-transform" + body: > + { + "source": { + "index": "source2-index" + }, + "dest": { "index": "dest-index" } + } + + - do: + indices.delete_alias: + index: created-destination-index + name: source2-index + + - do: + indices.put_alias: + index: second-created-destination-index + name: dest2-index + - do: + indices.put_alias: + index: created-destination-index + name: dest2-index + - do: + catch: /Destination index \[dest2-index\] should refer to a single index/ + data_frame.update_data_frame_transform: + transform_id: "updating-airline-transform" + body: > + { + "dest": { "index": "dest2-index" } + } +--- +"Test invalid destination index name": + - do: + catch: /dest\.index \[DeStInAtIoN\] must be lowercase/ + data_frame.update_data_frame_transform: + transform_id: "updating-airline-transform" + body: > + { + "dest": { "index": "DeStInAtIoN" } + } + - do: + catch: /Invalid index name \[destination#dest\], must not contain \'#\'/ + data_frame.update_data_frame_transform: + transform_id: "updating-airline-transform" + body: > + { + "dest": { "index": "destination#dest" } + }