Skip to content

Commit 9abf537

Browse files
[7.x][ML] Improve DF analytics audits and logging (#53179) (#53218)
Adds audits for when the job starts reindexing, loading data, analyzing, writing results. Also adds some info logging. Backport of #53179
1 parent 7ddbda4 commit 9abf537

File tree

8 files changed

+72
-2
lines changed

8 files changed

+72
-2
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,14 @@ public final class Messages {
6565
public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE = "Estimated memory usage for this analytics to be [{0}]";
6666
public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX = "Creating destination index [{0}]";
6767
public static final String DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX = "Using existing destination index [{0}]";
68-
public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING = "Finished reindexing to destination index [{0}]";
68+
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_REINDEXING = "Started reindexing to destination index [{0}]";
69+
public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING =
70+
"Finished reindexing to destination index [{0}], took [{1}]";
6971
public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS = "Finished analysis";
7072
public static final String DATA_FRAME_ANALYTICS_AUDIT_RESTORING_STATE = "Restoring from previous model state";
73+
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_LOADING_DATA = "Started loading data";
74+
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_ANALYZING = "Started analyzing";
75+
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_WRITING_RESULTS = "Started writing results";
7176

7277
public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs {1}";
7378
public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed";

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ public void testSingleNumericFeatureAndMixedTrainingAndNonTrainingRows() throws
120120
"Starting analytics on node",
121121
"Started analytics",
122122
expectedDestIndexAuditMessage(),
123+
"Started reindexing to destination index [" + destIndex + "]",
123124
"Finished reindexing to destination index [" + destIndex + "]",
125+
"Started loading data",
126+
"Started analyzing",
127+
"Started writing results",
124128
"Finished analysis");
125129
assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
126130
}
@@ -161,7 +165,11 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Excepti
161165
"Starting analytics on node",
162166
"Started analytics",
163167
expectedDestIndexAuditMessage(),
168+
"Started reindexing to destination index [" + destIndex + "]",
164169
"Finished reindexing to destination index [" + destIndex + "]",
170+
"Started loading data",
171+
"Started analyzing",
172+
"Started writing results",
165173
"Finished analysis");
166174
assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
167175
}
@@ -224,7 +232,11 @@ public <T> void testWithOnlyTrainingRowsAndTrainingPercentIsFifty(String jobId,
224232
"Starting analytics on node",
225233
"Started analytics",
226234
expectedDestIndexAuditMessage(),
235+
"Started reindexing to destination index [" + destIndex + "]",
227236
"Finished reindexing to destination index [" + destIndex + "]",
237+
"Started loading data",
238+
"Started analyzing",
239+
"Started writing results",
228240
"Finished analysis");
229241
assertEvaluation(dependentVariable, dependentVariableValues, "ml." + predictedClassField);
230242
}

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ protected static void assertThatAuditMessagesMatch(String configId, String... ex
233233
// Since calls to write the AbstractAuditor are sent and forgot (async) we could have returned from the start,
234234
// finished the job (as this is a very short analytics job), all without the audit being fully written.
235235
assertBusy(() -> assertTrue(indexExists(NotificationsIndex.NOTIFICATIONS_INDEX)));
236+
236237
@SuppressWarnings("unchecked")
237238
Matcher<String>[] itemMatchers = Arrays.stream(expectedAuditMessagePrefixes).map(Matchers::startsWith).toArray(Matcher[]::new);
238239
assertBusy(() -> {
@@ -252,6 +253,7 @@ private static List<String> fetchAllAuditMessages(String dataFrameAnalyticsId) {
252253
.setIndices(NotificationsIndex.NOTIFICATIONS_INDEX)
253254
.addSort("timestamp", SortOrder.ASC)
254255
.setQuery(QueryBuilders.termQuery("job_id", dataFrameAnalyticsId))
256+
.setSize(100)
255257
.request();
256258
SearchResponse searchResponse = client().execute(SearchAction.INSTANCE, searchRequest).actionGet();
257259

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,11 @@ public void testSingleNumericFeatureAndMixedTrainingAndNonTrainingRows() throws
105105
"Starting analytics on node",
106106
"Started analytics",
107107
"Creating destination index [" + destIndex + "]",
108+
"Started reindexing to destination index [" + destIndex + "]",
108109
"Finished reindexing to destination index [" + destIndex + "]",
110+
"Started loading data",
111+
"Started analyzing",
112+
"Started writing results",
109113
"Finished analysis");
110114
}
111115

@@ -144,7 +148,11 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Excepti
144148
"Starting analytics on node",
145149
"Started analytics",
146150
"Creating destination index [" + destIndex + "]",
151+
"Started reindexing to destination index [" + destIndex + "]",
147152
"Finished reindexing to destination index [" + destIndex + "]",
153+
"Started loading data",
154+
"Started analyzing",
155+
"Started writing results",
148156
"Finished analysis");
149157
}
150158

@@ -198,7 +206,11 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception
198206
"Starting analytics on node",
199207
"Started analytics",
200208
"Creating destination index [" + destIndex + "]",
209+
"Started reindexing to destination index [" + destIndex + "]",
201210
"Finished reindexing to destination index [" + destIndex + "]",
211+
"Started loading data",
212+
"Started analyzing",
213+
"Started writing results",
202214
"Finished analysis");
203215
}
204216

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RunDataFrameAnalyticsIT.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,11 @@ public void testOutlierDetectionWithFewDocuments() throws Exception {
126126
"Starting analytics on node",
127127
"Started analytics",
128128
"Creating destination index [test-outlier-detection-with-few-docs-results]",
129+
"Started reindexing to destination index [test-outlier-detection-with-few-docs-results]",
129130
"Finished reindexing to destination index [test-outlier-detection-with-few-docs-results]",
131+
"Started loading data",
132+
"Started analyzing",
133+
"Started writing results",
130134
"Finished analysis");
131135
}
132136

