Skip to content

Commit 856d9bf

Browse files
authored
[ML] fixing data frame analysis test when two jobs are started in succession quickly (#53192) (#53332)
A previous change (#53029) is causing analysis jobs to wait for certain indices to be made available. While this it is good for jobs to wait, they could fail early on _start. This change will cause the persistent task to continually retry node assignment when the failure is due to shards not being available. If the shards are not available by the time `timeout` is reached by the predicate, it is treated as a _start failure and the task is canceled. For tasks seeking a new assignment after a node failure, that behavior is unchanged. closes #53188
1 parent 5912895 commit 856d9bf

File tree

3 files changed

+38
-9
lines changed

3 files changed

+38
-9
lines changed

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ClassificationIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,6 @@ public void testDependentVariableIsAliasToNested() throws Exception {
417417
assertEvaluation(ALIAS_TO_NESTED_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
418418
}
419419

420-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53188")
421420
public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exception {
422421
String sourceIndex = "classification_two_jobs_with_same_randomize_seed_source";
423422
String dependentVariable = KEYWORD_FIELD;

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,6 @@ public void testStopAndRestart() throws Exception {
271271
assertMlResultsFieldMappings(destIndex, predictedClassField, "double");
272272
}
273273

274-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53188")
275274
public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exception {
276275
String sourceIndex = "regression_two_jobs_with_same_randomize_seed_source";
277276
indexData(sourceIndex, 100, 0);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
10+
import org.apache.logging.log4j.message.ParameterizedMessage;
1011
import org.elasticsearch.ElasticsearchException;
1112
import org.elasticsearch.ElasticsearchStatusException;
1213
import org.elasticsearch.ResourceAlreadyExistsException;
@@ -96,6 +97,7 @@ public class TransportStartDataFrameAnalyticsAction
9697
extends TransportMasterNodeAction<StartDataFrameAnalyticsAction.Request, AcknowledgedResponse> {
9798

9899
private static final Logger logger = LogManager.getLogger(TransportStartDataFrameAnalyticsAction.class);
100+
private static final String PRIMARY_SHARDS_INACTIVE = "not all primary shards are active";
99101

100102
private final XPackLicenseState licenseState;
101103
private final Client client;
@@ -409,8 +411,26 @@ public void onFailure(Exception e) {
409411

410412
@Override
411413
public void onTimeout(TimeValue timeout) {
412-
listener.onFailure(new ElasticsearchException(
413-
"Starting data frame analytics [" + task.getParams().getId() + "] timed out after [" + timeout + "]"));
414+
logger.error(
415+
() -> new ParameterizedMessage("[{}] timed out when starting task after [{}]. Assignment explanation [{}]",
416+
task.getParams().getId(),
417+
timeout,
418+
predicate.assignmentExplanation));
419+
if (predicate.assignmentExplanation != null) {
420+
cancelAnalyticsStart(task,
421+
new ElasticsearchStatusException(
422+
"Could not start data frame analytics task, timed out after [{}] waiting for task assignment. "
423+
+ "Assignment explanation [{}]",
424+
RestStatus.TOO_MANY_REQUESTS,
425+
timeout,
426+
predicate.assignmentExplanation),
427+
listener);
428+
} else {
429+
listener.onFailure(new ElasticsearchException(
430+
"Starting data frame analytics [{}] timed out after [{}]",
431+
task.getParams().getId(),
432+
timeout));
433+
}
414434
}
415435
});
416436
}
@@ -435,6 +455,7 @@ private StartContext(DataFrameAnalyticsConfig config, List<PhaseProgress> progre
435455
private static class AnalyticsPredicate implements Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> {
436456

437457
private volatile Exception exception;
458+
private volatile String assignmentExplanation;
438459

439460
@Override
440461
public boolean test(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
@@ -449,9 +470,15 @@ public boolean test(PersistentTasksCustomMetaData.PersistentTask<?> persistentTa
449470
return true;
450471
}
451472

452-
if (assignment != null && assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false &&
453-
assignment.isAssigned() == false) {
454-
// Assignment has failed despite passing our "fast fail" validation
473+
if (assignment != null
474+
&& assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false
475+
&& assignment.isAssigned() == false) {
476+
assignmentExplanation = assignment.getExplanation();
477+
// Assignment failed due to primary shard check.
478+
// This is hopefully intermittent and we should allow another assignment attempt.
479+
if (assignmentExplanation.contains(PRIMARY_SHARDS_INACTIVE)) {
480+
return false;
481+
}
455482
exception = new ElasticsearchStatusException("Could not start data frame analytics task, allocation explanation [" +
456483
assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS);
457484
return true;
@@ -582,8 +609,12 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(StartDataFrameAnal
582609
MlStatsIndex.indexPattern(),
583610
AnomalyDetectorsIndex.jobStateIndexPattern());
584611
if (unavailableIndices.size() != 0) {
585-
String reason = "Not opening data frame analytics job [" + id +
586-
"], because not all primary shards are active for the following indices [" + String.join(",", unavailableIndices) + "]";
612+
String reason = "Not opening data frame analytics job ["
613+
+ id
614+
+ "], because "
615+
+ PRIMARY_SHARDS_INACTIVE
616+
+ " for the following indices ["
617+
+ String.join(",", unavailableIndices) + "]";
587618
logger.debug(reason);
588619
return new PersistentTasksCustomMetaData.Assignment(null, reason);
589620
}

0 commit comments

Comments
 (0)