Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources.v2

import scala.util.control.NonFatal

import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.internal.Logging
Expand All @@ -27,6 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
Expand Down Expand Up @@ -107,7 +110,13 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e
throw new SparkException("Writing job failed.", cause)
}
logError(s"Data source writer $writer aborted.")
throw new SparkException("Writing job aborted.", cause)
cause match {
// Do not wrap interruption exceptions that will be handled by streaming specially.
case _ if StreamExecution.isInterruptionException(cause) => throw cause
// Only wrap non fatal exceptions.
case NonFatal(e) => throw new SparkException("Writing job aborted.", e)
case _ => throw cause
}
}

sparkContext.emptyRDD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,25 +356,7 @@ abstract class StreamExecution(

private def isInterruptedByStop(e: Throwable): Boolean = {
if (state.get == TERMINATED) {
e match {
// InterruptedIOException - thrown when an I/O operation is interrupted
// ClosedByInterruptException - thrown when an I/O operation upon a channel is interrupted
case _: InterruptedException | _: InterruptedIOException | _: ClosedByInterruptException =>
true
// The cause of the following exceptions may be one of the above exceptions:
//
// UncheckedIOException - thrown by codes that cannot throw a checked IOException, such as
// BiFunction.apply
// ExecutionException - thrown by codes running in a thread pool and these codes throw an
// exception
// UncheckedExecutionException - thrown by codes that cannot throw a checked
// ExecutionException, such as BiFunction.apply
case e2 @ (_: UncheckedIOException | _: ExecutionException | _: UncheckedExecutionException)
if e2.getCause != null =>
isInterruptedByStop(e2.getCause)
case _ =>
false
}
StreamExecution.isInterruptionException(e)
} else {
false
}
Expand Down Expand Up @@ -565,6 +547,26 @@ abstract class StreamExecution(

object StreamExecution {
val QUERY_ID_KEY = "sql.streaming.queryId"

def isInterruptionException(e: Throwable): Boolean = e match {
// InterruptedIOException - thrown when an I/O operation is interrupted
// ClosedByInterruptException - thrown when an I/O operation upon a channel is interrupted
case _: InterruptedException | _: InterruptedIOException | _: ClosedByInterruptException =>
true
// The cause of the following exceptions may be one of the above exceptions:
//
// UncheckedIOException - thrown by codes that cannot throw a checked IOException, such as
// BiFunction.apply
// ExecutionException - thrown by codes running in a thread pool and these codes throw an
// exception
// UncheckedExecutionException - thrown by codes that cannot throw a checked
// ExecutionException, such as BiFunction.apply
case e2 @ (_: UncheckedIOException | _: ExecutionException | _: UncheckedExecutionException)
if e2.getCause != null =>
isInterruptionException(e2.getCause)
case _ =>
false
}
}

/**
Expand Down