From a4b01b005b7830fa3e86f007e443c99cc41ac9a7 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Tue, 11 Jun 2024 17:29:21 +0800 Subject: [PATCH 1/6] test first --- .../apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 78 +++++++++++++------ .../datasources/jdbc/JdbcUtils.scala | 5 +- .../datasources/v2/jdbc/JDBCTable.scala | 6 +- .../v2/jdbc/JDBCTableCatalog.scala | 37 +++------ .../apache/spark/sql/jdbc/DB2Dialect.scala | 7 +- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 5 +- .../apache/spark/sql/jdbc/JdbcDialects.scala | 9 ++- .../spark/sql/jdbc/MsSqlServerDialect.scala | 7 +- .../apache/spark/sql/jdbc/MySQLDialect.scala | 7 +- .../spark/sql/jdbc/PostgresDialect.scala | 9 +-- 10 files changed, 90 insertions(+), 80 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index c78e87d0b8463..b91d040bb55f9 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -129,10 +129,15 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu ) } // Add a column to not existing table - val e = intercept[AnalysisException] { - sql(s"ALTER TABLE $catalogName.not_existing_table ADD COLUMNS (C4 STRING)") - } - checkErrorFailedLoadTable(e, "not_existing_table") + checkErrorMatchPVals( + exception = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.not_existing_table ADD COLUMNS (C4 STRING)") + }, + errorClass = "FAILED_JDBC.LOAD_TABLE", + parameters = Map( + "url" -> "jdbc:.*", + "tableName" -> "`not_existing_table`") + ) } test("SPARK-33034: ALTER TABLE ... drop column") { @@ -151,10 +156,15 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu assert(msg.contains(s"Missing field bad_column in table $catalogName.alt_table")) } // Drop a column from a not existing table - val e = intercept[AnalysisException] { - sql(s"ALTER TABLE $catalogName.not_existing_table DROP COLUMN C1") - } - checkErrorFailedLoadTable(e, "not_existing_table") + checkErrorMatchPVals( + exception = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.not_existing_table DROP COLUMN C1") + }, + errorClass = "FAILED_JDBC.LOAD_TABLE", + parameters = Map( + "url" -> "jdbc:.*", + "tableName" -> "`not_existing_table`") + ) } test("SPARK-33034: ALTER TABLE ... update column type") { @@ -167,10 +177,15 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu assert(msg2.contains("Missing field bad_column")) } // Update column type in not existing table - val e = intercept[AnalysisException] { - sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN id TYPE DOUBLE") - } - checkErrorFailedLoadTable(e, "not_existing_table") + checkErrorMatchPVals( + exception = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN id TYPE DOUBLE") + }, + errorClass = "FAILED_JDBC.LOAD_TABLE", + parameters = Map( + "url" -> "jdbc:.*", + "tableName" -> "`not_existing_table`") + ) } test("SPARK-33034: ALTER TABLE ... rename column") { @@ -195,10 +210,15 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu ) } // Rename a column in a not existing table - val e = intercept[AnalysisException] { - sql(s"ALTER TABLE $catalogName.not_existing_table RENAME COLUMN ID TO C") - } - checkErrorFailedLoadTable(e, "not_existing_table") + checkErrorMatchPVals( + exception = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.not_existing_table RENAME COLUMN ID TO C") + }, + errorClass = "FAILED_JDBC.LOAD_TABLE", + parameters = Map( + "url" -> "jdbc:.*", + "tableName" -> "`not_existing_table`") + ) } test("SPARK-33034: ALTER TABLE ... update column nullability") { @@ -206,10 +226,15 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu testUpdateColumnNullability(s"$catalogName.alt_table") } // Update column nullability in not existing table - val e = intercept[AnalysisException] { - sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN ID DROP NOT NULL") - } - checkErrorFailedLoadTable(e, "not_existing_table") + checkErrorMatchPVals( + exception = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN ID DROP NOT NULL") + }, + errorClass = "FAILED_JDBC.LOAD_TABLE", + parameters = Map( + "url" -> "jdbc:.*", + "tableName" -> "`not_existing_table`") + ) } test("CREATE TABLE with table comment") { @@ -228,10 +253,15 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu test("CREATE TABLE with table property") { withTable(s"$catalogName.new_table") { - val e = intercept[AnalysisException] { - sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')") - } - assert(e.getErrorClass == "FAILED_JDBC.UNCLASSIFIED") + checkErrorMatchPVals( + exception = intercept[AnalysisException] { + sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')") + }, + errorClass = "FAILED_JDBC.CREATE_TABLE", + parameters = Map( + "url" -> "jdbc:.*", + "tableName" -> "`new_table`") + ) testCreateTableWithProperty(s"$catalogName.new_table") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index f7d2d61eab653..106033a92399e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -1261,14 +1261,13 @@ object JdbcUtils extends Logging with SQLConfHelper { def classifyException[T]( errorClass: String, messageParameters: Map[String, String], - dialect: JdbcDialect, - description: String)(f: => T): T = { + dialect: JdbcDialect)(f: => T): T = { try { f } catch { case e: SparkThrowable with Throwable => throw e case e: Throwable => - throw dialect.classifyException(e, errorClass, messageParameters, description) + throw dialect.classifyException(e, errorClass, messageParameters) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index 6828bb0f0c4d8..120a68075a8fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -69,8 +69,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt "url" -> jdbcOptions.getRedactUrl(), "indexName" -> toSQLId(indexName), "tableName" -> toSQLId(name)), - dialect = JdbcDialects.get(jdbcOptions.url), - description = s"Failed to create index $indexName in ${name()}") { + dialect = JdbcDialects.get(jdbcOptions.url)) { JdbcUtils.createIndex( conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions) } @@ -91,8 +90,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt "url" -> jdbcOptions.getRedactUrl(), "indexName" -> toSQLId(indexName), "tableName" -> toSQLId(name)), - dialect = JdbcDialects.get(jdbcOptions.url), - description = s"Failed to drop index $indexName in ${name()}") { + dialect = JdbcDialects.get(jdbcOptions.url)) { JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index e7a3fe0f8aa7b..21f07d51aa1c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -73,8 +73,7 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(namespace.toSeq)), - dialect, - description = s"Failed get tables from: ${namespace.mkString(".")}") { + dialect) { conn.getMetaData.getTables(null, schemaPattern, "%", Array("TABLE")) } new Iterator[Identifier] { @@ -93,8 +92,7 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "tableName" -> toSQLId(ident)), - dialect, - description = s"Failed table existence check: $ident") { + dialect) { JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions)) } } @@ -120,8 +118,7 @@ class JDBCTableCatalog extends TableCatalog "url" -> options.getRedactUrl(), "oldName" -> toSQLId(oldIdent), "newName" -> toSQLId(newIdent)), - dialect, - description = s"Failed table renaming from $oldIdent to $newIdent") { + dialect) { JdbcUtils.renameTable(conn, oldIdent, newIdent, options) } } @@ -136,9 +133,7 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "tableName" -> toSQLId(ident)), - dialect, - description = s"Failed to load table: $ident" - ) { + dialect) { val schema = JDBCRDD.resolveTable(optionsWithTableName) JDBCTable(ident, schema, optionsWithTableName) } @@ -200,8 +195,7 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "tableName" -> toSQLId(ident)), - dialect, - description = s"Failed table creation: $ident") { + dialect) { JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions) } } @@ -217,8 +211,7 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "tableName" -> toSQLId(ident)), - dialect, - description = s"Failed table altering: $ident") { + dialect) { JdbcUtils.alterTable(conn, getTableName(ident), changes, options) } loadTable(ident) @@ -233,8 +226,7 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(namespace.toSeq)), - dialect, - description = s"Failed namespace exists: ${namespace.mkString}") { + dialect) { JdbcUtils.schemaExists(conn, options, db) } } @@ -246,8 +238,7 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.classifyException( errorClass = "FAILED_JDBC.LIST_NAMESPACES", messageParameters = Map("url" -> options.getRedactUrl()), - dialect, - description = s"Failed list namespaces") { + dialect) { JdbcUtils.listSchemas(conn, options) } } @@ -300,8 +291,7 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(db)), - dialect, - description = s"Failed create name space: $db") { + dialect) { JdbcUtils.createSchema(conn, options, db, comment) } } @@ -325,8 +315,7 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(db)), - dialect, - description = s"Failed create comment on name space: $db") { + dialect) { JdbcUtils.alterSchemaComment(conn, options, db, set.value) } } @@ -342,8 +331,7 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(db)), - dialect, - description = s"Failed remove comment on name space: $db") { + dialect) { JdbcUtils.removeSchemaComment(conn, options, db) } } @@ -370,8 +358,7 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(db)), - dialect, - description = s"Failed drop name space: $db") { + dialect) { JdbcUtils.dropSchema(conn, options, db, cascade) true } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 934ccdb51aa39..4a5d9a3e5c54f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -156,8 +156,7 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper { override def classifyException( e: Throwable, errorClass: String, - messageParameters: Map[String, String], - description: String): AnalysisException = { + messageParameters: Map[String, String]): AnalysisException = { e match { case sqlException: SQLException => sqlException.getSQLState match { @@ -167,9 +166,9 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper { namespace = messageParameters.get("namespace").toArray, details = sqlException.getMessage, cause = Some(e)) - case _ => super.classifyException(e, errorClass, messageParameters, description) + case _ => super.classifyException(e, errorClass, messageParameters) } - case _ => super.classifyException(e, errorClass, messageParameters, description) + case _ => super.classifyException(e, errorClass, messageParameters) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 949455b248ffd..4045321beac54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -199,8 +199,7 @@ private[sql] case class H2Dialect() extends JdbcDialect { override def classifyException( e: Throwable, errorClass: String, - messageParameters: Map[String, String], - description: String): AnalysisException = { + messageParameters: Map[String, String]): AnalysisException = { e match { case exception: SQLException => // Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html @@ -244,7 +243,7 @@ private[sql] case class H2Dialect() extends JdbcDialect { } case _ => // do nothing } - super.classifyException(e, errorClass, messageParameters, description) + super.classifyException(e, errorClass, messageParameters) } override def compileExpression(expr: Expression): Option[String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 4ebe73292f11e..84c435d25d1df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -740,15 +740,16 @@ abstract class JdbcDialect extends Serializable with Logging { * @param e The dialect specific exception. * @param errorClass The error class assigned in the case of an unclassified `e` * @param messageParameters The message parameters of `errorClass` - * @param description The error description * @return `AnalysisException` or its sub-class. */ def classifyException( e: Throwable, errorClass: String, - messageParameters: Map[String, String], - description: String): AnalysisException = { - classifyException(description, e) + messageParameters: Map[String, String]): AnalysisException = { + new AnalysisException( + errorClass = errorClass, + messageParameters = messageParameters, + cause = Some(e)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index e341bf3720f46..cc53fd5e9694f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -206,8 +206,7 @@ private case class MsSqlServerDialect() extends JdbcDialect { override def classifyException( e: Throwable, errorClass: String, - messageParameters: Map[String, String], - description: String): AnalysisException = { + messageParameters: Map[String, String]): AnalysisException = { e match { case sqlException: SQLException => sqlException.getErrorCode match { @@ -216,9 +215,9 @@ private case class MsSqlServerDialect() extends JdbcDialect { namespace = messageParameters.get("namespace").toArray, details = sqlException.getMessage, cause = Some(e)) - case _ => super.classifyException(e, errorClass, messageParameters, description) + case _ => super.classifyException(e, errorClass, messageParameters) } - case _ => super.classifyException(e, errorClass, messageParameters, description) + case _ => super.classifyException(e, errorClass, messageParameters) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index d2034812cdd3a..af8999e41d96a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -331,8 +331,7 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper { override def classifyException( e: Throwable, errorClass: String, - messageParameters: Map[String, String], - description: String): AnalysisException = { + messageParameters: Map[String, String]): AnalysisException = { e match { case sqlException: SQLException => sqlException.getErrorCode match { @@ -345,10 +344,10 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper { val indexName = messageParameters("indexName") val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) - case _ => super.classifyException(e, errorClass, messageParameters, description) + case _ => super.classifyException(e, errorClass, messageParameters) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(e, errorClass, messageParameters, description) + case _ => super.classifyException(e, errorClass, messageParameters) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 93052a0c37b59..951278cf0cf81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -258,8 +258,7 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper { override def classifyException( e: Throwable, errorClass: String, - messageParameters: Map[String, String], - description: String): AnalysisException = { + messageParameters: Map[String, String]): AnalysisException = { e match { case sqlException: SQLException => sqlException.getSQLState match { @@ -278,7 +277,7 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper { if (tblRegexp.nonEmpty) { throw QueryCompilationErrors.tableAlreadyExistsError(tblRegexp.get.group(1)) } else { - super.classifyException(e, errorClass, messageParameters, description) + super.classifyException(e, errorClass, messageParameters) } } case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" => @@ -290,10 +289,10 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper { namespace = messageParameters.get("namespace").toArray, details = sqlException.getMessage, cause = Some(e)) - case _ => super.classifyException(e, errorClass, messageParameters, description) + case _ => super.classifyException(e, errorClass, messageParameters) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(e, errorClass, messageParameters, description) + case _ => super.classifyException(e, errorClass, messageParameters) } } From 0f046cbe9cc78d8f793920d0c617596d16d14360 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Tue, 11 Jun 2024 19:10:17 +0800 Subject: [PATCH 2/6] [SPARK-48585][SQL] Make `JdbcDialect.classifyException` throw out the original exception --- .../apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 92 +++++++------------ .../v2/jdbc/JDBCTableCatalogSuite.scala | 16 ++-- 2 files changed, 40 insertions(+), 68 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index b91d040bb55f9..88ba00a8a1aea 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -83,14 +83,16 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu def testCreateTableWithProperty(tbl: String): Unit = {} - def checkErrorFailedLoadTable(e: AnalysisException, tbl: String): Unit = { - checkError( + private def checkErrorFailedJDBC( + e: AnalysisException, + errorClass: String, + tbl: String): Unit = { + checkErrorMatchPVals( exception = e, - errorClass = "FAILED_JDBC.UNCLASSIFIED", + errorClass = errorClass, parameters = Map( - "url" -> "jdbc:", - "message" -> s"Failed to load table: $tbl" - ) + "url" -> "jdbc:.*", + "tableName" -> s"`$tbl`") ) } @@ -129,15 +131,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu ) } // Add a column to not existing table - checkErrorMatchPVals( - exception = intercept[AnalysisException] { - sql(s"ALTER TABLE $catalogName.not_existing_table ADD COLUMNS (C4 STRING)") - }, - errorClass = "FAILED_JDBC.LOAD_TABLE", - parameters = Map( - "url" -> "jdbc:.*", - "tableName" -> "`not_existing_table`") - ) + val e = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.not_existing_table ADD COLUMNS (C4 STRING)") + } + checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table") } test("SPARK-33034: ALTER TABLE ... drop column") { @@ -156,15 +153,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu assert(msg.contains(s"Missing field bad_column in table $catalogName.alt_table")) } // Drop a column from a not existing table - checkErrorMatchPVals( - exception = intercept[AnalysisException] { - sql(s"ALTER TABLE $catalogName.not_existing_table DROP COLUMN C1") - }, - errorClass = "FAILED_JDBC.LOAD_TABLE", - parameters = Map( - "url" -> "jdbc:.*", - "tableName" -> "`not_existing_table`") - ) + val e = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.not_existing_table DROP COLUMN C1") + } + checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table") } test("SPARK-33034: ALTER TABLE ... update column type") { @@ -177,15 +169,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu assert(msg2.contains("Missing field bad_column")) } // Update column type in not existing table - checkErrorMatchPVals( - exception = intercept[AnalysisException] { - sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN id TYPE DOUBLE") - }, - errorClass = "FAILED_JDBC.LOAD_TABLE", - parameters = Map( - "url" -> "jdbc:.*", - "tableName" -> "`not_existing_table`") - ) + val e = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN id TYPE DOUBLE") + } + checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table") } test("SPARK-33034: ALTER TABLE ... rename column") { @@ -210,15 +197,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu ) } // Rename a column in a not existing table - checkErrorMatchPVals( - exception = intercept[AnalysisException] { - sql(s"ALTER TABLE $catalogName.not_existing_table RENAME COLUMN ID TO C") - }, - errorClass = "FAILED_JDBC.LOAD_TABLE", - parameters = Map( - "url" -> "jdbc:.*", - "tableName" -> "`not_existing_table`") - ) + val e = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.not_existing_table RENAME COLUMN ID TO C") + } + checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table") } test("SPARK-33034: ALTER TABLE ... update column nullability") { @@ -226,15 +208,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu testUpdateColumnNullability(s"$catalogName.alt_table") } // Update column nullability in not existing table - checkErrorMatchPVals( - exception = intercept[AnalysisException] { - sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN ID DROP NOT NULL") - }, - errorClass = "FAILED_JDBC.LOAD_TABLE", - parameters = Map( - "url" -> "jdbc:.*", - "tableName" -> "`not_existing_table`") - ) + val e = intercept[AnalysisException] { + sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN ID DROP NOT NULL") + } + checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table") } test("CREATE TABLE with table comment") { @@ -253,15 +230,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu test("CREATE TABLE with table property") { withTable(s"$catalogName.new_table") { - checkErrorMatchPVals( - exception = intercept[AnalysisException] { - sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')") - }, - errorClass = "FAILED_JDBC.CREATE_TABLE", - parameters = Map( - "url" -> "jdbc:.*", - "tableName" -> "`new_table`") - ) + val e = intercept[AnalysisException] { + sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')") + } + checkErrorFailedJDBC(e, "FAILED_JDBC.CREATE_TABLE", "new_table") testCreateTableWithProperty(s"$catalogName.new_table") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index daf5d8507ecc1..d2ff33e104772 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -619,15 +619,15 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { test("CREATE TABLE with table property") { withTable("h2.test.new_table") { - checkError( + checkErrorMatchPVals( exception = intercept[AnalysisException] { sql("CREATE TABLE h2.test.new_table(i INT, j STRING)" + " TBLPROPERTIES('ENGINE'='tableEngineName')") }, - errorClass = "FAILED_JDBC.UNCLASSIFIED", + errorClass = "FAILED_JDBC.CREATE_TABLE", parameters = Map( - "url" -> "jdbc:", - "message" -> "Failed table creation: test.new_table")) + "url" -> "jdbc:.*", + "tableName" -> "`test`.`new_table`")) } } @@ -639,14 +639,14 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } test("SPARK-42904: CREATE TABLE with char/varchar with invalid char length") { - checkError( + checkErrorMatchPVals( exception = intercept[AnalysisException]{ sql("CREATE TABLE h2.test.new_table(c CHAR(1000000001))") }, - errorClass = "FAILED_JDBC.UNCLASSIFIED", + errorClass = "FAILED_JDBC.CREATE_TABLE", parameters = Map( - "url" -> "jdbc:", - "message" -> "Failed table creation: test.new_table")) + "url" -> "jdbc:.*", + "tableName" -> "`test`.`new_table`")) } test("SPARK-42955: Skip classifyException and wrap AnalysisException for SparkThrowable") { From 49f23df30806647cf8fc4693dbe36432494238df Mon Sep 17 00:00:00 2001 From: panbingkun Date: Wed, 12 Jun 2024 19:30:23 +0800 Subject: [PATCH 3/6] update --- .../datasources/jdbc/JdbcUtils.scala | 5 ++- .../datasources/v2/jdbc/JDBCTable.scala | 6 ++- .../v2/jdbc/JDBCTableCatalog.scala | 37 +++++++++++++------ .../spark/sql/jdbc/AggregatedDialect.scala | 8 ++++ .../apache/spark/sql/jdbc/DB2Dialect.scala | 3 +- .../spark/sql/jdbc/DatabricksDialect.scala | 8 ++++ .../apache/spark/sql/jdbc/DerbyDialect.scala | 8 ++++ .../org/apache/spark/sql/jdbc/H2Dialect.scala | 3 +- .../apache/spark/sql/jdbc/JdbcDialects.scala | 18 ++++++++- .../spark/sql/jdbc/MsSqlServerDialect.scala | 3 +- .../apache/spark/sql/jdbc/MySQLDialect.scala | 3 +- .../apache/spark/sql/jdbc/OracleDialect.scala | 8 ++++ .../spark/sql/jdbc/PostgresDialect.scala | 3 +- .../spark/sql/jdbc/SnowflakeDialect.scala | 8 ++++ .../spark/sql/jdbc/TeradataDialect.scala | 8 ++++ 15 files changed, 107 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 106033a92399e..f7d2d61eab653 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -1261,13 +1261,14 @@ object JdbcUtils extends Logging with SQLConfHelper { def classifyException[T]( errorClass: String, messageParameters: Map[String, String], - dialect: JdbcDialect)(f: => T): T = { + dialect: JdbcDialect, + description: String)(f: => T): T = { try { f } catch { case e: SparkThrowable with Throwable => throw e case e: Throwable => - throw dialect.classifyException(e, errorClass, messageParameters) + throw dialect.classifyException(e, errorClass, messageParameters, description) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index 120a68075a8fe..6828bb0f0c4d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -69,7 +69,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt "url" -> jdbcOptions.getRedactUrl(), "indexName" -> toSQLId(indexName), "tableName" -> toSQLId(name)), - dialect = JdbcDialects.get(jdbcOptions.url)) { + dialect = JdbcDialects.get(jdbcOptions.url), + description = s"Failed to create index $indexName in ${name()}") { JdbcUtils.createIndex( conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions) } @@ -90,7 +91,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt "url" -> jdbcOptions.getRedactUrl(), "indexName" -> toSQLId(indexName), "tableName" -> toSQLId(name)), - dialect = JdbcDialects.get(jdbcOptions.url)) { + dialect = JdbcDialects.get(jdbcOptions.url), + description = s"Failed to drop index $indexName in ${name()}") { JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 21f07d51aa1c9..e7a3fe0f8aa7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -73,7 +73,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(namespace.toSeq)), - dialect) { + dialect, + description = s"Failed get tables from: ${namespace.mkString(".")}") { conn.getMetaData.getTables(null, schemaPattern, "%", Array("TABLE")) } new Iterator[Identifier] { @@ -92,7 +93,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "tableName" -> toSQLId(ident)), - dialect) { + dialect, + description = s"Failed table existence check: $ident") { JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions)) } } @@ -118,7 +120,8 @@ class JDBCTableCatalog extends TableCatalog "url" -> options.getRedactUrl(), "oldName" -> toSQLId(oldIdent), "newName" -> toSQLId(newIdent)), - dialect) { + dialect, + description = s"Failed table renaming from $oldIdent to $newIdent") { JdbcUtils.renameTable(conn, oldIdent, newIdent, options) } } @@ -133,7 +136,9 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "tableName" -> toSQLId(ident)), - dialect) { + dialect, + description = s"Failed to load table: $ident" + ) { val schema = JDBCRDD.resolveTable(optionsWithTableName) JDBCTable(ident, schema, optionsWithTableName) } @@ -195,7 +200,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "tableName" -> toSQLId(ident)), - dialect) { + dialect, + description = s"Failed table creation: $ident") { JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions) } } @@ -211,7 +217,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "tableName" -> toSQLId(ident)), - dialect) { + dialect, + description = s"Failed table altering: $ident") { JdbcUtils.alterTable(conn, getTableName(ident), changes, options) } loadTable(ident) @@ -226,7 +233,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(namespace.toSeq)), - dialect) { + dialect, + description = s"Failed namespace exists: ${namespace.mkString}") { JdbcUtils.schemaExists(conn, options, db) } } @@ -238,7 +246,8 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.classifyException( errorClass = "FAILED_JDBC.LIST_NAMESPACES", messageParameters = Map("url" -> options.getRedactUrl()), - dialect) { + dialect, + description = s"Failed list namespaces") { JdbcUtils.listSchemas(conn, options) } } @@ -291,7 +300,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(db)), - dialect) { + dialect, + description = s"Failed create name space: $db") { JdbcUtils.createSchema(conn, options, db, comment) } } @@ -315,7 +325,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(db)), - dialect) { + dialect, + description = s"Failed create comment on name space: $db") { JdbcUtils.alterSchemaComment(conn, options, db, set.value) } } @@ -331,7 +342,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(db)), - dialect) { + dialect, + description = s"Failed remove comment on name space: $db") { JdbcUtils.removeSchemaComment(conn, options, db) } } @@ -358,7 +370,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(db)), - dialect) { + dialect, + description = s"Failed drop name space: $db") { JdbcUtils.dropSchema(conn, options, db, cascade) true } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala index 8f537aacebe5f..7c672c0cf82b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.jdbc +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.types.{DataType, MetadataBuilder} /** @@ -76,4 +77,11 @@ private class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect cascade: Option[Boolean] = isCascadingTruncateTable()): String = { dialects.head.getTruncateQuery(table, cascade) } + + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = + super.classifyException(e, errorClass, messageParameters) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 4a5d9a3e5c54f..ea9082184f366 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -156,7 +156,8 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper { override def classifyException( e: Throwable, errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { + messageParameters: Map[String, String], + description: String): AnalysisException = { e match { case sqlException: SQLException => sqlException.getSQLState match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala index 54b8c2622827e..9871db51c92ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala @@ -21,6 +21,7 @@ import java.sql.Connection import scala.collection.mutable.ArrayBuilder +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo import org.apache.spark.sql.types._ @@ -87,4 +88,11 @@ private case class DatabricksDialect() extends JdbcDialect { } schemaBuilder.result() } + + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = + super.classifyException(e, errorClass, messageParameters) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala index 23da4dbb60a5f..024c7c1ac7731 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.jdbc import java.sql.Types import java.util.Locale +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.types._ @@ -94,4 +95,11 @@ private case class DerbyDialect() extends JdbcDialect { override def getLimitClause(limit: Integer): String = { "" } + + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = + super.classifyException(e, errorClass, messageParameters) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 4045321beac54..9e6fc31ea7277 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -199,7 +199,8 @@ private[sql] case class H2Dialect() extends JdbcDialect { override def classifyException( e: Throwable, errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { + messageParameters: Map[String, String], + description: String): AnalysisException = { e match { case exception: SQLException => // Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 84c435d25d1df..207efc8f2ac71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -742,7 +742,7 @@ abstract class JdbcDialect extends Serializable with Logging { * @param messageParameters The message parameters of `errorClass` * @return `AnalysisException` or its sub-class. */ - def classifyException( + final def classifyException( e: Throwable, errorClass: String, messageParameters: Map[String, String]): AnalysisException = { @@ -752,6 +752,22 @@ abstract class JdbcDialect extends Serializable with Logging { cause = Some(e)) } + /** + * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. + * @param e The dialect specific exception. + * @param errorClass The error class assigned in the case of an unclassified `e` + * @param messageParameters The message parameters of `errorClass` + * @param description The error description + * @return `AnalysisException` or its sub-class. + */ + def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = { + classifyException(description, e) + } + /** * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. * @param message The error message to be placed to the returned exception. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index cc53fd5e9694f..430d65c194808 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -206,7 +206,8 @@ private case class MsSqlServerDialect() extends JdbcDialect { override def classifyException( e: Throwable, errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { + messageParameters: Map[String, String], + description: String): AnalysisException = { e match { case sqlException: SQLException => sqlException.getErrorCode match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index af8999e41d96a..fdcadb860c6b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -331,7 +331,8 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper { override def classifyException( e: Throwable, errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { + messageParameters: Map[String, String], + description: String): AnalysisException = { e match { case sqlException: SQLException => sqlException.getErrorCode match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index 38fee89bf726a..ea8cce74d6a53 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -23,6 +23,7 @@ import java.util.Locale import scala.util.control.NonFatal import org.apache.spark.SparkUnsupportedOperationException +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.connector.expressions.Expression import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions @@ -229,6 +230,13 @@ private case class OracleDialect() extends JdbcDialect with SQLConfHelper { override def supportsLimit: Boolean = true override def supportsOffset: Boolean = true + + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = + super.classifyException(e, errorClass, messageParameters) } private[jdbc] object OracleDialect { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 951278cf0cf81..d1db52185fabc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -258,7 +258,8 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper { override def classifyException( e: Throwable, errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { + messageParameters: Map[String, String], + description: String): AnalysisException = { e match { case sqlException: SQLException => sqlException.getSQLState match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala index 276364d5d89ed..45b0c453c0d9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc import java.util.Locale +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.types.{BooleanType, DataType} @@ -33,4 +34,11 @@ private case class SnowflakeDialect() extends JdbcDialect { Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN)) case _ => JdbcUtils.getCommonJDBCType(dt) } + + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = + super.classifyException(e, errorClass, messageParameters) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala index 95a9f60b64ed8..7489e6bacecfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.jdbc import java.sql.Types import java.util.Locale +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.types._ @@ -100,4 +101,11 @@ private case class TeradataDialect() extends JdbcDialect { case _ => None } } + + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = + super.classifyException(e, errorClass, messageParameters) } From 812d5da093014dc35ec8d1e987c19a5d5ae847a2 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Thu, 13 Jun 2024 08:36:07 +0800 Subject: [PATCH 4/6] add JdbcDialectHelper --- .../spark/sql/jdbc/AggregatedDialect.scala | 11 ++--------- .../org/apache/spark/sql/jdbc/DB2Dialect.scala | 6 +++--- .../spark/sql/jdbc/DatabricksDialect.scala | 10 +--------- .../apache/spark/sql/jdbc/DerbyDialect.scala | 10 +--------- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 4 ++-- .../apache/spark/sql/jdbc/JdbcDialects.scala | 17 +++++++++++++++++ .../spark/sql/jdbc/MsSqlServerDialect.scala | 6 +++--- .../apache/spark/sql/jdbc/MySQLDialect.scala | 6 +++--- .../apache/spark/sql/jdbc/OracleDialect.scala | 10 +--------- .../apache/spark/sql/jdbc/PostgresDialect.scala | 9 +++++---- .../spark/sql/jdbc/SnowflakeDialect.scala | 10 +--------- .../apache/spark/sql/jdbc/TeradataDialect.scala | 10 +--------- 12 files changed, 40 insertions(+), 69 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala index 7c672c0cf82b9..ceb6dc388e2ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.jdbc -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.types.{DataType, MetadataBuilder} /** @@ -27,7 +26,8 @@ import org.apache.spark.sql.types.{DataType, MetadataBuilder} * * @param dialects List of dialects. */ -private class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect { +private class AggregatedDialect(dialects: List[JdbcDialect]) + extends JdbcDialect with JdbcDialectHelper { require(dialects.nonEmpty) @@ -77,11 +77,4 @@ private class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect cascade: Option[Boolean] = isCascadingTruncateTable()): String = { dialects.head.getTruncateQuery(table, cascade) } - - override def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String], - description: String): AnalysisException = - super.classifyException(e, errorClass, messageParameters) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index ea9082184f366..3da6b13de008b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.connector.expressions.Expression import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.sql.types._ -private case class DB2Dialect() extends JdbcDialect with SQLConfHelper { +private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with JdbcDialectHelper { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:db2") @@ -167,9 +167,9 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper { namespace = messageParameters.get("namespace").toArray, details = sqlException.getMessage, cause = Some(e)) - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(e, errorClass, messageParameters, description) } - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(e, errorClass, messageParameters, description) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala index 9871db51c92ca..c700994b040ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala @@ -21,12 +21,11 @@ import java.sql.Connection import scala.collection.mutable.ArrayBuilder -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo import org.apache.spark.sql.types._ -private case class DatabricksDialect() extends JdbcDialect { +private case class DatabricksDialect() extends JdbcDialect with JdbcDialectHelper { override def canHandle(url: String): Boolean = { url.startsWith("jdbc:databricks") @@ -88,11 +87,4 @@ private case class DatabricksDialect() extends JdbcDialect { } schemaBuilder.result() } - - override def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String], - description: String): AnalysisException = - super.classifyException(e, errorClass, messageParameters) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala index 024c7c1ac7731..9b7f6feda1fca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala @@ -20,13 +20,12 @@ package org.apache.spark.sql.jdbc import java.sql.Types import java.util.Locale -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.types._ -private case class DerbyDialect() extends JdbcDialect { +private case class DerbyDialect() extends JdbcDialect with JdbcDialectHelper { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:derby") @@ -95,11 +94,4 @@ private case class DerbyDialect() extends JdbcDialect { override def getLimitClause(limit: Integer): String = { "" } - - override def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String], - description: String): AnalysisException = - super.classifyException(e, errorClass, messageParameters) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 9e6fc31ea7277..5907b22ff8a62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, N import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, MetadataBuilder, ShortType, StringType, TimestampType} -private[sql] case class H2Dialect() extends JdbcDialect { +private[sql] case class H2Dialect() extends JdbcDialect with JdbcDialectHelper { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2") @@ -244,7 +244,7 @@ private[sql] case class H2Dialect() extends JdbcDialect { } case _ => // do nothing } - super.classifyException(e, errorClass, messageParameters) + super.classifyException(e, errorClass, messageParameters, description) } override def compileExpression(expr: Expression): Option[String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 207efc8f2ac71..a5e1adba33f86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -858,6 +858,23 @@ abstract class JdbcDialect extends Serializable with Logging { metadata: MetadataBuilder): Unit = {} } +/** + * Make the `classifyException` method throw out the original exception + */ +trait JdbcDialectHelper { + + def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = { + new AnalysisException( + errorClass = errorClass, + messageParameters = messageParameters, + cause = Some(e)) + } +} + /** * :: DeveloperApi :: * Registry of dialects that apply to every new jdbc `org.apache.spark.sql.DataFrame`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 430d65c194808..c082c890bca28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.jdbc.MsSqlServerDialect.{GEOGRAPHY, GEOMETRY} import org.apache.spark.sql.types._ -private case class MsSqlServerDialect() extends JdbcDialect { +private case class MsSqlServerDialect() extends JdbcDialect with JdbcDialectHelper { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver") @@ -216,9 +216,9 @@ private case class MsSqlServerDialect() extends JdbcDialect { namespace = messageParameters.get("namespace").toArray, details = sqlException.getMessage, cause = Some(e)) - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(e, errorClass, messageParameters, description) } - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(e, errorClass, messageParameters, description) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index fdcadb860c6b2..bc89b7307c0e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.types._ -private case class MySQLDialect() extends JdbcDialect with SQLConfHelper { +private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with JdbcDialectHelper { override def canHandle(url : String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:mysql") @@ -345,10 +345,10 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper { val indexName = messageParameters("indexName") val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(e, errorClass, messageParameters, description) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(e, errorClass, messageParameters, description) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index ea8cce74d6a53..c7ce678653f8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -23,7 +23,6 @@ import java.util.Locale import scala.util.control.NonFatal import org.apache.spark.SparkUnsupportedOperationException -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.connector.expressions.Expression import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions @@ -31,7 +30,7 @@ import org.apache.spark.sql.jdbc.OracleDialect._ import org.apache.spark.sql.types._ -private case class OracleDialect() extends JdbcDialect with SQLConfHelper { +private case class OracleDialect() extends JdbcDialect with SQLConfHelper with JdbcDialectHelper { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:oracle") @@ -230,13 +229,6 @@ private case class OracleDialect() extends JdbcDialect with SQLConfHelper { override def supportsLimit: Boolean = true override def supportsOffset: Boolean = true - - override def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String], - description: String): AnalysisException = - super.classifyException(e, errorClass, messageParameters) } private[jdbc] object OracleDialect { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index d1db52185fabc..61af515b93851 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -37,7 +37,8 @@ import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo import org.apache.spark.sql.types._ -private case class PostgresDialect() extends JdbcDialect with SQLConfHelper { +private case class PostgresDialect() + extends JdbcDialect with SQLConfHelper with JdbcDialectHelper { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:postgresql") @@ -278,7 +279,7 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper { if (tblRegexp.nonEmpty) { throw QueryCompilationErrors.tableAlreadyExistsError(tblRegexp.get.group(1)) } else { - super.classifyException(e, errorClass, messageParameters) + super.classifyException(e, errorClass, messageParameters, description) } } case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" => @@ -290,10 +291,10 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper { namespace = messageParameters.get("namespace").toArray, details = sqlException.getMessage, cause = Some(e)) - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(e, errorClass, messageParameters, description) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(e, errorClass, messageParameters, description) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala index 45b0c453c0d9f..69219b443afc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala @@ -19,11 +19,10 @@ package org.apache.spark.sql.jdbc import java.util.Locale -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.types.{BooleanType, DataType} -private case class SnowflakeDialect() extends JdbcDialect { +private case class SnowflakeDialect() extends JdbcDialect with JdbcDialectHelper { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:snowflake") @@ -34,11 +33,4 @@ private case class SnowflakeDialect() extends JdbcDialect { Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN)) case _ => JdbcUtils.getCommonJDBCType(dt) } - - override def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String], - description: String): AnalysisException = - super.classifyException(e, errorClass, messageParameters) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala index 7489e6bacecfb..3b540a093084b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala @@ -20,12 +20,11 @@ package org.apache.spark.sql.jdbc import java.sql.Types import java.util.Locale -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.types._ -private case class TeradataDialect() extends JdbcDialect { +private case class TeradataDialect() extends JdbcDialect with JdbcDialectHelper { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:teradata") @@ -101,11 +100,4 @@ private case class TeradataDialect() extends JdbcDialect { case _ => None } } - - override def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String], - description: String): AnalysisException = - super.classifyException(e, errorClass, messageParameters) } From a05e3f82d72046f41cc5977d1429d9bbee35d535 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Thu, 13 Jun 2024 10:10:31 +0800 Subject: [PATCH 5/6] update --- .../apache/spark/sql/jdbc/JdbcDialects.scala | 21 ++----------------- 1 file changed, 2 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index a5e1adba33f86..961c634b05752 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -735,23 +735,6 @@ abstract class JdbcDialect extends Serializable with Logging { throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3182") } - /** - * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. - * @param e The dialect specific exception. - * @param errorClass The error class assigned in the case of an unclassified `e` - * @param messageParameters The message parameters of `errorClass` - * @return `AnalysisException` or its sub-class. - */ - final def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { - new AnalysisException( - errorClass = errorClass, - messageParameters = messageParameters, - cause = Some(e)) - } - /** * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. * @param e The dialect specific exception. @@ -861,9 +844,9 @@ abstract class JdbcDialect extends Serializable with Logging { /** * Make the `classifyException` method throw out the original exception */ -trait JdbcDialectHelper { +trait JdbcDialectHelper extends JdbcDialect { - def classifyException( + override def classifyException( e: Throwable, errorClass: String, messageParameters: Map[String, String], From 59d40c486e58522f0de0facc5924013415aecd8a Mon Sep 17 00:00:00 2001 From: panbingkun Date: Mon, 17 Jun 2024 15:35:32 +0800 Subject: [PATCH 6/6] rename JdbcDialectHelper to NoLegacyJDBCError --- .../scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala | 2 +- .../src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala | 2 +- .../scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala | 2 +- .../src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala | 2 +- .../src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala | 2 +- .../src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 2 +- .../scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala | 2 +- .../src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala | 2 +- .../main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala | 2 +- .../main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 2 +- .../main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala | 2 +- .../main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala | 2 +- 12 files changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala index ceb6dc388e2ee..5e79dbbb4d72e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.types.{DataType, MetadataBuilder} * @param dialects List of dialects. */ private class AggregatedDialect(dialects: List[JdbcDialect]) - extends JdbcDialect with JdbcDialectHelper { + extends JdbcDialect with NoLegacyJDBCError { require(dialects.nonEmpty) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 3da6b13de008b..8ccf94166a70e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.connector.expressions.Expression import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.sql.types._ -private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with JdbcDialectHelper { +private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:db2") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala index c700994b040ef..af77f8575dd86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DatabricksDialect.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo import org.apache.spark.sql.types._ -private case class DatabricksDialect() extends JdbcDialect with JdbcDialectHelper { +private case class DatabricksDialect() extends JdbcDialect with NoLegacyJDBCError { override def canHandle(url: String): Boolean = { url.startsWith("jdbc:databricks") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala index 9b7f6feda1fca..7b65a01b5e702 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors import org.apache.spark.sql.types._ -private case class DerbyDialect() extends JdbcDialect with JdbcDialectHelper { +private case class DerbyDialect() extends JdbcDialect with NoLegacyJDBCError { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:derby") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 5907b22ff8a62..3ece44ece9e6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, N import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, MetadataBuilder, ShortType, StringType, TimestampType} -private[sql] case class H2Dialect() extends JdbcDialect with JdbcDialectHelper { +private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 961c634b05752..290665020f883 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -844,7 +844,7 @@ abstract class JdbcDialect extends Serializable with Logging { /** * Make the `classifyException` method throw out the original exception */ -trait JdbcDialectHelper extends JdbcDialect { +trait NoLegacyJDBCError extends JdbcDialect { override def classifyException( e: Throwable, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index c082c890bca28..d03602b0338c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.jdbc.MsSqlServerDialect.{GEOGRAPHY, GEOMETRY} import org.apache.spark.sql.types._ -private case class MsSqlServerDialect() extends JdbcDialect with JdbcDialectHelper { +private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCError { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index bc89b7307c0e3..0f1bccbb01d51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.types._ -private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with JdbcDialectHelper { +private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError { override def canHandle(url : String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:mysql") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index c7ce678653f8d..627007e275599 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.jdbc.OracleDialect._ import org.apache.spark.sql.types._ -private case class OracleDialect() extends JdbcDialect with SQLConfHelper with JdbcDialectHelper { +private case class OracleDialect() extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:oracle") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 61af515b93851..03fefd82802ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.types._ private case class PostgresDialect() - extends JdbcDialect with SQLConfHelper with JdbcDialectHelper { + extends JdbcDialect with SQLConfHelper with NoLegacyJDBCError { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:postgresql") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala index 69219b443afc5..a443a798db7c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/SnowflakeDialect.scala @@ -22,7 +22,7 @@ import java.util.Locale import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.types.{BooleanType, DataType} -private case class SnowflakeDialect() extends JdbcDialect with JdbcDialectHelper { +private case class SnowflakeDialect() extends JdbcDialect with NoLegacyJDBCError { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:snowflake") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala index 3b540a093084b..322b259485f56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/TeradataDialect.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.types._ -private case class TeradataDialect() extends JdbcDialect with JdbcDialectHelper { +private case class TeradataDialect() extends JdbcDialect with NoLegacyJDBCError { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:teradata")