Skip to content

Commit a3c29fc

Browse files
YIHAODIAN\wangshuangshuanggatorsmile
authored andcommitted
[SPARK-19726][SQL] Faild to insert null timestamp value to mysql using spark jdbc
## What changes were proposed in this pull request? when creating table like following: > create table timestamp_test(id int(11), time_stamp timestamp not null default current_timestamp); The result of Excuting "insert into timestamp_test values (111, null)" is different between Spark and JDBC. ``` mysql> select * from timestamp_test; +------+---------------------+ | id | time_stamp | +------+---------------------+ | 111 | 1970-01-01 00:00:00 | -> spark | 111 | 2017-06-27 19:32:38 | -> mysql +------+---------------------+ 2 rows in set (0.00 sec) ``` Because in such case ```StructField.nullable``` is false, so the generated codes of ```InvokeLike``` and ```BoundReference``` don't check whether the field is null or not. Instead, they directly use ```CodegenContext.INPUT_ROW.getLong(1)```, however, ```UnsafeRow.setNullAt(1)``` will put 0 in the underlying memory. The PR will ```always``` set ```StructField.nullable``` true after obtaining metadata from jdbc connection, Since we can insert null to not null timestamp column in MySQL. In this way, spark will propagate null to underlying DB engine, and let DB to choose how to process NULL. ## How was this patch tested? Added tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: YIHAODIAN\wangshuangshuang <[email protected]> Author: Shuangshuang Wang <[email protected]> Closes #18445 from shuangshuangwang/SPARK-19726.
1 parent 29b1f6b commit a3c29fc

File tree

3 files changed

+19
-3
lines changed

3 files changed

+19
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ object JDBCRDD extends Logging {
5959
try {
6060
val rs = statement.executeQuery()
6161
try {
62-
JdbcUtils.getSchema(rs, dialect)
62+
JdbcUtils.getSchema(rs, dialect, alwaysNullable = true)
6363
} finally {
6464
rs.close()
6565
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,10 +266,14 @@ object JdbcUtils extends Logging {
266266
/**
267267
* Takes a [[ResultSet]] and returns its Catalyst schema.
268268
*
269+
* @param alwaysNullable If true, all the columns are nullable.
269270
* @return A [[StructType]] giving the Catalyst schema.
270271
* @throws SQLException if the schema contains an unsupported type.
271272
*/
272-
def getSchema(resultSet: ResultSet, dialect: JdbcDialect): StructType = {
273+
def getSchema(
274+
resultSet: ResultSet,
275+
dialect: JdbcDialect,
276+
alwaysNullable: Boolean = false): StructType = {
273277
val rsmd = resultSet.getMetaData
274278
val ncols = rsmd.getColumnCount
275279
val fields = new Array[StructField](ncols)
@@ -290,7 +294,11 @@ object JdbcUtils extends Logging {
290294
rsmd.getClass.getName == "org.apache.hive.jdbc.HiveResultSetMetaData" => true
291295
}
292296
}
293-
val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
297+
val nullable = if (alwaysNullable) {
298+
true
299+
} else {
300+
rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
301+
}
294302
val metadata = new MetadataBuilder()
295303
.putString("name", columnName)
296304
.putLong("scale", fieldScale)

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters.propertiesAsScalaMapConverter
2424

2525
import org.scalatest.BeforeAndAfter
2626

27+
import org.apache.spark.SparkException
2728
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
2829
import org.apache.spark.sql.catalyst.parser.ParseException
2930
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
@@ -506,4 +507,11 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
506507
"schema struct<name:string,id:int>"))
507508
}
508509
}
510+
511+
test("SPARK-19726: INSERT null to a NOT NULL column") {
512+
val e = intercept[SparkException] {
513+
sql("INSERT INTO PEOPLE1 values (null, null)")
514+
}.getMessage
515+
assert(e.contains("NULL not allowed for column \"NAME\""))
516+
}
509517
}

0 commit comments

Comments
 (0)