Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,8 @@ file directly with SQL.
Save operations can optionally take a `SaveMode`, that specifies how to handle existing data if
present. It is important to realize that these save modes do not utilize any locking and are not
atomic. Additionally, when performing an `Overwrite`, the data will be deleted before writing out the
new data.
new data. When performing an `Append`, there is an option to enable the UPSERT feature for JDBC datasources, currently
only MySQL and Postgres.

<table class="table">
<tr><th>Scala/Java</th><th>Any Language</th><th>Meaning</th></tr>
Expand Down Expand Up @@ -1230,6 +1231,24 @@ the following case-insensitive options:
The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: <code>"name CHAR(64), comments VARCHAR(1024)")</code>. The specified types should be valid spark sql data types. This option applies only to writing.
</td>
</tr>

<tr>
<td><code>upsert, upsertConditionColumn, upsertUpdateColumn </code></td>
<td>
These options are JDBC writer related options. They describe how to
use UPSERT feature for different JDBC dialects. Right now, this feature implemented in MySQL, Postgres
JDBC dialects. The upsert option is applicable only when <code>SaveMode.Append</code> is enabled,
in Overwrite mode, the existing table will be dropped or truncated first, including the unique constraints
or primary key, before the insertion. So, UPSERT scenario is not applicable.
<code>upsertConditionColumn</code> are columns used to match existing rows. They are usually unique constraint/primary
key columns. This option is required by PostgreSQL datasource. This option is not applicable for MySQL datasource,
since the datasource will automatically use any existing unique constraint.
<code>upsertUpdateColumn</code> are columns updated with the input data set when existing rows are matched. It is required
by MySQL datasource.
Be aware that if the input data set has duplicate rows, the upsert operation is
non-deterministic, it is documented at the [upsert(merge) wiki:](https://en.wikipedia.org/wiki/Merge_(SQL)).
</td>
</tr>
</table>

<div class="codetabs">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ package org.apache.spark.sql.jdbc
import java.math.BigDecimal
import java.sql.{Connection, Date, Timestamp}
import java.util.Properties
import org.apache.spark.sql.SaveMode

import org.apache.spark.tags.DockerTest

@DockerTest
class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
import testImplicits._
override val db = new DatabaseOnDocker {
override val imageName = "mysql:5.7.9"
override val env = Map(
Expand Down Expand Up @@ -61,6 +63,19 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
).executeUpdate()
conn.prepareStatement("INSERT INTO strings VALUES ('the', 'quick', 'brown', 'fox', " +
"'jumps', 'over', 'the', 'lazy', 'dog')").executeUpdate()

conn.prepareStatement("CREATE TABLE upsertT0 (c1 INTEGER primary key, c2 INTEGER, c3 INTEGER)")
.executeUpdate()
conn.prepareStatement("INSERT INTO upsertT0 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)")
.executeUpdate()
conn.prepareStatement("CREATE TABLE upsertT1 (c1 INTEGER primary key, c2 INTEGER, c3 INTEGER)")
.executeUpdate()
conn.prepareStatement("INSERT INTO upsertT1 VALUES (1, 2, 3), (2, 3, 4)")
.executeUpdate()
conn.prepareStatement("CREATE TABLE upsertT2 (c1 INTEGER, c2 INTEGER, c3 INTEGER, " +
"primary key(c1, c2))").executeUpdate()
conn.prepareStatement("INSERT INTO upsertT2 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)")
.executeUpdate()
}

test("Basic test") {
Expand Down Expand Up @@ -152,4 +167,104 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
}

test("upsert with Append without existing table") {
val df1 = Seq((1, 3), (2, 5)).toDF("c1", "c2")
df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertUpdateColumn", "c1")
.jdbc(jdbcUrl, "upsertT", new Properties)
val df2 = spark.read.jdbc(jdbcUrl, "upsertT", new Properties)
assert(df2.count() == 2)
assert(df2.filter("C1=1").collect.head.get(1) == 3)

// table upsertT create without primary key or unique constraints, it will do the insert
val df3 = Seq((1, 4)).toDF("c1", "c2")
df3.write.mode(SaveMode.Append).option("upsert", true).option("upsertUpdateColumn", "c1")
.jdbc(jdbcUrl, "upsertT", new Properties)
assert(spark.read.jdbc(jdbcUrl, "upsertT", new Properties).filter("c1=1").count() == 2)
}

test("Upsert and OverWrite mode") {
//existing table has these rows
//(1, 2, 3), (2, 3, 4), (3, 4, 5)
val df1 = spark.read.jdbc(jdbcUrl, "upsertT0", new Properties())
assert(df1.filter("c1=1").collect.head.getInt(1) == 2)
assert(df1.filter("c1=1").collect.head.getInt(2) == 3)
assert(df1.filter("c1=2").collect.head.getInt(1) == 3)
assert(df1.filter("c1=2").collect.head.getInt(2) == 4)
val df2 = Seq((1, 3, 6), (2, 5, 6)).toDF("c1", "c2", "c3")
// it will do the Overwrite, not upsert
df2.write.mode(SaveMode.Overwrite)
.option("upsert", true).option("upsertUpdateColumn", "c2, c3")
.jdbc(jdbcUrl, "upsertT0", new Properties)
val df3 = spark.read.jdbc(jdbcUrl, "upsertT0", new Properties())
assert(df3.filter("c1=1").collect.head.getInt(1) == 3)
assert(df3.filter("c1=1").collect.head.getInt(2) == 6)
assert(df3.filter("c1=2").collect.head.getInt(1) == 5)
assert(df3.filter("c1=2").collect.head.getInt(2) == 6)
assert(df3.filter("c1=3").collect.size == 0)
}

test("upsert with Append and negative option values") {
val df1 = Seq((1, 3, 6), (2, 5, 6)).toDF("c1", "c2", "c3")
val m = intercept[java.sql.SQLException] {
df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertUpdateColumn", "C11")
.jdbc(jdbcUrl, "upsertT1", new Properties)
}.getMessage
assert(m.contains("column C11 not found"))

val n = intercept[java.sql.SQLException] {
df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertUpdateColumn", "C11")
.jdbc(jdbcUrl, "upsertT1", new Properties)
}.getMessage
assert(n.contains("column C11 not found"))
}

test("Upsert and Append mode -- data matching one column") {
//existing table has these rows
//(1, 2, 3), (2, 3, 4)
val df1 = spark.read.jdbc(jdbcUrl, "upsertT1", new Properties())
assert(df1.count() == 2)
assert(df1.filter("c1=1").collect.head.getInt(1) == 2)
assert(df1.filter("c1=1").collect.head.getInt(2) == 3)
assert(df1.filter("c1=2").collect.head.getInt(1) == 3)
assert(df1.filter("c1=2").collect.head.getInt(2) == 4)
val df2 = Seq((1, 4, 7), (2, 6, 8)).toDF("c1", "c2", "c3")
df2.write.mode(SaveMode.Append)
.option("upsert", true).option("upsertUpdateColumn", "c2, c3")
.jdbc(jdbcUrl, "upsertT1", new Properties)
val df3 = spark.read.jdbc(jdbcUrl, "upsertT1", new Properties())
assert(df3.count() == 2)
assert(df3.filter("c1=1").collect.head.getInt(1) == 4)
assert(df3.filter("c1=1").collect.head.getInt(2) == 7)
assert(df3.filter("c1=2").collect.head.getInt(1) == 6)
assert(df3.filter("c1=2").collect.head.getInt(2) == 8)
// turn upsert off, it will do the insert the row with duplicate key, and it will get nullPointerException
val df4 = Seq((1, 5, 9)).toDF("c1", "c2", "c3")
val n = intercept[org.apache.spark.SparkException] {
df4.write.mode(SaveMode.Append).option("upsert", false).option("upsertUpdateColumn", "C11")
.jdbc(jdbcUrl, "upsertT1", new Properties)
}.getMessage
assert(n.contains("Duplicate entry '1' for key 'PRIMARY'"))
}

test("Upsert and Append mode -- data matching two columns") {
// table has these rows: (1, 2, 3), (2, 3, 4), (3, 4, 5)
// update Row(2, 3, 4) to Row(2, 3, 10) that matches 2 columns
val df1 = spark.read.jdbc(jdbcUrl, "upsertT2", new Properties())
assert(df1.count() == 3)
assert(df1.filter("c1=1").collect.head.getInt(1) == 2)
assert(df1.filter("c1=1").collect.head.getInt(2) == 3)
assert(df1.filter("c1=2").collect.head.getInt(1) == 3)
assert(df1.filter("c1=2").collect.head.getInt(2) == 4)

val df2 = Seq((2, 3, 10)).toDF("c1", "c2", "c3")
df2.write.mode(SaveMode.Append)
.option("upsert", true).option("upsertUpdateColumn", "c3")
.jdbc(jdbcUrl, "upsertT2", new Properties)

val df3 = spark.read.jdbc(jdbcUrl, "upsertT2", new Properties())
assert(df3.count() == 3)
assert(df3.filter("c1=2").collect.head.getInt(1) == 3)
assert(df3.filter("c1=2").collect.head.getInt(2) == 10)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc

import java.sql.Connection
import java.util.Properties
import org.apache.spark.sql.SaveMode

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.Literal
Expand All @@ -27,8 +28,9 @@ import org.apache.spark.tags.DockerTest

@DockerTest
class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
import testImplicits._
override val db = new DatabaseOnDocker {
override val imageName = "postgres:9.4.5"
override val imageName = "postgres:9.5.4"
override val env = Map(
"POSTGRES_PASSWORD" -> "rootpass"
)
Expand All @@ -55,6 +57,26 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
+ "null, null, null, null, null, "
+ "null, null, null, null, null, null, null)"
).executeUpdate()
conn.prepareStatement("CREATE TABLE upsertT0 " +
"(c1 INTEGER, c2 INTEGER, c3 INTEGER, primary key(c1))").executeUpdate()
conn.prepareStatement("INSERT INTO upsertT0 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)")
.executeUpdate()
conn.prepareStatement("CREATE TABLE upsertT1 " +
"(c1 INTEGER, c2 INTEGER, c3 INTEGER, primary key(c1))").executeUpdate()
conn.prepareStatement("INSERT INTO upsertT1 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)")
.executeUpdate()
conn.prepareStatement("CREATE TABLE upsertT2 " +
"(c1 INTEGER, c2 INTEGER, c3 INTEGER, primary key(c1, c2))").executeUpdate()
conn.prepareStatement("INSERT INTO upsertT2 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)")
.executeUpdate()
conn.prepareStatement("CREATE TABLE upsertT3 " +
"(c1 INTEGER, c2 INTEGER, c3 INTEGER, primary key(c1))").executeUpdate()
conn.prepareStatement("INSERT INTO upsertT3 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)")
.executeUpdate()
conn.prepareStatement("CREATE TABLE upsertT4 " +
"(c1 INTEGER, c2 INTEGER, c3 INTEGER, primary key(c1))").executeUpdate()
conn.prepareStatement("INSERT INTO upsertT4 VALUES (1, 2, 3), (2, 3, 4), (3, 4, 5)")
.executeUpdate()
}

test("Type mapping for various types") {
Expand Down Expand Up @@ -126,4 +148,93 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(schema(0).dataType == FloatType)
assert(schema(1).dataType == ShortType)
}

test("Upsert and OverWrite mode") {
//existing table has these rows
//(1, 2, 3), (2, 3, 4), (3, 4, 5)
val df1 = spark.read.jdbc(jdbcUrl, "upsertT0", new Properties())
assert(df1.filter("c1=1").collect.head.getInt(1) == 2)
assert(df1.filter("c1=1").collect.head.getInt(2) == 3)
assert(df1.filter("c1=2").collect.head.getInt(1) == 3)
assert(df1.filter("c1=2").collect.head.getInt(2) == 4)
val df2 = Seq((1, 3, 6), (2, 5, 6)).toDF("c1", "c2", "c3")
// it will do the Overwrite, not upsert
df2.write.mode(SaveMode.Overwrite)
.option("upsert", true).option("upsertConditionColumn", "c1")
.jdbc(jdbcUrl, "upsertT0", new Properties)
val df3 = spark.read.jdbc(jdbcUrl, "upsertT0", new Properties())
assert(df3.filter("c1=1").collect.head.getInt(1) == 3)
assert(df3.filter("c1=1").collect.head.getInt(2) == 6)
assert(df3.filter("c1=2").collect.head.getInt(1) == 5)
assert(df3.filter("c1=2").collect.head.getInt(2) == 6)
assert(df3.filter("c1=3").collect.size == 0)
}

test("upsert with Append and negative option values") {
val df1 = Seq((1, 3, 6), (2, 5, 6)).toDF("c1", "c2", "c3")
val m = intercept[java.sql.SQLException] {
df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "C11")
.jdbc(jdbcUrl, "upsertT1", new Properties)
}.getMessage
assert(m.contains("column C11 not found"))

val n = intercept[java.sql.SQLException] {
df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "C11")
.jdbc(jdbcUrl, "upsertT1", new Properties)
}.getMessage
assert(n.contains("column C11 not found"))

