diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java index ac028974e82d4..9708cd301e437 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java @@ -77,6 +77,8 @@ public class TransformMessages { public static final String LOG_TRANSFORM_PIVOT_LOW_PAGE_SIZE_FAILURE = "Insufficient memory for search after repeated page size reductions to [{0}], unable to continue pivot, " + "please simplify job or increase heap size on data nodes."; + public static final String LOG_TRANSFORM_PIVOT_SCRIPT_ERROR = + "Failed to execute script with error: [{0}], stack trace: {1}"; public static final String FAILED_TO_PARSE_TRANSFORM_CHECKPOINTS = "Failed to parse transform checkpoints for [{0}]"; diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java index a9f8a9bc963cc..033cf57ba84fa 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformTaskFailedStateIT.java @@ -28,21 +28,28 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.matchesRegex; public class TransformTaskFailedStateIT extends TransformRestTestCase { private final List failureTransforms = new ArrayList<>(); + @Before public void setClusterSettings() throws IOException { // Make sure we never retry on failure to speed up the test // Set logging level to trace // see: https://github.com/elastic/elasticsearch/issues/45562 Request addFailureRetrySetting = new Request("PUT", "/_cluster/settings"); - addFailureRetrySetting.setJsonEntity( - "{\"transient\": {\"xpack.transform.num_transform_failure_retries\": \"" + 0 + "\"," + - "\"logger.org.elasticsearch.action.bulk\": \"info\"," + // reduces bulk failure spam - "\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," + - "\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}"); + addFailureRetrySetting + .setJsonEntity( + "{\"transient\": {\"xpack.transform.num_transform_failure_retries\": \"" + + 0 + + "\"," + + "\"logger.org.elasticsearch.action.bulk\": \"info\"," + + // reduces bulk failure spam + "\"logger.org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer\": \"trace\"," + + "\"logger.org.elasticsearch.xpack.transform\": \"trace\"}}" + ); client().performRequest(addFailureRetrySetting); } @@ -66,18 +73,22 @@ public void testForceStopFailedTransform() throws Exception { startDataframeTransform(transformId); awaitState(transformId, TransformStats.State.FAILED); Map fullState = getDataFrameState(transformId); - final String failureReason = "task encountered more than 0 failures; latest failure: " + - "Bulk index experienced failures. See the logs of the node running the transform for details."; + final String failureReason = "task encountered more than 0 failures; latest failure: " + + ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details."; // Verify we have failed for the expected reason - assertThat(XContentMapValues.extractValue("reason", fullState), equalTo(failureReason)); + assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason)); // verify that we cannot stop a failed transform ResponseException ex = expectThrows(ResponseException.class, () -> stopTransform(transformId, false)); assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); - assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), - equalTo("Unable to stop transform [test-force-stop-failed-transform] as it is in a failed state with reason [" + - failureReason + - "]. Use force stop to stop the transform.")); + assertThat( + (String) XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), + matchesRegex( + "Unable to stop transform \\[test-force-stop-failed-transform\\] as it is in a failed state with reason \\[" + + failureReason + + "\\]. Use force stop to stop the transform." + ) + ); // Verify that we can force stop a failed transform stopTransform(transformId, true); @@ -97,20 +108,23 @@ public void testStartFailedTransform() throws Exception { startDataframeTransform(transformId); awaitState(transformId, TransformStats.State.FAILED); Map fullState = getDataFrameState(transformId); - final String failureReason = "task encountered more than 0 failures; latest failure: " + - "Bulk index experienced failures. See the logs of the node running the transform for details."; + final String failureReason = "task encountered more than 0 failures; latest failure: " + + ".*BulkIndexingException: Bulk index experienced failures. See the logs of the node running the transform for details."; // Verify we have failed for the expected reason - assertThat(XContentMapValues.extractValue("reason", fullState), equalTo(failureReason)); + assertThat((String) XContentMapValues.extractValue("reason", fullState), matchesRegex(failureReason)); - final String expectedFailure = "Unable to start transform [test-force-start-failed-transform] " + - "as it is in a failed state with failure: [" + failureReason + - "]. Use force stop and then restart the transform once error is resolved."; + final String expectedFailure = "Unable to start transform \\[test-force-start-failed-transform\\] " + + "as it is in a failed state with failure: \\[" + + failureReason + + "\\]. Use force stop and then restart the transform once error is resolved."; // Verify that we cannot start the transform when the task is in a failed state assertBusy(() -> { ResponseException ex = expectThrows(ResponseException.class, () -> startDataframeTransform(transformId)); assertThat(ex.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.CONFLICT.getStatus())); - assertThat(XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), - equalTo(expectedFailure)); + assertThat( + (String) XContentMapValues.extractValue("error.reason", entityAsMap(ex.getResponse())), + matchesRegex(expectedFailure) + ); }, 60, TimeUnit.SECONDS); stopTransform(transformId, true); @@ -128,7 +142,8 @@ private void createDestinationIndexWithBadMapping(String indexName) throws IOExc try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); { - builder.startObject("mappings") + builder + .startObject("mappings") .startObject("properties") .startObject("reviewer") .field("type", "long") diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 877afeb155418..94d220e29c1b3 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -13,10 +13,8 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreakingException; @@ -24,6 +22,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.script.ScriptException; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; @@ -45,6 +44,7 @@ import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.transforms.pivot.AggregationResultUtils; import org.elasticsearch.xpack.transform.transforms.pivot.Pivot; +import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder; import java.io.IOException; import java.io.UncheckedIOException; @@ -66,7 +66,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer doProcess(SearchResponse sea default: // Any other state is a bug, should not happen logger.warn("[{}] Encountered unexpected run state [{}]", getJobId(), runState); - throw new IllegalStateException("DataFrame indexer job encountered an illegal state [" + runState + "]"); + throw new IllegalStateException("Transform indexer job encountered an illegal state [" + runState + "]"); } } @@ -468,25 +468,36 @@ protected void onAbort() { synchronized void handleFailure(Exception e) { logger.warn(new ParameterizedMessage("[{}] transform encountered an exception: ", getJobId()), e); - if (handleCircuitBreakingException(e)) { - return; - } - - if (isIrrecoverableFailure(e) || context.getAndIncrementFailureCount() > context.getNumFailureRetries()) { - String failureMessage = isIrrecoverableFailure(e) - ? "task encountered irrecoverable failure: " + e.getMessage() - : "task encountered more than " + context.getNumFailureRetries() + " failures; latest failure: " + e.getMessage(); - failIndexer(failureMessage); + Throwable unwrappedException = ExceptionRootCauseFinder.getRootCauseException(e); + + if (unwrappedException instanceof CircuitBreakingException) { + handleCircuitBreakingException((CircuitBreakingException) unwrappedException); + } else if (unwrappedException instanceof ScriptException) { + handleScriptException((ScriptException) unwrappedException); + // irrecoverable error without special handling + } else if (unwrappedException instanceof IndexNotFoundException + || unwrappedException instanceof AggregationResultUtils.AggregationExtractionException + || unwrappedException instanceof TransformConfigReloadingException) { + failIndexer("task encountered irrecoverable failure: " + e.getMessage()); + } else if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) { + failIndexer( + "task encountered more than " + + context.getNumFailureRetries() + + " failures; latest failure: " + + ExceptionRootCauseFinder.getDetailedMessage(unwrappedException) + ); } else { // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one if (e.getMessage().equals(lastAuditedExceptionMessage) == false) { + String message = ExceptionRootCauseFinder.getDetailedMessage(unwrappedException); + auditor .warning( getJobId(), - "Transform encountered an exception: " + e.getMessage() + " Will attempt again at next scheduled trigger." + "Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger." ); - lastAuditedExceptionMessage = e.getMessage(); + lastAuditedExceptionMessage = message; } } } @@ -510,12 +521,6 @@ private void sourceHasChanged(ActionListener hasChangedListener) { })); } - private boolean isIrrecoverableFailure(Exception e) { - return e instanceof IndexNotFoundException - || e instanceof AggregationResultUtils.AggregationExtractionException - || e instanceof TransformConfigReloadingException; - } - private IterationResult processBuckets(final CompositeAggregation agg) { // we reached the end if (agg.getBuckets().isEmpty()) { @@ -536,7 +541,7 @@ private IterationResult processBuckets(final Composite agg.getBuckets().isEmpty() ); - // NOTE: progress is also mutated in ClientDataFrameIndexer#onFinished + // NOTE: progress is also mutated in onFinish if (progress != null) { progress.incrementDocsProcessed(getStats().getNumDocuments() - docsBeforeProcess); progress.incrementDocsIndexed(result.getToIndex().size()); @@ -671,7 +676,7 @@ protected SearchRequest buildSearchRequest() { default: // Any other state is a bug, should not happen logger.warn("Encountered unexpected run state [" + runState + "]"); - throw new IllegalStateException("DataFrame indexer job encountered an illegal state [" + runState + "]"); + throw new IllegalStateException("Transform indexer job encountered an illegal state [" + runState + "]"); } searchRequest.source(sourceBuilder); @@ -756,16 +761,9 @@ private SearchSourceBuilder buildPartialUpdateQuery(SearchSourceBuilder sourceBu * Implementation details: We take the values from the circuit breaker as a hint, but * note that it breaks early, that's why we also reduce using * - * @param e Exception thrown, only {@link CircuitBreakingException} are handled - * @return true if exception was handled, false if not + * @param circuitBreakingException CircuitBreakingException thrown */ - protected boolean handleCircuitBreakingException(Exception e) { - CircuitBreakingException circuitBreakingException = getCircuitBreakingException(e); - - if (circuitBreakingException == null) { - return false; - } - + private void handleCircuitBreakingException(CircuitBreakingException circuitBreakingException) { double reducingFactor = Math .min( (double) circuitBreakingException.getByteLimit() / circuitBreakingException.getBytesWanted(), @@ -777,15 +775,29 @@ protected boolean handleCircuitBreakingException(Exception e) { if (newPageSize < MINIMUM_PAGE_SIZE) { String message = TransformMessages.getMessage(TransformMessages.LOG_TRANSFORM_PIVOT_LOW_PAGE_SIZE_FAILURE, pageSize); failIndexer(message); - return true; + return; } String message = TransformMessages.getMessage(TransformMessages.LOG_TRANSFORM_PIVOT_REDUCE_PAGE_SIZE, pageSize, newPageSize); auditor.info(getJobId(), message); - logger.info("Data frame transform [" + getJobId() + "]:" + message); - + logger.info("[{}] {}", getJobId(), message); pageSize = newPageSize; - return true; + return; + } + + /** + * Handle script exception case. This is error is irrecoverable. + * + * @param scriptException ScriptException thrown + */ + private void handleScriptException(ScriptException scriptException) { + String message = TransformMessages + .getMessage( + TransformMessages.LOG_TRANSFORM_PIVOT_SCRIPT_ERROR, + scriptException.getDetailedMessage(), + scriptException.getScriptStack() + ); + failIndexer(message); } protected void failIndexer(String failureMessage) { @@ -818,7 +830,7 @@ protected boolean shouldAuditOnFinish(long completedCheckpoint) { } private RunState determineRunStateAtStart() { - // either 1st run or not a continuous data frame + // either 1st run or not a continuous transform if (nextCheckpoint.getCheckpoint() == 1 || isContinuous() == false) { return RunState.FULL_RUN; } @@ -832,32 +844,6 @@ private RunState determineRunStateAtStart() { return RunState.PARTIAL_RUN_IDENTIFY_CHANGES; } - /** - * Inspect exception for circuit breaking exception and return the first one it can find. - * - * @param e Exception - * @return CircuitBreakingException instance if found, null otherwise - */ - private static CircuitBreakingException getCircuitBreakingException(Exception e) { - // circuit breaking exceptions are at the bottom - Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(e); - - if (unwrappedThrowable instanceof CircuitBreakingException) { - return (CircuitBreakingException) unwrappedThrowable; - } else if (unwrappedThrowable instanceof SearchPhaseExecutionException) { - SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) e; - for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) { - Throwable unwrappedShardFailure = org.elasticsearch.ExceptionsHelper.unwrapCause(shardFailure.getCause()); - - if (unwrappedShardFailure instanceof CircuitBreakingException) { - return (CircuitBreakingException) unwrappedShardFailure; - } - } - } - - return null; - } - static class TransformConfigReloadingException extends ElasticsearchException { TransformConfigReloadingException(String msg, Throwable cause, Object... args) { super(msg, cause, args); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java new file mode 100644 index 0000000000000..f3a7e7cf0f7b9 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.utils; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.ShardSearchFailure; + +/** + * Set of static utils to find the cause of a search exception. + */ +public final class ExceptionRootCauseFinder { + + /** + * Unwrap the exception stack and return the most likely cause. + * + * @param t raw Throwable + * @return unwrapped throwable if possible + */ + public static Throwable getRootCauseException(Throwable t) { + // circuit breaking exceptions are at the bottom + Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(t); + + if (unwrappedThrowable instanceof SearchPhaseExecutionException) { + SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) t; + for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) { + Throwable unwrappedShardFailure = org.elasticsearch.ExceptionsHelper.unwrapCause(shardFailure.getCause()); + + if (unwrappedShardFailure instanceof ElasticsearchException) { + return unwrappedShardFailure; + } + } + } + + return t; + } + + /** + * Return the best error message possible given a already unwrapped exception. + * + * @param t the throwable + * @return the message string of the given throwable + */ + public static String getDetailedMessage(Throwable t) { + if (t instanceof ElasticsearchException) { + return ((ElasticsearchException) t).getDetailedMessage(); + } + + return t.getMessage(); + } + + private ExceptionRootCauseFinder() {} + +} diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index c1e07c4632e85..208b9a93929c8 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.breaker.CircuitBreaker.Durability; import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.script.ScriptException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; @@ -36,6 +37,7 @@ import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests; import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; +import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.transforms.pivot.Pivot; @@ -51,10 +53,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; +import static java.util.Collections.singletonList; import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig; import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig; import static org.hamcrest.CoreMatchers.is; @@ -62,6 +66,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.matchesRegex; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -75,7 +80,7 @@ class MockedTransformIndexer extends TransformIndexer { private final Function searchFunction; private final Function bulkFunction; - private final Consumer failureConsumer; + private final Consumer failureConsumer; // used for synchronizing with the test private CountDownLatch latch; @@ -94,7 +99,7 @@ class MockedTransformIndexer extends TransformIndexer { TransformContext context, Function searchFunction, Function bulkFunction, - Consumer failureConsumer + Consumer failureConsumer ) { super( executor, @@ -174,14 +179,12 @@ protected void doSaveState(IndexerState state, TransformIndexerPosition position @Override protected void onFailure(Exception exc) { try { - // mimic same behavior as {@link TransformTask} - if (handleCircuitBreakingException(exc)) { - return; - } - - failureConsumer.accept(exc); + super.onFailure(exc); } catch (Exception e) { - fail("Internal error: " + e.getMessage()); + final StringWriter sw = new StringWriter(); + final PrintWriter pw = new PrintWriter(sw, true); + e.printStackTrace(pw); + fail("Unexpected failure: " + e.getMessage() + " Trace: " + sw.getBuffer().toString()); } } @@ -198,7 +201,12 @@ protected void onAbort() { @Override protected void failIndexer(String message) { - fail("failIndexer should not be called, received error: " + message); + if (failureConsumer != null) { + failureConsumer.accept(message); + super.failIndexer(message); + } else { + fail("failIndexer should not be called, received error: " + message); + } } } @@ -238,33 +246,20 @@ public void testPageSizeAdapt() throws Exception { Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - Consumer failureConsumer = e -> { - final StringWriter sw = new StringWriter(); - final PrintWriter pw = new PrintWriter(sw, true); - e.printStackTrace(pw); - fail("expected circuit breaker exception to be handled, got:" + e + " Trace: " + sw.getBuffer().toString()); - }; - final ExecutorService executor = Executors.newFixedThreadPool(1); try { TransformAuditor auditor = new TransformAuditor(client, "node_1"); TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); - MockedTransformIndexer indexer = new MockedTransformIndexer( - executor, - mock(TransformConfigManager.class), - mock(CheckpointProvider.class), - new TransformProgressGatherer(client), + MockedTransformIndexer indexer = createMockIndexer( config, - Collections.emptyMap(), - auditor, state, - null, - new TransformIndexerStats(), - context, searchFunction, bulkFunction, - failureConsumer + null, + executor, + auditor, + context ); final CountDownLatch latch = indexer.newLatch(1); indexer.start(); @@ -333,33 +328,20 @@ public void testDoProcessAggNullCheck() { Function searchFunction = searchRequest -> searchResponse; Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); - Consumer failureConsumer = e -> { - final StringWriter sw = new StringWriter(); - final PrintWriter pw = new PrintWriter(sw, true); - e.printStackTrace(pw); - fail(e.getMessage()); - }; - final ExecutorService executor = Executors.newFixedThreadPool(1); try { TransformAuditor auditor = mock(TransformAuditor.class); TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); - MockedTransformIndexer indexer = new MockedTransformIndexer( - executor, - mock(TransformConfigManager.class), - mock(CheckpointProvider.class), - new TransformProgressGatherer(client), + MockedTransformIndexer indexer = createMockIndexer( config, - Collections.emptyMap(), - auditor, state, - null, - new TransformIndexerStats(), - context, searchFunction, bulkFunction, - failureConsumer + null, + executor, + auditor, + context ); IterationResult newPosition = indexer.doProcess(searchResponse); @@ -372,4 +354,117 @@ public void testDoProcessAggNullCheck() { } } + public void testScriptError() throws Exception { + Integer pageSize = randomBoolean() ? null : randomIntBetween(500, 10_000); + String transformId = randomAlphaOfLength(10); + TransformConfig config = new TransformConfig( + transformId, + randomSourceConfig(), + randomDestConfig(), + null, + null, + null, + new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize), + randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000) + ); + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + Function searchFunction = searchRequest -> { + throw new SearchPhaseExecutionException( + "query", + "Partial shards failure", + new ShardSearchFailure[] { + new ShardSearchFailure( + new ScriptException( + "runtime error", + new ArithmeticException("/ by zero"), + singletonList("stack"), + "test", + "painless" + ) + ) } + + ); + }; + + Function bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100); + + final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); + final AtomicReference failureMessage = new AtomicReference<>(); + Consumer failureConsumer = message -> { + failIndexerCalled.compareAndSet(false, true); + failureMessage.compareAndSet(null, message); + }; + + final ExecutorService executor = Executors.newFixedThreadPool(1); + try { + MockTransformAuditor auditor = new MockTransformAuditor(); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)); + + MockedTransformIndexer indexer = createMockIndexer( + config, + state, + searchFunction, + bulkFunction, + failureConsumer, + executor, + auditor, + context + ); + + final CountDownLatch latch = indexer.newLatch(1); + auditor + .addExpectation( + new MockTransformAuditor.SeenAuditExpectation( + "fail indexer due to script error", + org.elasticsearch.xpack.core.common.notifications.Level.ERROR, + transformId, + "Failed to execute script with error: [*ArithmeticException: / by zero], stack trace: [stack]" + ) + ); + indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); + assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); + assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + + latch.countDown(); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED)), 10, TimeUnit.SECONDS); + assertTrue(failIndexerCalled.get()); + assertThat( + failureMessage.get(), + matchesRegex("Failed to execute script with error: \\[.*ArithmeticException: / by zero\\], stack trace: \\[stack\\]") + ); + auditor.assertAllExpectationsMatched(); + } finally { + executor.shutdownNow(); + } + } + + private MockedTransformIndexer createMockIndexer( + TransformConfig config, + AtomicReference state, + Function searchFunction, + Function bulkFunction, + Consumer failureConsumer, + final ExecutorService executor, + TransformAuditor auditor, + TransformContext context + ) { + return new MockedTransformIndexer( + executor, + mock(TransformConfigManager.class), + mock(CheckpointProvider.class), + new TransformProgressGatherer(client), + config, + Collections.emptyMap(), + auditor, + state, + null, + new TransformIndexerStats(), + context, + searchFunction, + bulkFunction, + failureConsumer + ); + } + }