diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 490c1ce8a7cc5..72424d206a7bf 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -518,7 +518,8 @@ file directly with SQL. Save operations can optionally take a `SaveMode`, that specifies how to handle existing data if present. It is important to realize that these save modes do not utilize any locking and are not atomic. Additionally, when performing an `Overwrite`, the data will be deleted before writing out the -new data. +new data. When performing an `Append`, there is an option to enable the UPSERT feature for JDBC datasources, currently +only MySQL and Postgres. @@ -1230,6 +1231,24 @@ the following case-insensitive options: The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments VARCHAR(1024)"). The specified types should be valid spark sql data types. This option applies only to writing. + + + + +
Scala/JavaAny LanguageMeaning
upsert, upsertConditionColumn, upsertUpdateColumn + These options are JDBC writer related options. They describe how to + use UPSERT feature for different JDBC dialects. Right now, this feature implemented in MySQL, Postgres + JDBC dialects. The upsert option is applicable only when SaveMode.Append is enabled, + in Overwrite mode, the existing table will be dropped or truncated first, including the unique constraints + or primary key, before the insertion. So, UPSERT scenario is not applicable. + upsertConditionColumn are columns used to match existing rows. They are usually unique constraint/primary + key columns. This option is required by PostgreSQL datasource. This option is not applicable for MySQL datasource, + since the datasource will automatically use any existing unique constraint. + upsertUpdateColumn are columns updated with the input data set when existing rows are matched. It is required + by MySQL datasource. + Be aware that if the input data set has duplicate rows, the upsert operation is + non-deterministic, it is documented at the [upsert(merge) wiki:](https://en.wikipedia.org/wiki/Merge_(SQL)). +
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index a70ed98b52d5d..9da97ba017f33 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -20,11 +20,13 @@ package org.apache.spark.sql.jdbc import java.math.BigDecimal import java.sql.{Connection, Date, Timestamp} import java.util.Properties +import org.apache.spark.sql.SaveMode import org.apache.spark.tags.DockerTest @DockerTest class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { + import testImplicits._ override val db = new DatabaseOnDocker { override val imageName = "mysql:5.7.9" override val env = Map( @@ -61,6 +63,19 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { ).executeUpdate() conn.prepareStatement("INSERT INTO strings VALUES ('the', 'quick', 'brown', 'fox', " + "'jumps', 'over', 'the', 'lazy', 'dog')").executeUpdate() + + conn.prepareStatement("CREATE TABLE upsertT0 (c1 INTEGER primary key, c2 INTEGER, c3 INTEGER)") + .executeUpdate() + conn.prepareStatement("INSERT INTO upsertT0 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)") + .executeUpdate() + conn.prepareStatement("CREATE TABLE upsertT1 (c1 INTEGER primary key, c2 INTEGER, c3 INTEGER)") + .executeUpdate() + conn.prepareStatement("INSERT INTO upsertT1 VALUES (1, 2, 3), (2, 3, 4)") + .executeUpdate() + conn.prepareStatement("CREATE TABLE upsertT2 (c1 INTEGER, c2 INTEGER, c3 INTEGER, " + + "primary key(c1, c2))").executeUpdate() + conn.prepareStatement("INSERT INTO upsertT2 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)") + .executeUpdate() } test("Basic test") { @@ -152,4 +167,104 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { df2.write.jdbc(jdbcUrl, "datescopy", new Properties) df3.write.jdbc(jdbcUrl, "stringscopy", new Properties) } + + test("upsert with Append without existing table") { + val df1 = Seq((1, 3), (2, 5)).toDF("c1", "c2") + df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertUpdateColumn", "c1") + .jdbc(jdbcUrl, "upsertT", new Properties) + val df2 = spark.read.jdbc(jdbcUrl, "upsertT", new Properties) + assert(df2.count() == 2) + assert(df2.filter("C1=1").collect.head.get(1) == 3) + + // table upsertT create without primary key or unique constraints, it will do the insert + val df3 = Seq((1, 4)).toDF("c1", "c2") + df3.write.mode(SaveMode.Append).option("upsert", true).option("upsertUpdateColumn", "c1") + .jdbc(jdbcUrl, "upsertT", new Properties) + assert(spark.read.jdbc(jdbcUrl, "upsertT", new Properties).filter("c1=1").count() == 2) + } + + test("Upsert and OverWrite mode") { + //existing table has these rows + //(1, 2, 3), (2, 3, 4), (3, 4, 5) + val df1 = spark.read.jdbc(jdbcUrl, "upsertT0", new Properties()) + assert(df1.filter("c1=1").collect.head.getInt(1) == 2) + assert(df1.filter("c1=1").collect.head.getInt(2) == 3) + assert(df1.filter("c1=2").collect.head.getInt(1) == 3) + assert(df1.filter("c1=2").collect.head.getInt(2) == 4) + val df2 = Seq((1, 3, 6), (2, 5, 6)).toDF("c1", "c2", "c3") + // it will do the Overwrite, not upsert + df2.write.mode(SaveMode.Overwrite) + .option("upsert", true).option("upsertUpdateColumn", "c2, c3") + .jdbc(jdbcUrl, "upsertT0", new Properties) + val df3 = spark.read.jdbc(jdbcUrl, "upsertT0", new Properties()) + assert(df3.filter("c1=1").collect.head.getInt(1) == 3) + assert(df3.filter("c1=1").collect.head.getInt(2) == 6) + assert(df3.filter("c1=2").collect.head.getInt(1) == 5) + assert(df3.filter("c1=2").collect.head.getInt(2) == 6) + assert(df3.filter("c1=3").collect.size == 0) + } + + test("upsert with Append and negative option values") { + val df1 = Seq((1, 3, 6), (2, 5, 6)).toDF("c1", "c2", "c3") + val m = intercept[java.sql.SQLException] { + df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertUpdateColumn", "C11") + .jdbc(jdbcUrl, "upsertT1", new Properties) + }.getMessage + assert(m.contains("column C11 not found")) + + val n = intercept[java.sql.SQLException] { + df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertUpdateColumn", "C11") + .jdbc(jdbcUrl, "upsertT1", new Properties) + }.getMessage + assert(n.contains("column C11 not found")) + } + + test("Upsert and Append mode -- data matching one column") { + //existing table has these rows + //(1, 2, 3), (2, 3, 4) + val df1 = spark.read.jdbc(jdbcUrl, "upsertT1", new Properties()) + assert(df1.count() == 2) + assert(df1.filter("c1=1").collect.head.getInt(1) == 2) + assert(df1.filter("c1=1").collect.head.getInt(2) == 3) + assert(df1.filter("c1=2").collect.head.getInt(1) == 3) + assert(df1.filter("c1=2").collect.head.getInt(2) == 4) + val df2 = Seq((1, 4, 7), (2, 6, 8)).toDF("c1", "c2", "c3") + df2.write.mode(SaveMode.Append) + .option("upsert", true).option("upsertUpdateColumn", "c2, c3") + .jdbc(jdbcUrl, "upsertT1", new Properties) + val df3 = spark.read.jdbc(jdbcUrl, "upsertT1", new Properties()) + assert(df3.count() == 2) + assert(df3.filter("c1=1").collect.head.getInt(1) == 4) + assert(df3.filter("c1=1").collect.head.getInt(2) == 7) + assert(df3.filter("c1=2").collect.head.getInt(1) == 6) + assert(df3.filter("c1=2").collect.head.getInt(2) == 8) + // turn upsert off, it will do the insert the row with duplicate key, and it will get nullPointerException + val df4 = Seq((1, 5, 9)).toDF("c1", "c2", "c3") + val n = intercept[org.apache.spark.SparkException] { + df4.write.mode(SaveMode.Append).option("upsert", false).option("upsertUpdateColumn", "C11") + .jdbc(jdbcUrl, "upsertT1", new Properties) + }.getMessage + assert(n.contains("Duplicate entry '1' for key 'PRIMARY'")) + } + + test("Upsert and Append mode -- data matching two columns") { + // table has these rows: (1, 2, 3), (2, 3, 4), (3, 4, 5) + // update Row(2, 3, 4) to Row(2, 3, 10) that matches 2 columns + val df1 = spark.read.jdbc(jdbcUrl, "upsertT2", new Properties()) + assert(df1.count() == 3) + assert(df1.filter("c1=1").collect.head.getInt(1) == 2) + assert(df1.filter("c1=1").collect.head.getInt(2) == 3) + assert(df1.filter("c1=2").collect.head.getInt(1) == 3) + assert(df1.filter("c1=2").collect.head.getInt(2) == 4) + + val df2 = Seq((2, 3, 10)).toDF("c1", "c2", "c3") + df2.write.mode(SaveMode.Append) + .option("upsert", true).option("upsertUpdateColumn", "c3") + .jdbc(jdbcUrl, "upsertT2", new Properties) + + val df3 = spark.read.jdbc(jdbcUrl, "upsertT2", new Properties()) + assert(df3.count() == 3) + assert(df3.filter("c1=2").collect.head.getInt(1) == 3) + assert(df3.filter("c1=2").collect.head.getInt(2) == 10) + } } diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index a1a065a443e67..374775c9b19f9 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc import java.sql.Connection import java.util.Properties +import org.apache.spark.sql.SaveMode import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.expressions.Literal @@ -27,8 +28,9 @@ import org.apache.spark.tags.DockerTest @DockerTest class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { + import testImplicits._ override val db = new DatabaseOnDocker { - override val imageName = "postgres:9.4.5" + override val imageName = "postgres:9.5.4" override val env = Map( "POSTGRES_PASSWORD" -> "rootpass" ) @@ -55,6 +57,26 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { + "null, null, null, null, null, " + "null, null, null, null, null, null, null)" ).executeUpdate() + conn.prepareStatement("CREATE TABLE upsertT0 " + + "(c1 INTEGER, c2 INTEGER, c3 INTEGER, primary key(c1))").executeUpdate() + conn.prepareStatement("INSERT INTO upsertT0 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)") + .executeUpdate() + conn.prepareStatement("CREATE TABLE upsertT1 " + + "(c1 INTEGER, c2 INTEGER, c3 INTEGER, primary key(c1))").executeUpdate() + conn.prepareStatement("INSERT INTO upsertT1 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)") + .executeUpdate() + conn.prepareStatement("CREATE TABLE upsertT2 " + + "(c1 INTEGER, c2 INTEGER, c3 INTEGER, primary key(c1, c2))").executeUpdate() + conn.prepareStatement("INSERT INTO upsertT2 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)") + .executeUpdate() + conn.prepareStatement("CREATE TABLE upsertT3 " + + "(c1 INTEGER, c2 INTEGER, c3 INTEGER, primary key(c1))").executeUpdate() + conn.prepareStatement("INSERT INTO upsertT3 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)") + .executeUpdate() + conn.prepareStatement("CREATE TABLE upsertT4 " + + "(c1 INTEGER, c2 INTEGER, c3 INTEGER, primary key(c1))").executeUpdate() + conn.prepareStatement("INSERT INTO upsertT4 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)") + .executeUpdate() } test("Type mapping for various types") { @@ -126,4 +148,93 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { assert(schema(0).dataType == FloatType) assert(schema(1).dataType == ShortType) } + + test("Upsert and OverWrite mode") { + //existing table has these rows + //(1, 2, 3), (2, 3, 4), (3, 4, 5) + val df1 = spark.read.jdbc(jdbcUrl, "upsertT0", new Properties()) + assert(df1.filter("c1=1").collect.head.getInt(1) == 2) + assert(df1.filter("c1=1").collect.head.getInt(2) == 3) + assert(df1.filter("c1=2").collect.head.getInt(1) == 3) + assert(df1.filter("c1=2").collect.head.getInt(2) == 4) + val df2 = Seq((1, 3, 6), (2, 5, 6)).toDF("c1", "c2", "c3") + // it will do the Overwrite, not upsert + df2.write.mode(SaveMode.Overwrite) + .option("upsert", true).option("upsertConditionColumn", "c1") + .jdbc(jdbcUrl, "upsertT0", new Properties) + val df3 = spark.read.jdbc(jdbcUrl, "upsertT0", new Properties()) + assert(df3.filter("c1=1").collect.head.getInt(1) == 3) + assert(df3.filter("c1=1").collect.head.getInt(2) == 6) + assert(df3.filter("c1=2").collect.head.getInt(1) == 5) + assert(df3.filter("c1=2").collect.head.getInt(2) == 6) + assert(df3.filter("c1=3").collect.size == 0) + } + + test("upsert with Append and negative option values") { + val df1 = Seq((1, 3, 6), (2, 5, 6)).toDF("c1", "c2", "c3") + val m = intercept[java.sql.SQLException] { + df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "C11") + .jdbc(jdbcUrl, "upsertT1", new Properties) + }.getMessage + assert(m.contains("column C11 not found")) + + val n = intercept[java.sql.SQLException] { + df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "C11") + .jdbc(jdbcUrl, "upsertT1", new Properties) + }.getMessage + assert(n.contains("column C11 not found")) + + val o = intercept[org.apache.spark.SparkException] { + df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertconditionColumn", "c2") + .jdbc(jdbcUrl, "upsertT1", new Properties) + }.getMessage + assert(o.contains("there is no unique or exclusion constraint matching the ON CONFLICT")) + } + + test("upsert with Append without existing table") { + val df1 = Seq((1, 3), (2, 5)).toDF("c1", "c2") + df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "c1") + .jdbc(jdbcUrl, "upsertT", new Properties) + val df2 = spark.read.jdbc(jdbcUrl, "upsertT", new Properties) + assert(df2.count() == 2) + assert(df2.filter("C1=1").collect.head.get(1) == 3) + + // table upsertT create without primary key or unique constraints, it will throw the exception + val df3 = Seq((1, 4)).toDF("c1", "c2") + val p = intercept[org.apache.spark.SparkException] { + df3.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "c1") + .jdbc(jdbcUrl, "upsertT", new Properties) + }.getMessage + assert(p.contains("there is no unique or exclusion constraint matching the ON CONFLICT specification")) + } + + test("Upsert & Append test -- matching one column") { + val df1 = spark.read.jdbc(jdbcUrl, "upsertT3", new Properties()) + assert(df1.filter("c1=1").collect.head.getInt(1) == 2) + assert(df1.filter("c1=1").collect.head.getInt(2) == 3) + assert(df1.filter("c1=4").collect.size == 0) + // update Row(1, 2, 3) to (1, 3, 6) and insert new Row(4, 5, 6) + val df2 = Seq((1, 3, 6), (4, 5, 6)).toDF("c1", "c2", "c3") + // condition on one column + df2.write.mode(SaveMode.Append) + .option("upsert", true).option("upsertConditionColumn", "c1").option("upsertUpdateColumn", "c2, c3") + .jdbc(jdbcUrl, "upsertT3", new Properties) + val df3 = spark.read.jdbc(jdbcUrl, "upsertT3", new Properties()) + assert(df3.filter("c1=1").collect.head.getInt(1) == 3) + assert(df3.filter("c1=1").collect.head.getInt(2) == 6) + assert(df3.filter("c1=4").collect.size == 1) + } + + test("Upsert & Append test -- matching two columns") { + val df1 = spark.read.jdbc(jdbcUrl, "upsertT2", new Properties()) + assert(df1.filter("c1=1").collect.head.getInt(1) == 2) + assert(df1.filter("c1=1").collect.head.getInt(2) == 3) + // update Row(2, 3, 4) to Row(2, 3, 10) that matches 2 columns + val df2 = Seq((2, 3, 10)).toDF("c1", "c2", "c3") + df2.write.mode(SaveMode.Append) + .option("upsert", true).option("upsertConditionColumn", "c1, c2") + .jdbc(jdbcUrl, "upsertT2", new Properties) + val df3 = spark.read.jdbc(jdbcUrl, "upsertT2", new Properties()) + assert(df3.filter("c1=2").collect.head.getInt(2) == 10) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 1732a8e08b73f..c0840914d02ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -431,6 +431,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * You can set the following JDBC-specific option(s) for storing JDBC: * * * In case of failures, users should turn off `truncate` option to use `DROP TABLE` again. Also, @@ -439,6 +448,27 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * while PostgresDialect and default JDBCDirect doesn't. For unknown and unsupported JDBCDirect, * the user option `truncate` is ignored. * + * When use the upsert feature, please be aware these limitations. + * First, if the content of the [[DataFrame]] has mutiple rows with the same key, the upsert + * feature will be non-deterministic, the reason is that the data will be partitioned and send + * to the data source with different JDBC connection. You can avoid this by eliminating the + * duplicate key rows first, then save to the JDBC datasource tables. For example: the key is + * on columns("key", "values1") + * + * {{{ + * scala> val testData = sc.parallelize((2, 1, 2) :: (1, 1, 1) :: (1, 2, 3) :: (2, 1, 3) :: + * (2, 2, 2) :: (2, 2, 1) :: (2, 1, 4) :: (1, 1, 4) :: (1, 2, 5) :: + * (1, 2, 6) :: Nil, 5).toDF("key", "value1", "TS") + * scala> val sorted = testData.orderBy('key,'value1) + * scala> val agg = sorted.groupBy('key).agg("TS" -> "max").withColumnRenamed("max(TS)","TS") + * scala> agg.join(sorted, Seq("key","TS")) + * }}} + * + * Second, this upsert feature is only supported for SaveMode.Append. SaveMode.Overwrite will + * first empty the table, then do the insert, also in order to get deterministic result, we + * recommend to remove duplicate key rows in the content of the [[DataFrame]], so there is + * no need to implement upsert in the SaveMode.Append. + * * @param url JDBC database url of the form `jdbc:subprotocol:subname` * @param table Name of the table in the external database. * @param connectionProperties JDBC database connection arguments, a list of arbitrary string diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 591096d5efd22..19451f331f2d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -115,6 +115,20 @@ class JDBCOptions( // ------------------------------------------------------------ // if to truncate the table from the JDBC database val isTruncate = parameters.getOrElse(JDBC_TRUNCATE, "false").toBoolean + // if to upsert the table from the JDBC database + val isUpsert = parameters.getOrElse(JDBC_UPSERT, "false").toBoolean + // the columns used to set condition columns for upsert feature + val upsertConditionColumn = parameters.getOrElse(JDBC_UPSERT_CONDITION_COLUMN, null) match { + case null => Array.empty[String] + case o => o.split(",").map(_.trim) + } + // the columns used to set columns to be updated for upsert feature + val upsertUpdateColumn = parameters.getOrElse(JDBC_UPSERT_UPDATE_COLUMN, null) match { + case null => Array.empty[String] + case o => o.split(",").map(_.trim) + } + // the jdbc table exist or not + val isJdbcTableExist = parameters.getOrElse(JDBC_TABLE_EXIST, "false").toBoolean // the create table option , which can be table_options or partition_options. // E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8" // TODO: to reuse the existing partition parameters for those partition specific options @@ -158,4 +172,8 @@ object JDBCOptions { val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes") val JDBC_BATCH_INSERT_SIZE = newOption("batchsize") val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel") + val JDBC_UPSERT = newOption("upsert") + val JDBC_UPSERT_CONDITION_COLUMN = newOption("upsertConditionColumn") + val JDBC_UPSERT_UPDATE_COLUMN = newOption("upsertUpdateColumn") + val JDBC_TABLE_EXIST = newOption("jdbcTableExist") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala index 74dcfb06f5c2b..d2652d26e5d1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -65,17 +65,17 @@ class JdbcRelationProvider extends CreatableRelationProvider // In this case, we should truncate table and then load. truncateTable(conn, options.table) val tableSchema = JdbcUtils.getSchemaOption(conn, options) - saveTable(df, tableSchema, isCaseSensitive, options) + saveTable(df, tableSchema, isCaseSensitive, mode, options) } else { // Otherwise, do not truncate the table, instead drop and recreate it dropTable(conn, options.table) createTable(conn, df, options) - saveTable(df, Some(df.schema), isCaseSensitive, options) + saveTable(df, Some(df.schema), isCaseSensitive, mode, options) } case SaveMode.Append => val tableSchema = JdbcUtils.getSchemaOption(conn, options) - saveTable(df, tableSchema, isCaseSensitive, options) + saveTable(df, tableSchema, isCaseSensitive, mode, options) case SaveMode.ErrorIfExists => throw new AnalysisException( @@ -88,7 +88,7 @@ class JdbcRelationProvider extends CreatableRelationProvider } } else { createTable(conn, df, options) - saveTable(df, Some(df.schema), isCaseSensitive, options) + saveTable(df, Some(df.schema), isCaseSensitive, mode, options, !tableExists) } } finally { conn.close() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 5fc3c2753b6cf..d24f7b8a339fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -27,13 +27,13 @@ import scala.util.control.NonFatal import org.apache.spark.TaskContext import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, DataFrame, Row} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData} -import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType} +import org.apache.spark.sql.jdbc._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.NextIterator @@ -570,7 +570,7 @@ object JdbcUtils extends Logging { * non-Serializable. Instead, we explicitly close over all variables that * are used. */ - def savePartition( + private def savePartition( getConnection: () => Connection, table: String, iterator: Iterator[Row], @@ -578,7 +578,9 @@ object JdbcUtils extends Logging { insertStmt: String, batchSize: Int, dialect: JdbcDialect, - isolationLevel: Int): Iterator[Byte] = { + isolationLevel: Int, + isUpsert: Boolean = false, + upsertParam: UpsertInfo = UpsertInfo(Array(), Array())): Iterator[Byte] = { val conn = getConnection() var committed = false @@ -612,7 +614,12 @@ object JdbcUtils extends Logging { conn.setAutoCommit(false) // Everything in the same db transaction. conn.setTransactionIsolation(finalIsolationLevel) } - val stmt = conn.prepareStatement(insertStmt) + + val stmt = if (isUpsert) { + dialect.upsertStatement(conn, table, rddSchema, upsertParam) + } else { + conn.prepareStatement(insertStmt) + } val setters = rddSchema.fields.map(f => makeSetter(conn, dialect, f.dataType)) val nullTypes = rddSchema.fields.map(f => getJdbcType(f.dataType, dialect).jdbcNullType) val numFields = rddSchema.fields.length @@ -753,7 +760,9 @@ object JdbcUtils extends Logging { df: DataFrame, tableSchema: Option[StructType], isCaseSensitive: Boolean, - options: JDBCOptions): Unit = { + mode: SaveMode, + options: JDBCOptions, + newTableFlag: Boolean = false): Unit = { val url = options.url val table = options.table val dialect = JdbcDialects.get(url) @@ -770,9 +779,69 @@ object JdbcUtils extends Logging { case Some(n) if n < df.rdd.getNumPartitions => df.coalesce(n) case _ => df } - repartitionedDF.foreachPartition(iterator => savePartition( - getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel) - ) + val isUpsert = if (options.isUpsert && (mode == SaveMode.Append)) { + if (newTableFlag) { + logWarning("UPSERT operation is not applicable for a brand new table, " + + "regular INSERT operation will be done.") + false + } else true + } else false + // if upsert feature is on, validate the column name and prepare the upsertParam + // if the input data(dataframe) have duplicate row, the upsert result is non-deterministic. + // It is documented at upsert(merge) wiki: https://en.wikipedia.org/wiki/Merge_(SQL) + if (isUpsert) { + val upsertUpdateColumns = options.upsertUpdateColumn + val upsertConditionColumns = options.upsertConditionColumn + val upsertParam = UpsertInfo(upsertConditionColumns, upsertUpdateColumns) + val caseSensitive = df.sparkSession.sessionState.conf.caseSensitiveAnalysis + validateColumn(rddSchema, upsertParam.upsertUpdateColumns, caseSensitive) + validateColumn(rddSchema, upsertParam.upsertConditionColumns, caseSensitive) + repartitionedDF.foreachPartition(iterator => + savePartition( + getConnection, + table, + iterator, + rddSchema, + insertStmt, + batchSize, + dialect, + isolationLevel, + isUpsert, + upsertParam) + ) + } else { + repartitionedDF.foreachPartition(iterator => + savePartition( + getConnection, + table, + iterator, + rddSchema, + insertStmt, + batchSize, + dialect, + isolationLevel) + ) + } + } + + /** + * Validate the column name for Upsert with caseSensitive/inSensitive + */ + def validateColumn( + schema: StructType, + upsertColumns: Seq[String], + caseSensitive: Boolean): Unit = { + upsertColumns.map { col => + schema.find { f => + if (caseSensitive) { + org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution(f.name, col) + } else { + org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution(f.name, col) + } + }.getOrElse { + throw new SQLException(s"upsert column $col not found in schema $schema") + } + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index e328b86437d62..49b842e753fc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -17,9 +17,10 @@ package org.apache.spark.sql.jdbc -import java.sql.Connection +import java.sql.{Connection, PreparedStatement} import org.apache.spark.annotation.{DeveloperApi, InterfaceStability, Since} +import org.apache.spark.sql.SaveMode import org.apache.spark.sql.types._ /** @@ -34,6 +35,18 @@ import org.apache.spark.sql.types._ @InterfaceStability.Evolving case class JdbcType(databaseTypeDefinition : String, jdbcNullType : Int) +/** + * :: DeveloperApi :: + * the duplicate key columns and update columns send to the database + * values to the database. + * @param upsertConditionColumns The duplicate key columns + * @param upsertUpdateColumns The columns to be updated + */ +@DeveloperApi +@InterfaceStability.Evolving +case class UpsertInfo(val upsertConditionColumns: Array[String], + val upsertUpdateColumns: Array[String]) + /** * :: DeveloperApi :: * Encapsulates everything (extensions, workarounds, quirks) to handle the @@ -130,6 +143,23 @@ abstract class JdbcDialect extends Serializable { * None: The behavior of TRUNCATE TABLE is unknown (default). */ def isCascadingTruncateTable(): Option[Boolean] = None + + /** + * Generate a PreparedStatement that performs UPSERT operation + * + * @param conn The connection object + * @param table The target table + * @param rddSchema The schema for the table + * @param upsertParam The parameter contains upsert feature related information. + * @return PreparedStatement + */ + def upsertStatement( + conn: Connection, + table: String, + rddSchema: StructType, + upsertParam: UpsertInfo = UpsertInfo(Array(), Array())): PreparedStatement = { + throw new UnsupportedOperationException("UPSERT operation is not implemented.") + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index b2cff7877d8b5..42eef7faa39dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.jdbc -import java.sql.Types +import java.sql.{Connection, PreparedStatement, Types} -import org.apache.spark.sql.types.{BooleanType, DataType, LongType, MetadataBuilder} +import org.apache.spark.sql.types.{BooleanType, DataType, LongType, MetadataBuilder, StructType} private case object MySQLDialect extends JdbcDialect { @@ -46,4 +46,35 @@ private case object MySQLDialect extends JdbcDialect { } override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + + /** + * Returns a PreparedStatement that does Insert/Update table + */ + override def upsertStatement( + conn: Connection, + table: String, + rddSchema: StructType, + upsertParam: UpsertInfo = UpsertInfo(Array.empty, Array.empty)): PreparedStatement = { + require(upsertParam.upsertUpdateColumns.nonEmpty, + "Upsert option requires update column names." + + "Please specify option(\"upsertUpdateColumn\", \"c1, c2, ...\")") + + + val updateClause = if (upsertParam.upsertUpdateColumns.nonEmpty) { + upsertParam.upsertUpdateColumns.map(x => s"$x = VALUES($x)").mkString(", ") + } + else { + rddSchema.fields.map(x => s"$x = VALUES($x)").mkString(", ") + } + val insertColumns = rddSchema.fields.map(_.name).mkString(", ") + val placeholders = rddSchema.fields.map(_ => "?").mkString(",") + + val sql = + s""" + |INSERT INTO $table ($insertColumns) + |VALUES ( $placeholders ) + |ON DUPLICATE KEY UPDATE $updateClause + """.stripMargin + conn.prepareStatement(sql) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 4f61a328f47ca..691a6156ece9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.jdbc -import java.sql.{Connection, Types} +import java.sql.{Connection, PreparedStatement, Types} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.types._ @@ -101,4 +101,49 @@ private object PostgresDialect extends JdbcDialect { } override def isCascadingTruncateTable(): Option[Boolean] = Some(true) + + /** + * Returns a PreparedStatement that does Insert/Update table + */ + override def upsertStatement( + conn: Connection, + table: String, + rddSchema: StructType, + upsertParam: UpsertInfo = + UpsertInfo(Array.empty[String], Array.empty[String])): PreparedStatement = { + require(upsertParam.upsertConditionColumns.nonEmpty, + "Upsert option requires column names on which duplicate rows are identified. " + + "Please specify option(\"upsertConditionColumn\", \"c1, c2, ...\")") + require(conn.getMetaData.getDatabaseProductVersion.compareToIgnoreCase("9.5") > 0, + "INSERT INTO with ON CONFLICT clause only support by PostgreSQL 9.5 and up.") + + val insertColumns = rddSchema.fields.map(_.name).mkString(", ") + val conflictTarget = upsertParam.upsertConditionColumns.mkString(", ") + val placeholders = rddSchema.fields.map(_ => "?").mkString(",") + val updateColumns = if (upsertParam.upsertUpdateColumns.nonEmpty) + { upsertParam.upsertUpdateColumns} else {rddSchema.fields.map(_.name)} + val updateClause = updateColumns + .filterNot(upsertParam.upsertConditionColumns.contains(_)) + .map(x => s"$x = EXCLUDED.$x").mkString(", ") + + // In the case where condition columns are the whole set of the rddSchema columns + // and rddSchema columns may be a subset of the target table schema. + // We need to do nothing for matched rows + val sql = if (updateClause != null && updateClause.nonEmpty) { + s""" + |INSERT INTO $table ($insertColumns) + |VALUES ( $placeholders ) + |ON CONFLICT ($conflictTarget) + |DO UPDATE SET $updateClause + """.stripMargin + } else { + s""" + |INSERT INTO $table ($insertColumns) + |VALUES ( $placeholders ) + |ON CONFLICT ($conflictTarget) + |DO NOTHING + """.stripMargin + } + conn.prepareStatement(sql) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index bf1fd160704fa..ecc3165077f00 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.jdbc -import java.sql.{Date, DriverManager, Timestamp} +import java.sql.{Connection, DriverManager, PreparedStatement} import java.util.Properties import scala.collection.JavaConverters.propertiesAsScalaMapConverter @@ -46,6 +46,30 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { val testH2Dialect = new JdbcDialect { override def canHandle(url: String) : Boolean = url.startsWith("jdbc:h2") override def isCascadingTruncateTable(): Option[Boolean] = Some(false) + override def upsertStatement( + conn: Connection, + table: String, + rddSchema: StructType, + upsertParam: UpsertInfo = + UpsertInfo(Array(), Array())): PreparedStatement = { + val columnNames = rddSchema.fields.map(_.name).mkString(", ") + val keyNames = upsertParam.upsertConditionColumns.mkString(", ") + val placeholders = rddSchema.fields.map(_ => "?").mkString(",") + val sql = + if (keyNames != null && !keyNames.isEmpty) { + s""" + |MERGE INTO $table ($columnNames) + |KEY($keyNames) + |VALUES($placeholders) + """.stripMargin + } else { + s""" + |MERGE INTO $table ($columnNames) + |VALUES($placeholders) + """.stripMargin + } + conn.prepareStatement(sql) + } } before { @@ -63,6 +87,18 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { conn1.prepareStatement("drop table if exists test.people1").executeUpdate() conn1.prepareStatement( "create table test.people1 (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate() + conn1.prepareStatement( + "create table test.upsertT1(c1 INTEGER PRIMARY KEY, c2 INTEGER)").executeUpdate() + conn1.prepareStatement( + "insert into test.upsertT1 values (1, 10)").executeUpdate() + conn1.prepareStatement( + "insert into test.upsertT1 values (2, 12)").executeUpdate() + conn1.prepareStatement( + "create table test.upsertT2(c1 INTEGER PRIMARY KEY, c2 INTEGER)").executeUpdate() + conn1.prepareStatement( + "insert into test.upsertT2 values (1, 10)").executeUpdate() + conn1.prepareStatement( + "insert into test.upsertT2 values (2, 12)").executeUpdate() conn1.commit() sql( @@ -101,6 +137,16 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { StructField("NAME", StringType) :: StructField("ID", IntegerType) :: Nil) + private lazy val ary1x2 = Array[Row](Row.apply(1, 42)) + private lazy val ary12x2 = Array[Row](Row.apply(2, 52)) + private lazy val ary13x2 = Array[Row](Row.apply(1, 62)) + private lazy val ary14x2 = Array[Row](Row.apply(1, 72)) + private lazy val ary2x2 = Array[Row](Row.apply(1, 52), Row.apply(2, 222)) + private lazy val ary3x2 = Array[Row](Row.apply(1, 10), Row.apply(2, 20), Row.apply(1, 30)) + private lazy val schema5 = StructType( + StructField("C1", IntegerType) :: + StructField("C2", IntegerType) :: Nil) + test("Basic CREATE") { val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2) @@ -506,4 +552,149 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter { "schema struct")) } } + + test("upsert with Overwrite") { + JdbcDialects.registerDialect(testH2Dialect) + val df = spark.createDataFrame(sparkContext.parallelize(ary1x2), schema5) + val df1 = spark.createDataFrame(sparkContext.parallelize(ary2x2), schema5) + + df.write.mode(SaveMode.Overwrite).option("upsert", true).option("upsertConditionColumn", "C1") + .jdbc(url1, "TEST.UPSERT", properties) + assert(spark.read.jdbc(url1, "TEST.UPSERT", properties).count() == 1) + assert(spark.read.jdbc(url1, "TEST.UPSERT", properties).filter("C1=1") + .collect.head.get(1) == 42) + + df1.write.mode(SaveMode.Overwrite).option("upsert", false).option("upsertConditionColumn", "C1") + .jdbc(url1, "TEST.UPSERT", properties) + assert(spark.read.jdbc(url1, "TEST.UPSERT", properties).count() == 2) + assert(spark.read.jdbc(url1, "TEST.UPSERT", properties).filter("C1=1") + .collect.head.get(1) == 52) + JdbcDialects.unregisterDialect(testH2Dialect) + } + + test("upsert with Append and case insensitive") { + JdbcDialects.registerDialect(testH2Dialect) + val df = spark.createDataFrame(sparkContext.parallelize(ary1x2), schema5) + val df1 = spark.createDataFrame(sparkContext.parallelize(ary12x2), schema5) + val df2 = spark.createDataFrame(sparkContext.parallelize(ary13x2), schema5) + + df.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "C1") + .jdbc(url1, "TEST.UPSERT1", properties) + assert(spark.read.jdbc(url1, "TEST.UPSERT1", properties).count() == 1) + assert(spark.read.jdbc(url1, "TEST.UPSERT1", properties).filter("C1=1") + .collect.head.get(1) == 42) + + df1.write.mode(SaveMode.Append).option("upsert", false).option("upsertConditionColumn", "c1") + .jdbc(url1, "TEST.UPSERT1", properties) + assert(spark.read.jdbc(url1, "TEST.UPSERT1", properties).count() == 2) + assert(spark.read.jdbc(url1, "TEST.UPSERT1", properties).filter("C1=1") + .collect.head.get(1) == 42) + + df2.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "c1") + .jdbc(url1, "TEST.UPSERT1", properties) + assert(spark.read.jdbc(url1, "TEST.UPSERT1", properties).count() == 2) + assert(spark.read.jdbc(url1, "TEST.UPSERT1", properties).filter("C1=1") + .collect.head.get(1) == 62) + JdbcDialects.unregisterDialect(testH2Dialect) + } + + test("upsert with Append and case sensitive") { + JdbcDialects.registerDialect(testH2Dialect) + val df = spark.createDataFrame(sparkContext.parallelize(ary1x2), schema5) + val df1 = spark.createDataFrame(sparkContext.parallelize(ary12x2), schema5) + val df2 = spark.createDataFrame(sparkContext.parallelize(ary13x2), schema5) + + spark.sql("set spark.sql.caseSensitive=true") + df.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "C1") + .jdbc(url1, "TEST.UPSERT1", properties) + assert(spark.read.jdbc(url1, "TEST.UPSERT1", properties).count() == 1) + assert(spark.read.jdbc(url1, "TEST.UPSERT1", properties).filter("C1=1") + .collect.head.get(1) == 42) + + df1.write.mode(SaveMode.Append).option("upsert", false).option("upsertConditionColumn", "c1") + .jdbc(url1, "TEST.UPSERT1", properties) + assert(spark.read.jdbc(url1, "TEST.UPSERT1", properties).count() == 2) + assert(spark.read.jdbc(url1, "TEST.UPSERT1", properties).filter("C1=1") + .collect.head.get(1) == 42) + + val m = intercept[java.sql.SQLException] { + df2.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "c1") + .jdbc(url1, "TEST.UPSERT1", properties) + }.getMessage + assert(m.contains("column c1 not found")) + spark.sql("set spark.sql.caseSensitive=false") + JdbcDialects.unregisterDialect(testH2Dialect) + } + + test("upsert with Append and negative option values") { + JdbcDialects.registerDialect(testH2Dialect) + val df = spark.createDataFrame(sparkContext.parallelize(ary1x2), schema5) + val df1 = spark.createDataFrame(sparkContext.parallelize(ary12x2), schema5) + + val m = intercept[java.sql.SQLException] { + df.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "C11") + .jdbc(url1, "test.upsertT2", properties) + }.getMessage + assert(m.contains("column C11 not found")) + + val n = intercept[java.sql.SQLException] { + df.write.mode(SaveMode.Append).option("upsert", true).option("upsertUpdateColumn", "c12") + .jdbc(url1, "test.upsertT2", properties) + }.getMessage + assert(n.contains("column c12 not found")) + JdbcDialects.unregisterDialect(testH2Dialect) + } + + test("upsert with Append without existing table") { + JdbcDialects.registerDialect(testH2Dialect) + val df = spark.createDataFrame(sparkContext.parallelize(ary1x2), schema5) + val df1 = spark.createDataFrame(sparkContext.parallelize(ary12x2), schema5) + val df2 = spark.createDataFrame(sparkContext.parallelize(ary13x2), schema5) + val df3 = spark.createDataFrame(sparkContext.parallelize(ary14x2), schema5) + + df.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "C1") + .jdbc(url1, "TEST.UPSERT", properties) + assert(spark.read.jdbc(url1, "TEST.UPSERT", properties).count() == 1) + assert(spark.read.jdbc(url1, "TEST.UPSERT", properties).filter("C1=1") + .collect.head.get(1) == 42) + + df2.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "C1") + .jdbc(url1, "TEST.UPSERT", properties) + assert(spark.read.jdbc(url1, "TEST.UPSERT", properties).count() == 1) + assert(spark.read.jdbc(url1, "TEST.UPSERT", properties).filter("C1=1") + .collect.head.get(1) == 62) + + // turn it off, it will insert one more row + df3.write.mode(SaveMode.Append).option("upsert", false).option("upsertConditionColumn", "C1") + .jdbc(url1, "TEST.UPSERT", properties) + assert(spark.read.jdbc(url1, "TEST.UPSERT", properties).count() == 2) + JdbcDialects.unregisterDialect(testH2Dialect) + } + + test("upsert with Append with existing table") { + JdbcDialects.registerDialect(testH2Dialect) + val df = spark.createDataFrame(sparkContext.parallelize(ary1x2), schema5) + val df1 = spark.createDataFrame(sparkContext.parallelize(ary12x2), schema5) + val df2 = spark.createDataFrame(sparkContext.parallelize(ary13x2), schema5) + val df3 = spark.createDataFrame(sparkContext.parallelize(ary14x2), schema5) + + assert(spark.read.jdbc(url1, "test.upsertT1", properties).count() == 2) + assert(spark.read.jdbc(url1, "test.upsertT1", properties).filter("C1=1") + .collect.head.get(1) == 10) + df.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "C1") + .jdbc(url1, "test.upsertT1", properties) + assert(spark.read.jdbc(url1, "test.upsertT1", properties).filter("C1=1") + .collect.head.get(1) == 42) + // Overwrite will drop the table, then insert the dataframe rows into the empty table + df2.write.mode(SaveMode.Overwrite).option("upsert", true).option("upsertConditionColumn", "C1") + .jdbc(url1, "test.upsertT1", properties) + assert(spark.read.jdbc(url1, "test.upsertT1", properties).filter("C1=1") + .collect.head.get(1) == 62) + // Append without upsert option, it will not insert the value into the table + df3.write.mode(SaveMode.Append).option("upsert", false).option("upsertConditionColumn", "c1") + .jdbc(url1, "test.upsertT1", properties) + assert(spark.read.jdbc(url1, "test.upsertT1", properties).filter("C1=1") + .collect.head.get(1) == 62) + JdbcDialects.unregisterDialect(testH2Dialect) + } }