Skip to content

Commit f64b184

Browse files
author
Hendrik Muhs
authored
[Transform] prevent old beta transforms from starting (#79712)
Disallow old beta transforms (< 7.5) from starting in >8.0. Transform can still be read and updated/upgraded.
1 parent 35cfbea commit f64b184

File tree

5 files changed

+193
-1
lines changed

5 files changed

+193
-1
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformDeprecations.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,12 @@
77

88
package org.elasticsearch.xpack.core.transform;
99

10+
import org.elasticsearch.Version;
11+
1012
public class TransformDeprecations {
1113

14+
public static final Version MIN_TRANSFORM_VERSION = Version.V_7_5_0;
15+
1216
public static final String UPGRADE_TRANSFORM_URL = "https://ela.st/es-8-upgrade-transforms";
1317

1418
// breaking changes base url for the _next_ major release

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ public List<DeprecationIssue> checkForDeprecations(NamedXContentRegistry namedXC
393393
List<DeprecationIssue> deprecations = new ArrayList<>();
394394

395395
// deprecate beta transforms
396-
if (getVersion() == null || getVersion().before(Version.V_7_5_0)) {
396+
if (getVersion() == null || getVersion().before(TransformDeprecations.MIN_TRANSFORM_VERSION)) {
397397
deprecations.add(
398398
new DeprecationIssue(
399399
Level.CRITICAL,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.transform.integration;
9+
10+
import org.elasticsearch.Version;
11+
import org.elasticsearch.action.DocWriteResponse;
12+
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
13+
import org.elasticsearch.action.index.IndexRequest;
14+
import org.elasticsearch.action.index.IndexResponse;
15+
import org.elasticsearch.action.support.WriteRequest;
16+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
17+
import org.elasticsearch.common.ValidationException;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.test.VersionUtils;
20+
import org.elasticsearch.xcontent.XContentBuilder;
21+
import org.elasticsearch.xcontent.XContentFactory;
22+
import org.elasticsearch.xcontent.XContentType;
23+
import org.elasticsearch.xpack.core.ClientHelper;
24+
import org.elasticsearch.xpack.core.XPackSettings;
25+
import org.elasticsearch.xpack.core.transform.TransformDeprecations;
26+
import org.elasticsearch.xpack.core.transform.TransformField;
27+
import org.elasticsearch.xpack.core.transform.action.GetTransformAction;
28+
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
29+
import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
30+
import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction;
31+
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
32+
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
33+
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
34+
import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase;
35+
import org.elasticsearch.xpack.transform.persistence.TransformInternalIndex;
36+
37+
import static org.hamcrest.Matchers.containsString;
38+
import static org.hamcrest.Matchers.equalTo;
39+
import static org.hamcrest.Matchers.is;
40+
41+
public class TransformOldTransformsIT extends TransformSingleNodeTestCase {
42+
43+
private static final String OLD_INDEX = TransformInternalIndexConstants.INDEX_PATTERN + "001";
44+
45+
@Override
46+
protected Settings nodeSettings() {
47+
// TODO Change this to run with security enabled
48+
// https://github.com/elastic/elasticsearch/issues/75940
49+
return Settings.builder().put(super.nodeSettings()).put(XPackSettings.SECURITY_ENABLED.getKey(), false).build();
50+
}
51+
52+
/**
53+
* Create an old transform and check that it can not be started, but updated and than started
54+
*/
55+
public void testStopThrowsForDeprecatedTransformConfig() throws Exception {
56+
57+
// The mapping does not need to actually be the "OLD" mapping, we are testing that the old doc gets deleted, and the new one
58+
// created.
59+
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
60+
builder.startObject();
61+
builder.field(TransformInternalIndex.DYNAMIC, "false");
62+
builder.startObject("properties");
63+
builder.startObject(TransformField.INDEX_DOC_TYPE.getPreferredName()).field("type", "keyword").endObject();
64+
TransformInternalIndex.addTransformsConfigMappings(builder);
65+
builder.endObject();
66+
builder.endObject();
67+
client().admin()
68+
.indices()
69+
.create(new CreateIndexRequest(OLD_INDEX).mapping(builder).origin(ClientHelper.TRANSFORM_ORIGIN))
70+
.actionGet();
71+
}
72+
String transformIndex = "transform-index";
73+
createSourceIndex(transformIndex);
74+
String transformId = "transform-throws-for-old-config";
75+
Version transformVersion = VersionUtils.randomVersionBetween(
76+
random(),
77+
Version.V_7_2_0,
78+
VersionUtils.getPreviousVersion(TransformDeprecations.MIN_TRANSFORM_VERSION)
79+
);
80+
String config = "{\"dest\": {\"index\":\"bar\"},"
81+
+ " \"source\": {\"index\":\""
82+
+ transformIndex
83+
+ "\", \"query\": {\"match_all\":{}}},"
84+
+ " \"id\": \""
85+
+ transformId
86+
+ "\","
87+
+ " \"doc_type\": \"data_frame_transform_config\","
88+
+ " \"pivot\": {"
89+
+ " \"group_by\": {"
90+
+ " \"reviewer\": {"
91+
+ " \"terms\": {"
92+
+ " \"field\": \"user_id\""
93+
+ " } } },"
94+
+ " \"aggregations\": {"
95+
+ " \"avg_rating\": {"
96+
+ " \"avg\": {"
97+
+ " \"field\": \"stars\""
98+
+ " } } } },"
99+
+ "\"frequency\":\"1s\","
100+
+ "\"version\":\""
101+
+ transformVersion
102+
+ "\""
103+
+ "}";
104+
IndexRequest indexRequest = new IndexRequest(OLD_INDEX).id(TransformConfig.documentId(transformId))
105+
.source(config, XContentType.JSON)
106+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
107+
IndexResponse indexResponse = client().index(indexRequest).actionGet();
108+
assertThat(indexResponse.getResult(), is(DocWriteResponse.Result.CREATED));
109+
110+
GetTransformAction.Request getTransformRequest = new GetTransformAction.Request(transformId);
111+
GetTransformAction.Response getTransformResponse = client().execute(GetTransformAction.INSTANCE, getTransformRequest).actionGet();
112+
assertThat(getTransformResponse.getTransformConfigurations().get(0).getId(), equalTo(transformId));
113+
assertThat(getTransformResponse.getTransformConfigurations().get(0).getVersion(), equalTo(transformVersion));
114+
115+
StartTransformAction.Request startTransformRequest = new StartTransformAction.Request(
116+
transformId,
117+
AcknowledgedRequest.DEFAULT_ACK_TIMEOUT
118+
);
119+
120+
ValidationException validationException = expectThrows(
121+
ValidationException.class,
122+
() -> client().execute(StartTransformAction.INSTANCE, startTransformRequest).actionGet()
123+
);
124+
125+
assertThat(validationException.getMessage(), containsString("Transform configuration is too old"));
126+
127+
UpdateTransformAction.Request updateTransformActionRequest = new UpdateTransformAction.Request(
128+
new TransformConfigUpdate(null, null, null, null, "updated", null, null, null),
129+
transformId,
130+
false,
131+
AcknowledgedRequest.DEFAULT_ACK_TIMEOUT
132+
);
133+
UpdateTransformAction.Response updateTransformActionResponse = client().execute(
134+
UpdateTransformAction.INSTANCE,
135+
updateTransformActionRequest
136+
).actionGet();
137+
assertThat(updateTransformActionResponse.getConfig().getId(), equalTo(transformId));
138+
assertThat(updateTransformActionResponse.getConfig().getDescription(), equalTo("updated"));
139+
140+
StartTransformAction.Response startTransformActionResponse = client().execute(StartTransformAction.INSTANCE, startTransformRequest)
141+
.actionGet();
142+
143+
assertTrue(startTransformActionResponse.isAcknowledged());
144+
145+
StopTransformAction.Response stopTransformActionResponse = client().execute(
146+
StopTransformAction.INSTANCE,
147+
new StopTransformAction.Request(transformId, true, false, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT, false, false)
148+
).actionGet();
149+
assertTrue(stopTransformActionResponse.isAcknowledged());
150+
}
151+
152+
private void createSourceIndex(String index) {
153+
client().admin().indices().create(new CreateIndexRequest(index)).actionGet();
154+
}
155+
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.transform.action;
99

10+
import org.apache.logging.log4j.message.ParameterizedMessage;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.support.ActionFilters;
1213
import org.elasticsearch.action.support.HandledTransportAction;
@@ -15,6 +16,7 @@
1516
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
1718
import org.elasticsearch.cluster.service.ClusterService;
19+
import org.elasticsearch.common.ValidationException;
1820
import org.elasticsearch.common.inject.Inject;
1921
import org.elasticsearch.common.settings.Settings;
2022
import org.elasticsearch.ingest.IngestService;
@@ -23,6 +25,7 @@
2325
import org.elasticsearch.tasks.Task;
2426
import org.elasticsearch.transport.TransportService;
2527
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator;
28+
import org.elasticsearch.xpack.core.transform.TransformDeprecations;
2629
import org.elasticsearch.xpack.core.transform.TransformMessages;
2730
import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
2831
import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction.Request;
@@ -108,6 +111,20 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
108111
final TransformConfig config = request.getConfig();
109112
final Function function = FunctionFactory.create(config);
110113

114+
if (config.getVersion() == null || config.getVersion().before(TransformDeprecations.MIN_TRANSFORM_VERSION)) {
115+
listener.onFailure(
116+
new ValidationException().addValidationError(
117+
new ParameterizedMessage(
118+
"Transform configuration is too old [{}], use the upgrade API to fix your transform. "
119+
+ "Minimum required version is [{}]",
120+
config.getVersion(),
121+
TransformDeprecations.MIN_TRANSFORM_VERSION
122+
).getFormattedMessage()
123+
)
124+
);
125+
return;
126+
}
127+
111128
// <5> Final listener
112129
ActionListener<Map<String, String>> deduceMappingsListener = ActionListener.wrap(
113130
deducedMappings -> {

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
12+
import org.apache.logging.log4j.message.ParameterizedMessage;
1213
import org.apache.lucene.util.SetOnce;
1314
import org.elasticsearch.ExceptionsHelper;
1415
import org.elasticsearch.ResourceNotFoundException;
@@ -32,6 +33,7 @@
3233
import org.elasticsearch.tasks.TaskId;
3334
import org.elasticsearch.threadpool.ThreadPool;
3435
import org.elasticsearch.xpack.core.indexing.IndexerState;
36+
import org.elasticsearch.xpack.core.transform.TransformDeprecations;
3537
import org.elasticsearch.xpack.core.transform.TransformField;
3638
import org.elasticsearch.xpack.core.transform.TransformMessages;
3739
import org.elasticsearch.xpack.core.transform.TransformMetadata;
@@ -270,6 +272,20 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa
270272

271273
// <3> Validate the transform, assigning it to the indexer, and get the previous stats (if they exist)
272274
ActionListener<TransformConfig> getTransformConfigListener = ActionListener.wrap(config -> {
275+
276+
// fail if a transform is too old, this can only happen on a rolling upgrade
277+
if (config.getVersion() == null || config.getVersion().before(TransformDeprecations.MIN_TRANSFORM_VERSION)) {
278+
String transformTooOldError = new ParameterizedMessage(
279+
"Transform configuration is too old [{}], use the upgrade API to fix your transform. "
280+
+ "Minimum required version is [{}]",
281+
config.getVersion(),
282+
TransformDeprecations.MIN_TRANSFORM_VERSION
283+
).getFormattedMessage();
284+
auditor.error(transformId, transformTooOldError);
285+
markAsFailed(buildTask, transformTooOldError);
286+
return;
287+
}
288+
273289
ValidationException validationException = config.validate(null);
274290
if (validationException == null) {
275291
indexerBuilder.setTransformConfig(config);

0 commit comments

Comments
 (0)