@@ -181,7 +185,11 @@ public void testOutlierDetectionWithEnoughDocumentsToScroll() throws Exception {
181185
"Starting analytics on node",
182186
"Started analytics",
183187
"Creating destination index [test-outlier-detection-with-enough-docs-to-scroll-results]",
188+
"Started reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]",
184189
"Finished reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]",
190+
"Started loading data",
191+
"Started analyzing",
192+
"Started writing results",
185193
"Finished analysis");
186194
}
187195

@@ -262,7 +270,11 @@ public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Ex
262270
"Starting analytics on node",
263271
"Started analytics",
264272
"Creating destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]",
273+
"Started reindexing to destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]",
265274
"Finished reindexing to destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]",
275+
"Started loading data",
276+
"Started analyzing",
277+
"Started writing results",
266278
"Finished analysis");
267279
}
268280

@@ -387,7 +399,11 @@ public void testOutlierDetectionWithMultipleSourceIndices() throws Exception {
387399
"Starting analytics on node",
388400
"Started analytics",
389401
"Creating destination index [test-outlier-detection-with-multiple-source-indices-results]",
402+
"Started reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]",
390403
"Finished reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]",
404+
"Started loading data",
405+
"Started analyzing",
406+
"Started writing results",
391407
"Finished analysis");
392408
}
393409

@@ -445,7 +461,11 @@ public void testOutlierDetectionWithPreExistingDestIndex() throws Exception {
445461
"Starting analytics on node",
446462
"Started analytics",
447463
"Using existing destination index [test-outlier-detection-with-pre-existing-dest-index-results]",
464+
"Started reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]",
448465
"Finished reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]",
466+
"Started loading data",
467+
"Started analyzing",
468+
"Started writing results",
449469
"Finished analysis");
450470
}
451471

@@ -699,7 +719,11 @@ public void testOutlierDetectionWithCustomParams() throws Exception {
699719
"Starting analytics on node",
700720
"Started analytics",
701721
"Creating destination index [test-outlier-detection-with-custom-params-results]",
722+
"Started reindexing to destination index [test-outlier-detection-with-custom-params-results]",
702723
"Finished reindexing to destination index [test-outlier-detection-with-custom-params-results]",
724+
"Started loading data",
725+
"Started analyzing",
726+
"Started writing results",
703727
"Finished analysis");
704728
}
705729
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,8 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
213213
task.setReindexingFinished();
214214
auditor.info(
215215
config.getId(),
216-
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex()));
216+
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex(),
217+
reindexResponse.getTook()));
217218
startAnalytics(task, config);
218219
},
219220
error -> task.setFailed(ExceptionsHelper.unwrapCause(error).getMessage())
@@ -233,9 +234,12 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
233234
final ThreadContext threadContext = client.threadPool().getThreadContext();
234235
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
235236
try (ThreadContext.StoredContext ignore = threadContext.stashWithOrigin(ML_ORIGIN)) {
237+
LOGGER.info("[{}] Started reindexing", config.getId());
236238
Task reindexTask = client.executeLocally(ReindexAction.INSTANCE, reindexRequest,
237239
new ContextPreservingActionListener<>(supplier, reindexCompletedListener));
238240
task.setReindexingTaskId(reindexTask.getId());
241+
auditor.info(config.getId(),
242+
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_REINDEXING, config.getDest().getIndex()));
239243
}
240244
},
241245
reindexCompletedListener::onFailure

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ private BytesReference getModelState(DataFrameAnalyticsConfig config) {
147147
}
148148

149149
private void processData(DataFrameAnalyticsTask task, ProcessContext processContext, BytesReference state) {
150+
LOGGER.info("[{}] Started loading data", processContext.config.getId());
151+
auditor.info(processContext.config.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_LOADING_DATA));
152+
150153
DataFrameAnalyticsConfig config = processContext.config;
151154
DataFrameDataExtractor dataExtractor = processContext.dataExtractor.get();
152155
AnalyticsProcess<AnalyticsResult> process = processContext.process.get();
@@ -159,6 +162,9 @@ private void processData(DataFrameAnalyticsTask task, ProcessContext processCont
159162

160163
restoreState(task, config, state, process);
161164

165+
LOGGER.info("[{}] Started analyzing", processContext.config.getId());
166+
auditor.info(processContext.config.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_ANALYZING));
167+
162168
LOGGER.info("[{}] Waiting for result processor to complete", config.getId());
163169
resultProcessor.awaitForCompletion();
164170
processContext.setFailureReason(resultProcessor.getFailure());

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
2727
import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinition;
2828
import org.elasticsearch.xpack.core.ml.inference.TrainedModelInput;
29+
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
2930
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
3031
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
3132
import org.elasticsearch.xpack.core.security.user.XPackUser;
@@ -120,6 +121,10 @@ public void process(AnalyticsProcess<AnalyticsResult> process) {
120121
AnalyticsResult result = iterator.next();
121122
processResult(result, resultsJoiner);
122123
if (result.getRowResults() != null) {
124+
if (processedRows == 0) {
125+
LOGGER.info("[{}] Started writing results", analytics.getId());
126+
auditor.info(analytics.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_WRITING_RESULTS));
127+
}
123128
processedRows++;
124129
updateResultsProgress(processedRows >= totalRows ? 100 : (int) (processedRows * 100.0 / totalRows));
125130
}

0 commit comments

Comments
 (0)