Skip to content

Commit 88641a0

Browse files
authored
[ML][Data frame] fixing failure state transitions and race condition (#45627) (#45656)
* [ML][Data frame] fixing failure state transitions and race condition (#45627) There is a small window for a race condition while we are flagging a task as failed. Here are the steps where the race condition occurs: 1. A failure occurs 2. Before `AsyncTwoPhaseIndexer` calls the `onFailure` handler it does the following: a. `finishAndSetState()` which sets the IndexerState to STARTED b. `doSaveState(...)` which attempts to save the current state of the indexer 3. Another trigger is fired BEFORE `onFailure` can fire, but AFTER `finishAndSetState()` occurs. The trick here is that we will eventually set the indexer to failed, but possibly not before another trigger had the opportunity to fire. This could obviously cause some weird state interactions. To combat this, I have put in some predicates to verify the state before taking actions. This is so if state is indeed marked failed, the "second trigger" stops ASAP. Additionally, I move the task state checks INTO the `start` and `stop` methods, which will now require a `force` parameter. `start`, `stop`, `trigger` and `markAsFailed` are all `synchronized`. This should gives us some guarantees that one will not switch states out from underneath another. I also flag the task as `failed` BEFORE we successfully write it to cluster state, this is to allow us to make the task fail more quickly. But, this does add the behavior where the task is "failed" but the cluster state does not indicate as much. Adding the checks in `start` and `stop` will handle this "real state vs cluster state" race condition. This has always been a problem for `_stop` as it is not a master node action and doesn’t always have the latest cluster state. closes #45609 Relates to #45562 * [ML][Data Frame] moves failure state transition for MT safety (#45676) * [ML][Data Frame] moves failure state transition for MT safety * removing unused imports
1 parent 0413855 commit 88641a0

File tree

12 files changed

+230
-98
lines changed

12 files changed

+230
-98
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ public class DataFrameMessages {
3434
public static final String DATA_FRAME_CANNOT_STOP_FAILED_TRANSFORM =
3535
"Unable to stop data frame transform [{0}] as it is in a failed state with reason [{1}]." +
3636
" Use force stop to stop the data frame transform.";
37+
public static final String DATA_FRAME_CANNOT_START_FAILED_TRANSFORM =
38+
"Unable to start data frame transform [{0}] as it is in a failed state with failure: [{1}]. " +
39+
"Use force start to restart data frame transform once error is resolved.";
40+
3741
public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]";
3842
public static final String FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION =
3943
"Failed to reload data frame transform configuration for transform [{0}]";

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
package org.elasticsearch.xpack.core.dataframe.action;
88

9+
import org.elasticsearch.Version;
910
import org.elasticsearch.action.ActionRequestValidationException;
1011
import org.elasticsearch.action.ActionType;
1112
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
@@ -34,24 +35,40 @@ private StartDataFrameTransformTaskAction() {
3435
public static class Request extends BaseTasksRequest<Request> {
3536

3637
private final String id;
38+
private final boolean force;
3739

38-
public Request(String id) {
40+
public Request(String id, boolean force) {
3941
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
42+
this.force = force;
4043
}
4144

4245
public Request(StreamInput in) throws IOException {
4346
super(in);
4447
id = in.readString();
48+
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
49+
force = in.readBoolean();
50+
} else {
51+
// The behavior before V_7_4_0 was that this flag did not exist,
52+
// assuming previous checks allowed this task to be started.
53+
force = true;
54+
}
4555
}
4656

4757
public String getId() {
4858
return id;
4959
}
5060

61+
public boolean isForce() {
62+
return force;
63+
}
64+
5165
@Override
5266
public void writeTo(StreamOutput out) throws IOException {
5367
super.writeTo(out);
5468
out.writeString(id);
69+
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
70+
out.writeBoolean(force);
71+
}
5572
}
5673

5774
@Override
@@ -66,7 +83,7 @@ public ActionRequestValidationException validate() {
6683

6784
@Override
6885
public int hashCode() {
69-
return Objects.hash(id);
86+
return Objects.hash(id, force);
7087
}
7188

7289
@Override
@@ -78,7 +95,7 @@ public boolean equals(Object obj) {
7895
return false;
7996
}
8097
Request other = (Request) obj;
81-
return Objects.equals(id, other.id);
98+
return Objects.equals(id, other.id) && force == other.force;
8299
}
83100
}
84101

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,8 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
160160
} else {
161161
finishAndSetState();
162162
}
163-
}, e -> {
164-
finishAndSetState();
165-
onFailure(e);
166-
}));
163+
},
164+
this::finishWithFailure));
167165
});
168166
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
169167
return true;
@@ -250,8 +248,9 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
250248
/**
251249
* Called when a failure occurs in an async job causing the execution to stop.
252250
*
253-
* @param exc
254-
* The exception
251+
* This is called before the internal state changes from the state in which the failure occurred.
252+
*
253+
* @param exc The exception
255254
*/
256255
protected abstract void onFailure(Exception exc);
257256

