Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,12 @@
],
"sqlState" : "42710"
},
"DATA_SOURCE_EXTERNAL_ERROR" : {
"message" : [
"Encountered error when saving to external data source."
],
"sqlState" : "KD00F"
},
"DATA_SOURCE_NOT_EXIST" : {
"message" : [
"Data source '<provider>' not found. Please make sure the data source is registered."
Expand Down
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-states.json
Original file line number Diff line number Diff line change
Expand Up @@ -7417,6 +7417,12 @@
"standard": "N",
"usedBy": ["Databricks"]
},
"KD00F": {
"description": "external data source failure",
"origin": "Databricks",
"standard": "N",
"usedBy": ["Databricks"]
},
"P0000": {
"description": "procedural logic error",
"origin": "PostgreSQL",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3959,6 +3959,14 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
messageParameters = Map("provider" -> name))
}

def externalDataSourceException(cause: Throwable): Throwable = {
new AnalysisException(
errorClass = "DATA_SOURCE_EXTERNAL_ERROR",
messageParameters = Map(),
cause = Some(cause)
)
}

def foundMultipleDataSources(provider: String): Throwable = {
new AnalysisException(
errorClass = "FOUND_MULTIPLE_DATA_SOURCES",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package org.apache.spark.sql.execution.datasources

import scala.util.control.NonFatal

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.sources.CreatableRelationProvider
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider}

/**
* Saves the results of `query` in to a data source.
Expand All @@ -44,8 +46,26 @@ case class SaveIntoDataSourceCommand(
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)

override def run(sparkSession: SparkSession): Seq[Row] = {
val relation = dataSource.createRelation(
sparkSession.sqlContext, mode, options, Dataset.ofRows(sparkSession, query))
var relation: BaseRelation = null

try {
relation = dataSource.createRelation(
sparkSession.sqlContext, mode, options, Dataset.ofRows(sparkSession, query))
} catch {
case e: SparkThrowable =>
// We should avoid wrapping `SparkThrowable` exceptions into another `AnalysisException`.
throw e
case e @ (_: NullPointerException | _: MatchError | _: ArrayIndexOutOfBoundsException) =>
// These are some of the exceptions thrown by the data source API. We catch these
// exceptions here and rethrow QueryCompilationErrors.externalDataSourceException to
// provide a more friendly error message for the user. This list is not exhaustive.
throw QueryCompilationErrors.externalDataSourceException(e)
case e: Throwable =>
// For other exceptions, just rethrow it, since we don't have enough information to
// provide a better error message for the user at the moment. We may want to further
// improve the error message handling in the future.
throw e
}

try {
val logicalRelation = LogicalRelation(relation, toAttributes(relation.schema), None, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.api.java.{UDF1, UDF2, UDF23Test}
import org.apache.spark.sql.catalyst.expressions.{Coalesce, Literal, UnsafeRow}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
import org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter
import org.apache.spark.sql.expressions.SparkUserDefinedFunction
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -926,6 +927,25 @@ class QueryCompilationErrorsSuite
})
}
}

test("Catch and log errors when failing to write to external data source") {
val password = "MyPassWord"
val token = "MyToken"
val value = "value"
val options = Map("password" -> password, "token" -> token, "key" -> value)
val query = spark.range(10).logicalPlan
val cmd = SaveIntoDataSourceCommand(query, null, options, SaveMode.Overwrite)

checkError(
exception = intercept[AnalysisException] {
cmd.run(spark)
},
condition = "DATA_SOURCE_EXTERNAL_ERROR",
sqlState = "KD00F",
parameters = Map.empty
)
}

}

class MyCastToString extends SparkUserDefinedFunction(
Expand Down