diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index c78e87d0b8463..88ba00a8a1aea 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -83,14 +83,16 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu def testCreateTableWithProperty(tbl: String): Unit = {} - def checkErrorFailedLoadTable(e: AnalysisException, tbl: String): Unit = { - checkError( + private def checkErrorFailedJDBC( + e: AnalysisException, + errorClass: String, + tbl: String): Unit = { + checkErrorMatchPVals( exception = e, - errorClass = "FAILED_JDBC.UNCLASSIFIED", + errorClass = errorClass, parameters = Map( - "url" -> "jdbc:", - "message" -> s"Failed to load table: $tbl" - ) + "url" -> "jdbc:.*", + "tableName" -> s"`$tbl`") ) } @@ -132,7 +134,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"ALTER TABLE $catalogName.not_existing_table ADD COLUMNS (C4 STRING)") } - checkErrorFailedLoadTable(e, "not_existing_table") + checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table") } test("SPARK-33034: ALTER TABLE ... drop column") { @@ -154,7 +156,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"ALTER TABLE $catalogName.not_existing_table DROP COLUMN C1") } - checkErrorFailedLoadTable(e, "not_existing_table") + checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table") } test("SPARK-33034: ALTER TABLE ... update column type") { @@ -170,7 +172,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN id TYPE DOUBLE") } - checkErrorFailedLoadTable(e, "not_existing_table") + checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table") } test("SPARK-33034: ALTER TABLE ... rename column") { @@ -198,7 +200,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"ALTER TABLE $catalogName.not_existing_table RENAME COLUMN ID TO C") } - checkErrorFailedLoadTable(e, "not_existing_table") + checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table") } test("SPARK-33034: ALTER TABLE ... update column nullability") { @@ -209,7 +211,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN ID DROP NOT NULL") } - checkErrorFailedLoadTable(e, "not_existing_table") + checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table") } test("CREATE TABLE with table comment") { @@ -231,7 +233,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')") } - assert(e.getErrorClass == "FAILED_JDBC.UNCLASSIFIED") + checkErrorFailedJDBC(e, "FAILED_JDBC.CREATE_TABLE", "new_table") testCreateTableWithProperty(s"$catalogName.new_table") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala index 8f537aacebe5f..5e79dbbb4d72e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala @@ -26,7 +26,8 @@ import org.apache.spark.sql.types.{DataType, MetadataBuilder} * * @param dialects List of dialects. */ -private class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect { +private class AggregatedDialect(dialects: List[JdbcDialect]) + extends JdbcDialect with NoLegacyJDBCError { require(dialects.nonEmpty) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 934ccdb51aa39..8ccf94166a70e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.connector.expressions.Expression import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.sql.types._ -private case class DB2Dialect() extends JdbcDialect with SQLConfHelper { +private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:db2") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala index 54b8c2622827e..af77f8575dd86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo import org.apache.spark.sql.types._ -private case class DatabricksDialect() extends JdbcDialect { +private case class DatabricksDialect() extends JdbcDialect with NoLegacyJDBCError { override def canHandle(url: String): Boolean = { url.startsWith("jdbc:databricks") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala index 23da4dbb60a5f..7b65a01b5e702 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors import org.apache.spark.sql.types._ -private case class DerbyDialect() extends JdbcDialect { +private case class DerbyDialect() extends JdbcDialect with NoLegacyJDBCError { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:derby") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 949455b248ffd..3ece44ece9e6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, N import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, MetadataBuilder, ShortType, StringType, TimestampType} -private[sql] case class H2Dialect() extends JdbcDialect { +private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 4ebe73292f11e..290665020f883 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -841,6 +841,23 @@ abstract class JdbcDialect extends Serializable with Logging { metadata: MetadataBuilder): Unit = {} } +/** + * Make the `classifyException` method throw out the original exception + */ +trait NoLegacyJDBCError extends JdbcDialect { + + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = { + new AnalysisException( + errorClass = errorClass, + messageParameters = messageParameters, + cause = Some(e)) + } +} + /** * :: DeveloperApi :: * Registry of dialects that apply to every new jdbc `org.apache.spark.sql.DataFrame`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index e341bf3720f46..d03602b0338c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.jdbc.MsSqlServerDialect.{GEOGRAPHY, GEOMETRY} import org.apache.spark.sql.types._ -private case class MsSqlServerDialect() extends JdbcDialect { +private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCError { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index d2034812cdd3a..0f1bccbb01d51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.types._ -private case class MySQLDialect() extends JdbcDialect with SQLConfHelper { +private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError { override def canHandle(url : String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:mysql") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index 38fee89bf726a..627007e275599 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.jdbc.OracleDialect._ import org.apache.spark.sql.types._ -private case class OracleDialect() extends JdbcDialect with SQLConfHelper { +private case class OracleDialect() extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:oracle") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 93052a0c37b59..03fefd82802ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -37,7 +37,8 @@ import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo import org.apache.spark.sql.types._ -private case class PostgresDialect() extends JdbcDialect with SQLConfHelper { +private case class PostgresDialect() + extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:postgresql") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala index 276364d5d89ed..a443a798db7c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala @@ -22,7 +22,7 @@ import java.util.Locale import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.types.{BooleanType, DataType} -private case class SnowflakeDialect() extends JdbcDialect { +private case class SnowflakeDialect() extends JdbcDialect with NoLegacyJDBCError { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:snowflake") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala index 95a9f60b64ed8..322b259485f56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.types._ -private case class TeradataDialect() extends JdbcDialect { +private case class TeradataDialect() extends JdbcDialect with NoLegacyJDBCError { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:teradata") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index daf5d8507ecc1..d2ff33e104772 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -619,15 +619,15 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { test("CREATE TABLE with table property") { withTable("h2.test.new_table") { - checkError( + checkErrorMatchPVals( exception = intercept[AnalysisException] { sql("CREATE TABLE h2.test.new_table(i INT, j STRING)" + " TBLPROPERTIES('ENGINE'='tableEngineName')") }, - errorClass = "FAILED_JDBC.UNCLASSIFIED", + errorClass = "FAILED_JDBC.CREATE_TABLE", parameters = Map( - "url" -> "jdbc:", - "message" -> "Failed table creation: test.new_table")) + "url" -> "jdbc:.*", + "tableName" -> "`test`.`new_table`")) } } @@ -639,14 +639,14 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } test("SPARK-42904: CREATE TABLE with char/varchar with invalid char length") { - checkError( + checkErrorMatchPVals( exception = intercept[AnalysisException]{ sql("CREATE TABLE h2.test.new_table(c CHAR(1000000001))") }, - errorClass = "FAILED_JDBC.UNCLASSIFIED", + errorClass = "FAILED_JDBC.CREATE_TABLE", parameters = Map( - "url" -> "jdbc:", - "message" -> "Failed table creation: test.new_table")) + "url" -> "jdbc:.*", + "tableName" -> "`test`.`new_table`")) } test("SPARK-42955: Skip classifyException and wrap AnalysisException for SparkThrowable") {