@@ -279,12 +278,19 @@ protected void onStop() {
279278

280279
private void finishWithSearchFailure(Exception exc) {
281280
stats.incrementSearchFailures();
282-
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
281+
onFailure(exc);
282+
doSaveState(finishAndSetState(), position.get(), () -> {});
283283
}
284284

285285
private void finishWithIndexingFailure(Exception exc) {
286286
stats.incrementIndexingFailures();
287-
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
287+
onFailure(exc);
288+
doSaveState(finishAndSetState(), position.get(), () -> {});
289+
}
290+
291+
private void finishWithFailure(Exception exc) {
292+
onFailure(exc);
293+
finishAndSetState();
288294
}
289295

290296
private IndexerState finishAndSetState() {
@@ -390,8 +396,7 @@ private void onSearchResponse(SearchResponse searchResponse) {
390396
ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure);
391397
nextSearch(listener);
392398
} catch (Exception e) {
393-
finishAndSetState();
394-
onFailure(e);
399+
finishWithFailure(e);
395400
}
396401
}
397402
} catch (Exception e) {

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessagesTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private void assertSingleMessage(String message) {
5151
try {
5252
innerAssertSingleMessage(message);
5353
} catch (Exception e) {
54-
fail(e.getMessage());
54+
fail("message: " + message + " failure: " + e.getMessage());
5555
}
5656
}
5757

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskActionRequestTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public class StartDataFrameTransformTaskActionRequestTests extends
1313
AbstractWireSerializingTestCase<StartDataFrameTransformTaskAction.Request> {
1414
@Override
1515
protected StartDataFrameTransformTaskAction.Request createTestInstance() {
16-
return new StartDataFrameTransformTaskAction.Request(randomAlphaOfLength(4));
16+
return new StartDataFrameTransformTaskAction.Request(randomAlphaOfLength(4), randomBoolean());
1717
}
1818

1919
@Override

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTaskFailedStateIT.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,14 @@ public class DataFrameTaskFailedStateIT extends DataFrameRestTestCase {
3737
@Before
3838
public void setClusterSettings() throws IOException {
3939
// Make sure we never retry on failure to speed up the test
40+
// Set logging level to trace
41+
// see: https://github.com/elastic/elasticsearch/issues/45562
4042
Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings");
4143
addFailureRetrySetting.setJsonEntity(
42-
"{\"persistent\": {\"xpack.data_frame.num_transform_failure_retries\": \"" + 0 + "\"}}");
44+
"{\"transient\": {\"xpack.data_frame.num_transform_failure_retries\": \"" + 0 + "\"," +
45+
"\"logger.org.elasticsearch.action.bulk\": \"info\"," + // reduces bulk failure spam
46+
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," +
47+
"\"logger.org.elasticsearch.xpack.dataframe\": \"trace\"}}");
4348
client().performRequest(addFailureRetrySetting);
4449
}
4550

@@ -84,7 +89,6 @@ public void testForceStopFailedTransform() throws Exception {
8489
assertThat(XContentMapValues.extractValue("reason", fullState), is(nullValue()));
8590
}
8691

87-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/45609")
8892
public void testForceStartFailedTransform() throws Exception {
8993
String transformId = "test-force-start-failed-transform";
9094
createReviewsIndex(REVIEWS_INDEX_NAME, 10);
@@ -100,13 +104,16 @@ public void testForceStartFailedTransform() throws Exception {
100104
// Verify we have failed for the expected reason
101105
assertThat(XContentMapValues.extractValue("reason", fullState), equalTo(failureReason));
102106

107+
final String expectedFailure = "Unable to start data frame transform [test-force-start-failed-transform] " +
108+
"as it is in a failed state with failure: [" + failureReason +
109+
"]. Use force start to restart data frame transform once error is resolved.";
103110
// Verify that we cannot start the transform when the task is in a failed state
104-
ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false));
105-
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
106-
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
107-
equalTo("Unable to start data frame transform [test-force-start-failed-transform] as it is in a failed state with failure: [" +
108-
failureReason +
109-
"]. Use force start to restart data frame transform once error is resolved."));
111+
assertBusy(() -> {
112+
ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId, false));
113+
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
114+
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
115+
equalTo(expectedFailure));
116+
}, 60, TimeUnit.SECONDS);
110117

111118
// Correct the failure by deleting the destination index
112119
deleteIndex(dataFrameIndex);

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ protected void masterOperation(StartDataFrameTransformAction.Request request,
160160
ClientHelper.executeAsyncWithOrigin(client,
161161
ClientHelper.DATA_FRAME_ORIGIN,
162162
StartDataFrameTransformTaskAction.INSTANCE,
163-
new StartDataFrameTransformTaskAction.Request(request.getId()),
163+
new StartDataFrameTransformTaskAction.Request(request.getId(), request.isForce()),
164164
ActionListener.wrap(
165165
r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)),
166166
listener::onFailure));

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ protected void doExecute(Task task, StartDataFrameTransformTaskAction.Request re
5959
protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask,
6060
ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
6161
if (transformTask.getTransformId().equals(request.getId())) {
62-
transformTask.start(null, listener);
62+
//TODO fix bug as .start where it was failed could result in a null current checkpoint?
63+
transformTask.start(null, request.isForce(), listener);
6364
} else {
6465
listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()
6566
+ "] does not match request's ID [" + request.getId() + "]"));

0 commit comments

Comments
 (0)