From 3838277123a44a374c6df8707495eb9f66d35cba Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 25 Oct 2021 15:02:43 +0200 Subject: [PATCH 1/2] prevent old beta transforms from starting --- .../core/transform/TransformDeprecations.java | 4 + .../transform/transforms/TransformConfig.java | 2 +- .../integration/TransformOldTransformsIT.java | 155 ++++++++++++++++++ .../TransportValidateTransformAction.java | 17 ++ .../TransformPersistentTasksExecutor.java | 15 ++ 5 files changed, 192 insertions(+), 1 deletion(-) create mode 100644 x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformOldTransformsIT.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformDeprecations.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformDeprecations.java index daae9e28d0b26..b7291e5337146 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformDeprecations.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformDeprecations.java @@ -7,8 +7,12 @@ package org.elasticsearch.xpack.core.transform; +import org.elasticsearch.Version; + public class TransformDeprecations { + public static final Version MIN_TRANSFORM_VERSION = Version.V_7_5_0; + public static final String UPGRADE_TRANSFORM_URL = "https://ela.st/es-8-upgrade-transforms"; // breaking changes base url for the _next_ major release diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java index 0e2c4166b5606..c36b70df6c4b4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java @@ -393,7 +393,7 @@ public List checkForDeprecations(NamedXContentRegistry namedXC List deprecations = new ArrayList<>(); // deprecate beta transforms - if (getVersion() == null || getVersion().before(Version.V_7_5_0)) { + if (getVersion() == null || getVersion().before(TransformDeprecations.MIN_TRANSFORM_VERSION)) { deprecations.add( new DeprecationIssue( Level.CRITICAL, diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformOldTransformsIT.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformOldTransformsIT.java new file mode 100644 index 0000000000000..6db36ef958303 --- /dev/null +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformOldTransformsIT.java @@ -0,0 +1,155 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.integration; + +import org.elasticsearch.Version; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.ValidationException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.transform.TransformDeprecations; +import org.elasticsearch.xpack.core.transform.TransformField; +import org.elasticsearch.xpack.core.transform.action.GetTransformAction; +import org.elasticsearch.xpack.core.transform.action.StartTransformAction; +import org.elasticsearch.xpack.core.transform.action.StopTransformAction; +import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate; +import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; +import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase; +import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class TransformOldTransformsIT extends TransformSingleNodeTestCase { + + private static final String OLD_INDEX = TransformInternalIndexConstants.INDEX_PATTERN + "001"; + + @Override + protected Settings nodeSettings() { + // TODO Change this to run with security enabled + // https://github.com/elastic/elasticsearch/issues/75940 + return Settings.builder().put(super.nodeSettings()).put(XPackSettings.SECURITY_ENABLED.getKey(), false).build(); + } + + /** + * Create an old transform and check that it can not be started, but updated and than started + */ + public void testStopThrowsForDeprecatedTransformConfig() throws Exception { + + // The mapping does not need to actually be the "OLD" mapping, we are testing that the old doc gets deleted, and the new one + // created. + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + builder.startObject(); + builder.field(TransformInternalIndex.DYNAMIC, "false"); + builder.startObject("properties"); + builder.startObject(TransformField.INDEX_DOC_TYPE.getPreferredName()).field("type", "keyword").endObject(); + TransformInternalIndex.addTransformsConfigMappings(builder); + builder.endObject(); + builder.endObject(); + client().admin() + .indices() + .create(new CreateIndexRequest(OLD_INDEX).mapping(builder).origin(ClientHelper.TRANSFORM_ORIGIN)) + .actionGet(); + } + String transformIndex = "transform-index"; + createSourceIndex(transformIndex); + String transformId = "transform-throws-for-old-config"; + Version transformVersion = VersionUtils.randomVersionBetween( + random(), + Version.V_7_2_0, + VersionUtils.getPreviousVersion(TransformDeprecations.MIN_TRANSFORM_VERSION) + ); + String config = "{\"dest\": {\"index\":\"bar\"}," + + " \"source\": {\"index\":\"" + + transformIndex + + "\", \"query\": {\"match_all\":{}}}," + + " \"id\": \"" + + transformId + + "\"," + + " \"doc_type\": \"data_frame_transform_config\"," + + " \"pivot\": {" + + " \"group_by\": {" + + " \"reviewer\": {" + + " \"terms\": {" + + " \"field\": \"user_id\"" + + " } } }," + + " \"aggregations\": {" + + " \"avg_rating\": {" + + " \"avg\": {" + + " \"field\": \"stars\"" + + " } } } }," + + "\"frequency\":\"1s\"," + + "\"version\":\"" + + transformVersion + + "\"" + + "}"; + IndexRequest indexRequest = new IndexRequest(OLD_INDEX).id(TransformConfig.documentId(transformId)) + .source(config, XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse indexResponse = client().index(indexRequest).actionGet(); + assertThat(indexResponse.getResult(), is(DocWriteResponse.Result.CREATED)); + + GetTransformAction.Request getTransformRequest = new GetTransformAction.Request(transformId); + GetTransformAction.Response getTransformResponse = client().execute(GetTransformAction.INSTANCE, getTransformRequest).actionGet(); + assertThat(getTransformResponse.getTransformConfigurations().get(0).getId(), equalTo(transformId)); + assertThat(getTransformResponse.getTransformConfigurations().get(0).getVersion(), equalTo(transformVersion)); + + StartTransformAction.Request startTransformRequest = new StartTransformAction.Request( + transformId, + AcknowledgedRequest.DEFAULT_ACK_TIMEOUT + ); + + ValidationException validationException = expectThrows( + ValidationException.class, + () -> client().execute(StartTransformAction.INSTANCE, startTransformRequest).actionGet() + ); + + assertThat(validationException.getMessage(), containsString("Transform configuration is too old")); + + UpdateTransformAction.Request updateTransformActionRequest = new UpdateTransformAction.Request( + new TransformConfigUpdate(null, null, null, null, "updated", null, null, null), + transformId, + false, + AcknowledgedRequest.DEFAULT_ACK_TIMEOUT + ); + UpdateTransformAction.Response updateTransformActionResponse = client().execute( + UpdateTransformAction.INSTANCE, + updateTransformActionRequest + ).actionGet(); + assertThat(updateTransformActionResponse.getConfig().getId(), equalTo(transformId)); + assertThat(updateTransformActionResponse.getConfig().getDescription(), equalTo("updated")); + + StartTransformAction.Response startTransformActionResponse = client().execute(StartTransformAction.INSTANCE, startTransformRequest) + .actionGet(); + + assertTrue(startTransformActionResponse.isAcknowledged()); + + StopTransformAction.Response stopTransformActionResponse = client().execute( + StopTransformAction.INSTANCE, + new StopTransformAction.Request(transformId, true, false, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false, false) + ).actionGet(); + assertTrue(stopTransformActionResponse.isAcknowledged()); + } + + private void createSourceIndex(String index) { + client().admin().indices().create(new CreateIndexRequest(index)).actionGet(); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java index b0b5fe6935bc4..bc9190c0eed24 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.transform.action; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; @@ -15,6 +16,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.IngestService; @@ -23,6 +25,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.common.validation.SourceDestValidator; +import org.elasticsearch.xpack.core.transform.TransformDeprecations; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction; import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction.Request; @@ -108,6 +111,20 @@ protected void doExecute(Task task, Request request, ActionListener li final TransformConfig config = request.getConfig(); final Function function = FunctionFactory.create(config); + if (config.getVersion() == null || config.getVersion().before(TransformDeprecations.MIN_TRANSFORM_VERSION)) { + listener.onFailure( + new ValidationException().addValidationError( + new ParameterizedMessage( + "Transform configuration is too old [{}], use the upgrade API to fix your transform. " + + "Minimum required version is [{}]", + config.getVersion(), + TransformDeprecations.MIN_TRANSFORM_VERSION + ).getFormattedMessage() + ) + ); + return; + } + // <5> Final listener ActionListener> deduceMappingsListener = ActionListener.wrap( deducedMappings -> { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index 4776d4c43312f..4b053139015a8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; @@ -32,6 +33,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; +import org.elasticsearch.xpack.core.transform.TransformDeprecations; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.TransformMetadata; @@ -270,6 +272,19 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa // <3> Validate the transform, assigning it to the indexer, and get the previous stats (if they exist) ActionListener getTransformConfigListener = ActionListener.wrap(config -> { + + // fail if a transform is too old, this can only happen on a rolling upgrade + if (config.getVersion() == null || config.getVersion().before(TransformDeprecations.MIN_TRANSFORM_VERSION)) { + String transformTooOldError = new ParameterizedMessage( + "Transform configuration is too old [{}], use the upgrade API to fix your transform. " + + "Minimum required version is [{}]", + config.getVersion(), + TransformDeprecations.MIN_TRANSFORM_VERSION + ).getFormattedMessage(); + auditor.error(transformId, transformTooOldError); + markAsFailed(buildTask, transformTooOldError); + } + ValidationException validationException = config.validate(null); if (validationException == null) { indexerBuilder.setTransformConfig(config); From a1c8aa298c0e6ef263a30b2fab00ea63e8fd96f3 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 25 Oct 2021 16:23:53 +0200 Subject: [PATCH 2/2] Update x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java Co-authored-by: Benjamin Trent --- .../transform/transforms/TransformPersistentTasksExecutor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index 4b053139015a8..b0d0b1d914b1d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -283,6 +283,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa ).getFormattedMessage(); auditor.error(transformId, transformTooOldError); markAsFailed(buildTask, transformTooOldError); + return; } ValidationException validationException = config.validate(null);