From 7e3bb0ecb1a47fb0ba868f8880bd5f6b8079bc70 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Tue, 28 Jun 2022 16:36:31 +0800 Subject: [PATCH 1/6] [SPARK-39339][SQL][FOLLOWUP] TimestampNTZ type in JDBC data source is incorrect --- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 9c28e20429621..3504e468f1f50 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1946,17 +1946,22 @@ class JDBCSuite extends QueryTest val df = spark.sql(s"select timestamp_ntz '$datetime'") df.write.format("jdbc") .mode("overwrite") + .option("inferTimestampNTZType", "true") .option("url", urlWithUserAndPass) .option("dbtable", tableName) .save() - val res = spark.read.format("jdbc") - .option("inferTimestampNTZType", "true") - .option("url", urlWithUserAndPass) - .option("dbtable", tableName) - .load() + DateTimeTestUtils.outstandingZoneIds.foreach { zoneId => + DateTimeTestUtils.withDefaultTimeZone(zoneId) { + val res = spark.read.format("jdbc") + .option("inferTimestampNTZType", "true") + .option("url", urlWithUserAndPass) + .option("dbtable", tableName) + .load() - checkAnswer(res, df) + checkAnswer(res, df) + } + } } } } From 3a7de500e9522507f19b3ae0ab45ab2b3b59c4a5 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Tue, 28 Jun 2022 17:53:31 +0800 Subject: [PATCH 2/6] Update code --- .../datasources/jdbc/JdbcUtils.scala | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index cc8746ea5c407..6be15659423de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeConstants, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp} import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex} @@ -473,7 +473,7 @@ object JdbcUtils extends Logging with SQLConfHelper { } } - case TimestampType | TimestampNTZType => + case TimestampType => (rs: ResultSet, row: InternalRow, pos: Int) => val t = rs.getTimestamp(pos + 1) if (t != null) { @@ -482,6 +482,17 @@ object JdbcUtils extends Logging with SQLConfHelper { row.update(pos, null) } + case TimestampNTZType => + (rs: ResultSet, row: InternalRow, pos: Int) => + val t = rs.getTimestamp(pos + 1) + if (t != null) { + val micros = DateTimeUtils.millisToMicros(t.getTime) + + (t.getNanos / DateTimeConstants.NANOS_PER_MICROS) % DateTimeConstants.MICROS_PER_MILLIS + row.setLong(pos, micros) + } else { + row.update(pos, null) + } + case BinaryType => (rs: ResultSet, row: InternalRow, pos: Int) => row.update(pos, rs.getBytes(pos + 1)) @@ -599,10 +610,13 @@ object JdbcUtils extends Logging with SQLConfHelper { stmt.setTimestamp(pos + 1, toJavaTimestamp(instantToMicros(row.getAs[Instant](pos)))) } else { (stmt: PreparedStatement, row: Row, pos: Int) => - stmt.setTimestamp( - pos + 1, - toJavaTimestamp(localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos))) - ) + val micros = localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos)) + val seconds = Math.floorDiv(micros, DateTimeConstants.MICROS_PER_SECOND) + val nanos = (micros - seconds * DateTimeConstants.MICROS_PER_SECOND) * + DateTimeConstants.NANOS_PER_MICROS + val result = new java.sql.Timestamp(seconds * DateTimeConstants.MILLIS_PER_SECOND) + result.setNanos(nanos.toInt) + stmt.setTimestamp(pos + 1, result) } case DateType => From e2ce5d8900cb4c6c7524f7efceebdced489e3f2d Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Wed, 29 Jun 2022 09:18:03 +0800 Subject: [PATCH 3/6] Update code --- .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 3504e468f1f50..494ae6d548784 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1946,7 +1946,6 @@ class JDBCSuite extends QueryTest val df = spark.sql(s"select timestamp_ntz '$datetime'") df.write.format("jdbc") .mode("overwrite") - .option("inferTimestampNTZType", "true") .option("url", urlWithUserAndPass) .option("dbtable", tableName) .save() From 09abec540206fbf436e4aeea3bff7b6fbd00012e Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Thu, 30 Jun 2022 11:09:52 +0800 Subject: [PATCH 4/6] Update code --- .../datasources/jdbc/JdbcUtils.scala | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 6be15659423de..6120577bf3cbc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -605,19 +605,14 @@ object JdbcUtils extends Logging with SQLConfHelper { } case TimestampNTZType => - if (conf.datetimeJava8ApiEnabled) { - (stmt: PreparedStatement, row: Row, pos: Int) => - stmt.setTimestamp(pos + 1, toJavaTimestamp(instantToMicros(row.getAs[Instant](pos)))) - } else { - (stmt: PreparedStatement, row: Row, pos: Int) => - val micros = localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos)) - val seconds = Math.floorDiv(micros, DateTimeConstants.MICROS_PER_SECOND) - val nanos = (micros - seconds * DateTimeConstants.MICROS_PER_SECOND) * - DateTimeConstants.NANOS_PER_MICROS - val result = new java.sql.Timestamp(seconds * DateTimeConstants.MILLIS_PER_SECOND) - result.setNanos(nanos.toInt) - stmt.setTimestamp(pos + 1, result) - } + (stmt: PreparedStatement, row: Row, pos: Int) => + val micros = localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos)) + val seconds = Math.floorDiv(micros, DateTimeConstants.MICROS_PER_SECOND) + val nanos = (micros - seconds * DateTimeConstants.MICROS_PER_SECOND) * + DateTimeConstants.NANOS_PER_MICROS + val result = new java.sql.Timestamp(seconds * DateTimeConstants.MILLIS_PER_SECOND) + result.setNanos(nanos.toInt) + stmt.setTimestamp(pos + 1, result) case DateType => if (conf.datetimeJava8ApiEnabled) { From ebe69c562861221e74cb2ab6fecf74f7cc2752a1 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Thu, 30 Jun 2022 14:16:38 +0800 Subject: [PATCH 5/6] Update code --- .../sql/catalyst/util/DateTimeUtils.scala | 25 +++++++++++++++++++ .../datasources/jdbc/JdbcUtils.scala | 15 +++-------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index cc61491dc95d7..549a970cff404 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -167,6 +167,21 @@ object DateTimeUtils { ts } + /** + * Converts microseconds since the epoch to an instance of `java.sql.Timestamp`. + * + * @param micros The number of microseconds since 1970-01-01T00:00:00.000000Z. + * @return A `java.sql.Timestamp` from number of micros since epoch. + */ + def toJavaTimestampNoRebase(micros: Long): Timestamp = { + val seconds = Math.floorDiv(micros, DateTimeConstants.MICROS_PER_SECOND) + val nanos = (micros - seconds * DateTimeConstants.MICROS_PER_SECOND) * + DateTimeConstants.NANOS_PER_MICROS + val ts = new Timestamp(seconds * DateTimeConstants.MILLIS_PER_SECOND) + ts.setNanos(nanos.toInt) + ts + } + /** * Converts an instance of `java.sql.Timestamp` to the number of microseconds since * 1970-01-01T00:00:00.000000Z. It extracts date-time fields from the input, builds @@ -191,6 +206,16 @@ object DateTimeUtils { rebaseJulianToGregorianMicros(micros) } + /** + * Converts an instance of `java.sql.Timestamp` to the number of microseconds since + * 1970-01-01T00:00:00.000000Z. + * + * @param t an instance of `java.sql.Timestamp`. + * @return The number of micros since epoch from `java.sql.Timestamp`. + */ + def fromJavaTimestampNoRebase(t: Timestamp): Long = + millisToMicros(t.getTime) + (t.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS + /** * Converts an Java object to microseconds. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 6120577bf3cbc..fa4c032fcb012 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -37,8 +37,8 @@ import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeConstants, DateTimeUtils, GenericArrayData} -import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp, toJavaTimestampNoRebase} import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex} import org.apache.spark.sql.connector.expressions.NamedReference @@ -486,9 +486,7 @@ object JdbcUtils extends Logging with SQLConfHelper { (rs: ResultSet, row: InternalRow, pos: Int) => val t = rs.getTimestamp(pos + 1) if (t != null) { - val micros = DateTimeUtils.millisToMicros(t.getTime) + - (t.getNanos / DateTimeConstants.NANOS_PER_MICROS) % DateTimeConstants.MICROS_PER_MILLIS - row.setLong(pos, micros) + row.setLong(pos, DateTimeUtils.fromJavaTimestampNoRebase(t)) } else { row.update(pos, null) } @@ -607,12 +605,7 @@ object JdbcUtils extends Logging with SQLConfHelper { case TimestampNTZType => (stmt: PreparedStatement, row: Row, pos: Int) => val micros = localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos)) - val seconds = Math.floorDiv(micros, DateTimeConstants.MICROS_PER_SECOND) - val nanos = (micros - seconds * DateTimeConstants.MICROS_PER_SECOND) * - DateTimeConstants.NANOS_PER_MICROS - val result = new java.sql.Timestamp(seconds * DateTimeConstants.MILLIS_PER_SECOND) - result.setNanos(nanos.toInt) - stmt.setTimestamp(pos + 1, result) + stmt.setTimestamp(pos + 1, toJavaTimestampNoRebase(micros)) case DateType => if (conf.datetimeJava8ApiEnabled) { From 355627f243072f0306501e32ddf1768beb76f9a0 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Thu, 30 Jun 2022 14:36:05 +0800 Subject: [PATCH 6/6] Update code --- .../sql/catalyst/util/DateTimeUtils.scala | 23 ++++++------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 549a970cff404..5045d1479f207 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -158,14 +158,8 @@ object DateTimeUtils { * @param micros The number of microseconds since 1970-01-01T00:00:00.000000Z. * @return A `java.sql.Timestamp` from number of micros since epoch. */ - def toJavaTimestamp(micros: Long): Timestamp = { - val rebasedMicros = rebaseGregorianToJulianMicros(micros) - val seconds = Math.floorDiv(rebasedMicros, MICROS_PER_SECOND) - val ts = new Timestamp(seconds * MILLIS_PER_SECOND) - val nanos = (rebasedMicros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS - ts.setNanos(nanos.toInt) - ts - } + def toJavaTimestamp(micros: Long): Timestamp = + toJavaTimestampNoRebase(rebaseGregorianToJulianMicros(micros)) /** * Converts microseconds since the epoch to an instance of `java.sql.Timestamp`. @@ -174,10 +168,9 @@ object DateTimeUtils { * @return A `java.sql.Timestamp` from number of micros since epoch. */ def toJavaTimestampNoRebase(micros: Long): Timestamp = { - val seconds = Math.floorDiv(micros, DateTimeConstants.MICROS_PER_SECOND) - val nanos = (micros - seconds * DateTimeConstants.MICROS_PER_SECOND) * - DateTimeConstants.NANOS_PER_MICROS - val ts = new Timestamp(seconds * DateTimeConstants.MILLIS_PER_SECOND) + val seconds = Math.floorDiv(micros, MICROS_PER_SECOND) + val ts = new Timestamp(seconds * MILLIS_PER_SECOND) + val nanos = (micros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS ts.setNanos(nanos.toInt) ts } @@ -201,10 +194,8 @@ object DateTimeUtils { * Gregorian calendars. * @return The number of micros since epoch from `java.sql.Timestamp`. */ - def fromJavaTimestamp(t: Timestamp): Long = { - val micros = millisToMicros(t.getTime) + (t.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS - rebaseJulianToGregorianMicros(micros) - } + def fromJavaTimestamp(t: Timestamp): Long = + rebaseJulianToGregorianMicros(fromJavaTimestampNoRebase(t)) /** * Converts an instance of `java.sql.Timestamp` to the number of microseconds since