1313import org .elasticsearch .ResourceNotFoundException ;
1414import org .elasticsearch .action .ActionListener ;
1515import org .elasticsearch .action .index .IndexRequest ;
16- import org .elasticsearch .action .search .SearchPhaseExecutionException ;
1716import org .elasticsearch .action .search .SearchRequest ;
1817import org .elasticsearch .action .search .SearchResponse ;
19- import org .elasticsearch .action .search .ShardSearchFailure ;
2018import org .elasticsearch .action .support .IndicesOptions ;
2119import org .elasticsearch .common .Strings ;
2220import org .elasticsearch .common .breaker .CircuitBreakingException ;
2321import org .elasticsearch .common .xcontent .XContentBuilder ;
2422import org .elasticsearch .index .IndexNotFoundException ;
2523import org .elasticsearch .index .query .BoolQueryBuilder ;
2624import org .elasticsearch .index .query .QueryBuilder ;
25+ import org .elasticsearch .script .ScriptException ;
2726import org .elasticsearch .search .aggregations .Aggregations ;
2827import org .elasticsearch .search .aggregations .bucket .composite .CompositeAggregation ;
2928import org .elasticsearch .search .aggregations .bucket .composite .CompositeAggregationBuilder ;
4544import org .elasticsearch .xpack .transform .persistence .TransformConfigManager ;
4645import org .elasticsearch .xpack .transform .transforms .pivot .AggregationResultUtils ;
4746import org .elasticsearch .xpack .transform .transforms .pivot .Pivot ;
47+ import org .elasticsearch .xpack .transform .utils .ExceptionRootCauseFinder ;
4848
4949import java .io .IOException ;
5050import java .io .UncheckedIOException ;
@@ -66,7 +66,7 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
6666 * which query filters to run and which index requests to send
6767 */
6868 private enum RunState {
69- // do a complete query/index, this is used for batch data frames and for bootstraping (1st run)
69+ // do a complete query/index, this is used for batch transforms and for bootstrapping (1st run)
7070 FULL_RUN ,
7171
7272 // Partial run modes in 2 stages:
@@ -422,7 +422,7 @@ protected IterationResult<TransformIndexerPosition> doProcess(SearchResponse sea
422422 default :
423423 // Any other state is a bug, should not happen
424424 logger .warn ("[{}] Encountered unexpected run state [{}]" , getJobId (), runState );
425- throw new IllegalStateException ("DataFrame indexer job encountered an illegal state [" + runState + "]" );
425+ throw new IllegalStateException ("Transform indexer job encountered an illegal state [" + runState + "]" );
426426 }
427427 }
428428
@@ -468,25 +468,36 @@ protected void onAbort() {
468468
469469 synchronized void handleFailure (Exception e ) {
470470 logger .warn (new ParameterizedMessage ("[{}] transform encountered an exception: " , getJobId ()), e );
471- if (handleCircuitBreakingException (e )) {
472- return ;
473- }
474-
475- if (isIrrecoverableFailure (e ) || context .getAndIncrementFailureCount () > context .getNumFailureRetries ()) {
476- String failureMessage = isIrrecoverableFailure (e )
477- ? "task encountered irrecoverable failure: " + e .getMessage ()
478- : "task encountered more than " + context .getNumFailureRetries () + " failures; latest failure: " + e .getMessage ();
479- failIndexer (failureMessage );
471+ Throwable unwrappedException = ExceptionRootCauseFinder .getRootCauseException (e );
472+
473+ if (unwrappedException instanceof CircuitBreakingException ) {
474+ handleCircuitBreakingException ((CircuitBreakingException ) unwrappedException );
475+ } else if (unwrappedException instanceof ScriptException ) {
476+ handleScriptException ((ScriptException ) unwrappedException );
477+ // irrecoverable error without special handling
478+ } else if (unwrappedException instanceof IndexNotFoundException
479+ || unwrappedException instanceof AggregationResultUtils .AggregationExtractionException
480+ || unwrappedException instanceof TransformConfigReloadingException ) {
481+ failIndexer ("task encountered irrecoverable failure: " + e .getMessage ());
482+ } else if (context .getAndIncrementFailureCount () > context .getNumFailureRetries ()) {
483+ failIndexer (
484+ "task encountered more than "
485+ + context .getNumFailureRetries ()
486+ + " failures; latest failure: "
487+ + ExceptionRootCauseFinder .getDetailedMessage (unwrappedException )
488+ );
480489 } else {
481490 // Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
482491 // times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
483492 if (e .getMessage ().equals (lastAuditedExceptionMessage ) == false ) {
493+ String message = ExceptionRootCauseFinder .getDetailedMessage (unwrappedException );
494+
484495 auditor
485496 .warning (
486497 getJobId (),
487- "Transform encountered an exception: " + e . getMessage () + " Will attempt again at next scheduled trigger."
498+ "Transform encountered an exception: " + message + " Will attempt again at next scheduled trigger."
488499 );
489- lastAuditedExceptionMessage = e . getMessage () ;
500+ lastAuditedExceptionMessage = message ;
490501 }
491502 }
492503 }
@@ -510,12 +521,6 @@ private void sourceHasChanged(ActionListener<Boolean> hasChangedListener) {
510521 }));
511522 }
512523
513- private boolean isIrrecoverableFailure (Exception e ) {
514- return e instanceof IndexNotFoundException
515- || e instanceof AggregationResultUtils .AggregationExtractionException
516- || e instanceof TransformConfigReloadingException ;
517- }
518-
519524 private IterationResult <TransformIndexerPosition > processBuckets (final CompositeAggregation agg ) {
520525 // we reached the end
521526 if (agg .getBuckets ().isEmpty ()) {
@@ -536,7 +541,7 @@ private IterationResult<TransformIndexerPosition> processBuckets(final Composite
536541 agg .getBuckets ().isEmpty ()
537542 );
538543
539- // NOTE: progress is also mutated in ClientDataFrameIndexer#onFinished
544+ // NOTE: progress is also mutated in onFinish
540545 if (progress != null ) {
541546 progress .incrementDocsProcessed (getStats ().getNumDocuments () - docsBeforeProcess );
542547 progress .incrementDocsIndexed (result .getToIndex ().size ());
@@ -671,7 +676,7 @@ protected SearchRequest buildSearchRequest() {
671676 default :
672677 // Any other state is a bug, should not happen
673678 logger .warn ("Encountered unexpected run state [" + runState + "]" );
674- throw new IllegalStateException ("DataFrame indexer job encountered an illegal state [" + runState + "]" );
679+ throw new IllegalStateException ("Transform indexer job encountered an illegal state [" + runState + "]" );
675680 }
676681
677682 searchRequest .source (sourceBuilder );
@@ -756,16 +761,9 @@ private SearchSourceBuilder buildPartialUpdateQuery(SearchSourceBuilder sourceBu
756761 * Implementation details: We take the values from the circuit breaker as a hint, but
757762 * note that it breaks early, that's why we also reduce using
758763 *
759- * @param e Exception thrown, only {@link CircuitBreakingException} are handled
760- * @return true if exception was handled, false if not
764+ * @param circuitBreakingException CircuitBreakingException thrown
761765 */
762- protected boolean handleCircuitBreakingException (Exception e ) {
763- CircuitBreakingException circuitBreakingException = getCircuitBreakingException (e );
764-
765- if (circuitBreakingException == null ) {
766- return false ;
767- }
768-
766+ private void handleCircuitBreakingException (CircuitBreakingException circuitBreakingException ) {
769767 double reducingFactor = Math
770768 .min (
771769 (double ) circuitBreakingException .getByteLimit () / circuitBreakingException .getBytesWanted (),
@@ -777,15 +775,29 @@ protected boolean handleCircuitBreakingException(Exception e) {
777775 if (newPageSize < MINIMUM_PAGE_SIZE ) {
778776 String message = TransformMessages .getMessage (TransformMessages .LOG_TRANSFORM_PIVOT_LOW_PAGE_SIZE_FAILURE , pageSize );
779777 failIndexer (message );
780- return true ;
778+ return ;
781779 }
782780
783781 String message = TransformMessages .getMessage (TransformMessages .LOG_TRANSFORM_PIVOT_REDUCE_PAGE_SIZE , pageSize , newPageSize );
784782 auditor .info (getJobId (), message );
785- logger .info ("Data frame transform [" + getJobId () + "]:" + message );
786-
783+ logger .info ("[{}] {}" , getJobId (), message );
787784 pageSize = newPageSize ;
788- return true ;
785+ return ;
786+ }
787+
788+ /**
789+ * Handle script exception case. This is error is irrecoverable.
790+ *
791+ * @param scriptException ScriptException thrown
792+ */
793+ private void handleScriptException (ScriptException scriptException ) {
794+ String message = TransformMessages
795+ .getMessage (
796+ TransformMessages .LOG_TRANSFORM_PIVOT_SCRIPT_ERROR ,
797+ scriptException .getDetailedMessage (),
798+ scriptException .getScriptStack ()
799+ );
800+ failIndexer (message );
789801 }
790802
791803 protected void failIndexer (String failureMessage ) {
@@ -818,7 +830,7 @@ protected boolean shouldAuditOnFinish(long completedCheckpoint) {
818830 }
819831
820832 private RunState determineRunStateAtStart () {
821- // either 1st run or not a continuous data frame
833+ // either 1st run or not a continuous transform
822834 if (nextCheckpoint .getCheckpoint () == 1 || isContinuous () == false ) {
823835 return RunState .FULL_RUN ;
824836 }
@@ -832,32 +844,6 @@ private RunState determineRunStateAtStart() {
832844 return RunState .PARTIAL_RUN_IDENTIFY_CHANGES ;
833845 }
834846
835- /**
836- * Inspect exception for circuit breaking exception and return the first one it can find.
837- *
838- * @param e Exception
839- * @return CircuitBreakingException instance if found, null otherwise
840- */
841- private static CircuitBreakingException getCircuitBreakingException (Exception e ) {
842- // circuit breaking exceptions are at the bottom
843- Throwable unwrappedThrowable = org .elasticsearch .ExceptionsHelper .unwrapCause (e );
844-
845- if (unwrappedThrowable instanceof CircuitBreakingException ) {
846- return (CircuitBreakingException ) unwrappedThrowable ;
847- } else if (unwrappedThrowable instanceof SearchPhaseExecutionException ) {
848- SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException ) e ;
849- for (ShardSearchFailure shardFailure : searchPhaseException .shardFailures ()) {
850- Throwable unwrappedShardFailure = org .elasticsearch .ExceptionsHelper .unwrapCause (shardFailure .getCause ());
851-
852- if (unwrappedShardFailure instanceof CircuitBreakingException ) {
853- return (CircuitBreakingException ) unwrappedShardFailure ;
854- }
855- }
856- }
857-
858- return null ;
859- }
860-
861847 static class TransformConfigReloadingException extends ElasticsearchException {
862848 TransformConfigReloadingException (String msg , Throwable cause , Object ... args ) {
863849 super (msg , cause , args );
0 commit comments