From 921f88faa0cdf3df5851602d997113a47bff9da5 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 6 Oct 2020 10:31:38 +0300 Subject: [PATCH 1/4] Add classifyException() --- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index cea5a20917532..41bee1d7f4e36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuilder import org.apache.commons.lang3.StringUtils import org.apache.spark.annotation.{DeveloperApi, Since} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils @@ -234,6 +235,17 @@ abstract class JdbcDialect extends Serializable { } updateClause.result() } + + /** + * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. + * @param e The dialect specific exception. + * @return `AnalysisException` or its sub-class. + */ + def classifyException(e: Throwable): AnalysisException = { + new AnalysisException( + message = "Failed on a JDBC dialect statement", + cause = Some(e)) + } } /** From b2e7a721a525a29017894d39cd8e85a85d9c7833 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Tue, 6 Oct 2020 11:20:28 +0300 Subject: [PATCH 2/4] Classify dialect exceptions in JDBC v2 Table Catalog --- .../v2/jdbc/JDBCTableCatalog.scala | 24 +++++++++++++++---- .../v2/jdbc/JDBCTableCatalogSuite.scala | 10 ++++---- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 41f650d1f2ff5..ee514b0cdc578 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -70,7 +70,9 @@ class JDBCTableCatalog extends TableCatalog with Logging { checkNamespace(ident.namespace()) val writeOptions = new JdbcOptionsInWrite( options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident))) - withConnection(JdbcUtils.tableExists(_, writeOptions)) + classifyException { + withConnection(JdbcUtils.tableExists(_, writeOptions)) + } } override def dropTable(ident: Identifier): Boolean = { @@ -88,7 +90,9 @@ class JDBCTableCatalog extends TableCatalog with Logging { override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { checkNamespace(oldIdent.namespace()) withConnection { conn => - JdbcUtils.renameTable(conn, getTableName(oldIdent), getTableName(newIdent), options) + classifyException { + JdbcUtils.renameTable(conn, getTableName(oldIdent), getTableName(newIdent), options) + } } } @@ -123,7 +127,9 @@ class JDBCTableCatalog extends TableCatalog with Logging { options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident))) val caseSensitive = SQLConf.get.caseSensitiveAnalysis withConnection { conn => - JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions) + classifyException { + JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions) + } } JDBCTable(ident, schema, writeOptions) @@ -132,7 +138,9 @@ class JDBCTableCatalog extends TableCatalog with Logging { override def alterTable(ident: Identifier, changes: TableChange*): Table = { checkNamespace(ident.namespace()) withConnection { conn => - JdbcUtils.alterTable(conn, getTableName(ident), changes, options) + classifyException { + JdbcUtils.alterTable(conn, getTableName(ident), changes, options) + } loadTable(ident) } } @@ -156,4 +164,12 @@ class JDBCTableCatalog extends TableCatalog with Logging { private def getTableName(ident: Identifier): String = { (ident.namespace() :+ ident.name()).map(dialect.quoteIdentifier).mkString(".") } + + private def classifyException[T](f: => T): T = { + try { + f + } catch { + case e: Throwable => throw dialect.classifyException(e) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index bf71f90779b71..62794f779cb8e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -97,7 +97,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } // Rename not existing table or namespace Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table => - intercept[org.h2.jdbc.JdbcSQLException] { + intercept[AnalysisException] { sql(s"ALTER TABLE $table RENAME TO test.dst_table") } } @@ -110,7 +110,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withConnection { conn => conn.prepareStatement("""CREATE TABLE "test"."src_table" (id INTEGER)""").executeUpdate() } - intercept[org.h2.jdbc.JdbcSQLException] { + intercept[AnalysisException] { sql("ALTER TABLE h2.test.src_table RENAME TO h2.test.dst_table") } } @@ -144,7 +144,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql("CREATE TABLE h2.test.new_table(i INT, j STRING) USING _") } } - intercept[org.h2.jdbc.JdbcSQLException] { + intercept[AnalysisException] { sql("CREATE TABLE h2.bad_test.new_table(i INT, j STRING) USING _") } } @@ -265,9 +265,9 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { test("alter table ... update column comment not supported") { withTable("h2.test.alt_table") { sql("CREATE TABLE h2.test.alt_table (ID INTEGER) USING _") - val thrown = intercept[java.sql.SQLFeatureNotSupportedException] { + val thrown = intercept[AnalysisException] { sql("ALTER TABLE h2.test.alt_table ALTER COLUMN ID COMMENT 'test'") - } + }.cause.get assert(thrown.getMessage.contains("Unsupported TableChange")) // Update comment for not existing column intercept[AnalysisException] { From 90fcaf35f0b2e95a2ed7185248196a6d4a669a6b Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 7 Oct 2020 12:54:25 +0300 Subject: [PATCH 3/4] Add H2Dialect --- .../analysis/AlreadyExistException.scala | 3 +- .../analysis/NoSuchItemException.scala | 6 ++- .../v2/jdbc/JDBCTableCatalog.scala | 12 ++--- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 48 +++++++++++++++++++ .../apache/spark/sql/jdbc/JdbcDialects.scala | 8 ++-- .../v2/jdbc/JDBCTableCatalogSuite.scala | 47 ++++++++++-------- 6 files changed, 91 insertions(+), 33 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala index bfc3b3d0ac966..c50ba623c27b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala @@ -37,7 +37,8 @@ class NamespaceAlreadyExistsException(message: String) extends AnalysisException } } -class TableAlreadyExistsException(message: String) extends AnalysisException(message) { +class TableAlreadyExistsException(message: String, cause: Option[Throwable] = None) + extends AnalysisException(message, cause = cause) { def this(db: String, table: String) = { this(s"Table or view '$table' already exists in database '$db'") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala index 88be441d808db..8a1913b40b310 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala @@ -32,13 +32,15 @@ import org.apache.spark.sql.types.StructType class NoSuchDatabaseException( val db: String) extends NoSuchNamespaceException(s"Database '$db' not found") -class NoSuchNamespaceException(message: String) extends AnalysisException(message) { +class NoSuchNamespaceException(message: String, cause: Option[Throwable] = None) + extends AnalysisException(message, cause = cause) { def this(namespace: Array[String]) = { this(s"Namespace '${namespace.quoted}' not found") } } -class NoSuchTableException(message: String) extends AnalysisException(message) { +class NoSuchTableException(message: String, cause: Option[Throwable] = None) + extends AnalysisException(message, cause = cause) { def this(db: String, table: String) = { this(s"Table or view '$table' not found in database '$db'") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index ee514b0cdc578..8edc2fe5585e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -70,7 +70,7 @@ class JDBCTableCatalog extends TableCatalog with Logging { checkNamespace(ident.namespace()) val writeOptions = new JdbcOptionsInWrite( options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident))) - classifyException { + classifyException(s"Failed table existence check: $ident") { withConnection(JdbcUtils.tableExists(_, writeOptions)) } } @@ -90,7 +90,7 @@ class JDBCTableCatalog extends TableCatalog with Logging { override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = { checkNamespace(oldIdent.namespace()) withConnection { conn => - classifyException { + classifyException(s"Failed table renaming from $oldIdent to $newIdent") { JdbcUtils.renameTable(conn, getTableName(oldIdent), getTableName(newIdent), options) } } @@ -127,7 +127,7 @@ class JDBCTableCatalog extends TableCatalog with Logging { options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident))) val caseSensitive = SQLConf.get.caseSensitiveAnalysis withConnection { conn => - classifyException { + classifyException(s"Failed table creation: $ident") { JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions) } } @@ -138,7 +138,7 @@ class JDBCTableCatalog extends TableCatalog with Logging { override def alterTable(ident: Identifier, changes: TableChange*): Table = { checkNamespace(ident.namespace()) withConnection { conn => - classifyException { + classifyException(s"Failed table altering: $ident") { JdbcUtils.alterTable(conn, getTableName(ident), changes, options) } loadTable(ident) @@ -165,11 +165,11 @@ class JDBCTableCatalog extends TableCatalog with Logging { (ident.namespace() :+ ident.name()).map(dialect.quoteIdentifier).mkString(".") } - private def classifyException[T](f: => T): T = { + private def classifyException[T](message: String)(f: => T): T = { try { f } catch { - case e: Throwable => throw dialect.classifyException(e) + case e: Throwable => throw dialect.classifyException(message, e) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala new file mode 100644 index 0000000000000..9c727957ffab8 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import java.sql.SQLException +import java.util.Locale + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} + +private object H2Dialect extends JdbcDialect { + override def canHandle(url: String): Boolean = + url.toLowerCase(Locale.ROOT).startsWith("jdbc:h2") + + override def classifyException(message: String, e: Throwable): AnalysisException = { + if (e.isInstanceOf[SQLException]) { + // Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html + e.asInstanceOf[SQLException].getErrorCode match { + // TABLE_OR_VIEW_ALREADY_EXISTS_1 + case 42101 => + throw new TableAlreadyExistsException(message, cause = Some(e)) + // TABLE_OR_VIEW_NOT_FOUND_1 + case 42102 => + throw new NoSuchTableException(message, cause = Some(e)) + // SCHEMA_NOT_FOUND_1 + case 90079 => + throw new NoSuchNamespaceException(message, cause = Some(e)) + case _ => + } + } + super.classifyException(message, e) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index d2cbc25c43bc1..5f8d788bc7a22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -257,13 +257,12 @@ abstract class JdbcDialect extends Serializable { /** * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. + * @param message The error message to be placed to the returned exception. * @param e The dialect specific exception. * @return `AnalysisException` or its sub-class. */ - def classifyException(e: Throwable): AnalysisException = { - new AnalysisException( - message = "Failed on a JDBC dialect statement", - cause = Some(e)) + def classifyException(message: String, e: Throwable): AnalysisException = { + new AnalysisException(message, cause = Some(e)) } } @@ -309,6 +308,7 @@ object JdbcDialects { registerDialect(DerbyDialect) registerDialect(OracleDialect) registerDialect(TeradataDialect) + registerDialect(H2Dialect) /** * Fetch the JdbcDialect class corresponding to a given database url. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index ca86a8f593621..8fe58e3a0a28a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -21,7 +21,7 @@ import java.util.Properties import org.apache.spark.SparkConf import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -101,15 +101,18 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { Seq(Row("test", "dst_table"), Row("test", "people"))) } // Rename not existing table or namespace - Seq( - "h2.test.not_existing_table" -> "Table \"not_existing_table\" not found", - "h2.bad_test.not_existing_table" -> "Schema \"bad_test\" not found" - ).foreach { case (table, expectedMsg) => - val msg = intercept[org.h2.jdbc.JdbcSQLException] { - sql(s"ALTER TABLE $table RENAME TO test.dst_table") - }.getMessage - assert(msg.contains(expectedMsg)) + val exp1 = intercept[NoSuchTableException] { + sql(s"ALTER TABLE h2.test.not_existing_table RENAME TO test.dst_table") + } + assert(exp1.getMessage.contains( + "Failed table renaming from test.not_existing_table to test.dst_table")) + assert(exp1.cause.get.getMessage.contains("Table \"not_existing_table\" not found")) + val exp2 = intercept[NoSuchNamespaceException] { + sql(s"ALTER TABLE h2.bad_test.not_existing_table RENAME TO test.dst_table") } + assert(exp2.getMessage.contains( + "Failed table renaming from bad_test.not_existing_table to test.dst_table")) + assert(exp2.cause.get.getMessage.contains("Schema \"bad_test\" not found")) // Rename to an existing table withTable("h2.test.dst_table") { withConnection { conn => @@ -119,10 +122,12 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { withConnection { conn => conn.prepareStatement("""CREATE TABLE "test"."src_table" (id INTEGER)""").executeUpdate() } - val msg = intercept[org.h2.jdbc.JdbcSQLException] { + val exp = intercept[TableAlreadyExistsException] { sql("ALTER TABLE h2.test.src_table RENAME TO test.dst_table") - }.getMessage - assert(msg.contains("Table \"dst_table\" already exists")) + } + assert(exp.getMessage.contains( + "Failed table renaming from test.src_table to test.dst_table")) + assert(exp.cause.get.getMessage.contains("Table \"dst_table\" already exists")) } } } @@ -156,10 +161,11 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { }.getMessage assert(msg.contains("Table test.new_table already exists")) } - val msg = intercept[org.h2.jdbc.JdbcSQLException] { + val exp = intercept[NoSuchNamespaceException] { sql("CREATE TABLE h2.bad_test.new_table(i INT, j STRING) USING _") - }.getMessage - assert(msg.contains("Schema \"bad_test\" not found")) + } + assert(exp.getMessage.contains("Failed table creation: bad_test.new_table")) + assert(exp.cause.get.getMessage.contains("Schema \"bad_test\" not found")) } test("alter table ... add column") { @@ -289,15 +295,16 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { test("alter table ... update column comment not supported") { withTable("h2.test.alt_table") { sql("CREATE TABLE h2.test.alt_table (ID INTEGER) USING _") - val msg1 = intercept[java.sql.SQLFeatureNotSupportedException] { + val exp = intercept[AnalysisException] { sql("ALTER TABLE h2.test.alt_table ALTER COLUMN ID COMMENT 'test'") - }.getMessage - assert(msg1.contains("Unsupported TableChange")) + } + assert(exp.getMessage.contains("Failed table altering: test.alt_table")) + assert(exp.cause.get.getMessage.contains("Unsupported TableChange")) // Update comment for not existing column - val msg2 = intercept[AnalysisException] { + val msg = intercept[AnalysisException] { sql("ALTER TABLE h2.test.alt_table ALTER COLUMN bad_column COMMENT 'test'") }.getMessage - assert(msg2.contains("Cannot update missing field bad_column in test.alt_table")) + assert(msg.contains("Cannot update missing field bad_column in test.alt_table")) } // Update column comments in not existing table and namespace Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table => From ac55879e80bbb964c1563466a8171f7a8fb2720d Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Wed, 7 Oct 2020 17:00:14 +0300 Subject: [PATCH 4/4] Temporary unregister H2Dialect in tests --- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 11 ++++-- .../spark/sql/jdbc/JDBCWriteSuite.scala | 37 +++++++++++-------- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 7af55550a7736..f0b19071a969b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -770,9 +770,14 @@ class JDBCSuite extends QueryTest } test("Dialect unregister") { - JdbcDialects.registerDialect(testH2Dialect) - JdbcDialects.unregisterDialect(testH2Dialect) - assert(JdbcDialects.get(urlWithUserAndPass) == NoopDialect) + JdbcDialects.unregisterDialect(H2Dialect) + try { + JdbcDialects.registerDialect(testH2Dialect) + JdbcDialects.unregisterDialect(testH2Dialect) + assert(JdbcDialects.get(urlWithUserAndPass) == NoopDialect) + } finally { + JdbcDialects.registerDialect(H2Dialect) + } } test("Aggregated dialects") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 3f621e04338a3..fb46c2ff4c0ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -194,24 +194,29 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter { } test("Truncate") { - JdbcDialects.registerDialect(testH2Dialect) - val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) - val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) - val df3 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3) - - df.write.jdbc(url1, "TEST.TRUNCATETEST", properties) - df2.write.mode(SaveMode.Overwrite).option("truncate", true) - .jdbc(url1, "TEST.TRUNCATETEST", properties) - assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count()) - assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length) + JdbcDialects.unregisterDialect(H2Dialect) + try { + JdbcDialects.registerDialect(testH2Dialect) + val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) + val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2) + val df3 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3) - val m = intercept[AnalysisException] { - df3.write.mode(SaveMode.Overwrite).option("truncate", true) + df.write.jdbc(url1, "TEST.TRUNCATETEST", properties) + df2.write.mode(SaveMode.Overwrite).option("truncate", true) .jdbc(url1, "TEST.TRUNCATETEST", properties) - }.getMessage - assert(m.contains("Column \"seq\" not found")) - assert(0 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count()) - JdbcDialects.unregisterDialect(testH2Dialect) + assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count()) + assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length) + + val m = intercept[AnalysisException] { + df3.write.mode(SaveMode.Overwrite).option("truncate", true) + .jdbc(url1, "TEST.TRUNCATETEST", properties) + }.getMessage + assert(m.contains("Column \"seq\" not found")) + assert(0 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count()) + } finally { + JdbcDialects.unregisterDialect(testH2Dialect) + JdbcDialects.registerDialect(H2Dialect) + } } test("createTableOptions") {