From 9a925c6357aa591b5178f9d93ce9e7c0f3fad08f Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 21 Dec 2023 12:42:19 +0300 Subject: [PATCH] [SPARK-46466][SQL] Vectorized parquet reader should never do rebase for timestamp ntz This fixes a correctness bug. The TIMESTAMP_NTZ is a new data type in Spark and has no legacy files that need to do calendar rebase. However, the vectorized parquet reader treat it the same as LTZ and may do rebase if the parquet file was written with the legacy rebase mode. This PR fixes it to never do rebase for NTZ. bug fix Yes, now we can correctly write and read back NTZ value even if the date is before 1582. new test No Closes #44428 from cloud-fan/ntz. Lead-authored-by: Wenchen Fan Co-authored-by: Wenchen Fan Signed-off-by: Max Gekk --- .../parquet/ParquetVectorUpdaterFactory.java | 31 ++++++++++--------- .../parquet/ParquetQuerySuite.scala | 12 +++++++ 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index 42442cf8ea8a4..8c4fe20853879 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -109,24 +109,32 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa // For unsigned int64, it stores as plain signed int64 in Parquet when dictionary // fallbacks. We read them as decimal values. return new UnsignedLongUpdater(); - } else if (isTimestamp(sparkType) && - isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { - validateTimestampType(sparkType); + } else if (sparkType == DataTypes.TimestampType && + isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { if ("CORRECTED".equals(datetimeRebaseMode)) { return new LongUpdater(); } else { boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz); } - } else if (isTimestamp(sparkType) && - isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { - validateTimestampType(sparkType); + } else if (sparkType == DataTypes.TimestampType && + isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { if ("CORRECTED".equals(datetimeRebaseMode)) { return new LongAsMicrosUpdater(); } else { final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); return new LongAsMicrosRebaseUpdater(failIfRebase, datetimeRebaseTz); } + } else if (sparkType == DataTypes.TimestampNTZType && + isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { + validateTimestampNTZType(); + // TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase. + return new LongUpdater(); + } else if (sparkType == DataTypes.TimestampNTZType && + isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { + validateTimestampNTZType(); + // TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase. + return new LongAsMicrosUpdater(); } else if (sparkType instanceof DayTimeIntervalType) { return new LongUpdater(); } @@ -196,12 +204,11 @@ boolean isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit unit) { ((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).getUnit() == unit; } - void validateTimestampType(DataType sparkType) { + private void validateTimestampNTZType() { assert(logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation); - // Throw an exception if the Parquet type is TimestampLTZ and the Catalyst type is TimestampNTZ. + // Throw an exception if the Parquet type is TimestampLTZ as the Catalyst type is TimestampNTZ. // This is to avoid mistakes in reading the timestamp values. - if (((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).isAdjustedToUTC() && - sparkType == DataTypes.TimestampNTZType) { + if (((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).isAdjustedToUTC()) { convertErrorForTimestampNTZ("int64 time(" + logicalTypeAnnotation + ")"); } } @@ -1152,10 +1159,6 @@ private static boolean isLongDecimal(DataType dt) { return false; } - private static boolean isTimestamp(DataType dt) { - return dt == DataTypes.TimestampType || dt == DataTypes.TimestampNTZType; - } - private static boolean isDecimalTypeMatched(ColumnDescriptor descriptor, DataType dt) { DecimalType d = (DecimalType) dt; LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index ea5444a1791fd..828ec39c7d727 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -255,6 +255,18 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } + test("SPARK-46466: write and read TimestampNTZ with legacy rebase mode") { + withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> "LEGACY") { + withTable("ts") { + sql("create table ts (c1 timestamp_ntz) using parquet") + sql("insert into ts values (timestamp_ntz'0900-01-01 01:10:10')") + withAllParquetReaders { + checkAnswer(spark.table("ts"), sql("select timestamp_ntz'0900-01-01 01:10:10'")) + } + } + } + } + test("Enabling/disabling merging partfiles when merging parquet schema") { def testSchemaMerging(expectedColumnNumber: Int): Unit = { withTempDir { dir =>