val o = intercept[org.apache.spark.SparkException] {
df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertconditionColumn", "c2")
.jdbc(jdbcUrl, "upsertT1", new Properties)
}.getMessage
assert(o.contains("there is no unique or exclusion constraint matching the ON CONFLICT"))
}

test("upsert with Append without existing table") {
val df1 = Seq((1, 3), (2, 5)).toDF("c1", "c2")
df1.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "c1")
.jdbc(jdbcUrl, "upsertT", new Properties)
val df2 = spark.read.jdbc(jdbcUrl, "upsertT", new Properties)
assert(df2.count() == 2)
assert(df2.filter("C1=1").collect.head.get(1) == 3)

// table upsertT create without primary key or unique constraints, it will throw the exception
val df3 = Seq((1, 4)).toDF("c1", "c2")
val p = intercept[org.apache.spark.SparkException] {
df3.write.mode(SaveMode.Append).option("upsert", true).option("upsertConditionColumn", "c1")
.jdbc(jdbcUrl, "upsertT", new Properties)
}.getMessage
assert(p.contains("there is no unique or exclusion constraint matching the ON CONFLICT specification"))
}

test("Upsert & Append test -- matching one column") {
val df1 = spark.read.jdbc(jdbcUrl, "upsertT3", new Properties())
assert(df1.filter("c1=1").collect.head.getInt(1) == 2)
assert(df1.filter("c1=1").collect.head.getInt(2) == 3)
assert(df1.filter("c1=4").collect.size == 0)
// update Row(1, 2, 3) to (1, 3, 6) and insert new Row(4, 5, 6)
val df2 = Seq((1, 3, 6), (4, 5, 6)).toDF("c1", "c2", "c3")
// condition on one column
df2.write.mode(SaveMode.Append)
.option("upsert", true).option("upsertConditionColumn", "c1").option("upsertUpdateColumn", "c2, c3")
.jdbc(jdbcUrl, "upsertT3", new Properties)
val df3 = spark.read.jdbc(jdbcUrl, "upsertT3", new Properties())
assert(df3.filter("c1=1").collect.head.getInt(1) == 3)
assert(df3.filter("c1=1").collect.head.getInt(2) == 6)
assert(df3.filter("c1=4").collect.size == 1)
}

