Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public class DataFrameMessages {
public static final String DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM =
"Unable to stop data frame transform [{0}] as it is in a failed state with reason [{1}]." +
" Use force stop to stop the data frame transform.";
public static final String DATA_FRAME_CANNOT_START_FAILED_TRANSFORM =
"Unable to start data frame transform [{0}] as it is in a failed state with failure: [{1}]. " +
"Use force start to restart data frame transform once error is resolved.";

public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]";
public static final String FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION =
"Failed to reload data frame transform configuration for transform [{0}]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.elasticsearch.xpack.core.dataframe.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
Expand Down Expand Up @@ -34,24 +35,40 @@ private StartDataFrameTransformTaskAction() {
public static class Request extends BaseTasksRequest<Request> {

private final String id;
private final boolean force;

public Request(String id) {
public Request(String id, boolean force) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
this.force = force;
}

public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
force = in.readBoolean();
} else {
// The behavior before V_7_4_0 was that this flag did not exist,
// assuming previous checks allowed this task to be started.
force = true;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same behavior as previously we only did force checks against the stored cluster state.

}
}

public String getId() {
return id;
}

public boolean isForce() {
return force;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(force);
}
}

@Override
Expand All @@ -66,7 +83,7 @@ public ActionRequestValidationException validate() {

@Override
public int hashCode() {
return Objects.hash(id);
return Objects.hash(id, force);
}

@Override
Expand All @@ -78,7 +95,7 @@ public boolean equals(Object obj) {
return false;
}
Request other = (Request) obj;
return Objects.equals(id, other.id);
return Objects.equals(id, other.id) && force == other.force;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private void assertSingleMessage(String message) {
try {
innerAssertSingleMessage(message);
} catch (Exception e) {
fail(e.getMessage());
fail("message: " + message + " failure: " + e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class StartDataFrameTransformTaskActionRequestTests extends
AbstractWireSerializingTestCase<StartDataFrameTransformTaskAction.Request> {
@Override
protected StartDataFrameTransformTaskAction.Request createTestInstance() {
return new StartDataFrameTransformTaskAction.Request(randomAlphaOfLength(4));
return new StartDataFrameTransformTaskAction.Request(randomAlphaOfLength(4), randomBoolean());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
@Before
public void setClusterSettings() throws IOException {
// Make sure we never retry on failure to speed up the test
// Set logging level to trace
// see: https://github.com/elastic/elasticsearch/issues/45562
Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings");
addFailureRetrySetting.setJsonEntity(
"{\"persistent\": {\"xpack.data_frame.num_transform_failure_retries\": \"" + 0 + "\"}}");
"{\"transient\": {\"xpack.data_frame.num_transform_failure_retries\": \"" + 0 + "\"," +
"\"logger.org.elasticsearch.action.bulk\": \"info\"," + // reduces bulk failure spam
"\"logger.org.elasticsearch.xpack.dataframe\": \"trace\"}}");
client().performRequest(addFailureRetrySetting);
}

Expand Down Expand Up @@ -84,7 +88,6 @@ public void testForceStopFailedTransform() throws Exception {
assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45609")
public void testForceStartFailedTransform() throws Exception {
String transformId = "test-force-start-failed-transform";
createReviewsIndex(REVIEWS_INDEX_NAME, 10);
Expand All @@ -100,13 +103,16 @@ public void testForceStartFailedTransform() throws Exception {
// Verify we have failed for the expected reason
assertThat(XContentMapValues.extractValue("reason", fullState), equalTo(failureReason));

final String expectedFailure = "Unable to start data frame transform [test-force-start-failed-transform] " +
"as it is in a failed state with failure: [" + failureReason +
"]. Use force start to restart data frame transform once error is resolved.";
// Verify that we cannot start the transform when the task is in a failed state
ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
equalTo("Unable to start data frame transform [test-force-start-failed-transform] as it is in a failed state with failure: [" +
failureReason +
"]. Use force start to restart data frame transform once error is resolved."));
assertBusy(() -> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertBusy is because we may still only read the version of the ClusterState where the task state is STARTED and return a different error than the one we are expecting. This ensures that we will eventually see the clusterstate update and get the failure message we want.

ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
equalTo(expectedFailure));
}, 60, TimeUnit.SECONDS);

// Correct the failure by deleting the destination index
deleteIndex(dataFrameIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ protected void masterOperation(Task ignoredTask, StartDataFrameTransformAction.R
ClientHelper.executeAsyncWithOrigin(client,
ClientHelper.DATA_FRAME_ORIGIN,
StartDataFrameTransformTaskAction.INSTANCE,
new StartDataFrameTransformTaskAction.Request(request.getId()),
new StartDataFrameTransformTaskAction.Request(request.getId(), request.isForce()),
ActionListener.wrap(
r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)),
listener::onFailure));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ protected void doExecute(Task task, StartDataFrameTransformTaskAction.Request re
protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
if (transformTask.getTransformId().equals(request.getId())) {
transformTask.start(null, listener);
//TODO fix bug as .start where it was failed could result in a null current checkpoint?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed this potential bug while working through this, I will do investigation in another PR.

transformTask.start(null, request.isForce(), listener);
} else {
listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()
+ "] does not match request's ID [" + request.getId() + "]"));
Expand Down
Loading