Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public class DataFrameMessages {
public static final String DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM =
"Unable to stop data frame transform [{0}] as it is in a failed state with reason [{1}]." +
" Use force stop to stop the data frame transform.";
public static final String DATA_FRAME_CANNOT_START_FAILED_TRANSFORM =
"Unable to start data frame transform [{0}] as it is in a failed state with failure: [{1}]. " +
"Use force start to restart data frame transform once error is resolved.";

public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]";
public static final String FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION =
"Failed to reload data frame transform configuration for transform [{0}]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.elasticsearch.xpack.core.dataframe.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
Expand Down Expand Up @@ -34,24 +35,40 @@ private StartDataFrameTransformTaskAction() {
public static class Request extends BaseTasksRequest<Request> {

private final String id;
private final boolean force;

public Request(String id) {
public Request(String id, boolean force) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
this.force = force;
}

public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
force = in.readBoolean();
} else {
// The behavior before V_7_4_0 was that this flag did not exist,
// assuming previous checks allowed this task to be started.
force = true;
}
}

public String getId() {
return id;
}

public boolean isForce() {
return force;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeBoolean(force);
}
}

@Override
Expand All @@ -66,7 +83,7 @@ public ActionRequestValidationException validate() {

@Override
public int hashCode() {
return Objects.hash(id);
return Objects.hash(id, force);
}

@Override
Expand All @@ -78,7 +95,7 @@ public boolean equals(Object obj) {
return false;
}
Request other = (Request) obj;
return Objects.equals(id, other.id);
return Objects.equals(id, other.id) && force == other.force;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,8 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
} else {
finishAndSetState();
}
}, e -> {
finishAndSetState();
onFailure(e);
}));
},
this::finishWithFailure));
});
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
return true;
Expand Down Expand Up @@ -250,8 +248,9 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
/**
* Called when a failure occurs in an async job causing the execution to stop.
*
* @param exc
* The exception
* This is called before the internal state changes from the state in which the failure occurred.
*
* @param exc The exception
*/
protected abstract void onFailure(Exception exc);

Expand Down Expand Up @@ -279,12 +278,19 @@ protected void onStop() {

private void finishWithSearchFailure(Exception exc) {
stats.incrementSearchFailures();
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
onFailure(exc);
doSaveState(finishAndSetState(), position.get(), () -> {});
}

private void finishWithIndexingFailure(Exception exc) {
stats.incrementIndexingFailures();
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
onFailure(exc);
doSaveState(finishAndSetState(), position.get(), () -> {});
}

private void finishWithFailure(Exception exc) {
onFailure(exc);
finishAndSetState();
}

private IndexerState finishAndSetState() {
Expand Down Expand Up @@ -390,8 +396,7 @@ private void onSearchResponse(SearchResponse searchResponse) {
ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure);
nextSearch(listener);
} catch (Exception e) {
finishAndSetState();
onFailure(e);
finishWithFailure(e);
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private void assertSingleMessage(String message) {
try {
innerAssertSingleMessage(message);
} catch (Exception e) {
fail(e.getMessage());
fail("message: " + message + " failure: " + e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class StartDataFrameTransformTaskActionRequestTests extends
AbstractWireSerializingTestCase<StartDataFrameTransformTaskAction.Request> {
@Override
protected StartDataFrameTransformTaskAction.Request createTestInstance() {
return new StartDataFrameTransformTaskAction.Request(randomAlphaOfLength(4));
return new StartDataFrameTransformTaskAction.Request(randomAlphaOfLength(4), randomBoolean());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,14 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
@Before
public void setClusterSettings() throws IOException {
// Make sure we never retry on failure to speed up the test
// Set logging level to trace
// see: https://github.com/elastic/elasticsearch/issues/45562
Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings");
addFailureRetrySetting.setJsonEntity(
"{\"persistent\": {\"xpack.data_frame.num_transform_failure_retries\": \"" + 0 + "\"}}");
"{\"transient\": {\"xpack.data_frame.num_transform_failure_retries\": \"" + 0 + "\"," +
"\"logger.org.elasticsearch.action.bulk\": \"info\"," + // reduces bulk failure spam
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," +
"\"logger.org.elasticsearch.xpack.dataframe\": \"trace\"}}");
client().performRequest(addFailureRetrySetting);
}

Expand Down Expand Up @@ -84,7 +89,6 @@ public void testForceStopFailedTransform() throws Exception {
assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45609")
public void testForceStartFailedTransform() throws Exception {
String transformId = "test-force-start-failed-transform";
createReviewsIndex(REVIEWS_INDEX_NAME, 10);
Expand All @@ -100,13 +104,16 @@ public void testForceStartFailedTransform() throws Exception {
// Verify we have failed for the expected reason
assertThat(XContentMapValues.extractValue("reason", fullState), equalTo(failureReason));

final String expectedFailure = "Unable to start data frame transform [test-force-start-failed-transform] " +
"as it is in a failed state with failure: [" + failureReason +
"]. Use force start to restart data frame transform once error is resolved.";
// Verify that we cannot start the transform when the task is in a failed state
ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
equalTo("Unable to start data frame transform [test-force-start-failed-transform] as it is in a failed state with failure: [" +
failureReason +
"]. Use force start to restart data frame transform once error is resolved."));
assertBusy(() -> {
ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
equalTo(expectedFailure));
}, 60, TimeUnit.SECONDS);

// Correct the failure by deleting the destination index
deleteIndex(dataFrameIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ protected void masterOperation(StartDataFrameTransformAction.Request request,
ClientHelper.executeAsyncWithOrigin(client,
ClientHelper.DATA_FRAME_ORIGIN,
StartDataFrameTransformTaskAction.INSTANCE,
new StartDataFrameTransformTaskAction.Request(request.getId()),
new StartDataFrameTransformTaskAction.Request(request.getId(), request.isForce()),
ActionListener.wrap(
r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)),
listener::onFailure));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ protected void doExecute(Task task, StartDataFrameTransformTaskAction.Request re
protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask,
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
if (transformTask.getTransformId().equals(request.getId())) {
transformTask.start(null, listener);
//TODO fix bug as .start where it was failed could result in a null current checkpoint?
transformTask.start(null, request.isForce(), listener);
} else {
listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()
+ "] does not match request's ID [" + request.getId() + "]"));
Expand Down
Loading