Skip to content

Commit 283c012

Browse files
Refresh dest before detected extracted fields
1 parent b197ed8 commit 283c012

File tree

2 files changed

+21
-10
lines changed

2 files changed

+21
-10
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
1616
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
1717
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
18+
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
19+
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
20+
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
1821
import org.elasticsearch.action.support.ContextPreservingActionListener;
1922
import org.elasticsearch.client.node.NodeClient;
2023
import org.elasticsearch.cluster.ClusterState;
@@ -42,6 +45,7 @@
4245
import java.util.function.Supplier;
4346

4447
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
48+
import static org.elasticsearch.xpack.core.ClientHelper.executeWithHeadersAsync;
4549

4650
public class DataFrameAnalyticsManager {
4751

@@ -225,9 +229,6 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
225229
}
226230

227231
private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {
228-
// Ensure we mark reindexing is finished for the case we are recovering a task that had finished reindexing
229-
task.setReindexingFinished();
230-
231232
// Update state to ANALYZING and start process
232233
ActionListener<DataFrameDataExtractorFactory> dataExtractorFactoryListener = ActionListener.wrap(
233234
dataExtractorFactory -> {
@@ -247,10 +248,23 @@ private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfi
247248
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
248249
);
249250

250-
// TODO This could fail with errors. In that case we get stuck with the copied index.
251-
// We could delete the index in case of failure or we could try building the factory before reindexing
252-
// to catch the error early on.
253-
DataFrameDataExtractorFactory.createForDestinationIndex(client, config, dataExtractorFactoryListener);
251+
ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(
252+
refreshResponse -> {
253+
// Ensure we mark reindexing is finished for the case we are recovering a task that had finished reindexing
254+
task.setReindexingFinished();
255+
256+
// TODO This could fail with errors. In that case we get stuck with the copied index.
257+
// We could delete the index in case of failure or we could try building the factory before reindexing
258+
// to catch the error early on.
259+
DataFrameDataExtractorFactory.createForDestinationIndex(client, config, dataExtractorFactoryListener);
260+
},
261+
dataExtractorFactoryListener::onFailure
262+
);
263+
264+
// First we need to refresh the dest index to ensure data is searchable in case the job
265+
// was stopped after reindexing was complete but before the index was refreshed.
266+
executeWithHeadersAsync(config.getHeaders(), ML_ORIGIN, client, RefreshAction.INSTANCE,
267+
new RefreshRequest(config.getDest().getIndex()), refreshListener);
254268
}
255269

256270
public void stop(DataFrameAnalyticsTask task) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,6 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
109109
}
110110
}
111111

112-
// Refresh the dest index to ensure data is searchable
113-
refreshDest(config);
114-
115112
// Fetch existing model state (if any)
116113
BytesReference state = getModelState(config);
117114

0 commit comments

Comments
 (0)