Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class TransformMessages {
public static final String LOG_TRANSFORM_PIVOT_LOW_PAGE_SIZE_FAILURE =
"Insufficient memory for search after repeated page size reductions to [{0}], unable to continue pivot, "
+ "please simplify job or increase heap size on data nodes.";
public static final String LOG_TRANSFORM_PIVOT_SCRIPT_ERROR =
"Failed to execute script with error: [{0}], stack trace: {1}";

public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS =
"Failed to parse transform checkpoints for [{0}]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,28 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.matchesRegex;

public class TransformTaskFailedStateIT extends TransformRestTestCase {

private final List<String> failureTransforms = new ArrayList<>();

@Before
public void setClusterSettings() throws IOException {
// Make sure we never retry on failure to speed up the test
// Set logging level to trace
// see: https://github.com/elastic/elasticsearch/issues/45562
Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings");
addFailureRetrySetting.setJsonEntity(
"{\"transient\": {\"xpack.transform.num_transform_failure_retries\": \"" + 0 + "\"," +
"\"logger.org.elasticsearch.action.bulk\": \"info\"," + // reduces bulk failure spam
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," +
"\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}");
addFailureRetrySetting
.setJsonEntity(
"{\"transient\": {\"xpack.transform.num_transform_failure_retries\": \""
+ 0
+ "\","
+ "\"logger.org.elasticsearch.action.bulk\": \"info\","
+ // reduces bulk failure spam
"\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\","
+ "\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}"
);
client().performRequest(addFailureRetrySetting);
}

Expand All @@ -66,18 +73,22 @@ public void testForceStopFailedTransform() throws Exception {
startDataframeTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Map<?, ?> fullState = getDataFrameState(transformId);
final String failureReason = "task encountered more than 0 failures; latest failure: " +
"Bulk index experienced failures. See the logs of the node running the transform for details.";
final String failureReason = "task encountered more than 0 failures; latest failure: "
+ ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details.";
// Verify we have failed for the expected reason
assertThat(XContentMapValues.extractValue("reason", fullState), equalTo(failureReason));
assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason));

// verify that we cannot stop a failed transform
ResponseException ex = expectThrows(ResponseException.class, () -> stopTransform(transformId, false));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
equalTo("Unable to stop transform [test-force-stop-failed-transform] as it is in a failed state with reason [" +
failureReason +
"]. Use force stop to stop the transform."));
assertThat(
(String) XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
matchesRegex(
"Unable to stop transform \\[test-force-stop-failed-transform\\] as it is in a failed state with reason \\["
+ failureReason
+ "\\]. Use force stop to stop the transform."
)
);

// Verify that we can force stop a failed transform
stopTransform(transformId, true);
Expand All @@ -97,20 +108,23 @@ public void testStartFailedTransform() throws Exception {
startDataframeTransform(transformId);
awaitState(transformId, TransformStats.State.FAILED);
Map<?, ?> fullState = getDataFrameState(transformId);
final String failureReason = "task encountered more than 0 failures; latest failure: " +
"Bulk index experienced failures. See the logs of the node running the transform for details.";
final String failureReason = "task encountered more than 0 failures; latest failure: "
+ ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details.";
// Verify we have failed for the expected reason
assertThat(XContentMapValues.extractValue("reason", fullState), equalTo(failureReason));
assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason));

final String expectedFailure = "Unable to start transform [test-force-start-failed-transform] " +
"as it is in a failed state with failure: [" + failureReason +
"]. Use force stop and then restart the transform once error is resolved.";
final String expectedFailure = "Unable to start transform \\[test-force-start-failed-transform\\] "
+ "as it is in a failed state with failure: \\["
+ failureReason
+ "\\]. Use force stop and then restart the transform once error is resolved.";
// Verify that we cannot start the transform when the task is in a failed state
assertBusy(() -> {
ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId));
assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus()));
assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
equalTo(expectedFailure));
assertThat(
(String) XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())),
matchesRegex(expectedFailure)
);
}, 60, TimeUnit.SECONDS);

