Skip to content

Commit 858dbfc

Browse files
authored
[ML][Data Frame] treat bulk index failures as an indexing failure (#44351) (#44427)
* [ML][Data Frame] treat bulk index failures as an indexing failure * removing redundant public modifier * changing to an ElasticsearchException * fixing redundant public modifier
1 parent a2b4687 commit 858dbfc

File tree

3 files changed

+72
-9
lines changed

3 files changed

+72
-9
lines changed

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@
66

77
package org.elasticsearch.xpack.dataframe.integration;
88

9+
import org.apache.http.entity.ContentType;
10+
import org.apache.http.entity.StringEntity;
911
import org.elasticsearch.client.Request;
12+
import org.elasticsearch.client.dataframe.transforms.DataFrameTransformTaskState;
13+
import org.elasticsearch.common.Strings;
14+
import org.elasticsearch.common.xcontent.XContentBuilder;
1015
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1116
import org.junit.Before;
1217

@@ -17,7 +22,9 @@
1722
import java.util.List;
1823
import java.util.Map;
1924
import java.util.Set;
25+
import java.util.concurrent.TimeUnit;
2026

27+
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
2128
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
2229
import static org.hamcrest.Matchers.containsString;
2330
import static org.hamcrest.Matchers.equalTo;
@@ -723,6 +730,44 @@ public void testPivotWithWeightedAvgAgg() throws Exception {
723730
assertEquals(4.47169811, actual.doubleValue(), 0.000001);
724731
}
725732

733+
public void testBulkIndexFailuresCauseTaskToFail() throws Exception {
734+
String transformId = "bulk-failure-pivot";
735+
String dataFrameIndex = "pivot-failure-index";
736+
createPivotReviewsTransform(transformId, dataFrameIndex, null, null, null);
737+
738+
try (XContentBuilder builder = jsonBuilder()) {
739+
builder.startObject();
740+
{
741+
builder.startObject("mappings")
742+
.startObject("properties")
743+
.startObject("reviewer")
744+
// This type should cause mapping coercion type conflict on bulk index
745+
.field("type", "long")
746+
.endObject()
747+
.endObject()
748+
.endObject();
749+
}
750+
builder.endObject();
751+
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
752+
Request req = new Request("PUT", dataFrameIndex);
753+
req.setEntity(entity);
754+
client().performRequest(req);
755+
}
756+
startDataframeTransform(transformId, false, null);
757+
758+
assertBusy(() -> assertEquals(DataFrameTransformTaskState.FAILED.value(), getDataFrameTaskState(transformId)),
759+
120,
760+
TimeUnit.SECONDS);
761+
762+
Map<?, ?> state = getDataFrameState(transformId);
763+
assertThat((String) XContentMapValues.extractValue("state.reason", state),
764+
containsString("task encountered more than 10 failures; latest failure: Bulk index experienced failures."));
765+
766+
// Force stop the transform as bulk indexing caused it to go into a failed state
767+
stopDataFrameTransform(transformId, true);
768+
deleteIndex(dataFrameIndex);
769+
}
770+
726771
private void assertOnePivotValue(String query, double expected) throws IOException {
727772
Map<String, Object> searchResult = getAsMap(query);
728773

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,8 @@ protected void createPivotReviewsTransform(String transformId, String dataFrameI
211211
+ " \"avg_rating\": {"
212212
+ " \"avg\": {"
213213
+ " \"field\": \"stars\""
214-
+ " } } } }"
214+
+ " } } } },"
215+
+ "\"frequency\":\"1s\""
215216
+ "}";
216217

217218
createDataframeTransformRequest.setJsonEntity(config);

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -591,21 +591,31 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
591591
BulkAction.INSTANCE,
592592
request,
593593
ActionListener.wrap(bulkResponse -> {
594-
if (bulkResponse.hasFailures() && auditBulkFailures) {
594+
if (bulkResponse.hasFailures()) {
595595
int failureCount = 0;
596596
for(BulkItemResponse item : bulkResponse.getItems()) {
597597
if (item.isFailed()) {
598598
failureCount++;
599599
}
600+
// TODO gather information on irrecoverable failures and update isIrrecoverableFailure
600601
}
601-
auditor.warning(transformId,
602-
"Experienced at least [" +
603-
failureCount +
604-
"] bulk index failures. See the logs of the node running the transform for details. " +
605-
bulkResponse.buildFailureMessage());
606-
auditBulkFailures = false;
602+
if (auditBulkFailures) {
603+
auditor.warning(transformId,
604+
"Experienced at least [" +
605+
failureCount +
606+
"] bulk index failures. See the logs of the node running the transform for details. " +
607+
bulkResponse.buildFailureMessage());
608+
auditBulkFailures = false;
609+
}
610+
// This calls AsyncTwoPhaseIndexer#finishWithIndexingFailure
611+
// It increments the indexing failure, and then calls the `onFailure` logic
612+
nextPhase.onFailure(
613+
new BulkIndexingException("Bulk index experienced failures. " +
614+
"See the logs of the node running the transform for details."));
615+
} else {
616+
auditBulkFailures = true;
617+
nextPhase.onResponse(bulkResponse);
607618
}
608-
nextPhase.onResponse(bulkResponse);
609619
}, nextPhase::onFailure));
610620
}
611621

@@ -817,4 +827,11 @@ protected void failIndexer(String failureMessage) {
817827
}, e -> {}));
818828
}
819829
}
830+
831+
// Considered a recoverable indexing failure
832+
private static class BulkIndexingException extends ElasticsearchException {
833+
BulkIndexingException(String msg, Object... args) {
834+
super(msg, args);
835+
}
836+
}
820837
}

0 commit comments

Comments
 (0)