From c2547d66296a28ba0768a0489ae5a5fb614193e8 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 20 Dec 2023 22:46:23 +0800 Subject: [PATCH 1/4] vectorized parquet reader should never do rebase for timestamp ntz --- .../parquet/ParquetVectorUpdaterFactory.java | 23 ++++++++++++------- .../parquet/ParquetQuerySuite.scala | 12 ++++++++++ 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index 918f21716f455..c89d41d976185 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -109,24 +109,32 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa // For unsigned int64, it stores as plain signed int64 in Parquet when dictionary // fallbacks. We read them as decimal values. return new UnsignedLongUpdater(); - } else if (isTimestamp(sparkType) && + } else if (sparkType == DataTypes.TimestampType && isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { - validateTimestampType(sparkType); if ("CORRECTED".equals(datetimeRebaseMode)) { return new LongUpdater(); } else { boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz); } - } else if (isTimestamp(sparkType) && + } else if (sparkType == DataTypes.TimestampType && isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { - validateTimestampType(sparkType); if ("CORRECTED".equals(datetimeRebaseMode)) { return new LongAsMicrosUpdater(); } else { final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); return new LongAsMicrosRebaseUpdater(failIfRebase, datetimeRebaseTz); } + } else if (sparkType == DataTypes.TimestampNTZType && + isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { + validateTimestampNTZType(sparkType); + // TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase. + return new LongUpdater(); + } else if (sparkType == DataTypes.TimestampNTZType && + isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { + validateTimestampNTZType(sparkType); + // TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase. + return new LongAsMicrosUpdater(); } else if (sparkType instanceof DayTimeIntervalType) { return new LongUpdater(); } @@ -195,12 +203,11 @@ boolean isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit unit) { annotation.getUnit() == unit; } - void validateTimestampType(DataType sparkType) { + void validateTimestampNTZType(DataType sparkType) { assert(logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation); - // Throw an exception if the Parquet type is TimestampLTZ and the Catalyst type is TimestampNTZ. + // Throw an exception if the Parquet type is TimestampLTZ as the Catalyst type is TimestampNTZ. // This is to avoid mistakes in reading the timestamp values. - if (((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).isAdjustedToUTC() && - sparkType == DataTypes.TimestampNTZType) { + if (((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).isAdjustedToUTC()) { convertErrorForTimestampNTZ("int64 time(" + logicalTypeAnnotation + ")"); } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 43103db522bac..354d356403ce9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -255,6 +255,18 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } + test("write and read TimestampNTZ with legacy rebase mode") { + withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> "LEGACY") { + withTable("ts") { + sql("create table ts (c1 timestamp_ntz) using parquet") + sql("insert into ts values (timestamp_ntz'0900-01-01 01:10:10')") + withAllParquetReaders { + checkAnswer(spark.table("ts"), sql("select timestamp_ntz'0900-01-01 01:10:10'")) + } + } + } + } + test("Enabling/disabling merging partfiles when merging parquet schema") { def testSchemaMerging(expectedColumnNumber: Int): Unit = { withTempDir { dir => From e3743d767b9e4a8f81f14696bf53970a111783b7 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 20 Dec 2023 19:04:00 -0800 Subject: [PATCH 2/4] Update sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala --- .../sql/execution/datasources/parquet/ParquetQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 354d356403ce9..2ba01eea51e20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -255,7 +255,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } } - test("write and read TimestampNTZ with legacy rebase mode") { + test("SPARK-46466: write and read TimestampNTZ with legacy rebase mode") { withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> "LEGACY") { withTable("ts") { sql("create table ts (c1 timestamp_ntz) using parquet") From 57c0c0b5a01c0f69fdabc1ab1db0b2ea64731a75 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 20 Dec 2023 22:03:27 -0800 Subject: [PATCH 3/4] Update ParquetVectorUpdaterFactory.java --- .../datasources/parquet/ParquetVectorUpdaterFactory.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index c89d41d976185..e2c40b32a8639 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -1156,10 +1156,6 @@ private static boolean isLongDecimal(DataType dt) { return false; } - private static boolean isTimestamp(DataType dt) { - return dt == DataTypes.TimestampType || dt == DataTypes.TimestampNTZType; - } - private static boolean isDecimalTypeMatched(ColumnDescriptor descriptor, DataType dt) { DecimalType d = (DecimalType) dt; LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); From 286d72ae152395421fc9e4f908fdcab252b03a0a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 20 Dec 2023 23:35:32 -0800 Subject: [PATCH 4/4] Update ParquetVectorUpdaterFactory.java --- .../datasources/parquet/ParquetVectorUpdaterFactory.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index e2c40b32a8639..31a1957b4fb91 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -127,12 +127,12 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa } } else if (sparkType == DataTypes.TimestampNTZType && isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { - validateTimestampNTZType(sparkType); + validateTimestampNTZType(); // TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase. return new LongUpdater(); } else if (sparkType == DataTypes.TimestampNTZType && isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { - validateTimestampNTZType(sparkType); + validateTimestampNTZType(); // TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase. return new LongAsMicrosUpdater(); } else if (sparkType instanceof DayTimeIntervalType) { @@ -203,7 +203,7 @@ boolean isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit unit) { annotation.getUnit() == unit; } - void validateTimestampNTZType(DataType sparkType) { + private void validateTimestampNTZType() { assert(logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation); // Throw an exception if the Parquet type is TimestampLTZ as the Catalyst type is TimestampNTZ. // This is to avoid mistakes in reading the timestamp values.