stopTransform(transformId, true);
Expand All @@ -128,7 +142,8 @@ private void createDestinationIndexWithBadMapping(String indexName) throws IOExc
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
{
builder.startObject("mappings")
builder
.startObject("mappings")
.startObject("properties")
.startObject("reviewer")
.field("type", "long")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.script.ScriptException;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
Expand All @@ -45,6 +44,7 @@
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils;
import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand All @@ -66,7 +66,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
* which query filters to run and which index requests to send
*/
private enum RunState {
// do a complete query/index, this is used for batch data frames and for bootstraping (1st run)
// do a complete query/index, this is used for batch transforms and for bootstrapping (1st run)
FULL_RUN,

// Partial run modes in 2 stages:
Expand Down Expand Up @@ -422,7 +422,7 @@ protected IterationResult<TransformIndexerPosition> doProcess(SearchResponse sea
default:
// Any other state is a bug, should not happen
logger.warn("[{}] Encountered unexpected run state [{}]", getJobId(), runState);
throw new IllegalStateException("DataFrame indexer job encountered an illegal state [" + runState + "]");
throw new IllegalStateException("Transform indexer job encountered an illegal state [" + runState + "]");
}
}

Expand Down Expand Up @@ -468,25 +468,36 @@ protected void onAbort() {

synchronized void handleFailure(Exception e) {
logger.warn(new ParameterizedMessage("[{}] transform encountered an exception: ", getJobId()), e);
if (handleCircuitBreakingException(e)) {
return;
}

if (isIrrecoverableFailure(e) || context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
String failureMessage = isIrrecoverableFailure(e)
? "task encountered irrecoverable failure: " + e.getMessage()
: "task encountered more than " + context.getNumFailureRetries() + " failures; latest failure: " + e.getMessage();
failIndexer(failureMessage);
Throwable unwrappedException = ExceptionRootCauseFinder.getRootCauseException(e);

if (unwrappedException instanceof CircuitBreakingException) {
handleCircuitBreakingException((CircuitBreakingException) unwrappedException);
} else if (unwrappedException instanceof ScriptException) {
handleScriptException((ScriptException) unwrappedException);
// irrecoverable error without special handling
} else if (unwrappedException instanceof IndexNotFoundException
|| unwrappedException instanceof AggregationResultUtils.AggregationExtractionException
|| unwrappedException instanceof TransformConfigReloadingException) {
failIndexer("task encountered irrecoverable failure: " + e.getMessage());
} else if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
failIndexer(
"task encountered more than "
+ context.getNumFailureRetries()
+ " failures; latest failure: "
+ ExceptionRootCauseFinder.getDetailedMessage(unwrappedException)
);
} else {
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
if (e.getMessage().equals(lastAuditedExceptionMessage) == false) {
String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException);

auditor
.warning(
getJobId(),
"Transform encountered an exception: " + e.getMessage() + " Will attempt again at next scheduled trigger."
"Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger."
);
lastAuditedExceptionMessage = e.getMessage();
lastAuditedExceptionMessage = message;
}
}
}
Expand All @@ -510,12 +521,6 @@ private void sourceHasChanged(ActionListener<Boolean> hasChangedListener) {
}));
}

private boolean isIrrecoverableFailure(Exception e) {
return e instanceof IndexNotFoundException
|| e instanceof AggregationResultUtils.AggregationExtractionException
|| e instanceof TransformConfigReloadingException;
}

