diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 39dbba574f3..511515bc35c 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -498,7 +498,6 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { return; } - lastJobFailed = false; if (jobEnd.jobResult() instanceof JobFailed) { JobFailed jobFailed = (JobFailed) jobEnd.jobResult(); Exception exception = jobFailed.exception(); @@ -510,9 +509,15 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { jobSpan.setErrorMessage(errorMessage); jobSpan.setTag(DDTags.ERROR_STACK, errorStackTrace); jobSpan.setTag(DDTags.ERROR_TYPE, "Spark Job Failed"); - lastJobFailed = true; - lastJobFailedMessage = errorMessage; - lastJobFailedStackTrace = errorStackTrace; + + // Only propagate the error to the application if it is not a cancellation + if (errorMessage != null && !errorMessage.toLowerCase().contains("cancelled")) { + lastJobFailed = true; + lastJobFailedMessage = errorMessage; + lastJobFailedStackTrace = errorStackTrace; + } + } else { + lastJobFailed = false; } SparkAggregatedTaskMetrics metrics = jobMetrics.remove(jobEnd.jobId()); diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy index 3b9a1c32185..2e910eafe09 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy @@ -121,6 +121,12 @@ abstract class AbstractSparkListenerTest extends AgentTestRunner { return new SparkListenerJobEnd(jobId, time, JobSucceeded$.MODULE$) } + protected jobFailedEvent(Integer jobId, Long time, String errorMessage) { + def exception = new RuntimeException(errorMessage) + def jobFailed = new org.apache.spark.scheduler.JobFailed(exception) + return new SparkListenerJobEnd(jobId, time, jobFailed) + } + protected stageSubmittedEvent(Integer stageId, Long time) { def stageInfo = createStageInfo(stageId) stageInfo.submissionTime = Option.apply(time) @@ -457,6 +463,34 @@ abstract class AbstractSparkListenerTest extends AgentTestRunner { } } + def "test lastJobFailed is not set when job is cancelled"() { + setup: + def listener = getTestDatadogSparkListener() + listener.onApplicationStart(applicationStartEvent(1000L)) + listener.onJobStart(jobStartEvent(1, 1900L, [1])) + listener.onJobEnd(jobFailedEvent(1, 2200L, "Job was cancelled by user")) + listener.onApplicationEnd(new SparkListenerApplicationEnd(2300L)) + + expect: + assertTraces(1) { + trace(2) { + span { + operationName "spark.application" + resourceName "spark.application" + spanType "spark" + errored false + parent() + } + span { + operationName "spark.job" + spanType "spark" + errored true + childOf(span(0)) + } + } + } + } + protected validateRelativeError(double value, double expected, double relativeAccuracy) { double relativeError = Math.abs(value - expected) / expected assert relativeError < relativeAccuracy