From b8e864c1f2edf32a4a12496daa7cec708f0132dc Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 18 Dec 2019 15:50:21 +0000 Subject: [PATCH] [ML] Refresh state index before completing data frame analytics job In order to ensure any persisted model state is searchable by the moment the job reports itself as `stopped`, we need to refresh the state index before completing. This should fix the occasional failures we see in #50168 and #50313 where the model state appears missing. Closes #50168 Closes #50313 --- .../dataframe/process/AnalyticsProcessManager.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index ce981ad17a98a..30ecb71f4c058 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshAction; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -159,6 +160,7 @@ private void processData(DataFrameAnalyticsTask task, ProcessContext processCont processContext.setFailureReason(resultProcessor.getFailure()); refreshDest(config); + refreshStateIndex(config.getId()); LOGGER.info("[{}] Result processor has completed", config.getId()); } catch (Exception e) { if (task.isStopping()) { @@ -288,6 +290,17 @@ private void refreshDest(DataFrameAnalyticsConfig config) { () -> client.execute(RefreshAction.INSTANCE, new RefreshRequest(config.getDest().getIndex())).actionGet()); } + private void refreshStateIndex(String jobId) { + String indexName = AnomalyDetectorsIndex.jobStateIndexPattern(); + LOGGER.debug("[{}] Refresh index {}", jobId, indexName); + + RefreshRequest refreshRequest = new RefreshRequest(indexName); + refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashWithOrigin(ML_ORIGIN)) { + client.admin().indices().refresh(refreshRequest).actionGet(); + } + } + private void closeProcess(DataFrameAnalyticsTask task) { String configId = task.getParams().getId(); LOGGER.info("[{}] Closing process", configId);