Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -112,7 +113,36 @@ public static <T> T requireNonNull(T obj, ParseField paramName) {
return requireNonNull(obj, paramName.getPreferredName());
}

/**
* @see org.elasticsearch.ExceptionsHelper#unwrapCause(Throwable)
*/
public static Throwable unwrapCause(Throwable t) {
return org.elasticsearch.ExceptionsHelper.unwrapCause(t);
return org.elasticsearch.ExceptionsHelper.unwrapCause(t);
}

/**
* Unwrap the exception stack and return the most likely cause.
* This method has special handling for {@link SearchPhaseExecutionException}
* where it returns the cause of the first shard failure.
*
* @param t raw Throwable
* @return unwrapped throwable if possible
*/
public static Throwable findSearchExceptionRootCause(Throwable t) {
// circuit breaking exceptions are at the bottom
Throwable unwrappedThrowable = unwrapCause(t);

if (unwrappedThrowable instanceof SearchPhaseExecutionException) {
SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) unwrappedThrowable;
for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) {
Throwable unwrappedShardFailure = unwrapCause(shardFailure.getCause());

if (unwrappedShardFailure instanceof ElasticsearchException) {
return unwrappedShardFailure;
}
}
}

return unwrappedThrowable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,11 @@ public static <T> T requireNonNull(T obj, String paramName) {
}
return obj;
}

