From 07362187dfa374a9e9c8c5eebbbb77f7b67ac70d Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 21 Dec 2023 17:48:49 +0300 Subject: [PATCH 01/15] Restore backward compatibility of JdbcDialect.classifyException --- .../apache/spark/sql/internal/SQLConf.scala | 12 ++++ .../datasources/jdbc/JdbcUtils.scala | 11 +++- .../datasources/v2/jdbc/JDBCTable.scala | 6 +- .../v2/jdbc/JDBCTableCatalog.scala | 33 +++++++---- .../apache/spark/sql/jdbc/DB2Dialect.scala | 16 ++---- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 33 +++++------ .../apache/spark/sql/jdbc/JdbcDialects.scala | 13 ++--- .../spark/sql/jdbc/MsSqlServerDialect.scala | 16 ++---- .../apache/spark/sql/jdbc/MySQLDialect.scala | 28 +++++----- .../spark/sql/jdbc/PostgresDialect.scala | 55 +++++++++---------- 10 files changed, 119 insertions(+), 104 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d54cb3756638e..644532d06df67 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4612,6 +4612,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val CLASSIFY_JDBC_EXCEPTION_IN_DIALECT = + buildConf("spark.sql.legacy.classifyJDBCExceptionInDialect") + .internal() + .doc("When set to true, Spark delegates classification of JDBC exceptions to a dialect. " + + "If it is false, the JDBC table catalog converts exceptions from JDBC dialects " + + "to Spark exceptions.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * @@ -5035,6 +5045,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def broadcastHashJoinOutputPartitioningExpandLimit: Int = getConf(BROADCAST_HASH_JOIN_OUTPUT_PARTITIONING_EXPAND_LIMIT) + def classifyJDBCExceptionInDialect: Boolean = getConf(CLASSIFY_JDBC_EXCEPTION_IN_DIALECT) + /** * Returns the [[Resolver]] for the current configuration, which can be used to determine if two * identifiers are equal. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index e7835514a3842..3ac60e57fd91b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -31,7 +31,7 @@ import scala.util.control.NonFatal import org.apache.spark.{SparkThrowable, TaskContext} import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -43,6 +43,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, TableChange} import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex} import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType, NoopDialect} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -1183,12 +1184,16 @@ object JdbcUtils extends Logging with SQLConfHelper { def classifyException[T]( errorClass: String, messageParameters: Map[String, String], - dialect: JdbcDialect)(f: => T): T = { + dialect: JdbcDialect, + legacyMessage: String)(f: => T): T = { try { f } catch { case e: SparkThrowable with Throwable => throw e - case e: Throwable => throw dialect.classifyException(e, errorClass, messageParameters) + case e: Throwable if SQLConf.get.classifyJDBCExceptionInDialect => + throw dialect.classifyException(legacyMessage, e) + case e: Throwable => + throw new AnalysisException(errorClass, messageParameters, cause = Some(e)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index c251010881f34..96a3d62d101b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -69,7 +69,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt "url" -> jdbcOptions.url, "indexName" -> toSQLId(indexName), "tableName" -> toSQLId(name)), - dialect = JdbcDialects.get(jdbcOptions.url)) { + dialect = JdbcDialects.get(jdbcOptions.url), + legacyMessage = s"Failed to create index $indexName in ${name()}") { JdbcUtils.createIndex( conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions) } @@ -90,7 +91,8 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt "url" -> jdbcOptions.url, "indexName" -> toSQLId(indexName), "tableName" -> toSQLId(name)), - dialect = JdbcDialects.get(jdbcOptions.url)) { + dialect = JdbcDialects.get(jdbcOptions.url), + legacyMessage = s"Failed to drop index $indexName in ${name()}") { JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 976cd3f6e9aa0..4b05ecec144bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -73,7 +73,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.url, "namespace" -> toSQLId(namespace.toSeq)), - dialect) { + dialect, + legacyMessage = s"Failed get tables from: ${namespace.mkString(".")}") { conn.getMetaData.getTables(null, schemaPattern, "%", Array("TABLE")) } new Iterator[Identifier] { @@ -92,7 +93,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.url, "tableName" -> toSQLId(ident)), - dialect) { + dialect, + legacyMessage = s"Failed table existence check: $ident") { JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions)) } } @@ -118,7 +120,8 @@ class JDBCTableCatalog extends TableCatalog "url" -> options.url, "oldName" -> toSQLId(oldIdent), "newName" -> toSQLId(newIdent)), - dialect) { + dialect, + legacyMessage = s"Failed table renaming from $oldIdent to $newIdent") { JdbcUtils.renameTable(conn, oldIdent, newIdent, options) } } @@ -183,7 +186,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.url, "tableName" -> toSQLId(ident)), - dialect) { + dialect, + legacyMessage = s"Failed table creation: $ident") { JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions) } } @@ -199,7 +203,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.url, "tableName" -> toSQLId(ident)), - dialect) { + dialect, + legacyMessage = s"Failed table altering: $ident") { JdbcUtils.alterTable(conn, getTableName(ident), changes, options) } loadTable(ident) @@ -214,7 +219,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.url, "namespace" -> toSQLId(namespace.toSeq)), - dialect) { + dialect, + legacyMessage = s"Failed namespace exists: ${namespace.mkString}") { JdbcUtils.schemaExists(conn, options, db) } } @@ -226,7 +232,8 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.classifyException( errorClass = "FAILED_JDBC.LIST_NAMESPACES", messageParameters = Map("url" -> options.url), - dialect) { + dialect, + legacyMessage = s"Failed list namespaces") { JdbcUtils.listSchemas(conn, options) } } @@ -279,7 +286,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.url, "namespace" -> toSQLId(db)), - dialect) { + dialect, + legacyMessage = s"Failed create name space: $db") { JdbcUtils.createSchema(conn, options, db, comment) } } @@ -303,7 +311,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.url, "namespace" -> toSQLId(db)), - dialect) { + dialect, + legacyMessage = s"Failed create comment on name space: $db") { JdbcUtils.alterSchemaComment(conn, options, db, set.value) } } @@ -319,7 +328,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.url, "namespace" -> toSQLId(db)), - dialect) { + dialect, + legacyMessage = s"Failed remove comment on name space: $db") { JdbcUtils.removeSchemaComment(conn, options, db) } } @@ -346,7 +356,8 @@ class JDBCTableCatalog extends TableCatalog messageParameters = Map( "url" -> options.url, "namespace" -> toSQLId(db)), - dialect) { + dialect, + legacyMessage = s"Failed drop name space: $db") { JdbcUtils.dropSchema(conn, options, db, cascade) true } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 4f81ee031d220..d5a132c7dd48a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -144,22 +144,16 @@ private object DB2Dialect extends JdbcDialect { s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS ''" } - override def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { + override def classifyException(message: String, e: Throwable): AnalysisException = { e match { case sqlException: SQLException => sqlException.getSQLState match { // https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate - case "42893" => - throw NonEmptyNamespaceException( - namespace = messageParameters.get("namespace").toArray, - details = sqlException.getMessage, - cause = Some(e)) - case _ => super.classifyException(e, errorClass, messageParameters) + case "42893" => throw NonEmptyNamespaceException( + namespace = Array.empty, details = message, cause = Some(e)) + case _ => super.classifyException(message, e) } - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(message, e) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 3c9bc0ed691bb..d275f9c9cb1b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -28,7 +28,8 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.util.quoteNameParts import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.catalog.index.TableIndex @@ -180,10 +181,7 @@ private[sql] object H2Dialect extends JdbcDialect { (ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".") } - override def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { + override def classifyException(message: String, e: Throwable): AnalysisException = { e match { case exception: SQLException => // Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html @@ -194,16 +192,15 @@ private[sql] object H2Dialect extends JdbcDialect { val regex = """"((?:[^"\\]|\\[\\"ntbrf])+)"""".r val name = regex.findFirstMatchIn(e.getMessage).get.group(1) val quotedName = org.apache.spark.sql.catalyst.util.quoteIdentifier(name) - throw new TableAlreadyExistsException( - errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", + throw new TableAlreadyExistsException(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", messageParameters = Map("relationName" -> quotedName), cause = Some(e)) // TABLE_OR_VIEW_NOT_FOUND_1 case 42102 => - val relationName = messageParameters.getOrElse("tableName", "") + val quotedName = quoteNameParts(UnresolvedAttribute.parseAttributeName(message)) throw new NoSuchTableException( errorClass = "TABLE_OR_VIEW_NOT_FOUND", - messageParameters = Map("relationName" -> relationName), + messageParameters = Map("relationName" -> quotedName), cause = Some(e)) // SCHEMA_NOT_FOUND_1 case 90079 => @@ -213,21 +210,25 @@ private[sql] object H2Dialect extends JdbcDialect { throw new NoSuchNamespaceException(errorClass = "SCHEMA_NOT_FOUND", messageParameters = Map("schemaName" -> quotedName)) // INDEX_ALREADY_EXISTS_1 - case 42111 if errorClass == "FAILED_JDBC.CREATE_INDEX" => - val indexName = messageParameters("indexName") - val tableName = messageParameters("tableName") + case 42111 => + // The message is: Failed to create index indexName in tableName + val regex = "(?s)Failed to create index (.*) in (.*)".r + val indexName = regex.findFirstMatchIn(message).get.group(1) + val tableName = regex.findFirstMatchIn(message).get.group(2) throw new IndexAlreadyExistsException( indexName = indexName, tableName = tableName, cause = Some(e)) // INDEX_NOT_FOUND_1 - case 42112 if errorClass == "FAILED_JDBC.DROP_INDEX" => - val indexName = messageParameters("indexName") - val tableName = messageParameters("tableName") + case 42112 => + // The message is: Failed to drop index indexName in tableName + val regex = "(?s)Failed to drop index (.*) in (.*)".r + val indexName = regex.findFirstMatchIn(message).get.group(1) + val tableName = regex.findFirstMatchIn(message).get.group(2) throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) case _ => // do nothing } case _ => // do nothing } - super.classifyException(e, errorClass, messageParameters) + super.classifyException(message, e) } override def compileExpression(expr: Expression): Option[String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 4825568d88eb0..5ba4e39e8ec13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -630,16 +630,15 @@ abstract class JdbcDialect extends Serializable with Logging { /** * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. + * @param message The error message to be placed to the returned exception. * @param e The dialect specific exception. - * @param errorClass The error class assigned in the case of an unclassified `e` - * @param messageParameters The message parameters of `errorClass` * @return `AnalysisException` or its sub-class. */ - def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { - new AnalysisException(errorClass, messageParameters, cause = Some(e)) + def classifyException(message: String, e: Throwable): AnalysisException = { + new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3064", + messageParameters = Map("msg" -> message), + cause = Some(e)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index f63a1abdce658..9776cff3f7c81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -190,21 +190,15 @@ private object MsSqlServerDialect extends JdbcDialect { if (limit > 0) s"TOP ($limit)" else "" } - override def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { + override def classifyException(message: String, e: Throwable): AnalysisException = { e match { case sqlException: SQLException => sqlException.getErrorCode match { - case 3729 => - throw NonEmptyNamespaceException( - namespace = messageParameters.get("namespace").toArray, - details = sqlException.getMessage, - cause = Some(e)) - case _ => super.classifyException(e, errorClass, messageParameters) + case 3729 => throw NonEmptyNamespaceException( + namespace = Array.empty, details = message, cause = Some(e)) + case _ => super.classifyException(message, e) } - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(message, e) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index af50a8e3e3594..dd74c93bc2e19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -270,26 +270,28 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { indexMap.values.toArray } - override def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { + override def classifyException(message: String, e: Throwable): AnalysisException = { e match { case sqlException: SQLException => sqlException.getErrorCode match { // ER_DUP_KEYNAME - case 1061 if errorClass == "FAILED_JDBC.CREATE_INDEX" => - val indexName = messageParameters("indexName") - val tableName = messageParameters("tableName") - throw new IndexAlreadyExistsException(indexName, tableName, cause = Some(e)) - case 1091 if errorClass == "FAILED_JDBC.DROP_INDEX" => - val indexName = messageParameters("indexName") - val tableName = messageParameters("tableName") + case 1061 => + // The message is: Failed to create index indexName in tableName + val regex = "(?s)Failed to create index (.*) in (.*)".r + val indexName = regex.findFirstMatchIn(message).get.group(1) + val tableName = regex.findFirstMatchIn(message).get.group(2) + throw new IndexAlreadyExistsException( + indexName = indexName, tableName = tableName, cause = Some(e)) + case 1091 => + // The message is: Failed to drop index indexName in tableName + val regex = "(?s)Failed to drop index (.*) in (.*)".r + val indexName = regex.findFirstMatchIn(message).get.group(1) + val tableName = regex.findFirstMatchIn(message).get.group(2) throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(message, e) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(message, e) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 4637a96039b83..901e66e5efcb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -225,47 +225,42 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { s"DROP INDEX ${quoteIdentifier(indexName)}" } - // Message pattern defined by postgres specification - private final val pgAlreadyExistsRegex = """(?:.*)relation "(.*)" already exists""".r - - override def classifyException( - e: Throwable, - errorClass: String, - messageParameters: Map[String, String]): AnalysisException = { + override def classifyException(message: String, e: Throwable): AnalysisException = { e match { case sqlException: SQLException => sqlException.getSQLState match { // https://www.postgresql.org/docs/14/errcodes-appendix.html case "42P07" => - if (errorClass == "FAILED_JDBC.CREATE_INDEX") { - throw new IndexAlreadyExistsException( - indexName = messageParameters("indexName"), - tableName = messageParameters("tableName"), - cause = Some(e)) - } else if (errorClass == "FAILED_JDBC.RENAME_TABLE") { - val newTable = messageParameters("newName") - throw QueryCompilationErrors.tableAlreadyExistsError(newTable) - } else { - val tblRegexp = pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage) - if (tblRegexp.nonEmpty) { - throw QueryCompilationErrors.tableAlreadyExistsError(tblRegexp.get.group(1)) - } else { - super.classifyException(e, errorClass, messageParameters) - } + // Message patterns defined at caller sides of spark + val indexRegex = "(?s)Failed to create index (.*) in (.*)".r + val renameRegex = "(?s)Failed table renaming from (.*) to (.*)".r + // Message pattern defined by postgres specification + val pgRegex = """(?:.*)relation "(.*)" already exists""".r + + message match { + case indexRegex(index, table) => + throw new IndexAlreadyExistsException( + indexName = index, tableName = table, cause = Some(e)) + case renameRegex(_, newTable) => + throw QueryCompilationErrors.tableAlreadyExistsError(newTable) + case _ if pgRegex.findFirstMatchIn(sqlException.getMessage).nonEmpty => + val tableName = pgRegex.findFirstMatchIn(sqlException.getMessage).get.group(1) + throw QueryCompilationErrors.tableAlreadyExistsError(tableName) + case _ => super.classifyException(message, e) } - case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" => - val indexName = messageParameters("indexName") - val tableName = messageParameters("tableName") + case "42704" => + // The message is: Failed to drop index indexName in tableName + val regex = "(?s)Failed to drop index (.*) in (.*)".r + val indexName = regex.findFirstMatchIn(message).get.group(1) + val tableName = regex.findFirstMatchIn(message).get.group(2) throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) case "2BP01" => throw NonEmptyNamespaceException( - namespace = messageParameters.get("namespace").toArray, - details = sqlException.getMessage, - cause = Some(e)) - case _ => super.classifyException(e, errorClass, messageParameters) + namespace = Array.empty, details = message, cause = Some(e)) + case _ => super.classifyException(message, e) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(e, errorClass, messageParameters) + case _ => super.classifyException(message, e) } } From a1568c6f24f465e1daab64ca9279c28f60e8de80 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 21 Dec 2023 19:50:19 +0300 Subject: [PATCH 02/15] Add the error sub-class: FAILED_JDBC.UNCLASSIFIED --- common/utils/src/main/resources/error/error-classes.json | 5 +++++ docs/sql-error-conditions-failed-jdbc-error-class.md | 4 ++++ .../main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 2 +- 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index df223f3298ef5..6c8522c416516 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1183,6 +1183,11 @@ "message" : [ "Check that the table exists." ] + }, + "UNCLASSIFIED" : { + "message" : [ + "" + ] } }, "sqlState" : "HV000" diff --git a/docs/sql-error-conditions-failed-jdbc-error-class.md b/docs/sql-error-conditions-failed-jdbc-error-class.md index 575441e3f347e..3c1dbed05f661 100644 --- a/docs/sql-error-conditions-failed-jdbc-error-class.md +++ b/docs/sql-error-conditions-failed-jdbc-error-class.md @@ -77,4 +77,8 @@ Rename the table `` to ``. Check that the table `` exists. +## UNCLASSIFIED + +`` + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 5ba4e39e8ec13..cb316b76175c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -636,7 +636,7 @@ abstract class JdbcDialect extends Serializable with Logging { */ def classifyException(message: String, e: Throwable): AnalysisException = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_3064", + errorClass = "FAILED_JDBC.UNCLASSIFIED", messageParameters = Map("msg" -> message), cause = Some(e)) } From 1138304d357beb7d55e438b519b42726da84ec4c Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 21 Dec 2023 20:21:31 +0300 Subject: [PATCH 03/15] Fix JDBCV2Suite --- .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 61 ++++++++++++++----- 1 file changed, 45 insertions(+), 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 5e04fca92f4b0..b4e38bd1820fe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -2918,16 +2918,31 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel assert(indexes1.isEmpty) sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)") - checkError( - exception = intercept[IndexAlreadyExistsException] { - sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)") - }, - errorClass = "INDEX_ALREADY_EXISTS", - parameters = Map( - "indexName" -> "`people_index`", - "tableName" -> "`test`.`people`" + withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "true") { + checkError( + exception = intercept[IndexAlreadyExistsException] { + sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)") + }, + errorClass = "INDEX_ALREADY_EXISTS", + parameters = Map( + "indexName" -> "people_index", + "tableName" -> "test.people" + ) + ) + } + withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "false") { + checkError( + exception = intercept[AnalysisException] { + sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)") + }, + errorClass = "FAILED_JDBC.CREATE_INDEX", + parameters = Map( + "url" -> url, + "indexName" -> "`people_index`", + "tableName" -> "`test`.`people`" + ) ) - ) + } assert(jdbcTable.indexExists("people_index")) val indexes2 = jdbcTable.listIndexes() assert(!indexes2.isEmpty) @@ -2936,13 +2951,27 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel assert(tableIndex.indexName() == "people_index") sql(s"DROP INDEX people_index ON TABLE h2.test.people") - checkError( - exception = intercept[NoSuchIndexException] { - sql(s"DROP INDEX people_index ON TABLE h2.test.people") - }, - errorClass = "INDEX_NOT_FOUND", - parameters = Map("indexName" -> "`people_index`", "tableName" -> "`test`.`people`") - ) + withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "true") { + checkError( + exception = intercept[NoSuchIndexException] { + sql(s"DROP INDEX people_index ON TABLE h2.test.people") + }, + errorClass = "INDEX_NOT_FOUND", + parameters = Map("indexName" -> "people_index", "tableName" -> "test.people") + ) + } + withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "false") { + checkError( + exception = intercept[AnalysisException] { + sql(s"DROP INDEX people_index ON TABLE h2.test.people") + }, + errorClass = "FAILED_JDBC.DROP_INDEX", + parameters = Map( + "url" -> url, + "indexName" -> "`people_index`", + "tableName" -> "`test`.`people`") + ) + } assert(jdbcTable.indexExists("people_index") == false) val indexes3 = jdbcTable.listIndexes() assert(indexes3.isEmpty) From 4ddef497465ad1f8363e6239d0c9fc0895da0097 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 21 Dec 2023 20:42:40 +0300 Subject: [PATCH 04/15] Fix JDBCTableCatalogSuite --- .../v2/jdbc/JDBCTableCatalogSuite.scala | 41 +++++++++++++++---- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index 1cd4077b4ec1e..565b497ba3777 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -127,10 +127,23 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withConnection { conn => conn.prepareStatement("""CREATE TABLE "test"."src_table" (id INTEGER)""").executeUpdate() } - val exp = intercept[TableAlreadyExistsException] { - sql("ALTER TABLE h2.test.src_table RENAME TO test.dst_table") + withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "true") { + val exp = intercept[TableAlreadyExistsException] { + sql("ALTER TABLE h2.test.src_table RENAME TO test.dst_table") + } + checkErrorTableAlreadyExists(exp, "`dst_table`") + } + withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "false") { + checkError( + exception = intercept[AnalysisException] { + sql("ALTER TABLE h2.test.src_table RENAME TO test.dst_table") + }, + errorClass = "FAILED_JDBC.RENAME_TABLE", + parameters = Map( + "url" -> url, + "oldName" -> "`test`.`src_table`", + "newName" -> "`test`.`dst_table`")) } - checkErrorTableAlreadyExists(exp, "`dst_table`") } } } @@ -166,12 +179,24 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } checkErrorTableAlreadyExists(e, "`test`.`new_table`") } - val exp = intercept[NoSuchNamespaceException] { - sql("CREATE TABLE h2.bad_test.new_table(i INT, j STRING)") + withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "true") { + val exp = intercept[NoSuchNamespaceException] { + sql("CREATE TABLE h2.bad_test.new_table(i INT, j STRING)") + } + checkError(exp, + errorClass = "SCHEMA_NOT_FOUND", + parameters = Map("schemaName" -> "`bad_test`")) + } + withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "false") { + val exp = intercept[AnalysisException] { + sql("CREATE TABLE h2.bad_test.new_table(i INT, j STRING)") + } + checkError(exp, + errorClass = "FAILED_JDBC.CREATE_TABLE", + parameters = Map( + "url" -> url, + "tableName" -> "`bad_test`.`new_table`")) } - checkError(exp, - errorClass = "SCHEMA_NOT_FOUND", - parameters = Map("schemaName" -> "`bad_test`")) } test("ALTER TABLE ... add column") { From 1323be57da6330f6ccb8fbc15cc4581221851332 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 21 Dec 2023 22:30:02 +0300 Subject: [PATCH 05/15] AnalysisException -> SparkException --- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 6 +++--- .../datasources/v2/jdbc/JDBCTableCatalogSuite.scala | 12 ++++++------ .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 3ac60e57fd91b..5bbe5bf72f64d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -28,10 +28,10 @@ import scala.jdk.CollectionConverters._ import scala.util.Try import scala.util.control.NonFatal -import org.apache.spark.{SparkThrowable, TaskContext} +import org.apache.spark.{SparkException, SparkThrowable, TaskContext} import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, DataFrame, Row} +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -1193,7 +1193,7 @@ object JdbcUtils extends Logging with SQLConfHelper { case e: Throwable if SQLConf.get.classifyJDBCExceptionInDialect => throw dialect.classifyException(legacyMessage, e) case e: Throwable => - throw new AnalysisException(errorClass, messageParameters, cause = Some(e)) + throw new SparkException(errorClass, messageParameters, cause = e) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index 565b497ba3777..e8292461438ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -21,7 +21,7 @@ import java.util.Properties import org.apache.logging.log4j.Level -import org.apache.spark.{SparkConf, SparkIllegalArgumentException} +import org.apache.spark.{SparkConf, SparkException, SparkIllegalArgumentException} import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.ParseException @@ -135,7 +135,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "false") { checkError( - exception = intercept[AnalysisException] { + exception = intercept[SparkException] { sql("ALTER TABLE h2.test.src_table RENAME TO test.dst_table") }, errorClass = "FAILED_JDBC.RENAME_TABLE", @@ -174,7 +174,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } withTable("h2.test.new_table") { sql("CREATE TABLE h2.test.new_table(i INT, j STRING)") - val e = intercept[AnalysisException] { + val e = intercept[TableAlreadyExistsException] { sql("CREATE TABLE h2.test.new_table(i INT, j STRING)") } checkErrorTableAlreadyExists(e, "`test`.`new_table`") @@ -188,7 +188,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { parameters = Map("schemaName" -> "`bad_test`")) } withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "false") { - val exp = intercept[AnalysisException] { + val exp = intercept[SparkException] { sql("CREATE TABLE h2.bad_test.new_table(i INT, j STRING)") } checkError(exp, @@ -580,7 +580,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { test("CREATE TABLE with table property") { withTable("h2.test.new_table") { checkError( - exception = intercept[AnalysisException] { + exception = intercept[SparkException] { sql("CREATE TABLE h2.test.new_table(i INT, j STRING)" + " TBLPROPERTIES('ENGINE'='tableEngineName')") }, @@ -600,7 +600,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { test("SPARK-42904: CREATE TABLE with char/varchar with invalid char length") { checkError( - exception = intercept[AnalysisException]{ + exception = intercept[SparkException]{ sql("CREATE TABLE h2.test.new_table(c CHAR(1000000001))") }, errorClass = "FAILED_JDBC.CREATE_TABLE", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index b4e38bd1820fe..a59c5f2481aaf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -2932,7 +2932,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel } withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "false") { checkError( - exception = intercept[AnalysisException] { + exception = intercept[SparkException] { sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)") }, errorClass = "FAILED_JDBC.CREATE_INDEX", @@ -2962,7 +2962,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel } withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "false") { checkError( - exception = intercept[AnalysisException] { + exception = intercept[SparkException] { sql(s"DROP INDEX people_index ON TABLE h2.test.people") }, errorClass = "FAILED_JDBC.DROP_INDEX", From dc1d45206699886293711034d805caf86f2b4db9 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 21 Dec 2023 22:42:58 +0300 Subject: [PATCH 06/15] Change the integration tests --- .../spark/sql/jdbc/v2/PostgresIntegrationSuite.scala | 5 ++--- .../spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala | 3 ++- .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 11 ++++++----- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 85e85f8bf3803..88ad7e3ae9f80 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -19,9 +19,8 @@ package org.apache.spark.sql.jdbc.v2 import java.sql.Connection -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.jdbc.DatabaseOnDocker import org.apache.spark.sql.types._ @@ -108,7 +107,7 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT sql(s"CREATE TABLE $t1(c int)") sql(s"CREATE TABLE $t2(c int)") checkError( - exception = intercept[TableAlreadyExistsException](sql(s"ALTER TABLE $t1 RENAME TO t2")), + exception = intercept[SparkException](sql(s"ALTER TABLE $t1 RENAME TO t2")), errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", parameters = Map("relationName" -> "`t2`") ) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala index 4eacfbfbd8804..a100096929620 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala @@ -24,6 +24,7 @@ import scala.jdk.CollectionConverters._ import org.apache.logging.log4j.Level +import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange} @@ -118,7 +119,7 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte assert(catalog.namespaceExists(Array("foo")) === true) catalog.createTable(ident1, schema, Array.empty[Transform], emptyProps) if (supportsDropSchemaRestrict) { - intercept[NonEmptyNamespaceException] { + intercept[SparkException] { catalog.dropNamespace(Array("foo"), cascade = false) } } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index 76277dbc96b61..bc5cfaa1403f8 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -248,11 +248,12 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu assert(jdbcTable.indexExists("i2") == false) val indexType = "DUMMY" - var m = intercept[UnsupportedOperationException] { - sql(s"CREATE index i1 ON $catalogName.new_table USING $indexType (col1)") - }.getMessage - assert(m.contains(s"Index Type $indexType is not supported." + - s" The supported Index Types are:")) + checkError( + exception = intercept[SparkException] { + sql(s"CREATE index i1 ON $catalogName.new_table USING $indexType (col1)") + }, + errorClass = "FAILED_JDBC.CREATE_INDEX", + parameters = Map.empty) sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)") assert(jdbcTable.indexExists("i1")) From aff94798ea32dda143b3c329d7a6d9e924558ac4 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 22 Dec 2023 09:19:24 +0300 Subject: [PATCH 07/15] Fix imports --- .../scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala | 1 - .../src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala index a100096929620..4f9d0a6a0e346 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala @@ -26,7 +26,6 @@ import org.apache.logging.log4j.Level import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index bc5cfaa1403f8..a4f04a11ae2b6 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc.v2 import org.apache.logging.log4j.Level +import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame} import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, UnresolvedAttribute} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Sample, Sort} From d0ea4692d9fd6bc2f495b9ee5abe98176b003d5f Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 22 Dec 2023 10:00:35 +0300 Subject: [PATCH 08/15] Fix tests --- .../scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index a4f04a11ae2b6..83ec09307648d 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -222,7 +222,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu test("CREATE TABLE with table property") { withTable(s"$catalogName.new_table") { - val e = intercept[AnalysisException] { + val e = intercept[SparkException] { sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')") } assert(e.getErrorClass == "FAILED_JDBC.CREATE_TABLE") @@ -254,7 +254,11 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"CREATE index i1 ON $catalogName.new_table USING $indexType (col1)") }, errorClass = "FAILED_JDBC.CREATE_INDEX", - parameters = Map.empty) + parameters = Map( + "url" -> ".*", + "indexName" -> "`i1`", + "tableName" -> "`new_table`"), + matchPVals = true) sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)") assert(jdbcTable.indexExists("i1")) From d6c29a6b9151002e5cdfdba94c1a7b64b1bdd5e6 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 22 Dec 2023 10:58:27 +0300 Subject: [PATCH 09/15] Fix integration tests --- .../sql/jdbc/v2/PostgresIntegrationSuite.scala | 7 ++----- .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 14 ++++---------- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index 88ad7e3ae9f80..dc246c68c4f6f 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -106,11 +106,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT withTable(t1, t2) { sql(s"CREATE TABLE $t1(c int)") sql(s"CREATE TABLE $t2(c int)") - checkError( - exception = intercept[SparkException](sql(s"ALTER TABLE $t1 RENAME TO t2")), - errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", - parameters = Map("relationName" -> "`t2`") - ) + val e = intercept[SparkException](sql(s"ALTER TABLE $t1 RENAME TO t2")) + assert(e.getErrorClass == "FAILED_JDBC.RENAME_TABLE") } } } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index 83ec09307648d..99f5847fa394e 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -249,16 +249,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu assert(jdbcTable.indexExists("i2") == false) val indexType = "DUMMY" - checkError( - exception = intercept[SparkException] { - sql(s"CREATE index i1 ON $catalogName.new_table USING $indexType (col1)") - }, - errorClass = "FAILED_JDBC.CREATE_INDEX", - parameters = Map( - "url" -> ".*", - "indexName" -> "`i1`", - "tableName" -> "`new_table`"), - matchPVals = true) + val e = intercept[SparkException] { + sql(s"CREATE index i1 ON $catalogName.new_table USING $indexType (col1)") + } + assert(e.getErrorClass == "FAILED_JDBC.CREATE_INDEX") sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)") assert(jdbcTable.indexExists("i1")) From 11be4fe8040b5f15098d67f8da6aaa9e4f9fc5d6 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Thu, 4 Jan 2024 22:53:02 +0300 Subject: [PATCH 10/15] Implement alternative approach --- .../main/resources/error/error-classes.json | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 12 ---- .../datasources/jdbc/JdbcUtils.scala | 9 +-- .../datasources/v2/jdbc/JDBCTable.scala | 4 +- .../v2/jdbc/JDBCTableCatalog.scala | 22 +++---- .../apache/spark/sql/jdbc/JdbcDialects.scala | 20 +++++- .../v2/jdbc/JDBCTableCatalogSuite.scala | 59 ++++++------------ .../apache/spark/sql/jdbc/JDBCV2Suite.scala | 61 +++++-------------- 8 files changed, 69 insertions(+), 120 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 6c8522c416516..a602a2180b4d3 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1186,7 +1186,7 @@ }, "UNCLASSIFIED" : { "message" : [ - "" + "" ] } }, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 644532d06df67..d54cb3756638e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4612,16 +4612,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val CLASSIFY_JDBC_EXCEPTION_IN_DIALECT = - buildConf("spark.sql.legacy.classifyJDBCExceptionInDialect") - .internal() - .doc("When set to true, Spark delegates classification of JDBC exceptions to a dialect. " + - "If it is false, the JDBC table catalog converts exceptions from JDBC dialects " + - "to Spark exceptions.") - .version("4.0.0") - .booleanConf - .createWithDefault(false) - /** * Holds information about keys that have been deprecated. * @@ -5045,8 +5035,6 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def broadcastHashJoinOutputPartitioningExpandLimit: Int = getConf(BROADCAST_HASH_JOIN_OUTPUT_PARTITIONING_EXPAND_LIMIT) - def classifyJDBCExceptionInDialect: Boolean = getConf(CLASSIFY_JDBC_EXCEPTION_IN_DIALECT) - /** * Returns the [[Resolver]] for the current configuration, which can be used to determine if two * identifiers are equal. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 5bbe5bf72f64d..467c489a50fd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -28,7 +28,7 @@ import scala.jdk.CollectionConverters._ import scala.util.Try import scala.util.control.NonFatal -import org.apache.spark.{SparkException, SparkThrowable, TaskContext} +import org.apache.spark.{SparkThrowable, TaskContext} import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Row} @@ -43,7 +43,6 @@ import org.apache.spark.sql.connector.catalog.{Identifier, TableChange} import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex} import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType, NoopDialect} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -1185,15 +1184,13 @@ object JdbcUtils extends Logging with SQLConfHelper { errorClass: String, messageParameters: Map[String, String], dialect: JdbcDialect, - legacyMessage: String)(f: => T): T = { + description: String)(f: => T): T = { try { f } catch { case e: SparkThrowable with Throwable => throw e - case e: Throwable if SQLConf.get.classifyJDBCExceptionInDialect => - throw dialect.classifyException(legacyMessage, e) case e: Throwable => - throw new SparkException(errorClass, messageParameters, cause = e) + throw dialect.classifyException(e, errorClass, messageParameters, description) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index 96a3d62d101b5..4f134ca615ceb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -70,7 +70,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt "indexName" -> toSQLId(indexName), "tableName" -> toSQLId(name)), dialect = JdbcDialects.get(jdbcOptions.url), - legacyMessage = s"Failed to create index $indexName in ${name()}") { + description = s"Failed to create index $indexName in ${name()}") { JdbcUtils.createIndex( conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions) } @@ -92,7 +92,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt "indexName" -> toSQLId(indexName), "tableName" -> toSQLId(name)), dialect = JdbcDialects.get(jdbcOptions.url), - legacyMessage = s"Failed to drop index $indexName in ${name()}") { + description = s"Failed to drop index $indexName in ${name()}") { JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 4b05ecec144bf..8375781726b94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -74,7 +74,7 @@ class JDBCTableCatalog extends TableCatalog "url" -> options.url, "namespace" -> toSQLId(namespace.toSeq)), dialect, - legacyMessage = s"Failed get tables from: ${namespace.mkString(".")}") { + description = s"Failed get tables from: ${namespace.mkString(".")}") { conn.getMetaData.getTables(null, schemaPattern, "%", Array("TABLE")) } new Iterator[Identifier] { @@ -94,7 +94,7 @@ class JDBCTableCatalog extends TableCatalog "url" -> options.url, "tableName" -> toSQLId(ident)), dialect, - legacyMessage = s"Failed table existence check: $ident") { + description = s"Failed table existence check: $ident") { JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions)) } } @@ -121,7 +121,7 @@ class JDBCTableCatalog extends TableCatalog "oldName" -> toSQLId(oldIdent), "newName" -> toSQLId(newIdent)), dialect, - legacyMessage = s"Failed table renaming from $oldIdent to $newIdent") { + description = s"Failed table renaming from $oldIdent to $newIdent") { JdbcUtils.renameTable(conn, oldIdent, newIdent, options) } } @@ -187,7 +187,7 @@ class JDBCTableCatalog extends TableCatalog "url" -> options.url, "tableName" -> toSQLId(ident)), dialect, - legacyMessage = s"Failed table creation: $ident") { + description = s"Failed table creation: $ident") { JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions) } } @@ -204,7 +204,7 @@ class JDBCTableCatalog extends TableCatalog "url" -> options.url, "tableName" -> toSQLId(ident)), dialect, - legacyMessage = s"Failed table altering: $ident") { + description = s"Failed table altering: $ident") { JdbcUtils.alterTable(conn, getTableName(ident), changes, options) } loadTable(ident) @@ -220,7 +220,7 @@ class JDBCTableCatalog extends TableCatalog "url" -> options.url, "namespace" -> toSQLId(namespace.toSeq)), dialect, - legacyMessage = s"Failed namespace exists: ${namespace.mkString}") { + description = s"Failed namespace exists: ${namespace.mkString}") { JdbcUtils.schemaExists(conn, options, db) } } @@ -233,7 +233,7 @@ class JDBCTableCatalog extends TableCatalog errorClass = "FAILED_JDBC.LIST_NAMESPACES", messageParameters = Map("url" -> options.url), dialect, - legacyMessage = s"Failed list namespaces") { + description = s"Failed list namespaces") { JdbcUtils.listSchemas(conn, options) } } @@ -287,7 +287,7 @@ class JDBCTableCatalog extends TableCatalog "url" -> options.url, "namespace" -> toSQLId(db)), dialect, - legacyMessage = s"Failed create name space: $db") { + description = s"Failed create name space: $db") { JdbcUtils.createSchema(conn, options, db, comment) } } @@ -312,7 +312,7 @@ class JDBCTableCatalog extends TableCatalog "url" -> options.url, "namespace" -> toSQLId(db)), dialect, - legacyMessage = s"Failed create comment on name space: $db") { + description = s"Failed create comment on name space: $db") { JdbcUtils.alterSchemaComment(conn, options, db, set.value) } } @@ -329,7 +329,7 @@ class JDBCTableCatalog extends TableCatalog "url" -> options.url, "namespace" -> toSQLId(db)), dialect, - legacyMessage = s"Failed remove comment on name space: $db") { + description = s"Failed remove comment on name space: $db") { JdbcUtils.removeSchemaComment(conn, options, db) } } @@ -357,7 +357,7 @@ class JDBCTableCatalog extends TableCatalog "url" -> options.url, "namespace" -> toSQLId(db)), dialect, - legacyMessage = s"Failed drop name space: $db") { + description = s"Failed drop name space: $db") { JdbcUtils.dropSchema(conn, options, db, cascade) true } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index cb316b76175c2..da8884405eca0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -628,6 +628,22 @@ abstract class JdbcDialect extends Serializable with Logging { throw new UnsupportedOperationException("listIndexes is not supported") } + /** + * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. + * @param e The dialect specific exception. + * @param errorClass The error class assigned in the case of an unclassified `e` + * @param messageParameters The message parameters of `errorClass` + * @param description The error description + * @return `AnalysisException` or its sub-class. + */ + def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = { + classifyException(description, e) + } + /** * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. * @param message The error message to be placed to the returned exception. @@ -637,7 +653,9 @@ abstract class JdbcDialect extends Serializable with Logging { def classifyException(message: String, e: Throwable): AnalysisException = { new AnalysisException( errorClass = "FAILED_JDBC.UNCLASSIFIED", - messageParameters = Map("msg" -> message), + messageParameters = Map( + "url" -> "jdbc:", + "message" -> message), cause = Some(e)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index e8292461438ce..5408d434fced1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -21,7 +21,7 @@ import java.util.Properties import org.apache.logging.log4j.Level -import org.apache.spark.{SparkConf, SparkException, SparkIllegalArgumentException} +import org.apache.spark.{SparkConf, SparkIllegalArgumentException} import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.ParseException @@ -127,23 +127,10 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withConnection { conn => conn.prepareStatement("""CREATE TABLE "test"."src_table" (id INTEGER)""").executeUpdate() } - withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "true") { - val exp = intercept[TableAlreadyExistsException] { - sql("ALTER TABLE h2.test.src_table RENAME TO test.dst_table") - } - checkErrorTableAlreadyExists(exp, "`dst_table`") - } - withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "false") { - checkError( - exception = intercept[SparkException] { - sql("ALTER TABLE h2.test.src_table RENAME TO test.dst_table") - }, - errorClass = "FAILED_JDBC.RENAME_TABLE", - parameters = Map( - "url" -> url, - "oldName" -> "`test`.`src_table`", - "newName" -> "`test`.`dst_table`")) + val exp = intercept[TableAlreadyExistsException] { + sql("ALTER TABLE h2.test.src_table RENAME TO test.dst_table") } + checkErrorTableAlreadyExists(exp, "`dst_table`") } } } @@ -179,24 +166,12 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } checkErrorTableAlreadyExists(e, "`test`.`new_table`") } - withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "true") { - val exp = intercept[NoSuchNamespaceException] { - sql("CREATE TABLE h2.bad_test.new_table(i INT, j STRING)") - } - checkError(exp, - errorClass = "SCHEMA_NOT_FOUND", - parameters = Map("schemaName" -> "`bad_test`")) - } - withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "false") { - val exp = intercept[SparkException] { - sql("CREATE TABLE h2.bad_test.new_table(i INT, j STRING)") - } - checkError(exp, - errorClass = "FAILED_JDBC.CREATE_TABLE", - parameters = Map( - "url" -> url, - "tableName" -> "`bad_test`.`new_table`")) + val exp = intercept[NoSuchNamespaceException] { + sql("CREATE TABLE h2.bad_test.new_table(i INT, j STRING)") } + checkError(exp, + errorClass = "SCHEMA_NOT_FOUND", + parameters = Map("schemaName" -> "`bad_test`")) } test("ALTER TABLE ... add column") { @@ -580,14 +555,14 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { test("CREATE TABLE with table property") { withTable("h2.test.new_table") { checkError( - exception = intercept[SparkException] { + exception = intercept[AnalysisException] { sql("CREATE TABLE h2.test.new_table(i INT, j STRING)" + " TBLPROPERTIES('ENGINE'='tableEngineName')") }, - errorClass = "FAILED_JDBC.CREATE_TABLE", + errorClass = "FAILED_JDBC.UNCLASSIFIED", parameters = Map( - "url" -> url, - "tableName" -> "`test`.`new_table`")) + "url" -> "jdbc:", + "message" -> "Failed table creation: test.new_table")) } } @@ -600,13 +575,13 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { test("SPARK-42904: CREATE TABLE with char/varchar with invalid char length") { checkError( - exception = intercept[SparkException]{ + exception = intercept[AnalysisException]{ sql("CREATE TABLE h2.test.new_table(c CHAR(1000000001))") }, - errorClass = "FAILED_JDBC.CREATE_TABLE", + errorClass = "FAILED_JDBC.UNCLASSIFIED", parameters = Map( - "url" -> url, - "tableName" -> "`test`.`new_table`")) + "url" -> "jdbc:", + "message" -> "Failed table creation: test.new_table")) } test("SPARK-42955: Skip classifyException and wrap AnalysisException for SparkThrowable") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index a59c5f2481aaf..0a66680edd639 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -2918,31 +2918,16 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel assert(indexes1.isEmpty) sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)") - withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "true") { - checkError( - exception = intercept[IndexAlreadyExistsException] { - sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)") - }, - errorClass = "INDEX_ALREADY_EXISTS", - parameters = Map( - "indexName" -> "people_index", - "tableName" -> "test.people" - ) + checkError( + exception = intercept[IndexAlreadyExistsException] { + sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)") + }, + errorClass = "INDEX_ALREADY_EXISTS", + parameters = Map( + "indexName" -> "people_index", + "tableName" -> "test.people" ) - } - withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "false") { - checkError( - exception = intercept[SparkException] { - sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)") - }, - errorClass = "FAILED_JDBC.CREATE_INDEX", - parameters = Map( - "url" -> url, - "indexName" -> "`people_index`", - "tableName" -> "`test`.`people`" - ) - ) - } + ) assert(jdbcTable.indexExists("people_index")) val indexes2 = jdbcTable.listIndexes() assert(!indexes2.isEmpty) @@ -2951,27 +2936,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel assert(tableIndex.indexName() == "people_index") sql(s"DROP INDEX people_index ON TABLE h2.test.people") - withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "true") { - checkError( - exception = intercept[NoSuchIndexException] { - sql(s"DROP INDEX people_index ON TABLE h2.test.people") - }, - errorClass = "INDEX_NOT_FOUND", - parameters = Map("indexName" -> "people_index", "tableName" -> "test.people") - ) - } - withSQLConf(SQLConf.CLASSIFY_JDBC_EXCEPTION_IN_DIALECT.key -> "false") { - checkError( - exception = intercept[SparkException] { - sql(s"DROP INDEX people_index ON TABLE h2.test.people") - }, - errorClass = "FAILED_JDBC.DROP_INDEX", - parameters = Map( - "url" -> url, - "indexName" -> "`people_index`", - "tableName" -> "`test`.`people`") - ) - } + checkError( + exception = intercept[NoSuchIndexException] { + sql(s"DROP INDEX people_index ON TABLE h2.test.people") + }, + errorClass = "INDEX_NOT_FOUND", + parameters = Map("indexName" -> "people_index", "tableName" -> "test.people") + ) assert(jdbcTable.indexExists("people_index") == false) val indexes3 = jdbcTable.listIndexes() assert(indexes3.isEmpty) From 38b701f9941456e800151e2dc2a7c2ea9eaaabab Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 5 Jan 2024 09:05:31 +0300 Subject: [PATCH 11/15] Re-gen .md --- docs/sql-error-conditions-failed-jdbc-error-class.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-error-conditions-failed-jdbc-error-class.md b/docs/sql-error-conditions-failed-jdbc-error-class.md index 3c1dbed05f661..1740ad49e7f36 100644 --- a/docs/sql-error-conditions-failed-jdbc-error-class.md +++ b/docs/sql-error-conditions-failed-jdbc-error-class.md @@ -79,6 +79,6 @@ Check that the table `` exists. ## UNCLASSIFIED -`` +`` From 814ce3c14272483694a1ded8a2174e28a00aaf72 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 5 Jan 2024 09:17:47 +0300 Subject: [PATCH 12/15] Fix tests --- .../org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala | 4 ++-- .../test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala index 4f9d0a6a0e346..4eacfbfbd8804 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala @@ -24,8 +24,8 @@ import scala.jdk.CollectionConverters._ import org.apache.logging.log4j.Level -import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog @@ -118,7 +118,7 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte assert(catalog.namespaceExists(Array("foo")) === true) catalog.createTable(ident1, schema, Array.empty[Transform], emptyProps) if (supportsDropSchemaRestrict) { - intercept[SparkException] { + intercept[NonEmptyNamespaceException] { catalog.dropNamespace(Array("foo"), cascade = false) } } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index 4a8e3e27da62e..b97e7edfddf0a 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -222,7 +222,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu test("CREATE TABLE with table property") { withTable(s"$catalogName.new_table") { - val e = intercept[SparkException] { + val e = intercept[AnalysisException] { sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')") } assert(e.getErrorClass == "FAILED_JDBC.CREATE_TABLE") From 0cfce60e0cd05a0c59b04c87d7e151bfbb76323b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 5 Jan 2024 10:04:04 +0300 Subject: [PATCH 13/15] Fix tests --- .../spark/sql/jdbc/v2/PostgresIntegrationSuite.scala | 10 +++++++--- .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 10 +++++----- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index dc246c68c4f6f..85e85f8bf3803 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.jdbc.v2 import java.sql.Connection -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.SparkConf import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.jdbc.DatabaseOnDocker import org.apache.spark.sql.types._ @@ -106,8 +107,11 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT withTable(t1, t2) { sql(s"CREATE TABLE $t1(c int)") sql(s"CREATE TABLE $t2(c int)") - val e = intercept[SparkException](sql(s"ALTER TABLE $t1 RENAME TO t2")) - assert(e.getErrorClass == "FAILED_JDBC.RENAME_TABLE") + checkError( + exception = intercept[TableAlreadyExistsException](sql(s"ALTER TABLE $t1 RENAME TO t2")), + errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", + parameters = Map("relationName" -> "`t2`") + ) } } } diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index b97e7edfddf0a..bae2747882123 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.jdbc.v2 import org.apache.logging.log4j.Level -import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame} import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, UnresolvedAttribute} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Sample, Sort} @@ -225,7 +224,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu val e = intercept[AnalysisException] { sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')") } - assert(e.getErrorClass == "FAILED_JDBC.CREATE_TABLE") + assert(e.getErrorClass == "FAILED_JDBC.UNCLASSIFIED") testCreateTableWithProperty(s"$catalogName.new_table") } } @@ -249,10 +248,11 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu assert(jdbcTable.indexExists("i2") == false) val indexType = "DUMMY" - val e = intercept[SparkException] { + var m = intercept[UnsupportedOperationException] { sql(s"CREATE index i1 ON $catalogName.new_table USING $indexType (col1)") - } - assert(e.getErrorClass == "FAILED_JDBC.CREATE_INDEX") + }.getMessage + assert(m.contains(s"Index Type $indexType is not supported." + + s" The supported Index Types are:")) sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)") assert(jdbcTable.indexExists("i1")) From aae40899b67d1f30fb2d021921e98c211c955b0c Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 5 Jan 2024 11:25:16 +0300 Subject: [PATCH 14/15] Fix tests --- .../test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index bae2747882123..d1d247967b4b5 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -279,7 +279,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"CREATE index i1 ON $catalogName.new_table (col1)") }, errorClass = "INDEX_ALREADY_EXISTS", - parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`") + parameters = Map("indexName" -> "i1", "tableName" -> "new_table") ) sql(s"DROP index i1 ON $catalogName.new_table") @@ -304,7 +304,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"DROP index i1 ON $catalogName.new_table") }, errorClass = "INDEX_NOT_FOUND", - parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`") + parameters = Map("indexName" -> "i1", "tableName" -> "new_table") ) } } From a0a27e1e7b737c9b95edf71c790341a43d527bca Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Fri, 5 Jan 2024 11:28:06 +0300 Subject: [PATCH 15/15] Deprecate old classifyException --- .../src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 9c5a731174e36..888ef4a20be3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -650,6 +650,7 @@ abstract class JdbcDialect extends Serializable with Logging { * @param e The dialect specific exception. * @return `AnalysisException` or its sub-class. */ + @deprecated("Please override the classifyException method with an error class", "4.0.0") def classifyException(message: String, e: Throwable): AnalysisException = { new AnalysisException( errorClass = "FAILED_JDBC.UNCLASSIFIED",