From 15302ff8800b9a7c05faf73999ccdeffd3706b49 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Tue, 20 Aug 2019 07:52:10 -0500 Subject: [PATCH] [ML][Data Frame] fixing _start?force=true bug (#45660) * [ML][Data Frame] fixing _start?force=true bug * removing unused import * removing old TODO --- ...portStartDataFrameTransformTaskAction.java | 1 - ...FrameTransformPersistentTasksExecutor.java | 18 +++++----- .../transforms/DataFrameTransformTask.java | 36 ++++++++++--------- 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java index 9587aa1ebbf92..17df98e0c2b33 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java @@ -59,7 +59,6 @@ protected void doExecute(Task task, StartDataFrameTransformTaskAction.Request re protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask, ActionListener listener) { if (transformTask.getTransformId().equals(request.getId())) { - //TODO fix bug as .start where it was failed could result in a null current checkpoint? transformTask.start(null, request.isForce(), listener); } else { listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId() diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 78650c5baace1..dc37e937ea135 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -35,7 +35,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStoredDoc; -import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.dataframe.DataFrame; @@ -120,14 +119,14 @@ static List verifyIndicesPrimaryShardsAreActive(ClusterState clusterStat protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTransform params, PersistentTaskState state) { final String transformId = params.getId(); final DataFrameTransformTask buildTask = (DataFrameTransformTask) task; - final DataFrameTransformState transformPTaskState = (DataFrameTransformState) state; - - // If the transform is failed then the Persistent Task Service will - // try to restart it on a node restart. Exiting here leaves the - // transform in the failed state and it must be force closed. - if (transformPTaskState != null && transformPTaskState.getTaskState() == DataFrameTransformTaskState.FAILED) { - return; - } + // NOTE: DataFrameTransformPersistentTasksExecutor#createTask pulls in the stored task state from the ClusterState when the object + // is created. DataFrameTransformTask#ctor takes into account setting the task as failed if that is passed in with the + // persisted state. + // DataFrameTransformPersistentTasksExecutor#startTask will fail as DataFrameTransformTask#start, when force == false, will return + // a failure indicating that a failed task cannot be started. + // + // We want the rest of the state to be populated in the task when it is loaded on the node so that users can force start it again + // later if they want. final DataFrameTransformTask.ClientDataFrameIndexerBuilder indexerBuilder = new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId) @@ -299,6 +298,7 @@ private void startTask(DataFrameTransformTask buildTask, Long previousCheckpoint, ActionListener listener) { buildTask.initializeIndexer(indexerBuilder); + // DataFrameTransformTask#start will fail if the task state is FAILED buildTask.setNumFailureRetries(numFailureRetries).start(previousCheckpoint, false, listener); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index fab87569fc5e7..641e3a0d1d777 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -235,7 +235,7 @@ public long getInProgressCheckpoint() { } } - public void setTaskStateStopped() { + public synchronized void setTaskStateStopped() { taskState.set(DataFrameTransformTaskState.STOPPED); } @@ -256,8 +256,16 @@ public synchronized void start(Long startingCheckpoint, boolean force, ActionLis return; } if (getIndexer() == null) { - listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later", - getTransformId())); + // If our state is failed AND the indexer is null, the user needs to _stop?force=true so that the indexer gets + // fully initialized. + // If we are NOT failed, then we can assume that `start` was just called early in the process. + String msg = taskState.get() == DataFrameTransformTaskState.FAILED ? + "It failed during the initialization process; force stop to allow reinitialization." : + "Try again later."; + listener.onFailure(new ElasticsearchStatusException("Task for transform [{}] not fully initialized. {}", + RestStatus.CONFLICT, + getTransformId(), + msg)); return; } final IndexerState newState = getIndexer().start(); @@ -409,6 +417,13 @@ void persistStateToClusterState(DataFrameTransformState state, } synchronized void markAsFailed(String reason, ActionListener listener) { + // If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to + // flag the previously triggered indexer as failed. Exit early as we are already flagged as failed. + if (taskState.get() == DataFrameTransformTaskState.FAILED) { + logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason); + listener.onResponse(null); + return; + } // If the indexer is `STOPPING` this means that `DataFrameTransformTask#stop` was called previously, but something caused // the indexer to fail. Since `ClientDataFrameIndexer#doSaveState` will persist the state to the index once the indexer stops, // it is probably best to NOT change the internal state of the task and allow the normal stopping logic to continue. @@ -425,26 +440,13 @@ synchronized void markAsFailed(String reason, ActionListener listener) { listener.onResponse(null); return; } - // If we are already flagged as failed, this probably means that a second trigger started firing while we were attempting to - // flag the previously triggered indexer as failed. Exit early as we are already flagged as failed. - if (taskState.get() == DataFrameTransformTaskState.FAILED) { - logger.warn("[{}] is already failed but encountered new failure; reason [{}] ", getTransformId(), reason); - listener.onResponse(null); - return; - } auditor.error(transform.getId(), reason); // We should not keep retrying. Either the task will be stopped, or started // If it is started again, it is registered again. deregisterSchedulerJob(); - DataFrameTransformState newState = new DataFrameTransformState( - DataFrameTransformTaskState.FAILED, - getIndexer() == null ? initialIndexerState : getIndexer().getState(), - getIndexer() == null ? initialPosition : getIndexer().getPosition(), - currentCheckpoint.get(), - reason, - getIndexer() == null ? null : getIndexer().getProgress()); taskState.set(DataFrameTransformTaskState.FAILED); stateReason.set(reason); + DataFrameTransformState newState = getState(); // Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate // This keeps track of STARTED, FAILED, STOPPED // This is because a FAILED state could occur because we failed to read the config from the internal index, which would imply that