private IterationResult<TransformIndexerPosition> processBuckets(final CompositeAggregation agg) {
// we reached the end
if (agg.getBuckets().isEmpty()) {
Expand All @@ -536,7 +541,7 @@ private IterationResult<TransformIndexerPosition> processBuckets(final Composite
agg.getBuckets().isEmpty()
);

// NOTE: progress is also mutated in ClientDataFrameIndexer#onFinished
// NOTE: progress is also mutated in onFinish
if (progress != null) {
progress.incrementDocsProcessed(getStats().getNumDocuments() - docsBeforeProcess);
progress.incrementDocsIndexed(result.getToIndex().size());
Expand Down Expand Up @@ -671,7 +676,7 @@ protected SearchRequest buildSearchRequest() {
default:
// Any other state is a bug, should not happen
logger.warn("Encountered unexpected run state [" + runState + "]");
throw new IllegalStateException("DataFrame indexer job encountered an illegal state [" + runState + "]");
throw new IllegalStateException("Transform indexer job encountered an illegal state [" + runState + "]");
}

searchRequest.source(sourceBuilder);
Expand Down Expand Up @@ -756,16 +761,9 @@ private SearchSourceBuilder buildPartialUpdateQuery(SearchSourceBuilder sourceBu
* Implementation details: We take the values from the circuit breaker as a hint, but
* note that it breaks early, that's why we also reduce using
*
* @param e Exception thrown, only {@link CircuitBreakingException} are handled
* @return true if exception was handled, false if not
* @param circuitBreakingException CircuitBreakingException thrown
*/
protected boolean handleCircuitBreakingException(Exception e) {
CircuitBreakingException circuitBreakingException = getCircuitBreakingException(e);

if (circuitBreakingException == null) {
return false;
}

private void handleCircuitBreakingException(CircuitBreakingException circuitBreakingException) {
double reducingFactor = Math
.min(
(double) circuitBreakingException.getByteLimit() / circuitBreakingException.getBytesWanted(),
Expand All @@ -777,15 +775,29 @@ protected boolean handleCircuitBreakingException(Exception e) {
if (newPageSize < MINIMUM_PAGE_SIZE) {
String message = TransformMessages.getMessage(TransformMessages.LOG_TRANSFORM_PIVOT_LOW_PAGE_SIZE_FAILURE, pageSize);
failIndexer(message);
return true;
return;
}

String message = TransformMessages.getMessage(TransformMessages.LOG_TRANSFORM_PIVOT_REDUCE_PAGE_SIZE, pageSize, newPageSize);
auditor.info(getJobId(), message);
logger.info("Data frame transform [" + getJobId() + "]:" + message);

logger.info("[{}] {}", getJobId(), message);
pageSize = newPageSize;
return true;
return;
}

/**
* Handle script exception case. This is error is irrecoverable.
*
* @param scriptException ScriptException thrown
*/
private void handleScriptException(ScriptException scriptException) {
String message = TransformMessages
.getMessage(
TransformMessages.LOG_TRANSFORM_PIVOT_SCRIPT_ERROR,
scriptException.getDetailedMessage(),
scriptException.getScriptStack()
);
failIndexer(message);
}

protected void failIndexer(String failureMessage) {
Expand Down Expand Up @@ -818,7 +830,7 @@ protected boolean shouldAuditOnFinish(long completedCheckpoint) {
}

private RunState determineRunStateAtStart() {
// either 1st run or not a continuous data frame
// either 1st run or not a continuous transform
if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) {
return RunState.FULL_RUN;
}
Expand All @@ -832,32 +844,6 @@ private RunState determineRunStateAtStart() {
return RunState.PARTIAL_RUN_IDENTIFY_CHANGES;
}

/**
* Inspect exception for circuit breaking exception and return the first one it can find.
*
* @param e Exception
* @return CircuitBreakingException instance if found, null otherwise
*/
private static CircuitBreakingException getCircuitBreakingException(Exception e) {
// circuit breaking exceptions are at the bottom
Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(e);

if (unwrappedThrowable instanceof CircuitBreakingException) {
return (CircuitBreakingException) unwrappedThrowable;
} else if (unwrappedThrowable instanceof SearchPhaseExecutionException) {
SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) e;
for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) {
Throwable unwrappedShardFailure = org.elasticsearch.ExceptionsHelper.unwrapCause(shardFailure.getCause());

if (unwrappedShardFailure instanceof CircuitBreakingException) {
return (CircuitBreakingException) unwrappedShardFailure;
}
}
}

return null;
}

static class TransformConfigReloadingException extends ElasticsearchException {
TransformConfigReloadingException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
Expand Down
Loading