/**
* @see org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper#findSearchExceptionRootCause(Throwable)
*/
public static Throwable findSearchExceptionRootCause(Throwable t) {
return org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper.findSearchExceptionRootCause(t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this cross-package dependency ok?
I'd consider introducing ExceptionsHelper class under org.elasticsearch.xpack.core.common instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this with Hendrik. It's not great but harmless otherwise. Adding another ExceptionsHelper class is marginally worse. There is much to improve in our exceptions handling framework. I am going to raise an issue that once fixed should make this method unnecessary.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.ml.utils;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.indices.IndexCreationException;
import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;

public class ExceptionsHelperTests extends ESTestCase {

public void testFindSearchExceptionRootCause_GivenWrappedSearchPhaseException() {
SearchPhaseExecutionException searchPhaseExecutionException = new SearchPhaseExecutionException("test-phase",
"partial shards failure", new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchException("for the cause!")) });

Throwable rootCauseException = ExceptionsHelper.findSearchExceptionRootCause(
new IndexCreationException("test-index", searchPhaseExecutionException));

assertThat(rootCauseException.getMessage(), equalTo("for the cause!"));
}

public void testFindSearchExceptionRootCause_GivenRuntimeException() {
RuntimeException runtimeException = new RuntimeException("nothing to unwrap here");
assertThat(ExceptionsHelper.findSearchExceptionRootCause(runtimeException), sameInstance(runtimeException));
}

public void testFindSearchExceptionRootCause_GivenWrapperException() {
RuntimeException runtimeException = new RuntimeException("cause");

Throwable rootCauseException = ExceptionsHelper.findSearchExceptionRootCause(
new IndexCreationException("test-index", runtimeException));

assertThat(rootCauseException.getMessage(), equalTo("cause"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ElasticsearchWrapperException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
Expand Down Expand Up @@ -318,7 +321,7 @@ private void run(long start, long end, FlushJobAction.Request flushRequest) thro
try {
extractedData = dataExtractor.next();
} catch (Exception e) {
LOGGER.debug("[" + jobId + "] error while extracting data", e);
LOGGER.error(new ParameterizedMessage("[{}] error while extracting data", jobId), e);
// When extraction problems are encountered, we do not want to advance time.
// Instead, it is preferable to retry the given interval next time an extraction
// is triggered.
Expand Down Expand Up @@ -350,7 +353,7 @@ private void run(long start, long end, FlushJobAction.Request flushRequest) thro
if (isIsolated) {
return;
}
LOGGER.debug("[" + jobId + "] error while posting data", e);
LOGGER.error(new ParameterizedMessage("[{}] error while posting data", jobId), e);

// a conflict exception means the job state is not open any more.
// we should therefore stop the datafeed.
Expand Down Expand Up @@ -469,7 +472,7 @@ Long lastEndTimeMs() {
return lastEndTimeMs;
}

static class AnalysisProblemException extends RuntimeException {
static class AnalysisProblemException extends ElasticsearchException implements ElasticsearchWrapperException {

final boolean shouldStop;
final long nextDelayInMsSinceEpoch;
Expand All @@ -481,7 +484,7 @@ static class AnalysisProblemException extends RuntimeException {
}
}

static class ExtractionProblemException extends RuntimeException {
static class ExtractionProblemException extends ElasticsearchException implements ElasticsearchWrapperException {

final long nextDelayInMsSinceEpoch;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,12 @@ protected void doRun() {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
holder.problemTracker.reportExtractionProblem(e);
} catch (DatafeedJob.AnalysisProblemException e) {
if (endTime == null) {
next = e.nextDelayInMsSinceEpoch;
}
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
holder.problemTracker.reportAnalysisProblem(e);
if (e.shouldStop) {
holder.stop("lookback_analysis_error", TimeValue.timeValueSeconds(20), e);
return;
Expand Down Expand Up @@ -241,10 +241,10 @@ protected void doRun() {
holder.problemTracker.reportNonEmptyDataCount();
} catch (DatafeedJob.ExtractionProblemException e) {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
holder.problemTracker.reportExtractionProblem(e);
} catch (DatafeedJob.AnalysisProblemException e) {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage());
holder.problemTracker.reportAnalysisProblem(e);
if (e.shouldStop) {
holder.stop("realtime_analysis_error", TimeValue.timeValueSeconds(20), e);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.datafeed;

import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;

import java.util.Objects;
Expand Down Expand Up @@ -42,19 +43,19 @@ class ProblemTracker {
/**
* Reports as analysis problem if it is different than the last seen problem
*
* @param problemMessage the problem message
* @param error the exception
*/
public void reportAnalysisProblem(String problemMessage) {
reportProblem(Messages.JOB_AUDIT_DATAFEED_DATA_ANALYSIS_ERROR, problemMessage);
public void reportAnalysisProblem(DatafeedJob.AnalysisProblemException error) {
reportProblem(Messages.JOB_AUDIT_DATAFEED_DATA_ANALYSIS_ERROR, ExceptionsHelper.unwrapCause(error).getMessage());
}

/**
* Reports as extraction problem if it is different than the last seen problem
*
* @param problemMessage the problem message
* @param error the exception
*/
public void reportExtractionProblem(String problemMessage) {
reportProblem(Messages.JOB_AUDIT_DATAFEED_DATA_EXTRACTION_ERROR, problemMessage);
public void reportExtractionProblem(DatafeedJob.ExtractionProblemException error) {
reportProblem(Messages.JOB_AUDIT_DATAFEED_DATA_EXTRACTION_ERROR, ExceptionsHelper.findSearchExceptionRootCause(error).getMessage());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
*/
package org.elasticsearch.xpack.ml.datafeed;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchWrapperException;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.junit.Before;
Expand All @@ -27,33 +31,43 @@ public void setUpTests() {
}

public void testReportExtractionProblem() {
problemTracker.reportExtractionProblem("foo");
problemTracker.reportExtractionProblem(createExtractionProblem("top level", "cause"));

verify(auditor).error("foo", "Datafeed is encountering errors extracting data: foo");
verify(auditor).error("foo", "Datafeed is encountering errors extracting data: cause");
assertTrue(problemTracker.hasProblems());
}

public void testReportExtractionProblem_GivenSearchPhaseExecutionException() {
SearchPhaseExecutionException searchPhaseExecutionException = new SearchPhaseExecutionException("test-phase",
"partial shards failure", new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchException("for the cause!")) });

problemTracker.reportExtractionProblem(new DatafeedJob.ExtractionProblemException(0L, searchPhaseExecutionException));

verify(auditor).error("foo", "Datafeed is encountering errors extracting data: for the cause!");
assertTrue(problemTracker.hasProblems());
}

public void testReportAnalysisProblem() {
problemTracker.reportAnalysisProblem("foo");
problemTracker.reportAnalysisProblem(createAnalysisProblem("top level", "cause"));

verify(auditor).error("foo", "Datafeed is encountering errors submitting data for analysis: foo");
verify(auditor).error("foo", "Datafeed is encountering errors submitting data for analysis: cause");
assertTrue(problemTracker.hasProblems());
}

public void testReportProblem_GivenSameProblemTwice() {
problemTracker.reportExtractionProblem("foo");
problemTracker.reportAnalysisProblem("foo");
problemTracker.reportExtractionProblem(createExtractionProblem("top level", "cause"));
problemTracker.reportAnalysisProblem(createAnalysisProblem("top level", "cause"));

verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: foo");
verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: cause");
assertTrue(problemTracker.hasProblems());
}

public void testReportProblem_GivenSameProblemAfterFinishReport() {
problemTracker.reportExtractionProblem("foo");
problemTracker.reportExtractionProblem(createExtractionProblem("top level", "cause"));
problemTracker.finishReport();
problemTracker.reportExtractionProblem("foo");
problemTracker.reportExtractionProblem(createExtractionProblem("top level", "cause"));

verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: foo");
verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: cause");
assertTrue(problemTracker.hasProblems());
}

Expand Down Expand Up @@ -108,12 +122,31 @@ public void testFinishReport_GivenNoProblems() {
}

public void testFinishReport_GivenRecovery() {
problemTracker.reportExtractionProblem("bar");
problemTracker.reportExtractionProblem(createExtractionProblem("top level", "bar"));
problemTracker.finishReport();
problemTracker.finishReport();

verify(auditor).error("foo", "Datafeed is encountering errors extracting data: bar");
verify(auditor).info("foo", "Datafeed has recovered data extraction and analysis");
assertFalse(problemTracker.hasProblems());
}

private static DatafeedJob.ExtractionProblemException createExtractionProblem(String error, String cause) {
Exception causeException = new RuntimeException(cause);
Exception wrappedException = new TestWrappedException(error, causeException);
return new DatafeedJob.ExtractionProblemException(0L, wrappedException);
}

private static DatafeedJob.AnalysisProblemException createAnalysisProblem(String error, String cause) {
Exception causeException = new RuntimeException(cause);
Exception wrappedException = new TestWrappedException(error, causeException);
return new DatafeedJob.AnalysisProblemException(0L, false, wrappedException);
}

private static class TestWrappedException extends RuntimeException implements ElasticsearchWrapperException {

TestWrappedException(String message, Throwable cause) {
super(message, cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ synchronized void stopAndSaveState() {

synchronized void handleFailure(Exception e) {
logger.warn(new ParameterizedMessage("[{}] transform encountered an exception: ", getJobId()), e);
Throwable unwrappedException = ExceptionRootCauseFinder.getRootCauseException(e);
Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);

if (unwrappedException instanceof CircuitBreakingException) {
handleCircuitBreakingException((CircuitBreakingException) unwrappedException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.rest.RestStatus;

import java.util.Arrays;
Expand Down Expand Up @@ -38,30 +36,6 @@ public final class ExceptionRootCauseFinder {
)
);

/**
* Unwrap the exception stack and return the most likely cause.
*
* @param t raw Throwable
* @return unwrapped throwable if possible
*/
public static Throwable getRootCauseException(Throwable t) {
// circuit breaking exceptions are at the bottom
Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(t);

if (unwrappedThrowable instanceof SearchPhaseExecutionException) {
SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) unwrappedThrowable;
for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) {
Throwable unwrappedShardFailure = org.elasticsearch.ExceptionsHelper.unwrapCause(shardFailure.getCause());

if (unwrappedShardFailure instanceof ElasticsearchException) {
return unwrappedShardFailure;
}
}
}

return t;
}

/**
* Return the best error message possible given a already unwrapped exception.
*
Expand Down
Loading