Skip to content

Commit 803e5de

Browse files
committed
Fix after merge master
1 parent fc707f1 commit 803e5de

File tree

5 files changed

+24
-23
lines changed

5 files changed

+24
-23
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class JdbcRelationProvider extends CreatableRelationProvider
6868
}
6969

7070
case SaveMode.Append =>
71-
if (options.isUpsert && !dialect.supportsUpsert) {
71+
if (options.isUpsert && !dialect.supportsUpsert()) {
7272
throw QueryCompilationErrors.tableDoesNotSupportUpsertsError(options.table)
7373
}
7474
val tableSchema = JdbcUtils.getSchemaOption(conn, options)

sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -143,35 +143,34 @@ private case class MsSqlServerDialect() extends JdbcDialect {
143143

144144
override def getUpsertStatement(
145145
tableName: String,
146-
columns: Array[String],
147-
types: Array[DataType],
146+
columns: Array[StructField],
148147
isCaseSensitive: Boolean,
149148
options: JDBCOptions): String = {
150-
val insertColumns = columns.mkString(", ")
151-
val inputs = types
149+
val insertColumns = columns.map(_.name).map(quoteIdentifier)
150+
val inputs = columns
151+
.map(_.dataType)
152152
.map(t => JdbcUtils.getJdbcType(t, this).databaseTypeDefinition)
153153
.zipWithIndex.map {
154154
case (t, idx) => s"DECLARE @param$idx $t; SET @param$idx = ?;"
155155
}.mkString("\n")
156156
val values = columns.indices.map(i => s"@param$i").mkString(", ")
157-
val quotedUpsertKeyColumns = options.upsertKeyColumns.map(quoteIdentifier)
158157
val keyColumns = columns.zipWithIndex.filter {
159-
case (col, _) => quotedUpsertKeyColumns.contains(col)
158+
case (col, _) => options.upsertKeyColumns.contains(col.name)
160159
}
161160
val updateColumns = columns.zipWithIndex.filterNot {
162-
case (col, _) => quotedUpsertKeyColumns.contains(col)
161+
case (col, _) => options.upsertKeyColumns.contains(col.name)
163162
}
164163
val whereClause = keyColumns.map {
165-
case (key, idx) => s"$key = @param$idx"
164+
case (key, idx) => s"${quoteIdentifier(key.name)} = @param$idx"
166165
}.mkString(" AND ")
167166
val updateClause = updateColumns.map {
168-
case (col, idx) => s"$col = @param$idx"
167+
case (col, idx) => s"${quoteIdentifier(col.name)} = @param$idx"
169168
}.mkString(", ")
170169

171170
s"""
172171
|$inputs
173172
|
174-
|INSERT $tableName ($insertColumns)
173+
|INSERT $tableName (${insertColumns.mkString(", ")})
175174
|SELECT $values
176175
|WHERE NOT EXISTS (
177176
| SELECT 1

sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,15 +181,15 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper {
181181
columns: Array[StructField],
182182
isCaseSensitive: Boolean,
183183
options: JDBCOptions): String = {
184-
val insertColumns = columns.mkString(", ")
184+
val insertColumns = columns.map(_.name).map(quoteIdentifier)
185185
val placeholders = columns.map(_ => "?").mkString(",")
186186
val upsertKeyColumns = options.upsertKeyColumns.map(quoteIdentifier)
187-
val updateColumns = columns.filterNot(c => upsertKeyColumns.contains(c.name))
187+
val updateColumns = insertColumns.filterNot(upsertKeyColumns.contains)
188188
val updateClause =
189189
updateColumns.map(x => s"$x = VALUES($x)").mkString(", ")
190190

191191
s"""
192-
|INSERT INTO $tableName ($insertColumns)
192+
|INSERT INTO $tableName (${insertColumns.mkString(", ")})
193193
|VALUES ( $placeholders )
194194
|ON DUPLICATE KEY UPDATE $updateClause
195195
|""".stripMargin

sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -166,19 +166,18 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper {
166166

167167
override def getUpsertStatement(
168168
tableName: String,
169-
columns: Array[String],
170-
types: Array[DataType],
169+
columns: Array[StructField],
171170
isCaseSensitive: Boolean,
172171
options: JDBCOptions): String = {
173-
val insertColumns = columns.mkString(", ")
172+
val insertColumns = columns.map(_.name).map(quoteIdentifier)
174173
val placeholders = columns.map(_ => "?").mkString(",")
175174
val upsertKeyColumns = options.upsertKeyColumns.map(quoteIdentifier)
176-
val updateColumns = columns.filterNot(upsertKeyColumns.contains)
175+
val updateColumns = insertColumns.filterNot(upsertKeyColumns.contains)
177176
val updateClause =
178177
updateColumns.map(x => s"$x = EXCLUDED.$x").mkString(", ")
179178

180179
s"""
181-
|INSERT INTO $tableName ($insertColumns)
180+
|INSERT INTO $tableName (${insertColumns.mkString(", ")})
182181
|VALUES ( $placeholders )
183182
|ON CONFLICT (${upsertKeyColumns.mkString(", ")})
184183
|DO UPDATE SET $updateClause

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1206,11 +1206,14 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
12061206
}
12071207

12081208
val table = "table"
1209-
val columns = Array("id", "time", "value", "comment")
1210-
val quotedColumns = columns.map(dialect.quoteIdentifier)
1211-
val types: Array[DataType] = Array(LongType, TimestampType, DoubleType, StringType)
1209+
val columns = Array(
1210+
StructField("id", LongType),
1211+
StructField("time", TimestampType),
1212+
StructField("value", DoubleType),
1213+
StructField("comment", StringType)
1214+
)
12121215
val isCaseSensitive = false
1213-
val stmt = dialect.getUpsertStatement(table, quotedColumns, types, isCaseSensitive, options)
1216+
val stmt = dialect.getUpsertStatement(table, columns, isCaseSensitive, options)
12141217

12151218
assert(stmt === expected)
12161219
}

0 commit comments

Comments
 (0)