test("Upsert & Append test -- matching two columns") {
val df1 = spark.read.jdbc(jdbcUrl, "upsertT2", new Properties())
assert(df1.filter("c1=1").collect.head.getInt(1) == 2)
assert(df1.filter("c1=1").collect.head.getInt(2) == 3)
// update Row(2, 3, 4) to Row(2, 3, 10) that matches 2 columns
val df2 = Seq((2, 3, 10)).toDF("c1", "c2", "c3")
df2.write.mode(SaveMode.Append)
.option("upsert", true).option("upsertConditionColumn", "c1, c2")
.jdbc(jdbcUrl, "upsertT2", new Properties)
val df3 = spark.read.jdbc(jdbcUrl, "upsertT2", new Properties())
assert(df3.filter("c1=2").collect.head.getInt(2) == 10)
}
}
30 changes: 30 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* You can set the following JDBC-specific option(s) for storing JDBC:
* <ul>
* <li>`truncate` (default `false`): use `TRUNCATE TABLE` instead of `DROP TABLE`.</li>
* <li>`upsert` (default `false`): under SaveMode.Append mode, specify whether to allow the
* JDBC data source to update a record that is duplicate of the insertion row. Some databases
* may require users to specify on which column(s) to identify such duplicate. </li>
* <li>`upsertConditionColumn`: when `upsert` is `true`, column(s)(separated by comma) need
* to be specified, on which duplicate rows are identified. This requirement is applied to
* database systems, such as DB2, Oracle, PostgreSQL.</li>
* <li>`upsertUpdateColumn`: when `upsert` is `true`, column(s)(separated by comma) could be
* specified, on which update rows are identified. This requirement is applied to database
* systems, such as MySQL.</li>
* </ul>
*
* In case of failures, users should turn off `truncate` option to use `DROP TABLE` again. Also,
Expand All @@ -439,6 +448,27 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* while PostgresDialect and default JDBCDirect doesn't. For unknown and unsupported JDBCDirect,
* the user option `truncate` is ignored.
*
* When use the upsert feature, please be aware these limitations.
* First, if the content of the [[DataFrame]] has mutiple rows with the same key, the upsert
* feature will be non-deterministic, the reason is that the data will be partitioned and send
* to the data source with different JDBC connection. You can avoid this by eliminating the
* duplicate key rows first, then save to the JDBC datasource tables. For example: the key is
* on columns("key", "values1")
*
* {{{
* scala> val testData = sc.parallelize((2, 1, 2) :: (1, 1, 1) :: (1, 2, 3) :: (2, 1, 3) ::
* (2, 2, 2) :: (2, 2, 1) :: (2, 1, 4) :: (1, 1, 4) :: (1, 2, 5) ::
* (1, 2, 6) :: Nil, 5).toDF("key", "value1", "TS")
* scala> val sorted = testData.orderBy('key,'value1)
* scala> val agg = sorted.groupBy('key).agg("TS" -> "max").withColumnRenamed("max(TS)","TS")
* scala> agg.join(sorted, Seq("key","TS"))
* }}}
*
* Second, this upsert feature is only supported for SaveMode.Append. SaveMode.Overwrite will
* first empty the table, then do the insert, also in order to get deterministic result, we
* recommend to remove duplicate key rows in the content of the [[DataFrame]], so there is
* no need to implement upsert in the SaveMode.Append.
*
* @param url JDBC database url of the form `jdbc:subprotocol:subname`
* @param table Name of the table in the external database.
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
Expand Down
Loading