-
Notifications
You must be signed in to change notification settings - Fork 25.6k
[ML][Data frame] fixing failure state transitions and race condition #45627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML][Data frame] fixing failure state transitions and race condition #45627
Conversation
|
Pinging @elastic/ml-core |
| } else { | ||
| // The behavior before V_7_4_0 was that this flag did not exist, | ||
| // assuming previous checks allowed this task to be started. | ||
| force = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same behavior as previously we only did force checks against the stored cluster state.
| equalTo("Unable to start data frame transform [test-force-start-failed-transform] as it is in a failed state with failure: [" + | ||
| failureReason + | ||
| "]. Use force start to restart data frame transform once error is resolved.")); | ||
| assertBusy(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertBusy is because we may still only read the version of the ClusterState where the task state is STARTED and return a different error than the one we are expecting. This ensures that we will eventually see the clusterstate update and get the failure message we want.
| ActionListener<StartDataFrameTransformTaskAction.Response> listener) { | ||
| if (transformTask.getTransformId().equals(request.getId())) { | ||
| transformTask.start(null, listener); | ||
| //TODO fix bug as .start where it was failed could result in a null current checkpoint? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed this potential bug while working through this, I will do investigation in another PR.
| * @param listener Started listener | ||
| */ | ||
| public synchronized void start(Long startingCheckpoint, ActionListener<Response> listener) { | ||
| public synchronized void start(Long startingCheckpoint, boolean force, ActionListener<Response> listener) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opted for moving the force check (if we end up getting past our earlier checks against the cluster state) INTO these synchronized methods. This gives us some ordering guarantees that make these state transitions easier to reason about.
| // We just don't want it to be failed if it is failed | ||
| // Either we are running, and the STATE is already started or failed | ||
| // doSaveState should transfer the state to STOPPED when it needs to. | ||
| taskState.set(DataFrameTransformTaskState.STARTED); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since force could be true we should just make the task no longer failed so that the stopping logic can take the correct actions.
|
|
||
| @Override | ||
| protected void onStart(long now, ActionListener<Boolean> listener) { | ||
| if (transformTask.taskState.get() == DataFrameTransformTaskState.FAILED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of these failure state checks on the method callbacks are to cover for the situation where another trigger sneaks in after we fail the first one. We want that second trigger to fail ASAP to prevent undesired state interactions.
|
|
||
| // If all the remaining tasks are flagged as failed, do not wait for another ClusterState update. | ||
| // Return to the caller as soon as possible | ||
| return persistentTasksCustomMetaData.tasks().stream().allMatch(p -> exceptions.containsKey(p.getId())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not having this check causes periodic timeouts when I was running locally. If the clusterstate is not updated for 30s, this predicate times out without this check because it never sees that all the tasks have been flagged as failed.
davidkyle
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
The error messages are much better
...ain/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java
Outdated
Show resolved
Hide resolved
| r -> { | ||
| // for auto stop shutdown the task | ||
| if (state.getTaskState().equals(DataFrameTransformTaskState.STOPPED)) { | ||
| onStop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we not need this?? I know all it does it log & audit a message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davidkyle onStop is called in the Async indexer when transitioning from STOPPING -> STOPPED and we call it directly if stop transitions the indexer directly to STOPPED. This resulted in there always being two log entries for stopping which confused me at first (thinking somebody called stop twice).
| if (getIndexer() != null && getIndexer().getState() == IndexerState.STOPPING) { | ||
| logger.info("Attempt to fail transform [" + getTransformId() + "] with reason [" + reason + "] while it was stopping."); | ||
| auditor.info(getTransformId(), "Attempted to fail transform with reason [" + reason + "] while in STOPPING state."); | ||
| listener.onResponse(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. Good spot.
|
run elasticsearch-ci/1 |
…/dataframe/action/TransportStopDataFrameTransformAction.java Co-Authored-By: David Kyle <[email protected]>
|
Waiting to open the backport of this until after this has had a chance to bump around in CI. |
…lastic#45627) There is a small window for a race condition while we are flagging a task as failed. Here are the steps where the race condition occurs: 1. A failure occurs 2. Before `AsyncTwoPhaseIndexer` calls the `onFailure` handler it does the following: a. `finishAndSetState()` which sets the IndexerState to STARTED b. `doSaveState(...)` which attempts to save the current state of the indexer 3. Another trigger is fired BEFORE `onFailure` can fire, but AFTER `finishAndSetState()` occurs. The trick here is that we will eventually set the indexer to failed, but possibly not before another trigger had the opportunity to fire. This could obviously cause some weird state interactions. To combat this, I have put in some predicates to verify the state before taking actions. This is so if state is indeed marked failed, the "second trigger" stops ASAP. Additionally, I move the task state checks INTO the `start` and `stop` methods, which will now require a `force` parameter. `start`, `stop`, `trigger` and `markAsFailed` are all `synchronized`. This should gives us some guarantees that one will not switch states out from underneath another. I also flag the task as `failed` BEFORE we successfully write it to cluster state, this is to allow us to make the task fail more quickly. But, this does add the behavior where the task is "failed" but the cluster state does not indicate as much. Adding the checks in `start` and `stop` will handle this "real state vs cluster state" race condition. This has always been a problem for `_stop` as it is not a master node action and doesn’t always have the latest cluster state. closes elastic#45609 Relates to elastic#45562
…45627) (#45656) * [ML][Data frame] fixing failure state transitions and race condition (#45627) There is a small window for a race condition while we are flagging a task as failed. Here are the steps where the race condition occurs: 1. A failure occurs 2. Before `AsyncTwoPhaseIndexer` calls the `onFailure` handler it does the following: a. `finishAndSetState()` which sets the IndexerState to STARTED b. `doSaveState(...)` which attempts to save the current state of the indexer 3. Another trigger is fired BEFORE `onFailure` can fire, but AFTER `finishAndSetState()` occurs. The trick here is that we will eventually set the indexer to failed, but possibly not before another trigger had the opportunity to fire. This could obviously cause some weird state interactions. To combat this, I have put in some predicates to verify the state before taking actions. This is so if state is indeed marked failed, the "second trigger" stops ASAP. Additionally, I move the task state checks INTO the `start` and `stop` methods, which will now require a `force` parameter. `start`, `stop`, `trigger` and `markAsFailed` are all `synchronized`. This should gives us some guarantees that one will not switch states out from underneath another. I also flag the task as `failed` BEFORE we successfully write it to cluster state, this is to allow us to make the task fail more quickly. But, this does add the behavior where the task is "failed" but the cluster state does not indicate as much. Adding the checks in `start` and `stop` will handle this "real state vs cluster state" race condition. This has always been a problem for `_stop` as it is not a master node action and doesn’t always have the latest cluster state. closes #45609 Relates to #45562 * [ML][Data Frame] moves failure state transition for MT safety (#45676) * [ML][Data Frame] moves failure state transition for MT safety * removing unused imports
There is a small window for a race condition while we are flagging a task as failed.
Here are the steps where the race condition occurs:
AsyncTwoPhaseIndexercalls theonFailurehandler it does the following:a.
finishAndSetState()which sets the IndexerState to STARTEDb.
doSaveState(...)which attempts to save the current state of the indexeronFailurecan fire, but AFTERfinishAndSetState()occurs.The trick here is that we will eventually set the indexer to failed, but possibly not before another trigger had the opportunity to fire. This could obviously cause some weird state interactions. To combat this, I have put in some predicates to verify the state before taking actions. This is so if state is indeed marked failed, the "second trigger" stops ASAP.
Additionally, I move the task state checks INTO the
startandstopmethods, which will now require aforceparameter.start,stop,triggerandmarkAsFailedare allsynchronized. This should gives us some guarantees that one will not switch states out from underneath another.I also flag the task as
failedBEFORE we successfully write it to cluster state, this is to allow us to make the task fail more quickly. But, this does add the behavior where the task is "failed" but the cluster state does not indicate as much. Adding the checks instartandstopwill handle this "real state vs cluster state" race condition. This has always been a problem for_stopas it is not a master node action and doesn’t always have the latest cluster state.closes #45609
Relates to #45562