diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java index f9bbf890fe6ce..efe57f44e89dc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java @@ -284,6 +284,8 @@ private IndexerState finishAndSetState() { AtomicBoolean callOnStop = new AtomicBoolean(false); AtomicBoolean callOnAbort = new AtomicBoolean(false); IndexerState updatedState = state.updateAndGet(prev -> { + callOnAbort.set(false); + callOnStop.set(false); switch (prev) { case INDEXING: // ready for another job diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java index df2d09a875d19..d814714ab6653 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java @@ -26,7 +26,10 @@ import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Request; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointingInfo; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats; +import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState; +import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService; import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask; @@ -136,7 +139,21 @@ private void collectStatsForTransformsWithoutTasks(Request request, ActionListener> searchStatsListener = ActionListener.wrap( stats -> { List allStateAndStats = response.getTransformsStateAndStats(); - allStateAndStats.addAll(stats); + // If the persistent task does NOT exist, it is STOPPED + // There is a potential race condition where the saved document does not actually have a STOPPED state + // as the task is cancelled before we persist state. + stats.forEach(stat -> + allStateAndStats.add(new DataFrameTransformStateAndStats( + stat.getId(), + new DataFrameTransformState(DataFrameTransformTaskState.STOPPED, + IndexerState.STOPPED, + stat.getTransformState().getPosition(), + stat.getTransformState().getCheckpoint(), + stat.getTransformState().getReason(), + stat.getTransformState().getProgress()), + stat.getTransformStats(), + stat.getCheckpointingInfo())) + ); transformsWithoutTasks.removeAll( stats.stream().map(DataFrameTransformStateAndStats::getId).collect(Collectors.toSet())); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 575cd4c15bd67..25fe3d52e7205 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -236,6 +236,10 @@ public synchronized void stop() { return; } + if (getIndexer().getState() == IndexerState.STOPPED) { + return; + } + IndexerState state = getIndexer().stop(); if (state == IndexerState.STOPPED) { getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop()); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml index 2686c57fd06ac..d156344b5ad6f 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml @@ -90,9 +90,6 @@ teardown: - match: { airline-data-by-airline-start-stop.mappings: {} } --- "Test start/stop/start transform": - - skip: - reason: "https://github.com/elastic/elasticsearch/issues/42650" - version: "all" - do: data_frame.start_data_frame_transform: transform_id: "airline-transform-start-stop"