Skip to content

Commit 0948e24

Browse files
committed
[SPARK-46466][SQL][3.5] Vectorized parquet reader should never do rebase for timestamp ntz
backport #44428 ### What changes were proposed in this pull request? This fixes a correctness bug. The TIMESTAMP_NTZ is a new data type in Spark and has no legacy files that need to do calendar rebase. However, the vectorized parquet reader treat it the same as LTZ and may do rebase if the parquet file was written with the legacy rebase mode. This PR fixes it to never do rebase for NTZ. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? Yes, now we can correctly write and read back NTZ value even if the date is before 1582. ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? No Closes #44446 from cloud-fan/ntz2. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent a001482 commit 0948e24

File tree

2 files changed

+29
-14
lines changed

2 files changed

+29
-14
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -109,24 +109,32 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa
109109
// For unsigned int64, it stores as plain signed int64 in Parquet when dictionary
110110
// fallbacks. We read them as decimal values.
111111
return new UnsignedLongUpdater();
112-
} else if (isTimestamp(sparkType) &&
113-
isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
114-
validateTimestampType(sparkType);
112+
} else if (sparkType == DataTypes.TimestampType &&
113+
isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
115114
if ("CORRECTED".equals(datetimeRebaseMode)) {
116115
return new LongUpdater();
117116
} else {
118117
boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
119118
return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz);
120119
}
121-
} else if (isTimestamp(sparkType) &&
122-
isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
123-
validateTimestampType(sparkType);
120+
} else if (sparkType == DataTypes.TimestampType &&
121+
isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
124122
if ("CORRECTED".equals(datetimeRebaseMode)) {
125123
return new LongAsMicrosUpdater();
126124
} else {
127125
final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
128126
return new LongAsMicrosRebaseUpdater(failIfRebase, datetimeRebaseTz);
129127
}
128+
} else if (sparkType == DataTypes.TimestampNTZType &&
129+
isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
130+
validateTimestampNTZType();
131+
// TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase.
132+
return new LongUpdater();
133+
} else if (sparkType == DataTypes.TimestampNTZType &&
134+
isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
135+
validateTimestampNTZType();
136+
// TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase.
137+
return new LongAsMicrosUpdater();
130138
} else if (sparkType instanceof DayTimeIntervalType) {
131139
return new LongUpdater();
132140
}
@@ -196,12 +204,11 @@ boolean isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit unit) {
196204
((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).getUnit() == unit;
197205
}
198206

199-
void validateTimestampType(DataType sparkType) {
207+
private void validateTimestampNTZType() {
200208
assert(logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation);
201-
// Throw an exception if the Parquet type is TimestampLTZ and the Catalyst type is TimestampNTZ.
209+
// Throw an exception if the Parquet type is TimestampLTZ as the Catalyst type is TimestampNTZ.
202210
// This is to avoid mistakes in reading the timestamp values.
203-
if (((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).isAdjustedToUTC() &&
204-
sparkType == DataTypes.TimestampNTZType) {
211+
if (((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).isAdjustedToUTC()) {
205212
convertErrorForTimestampNTZ("int64 time(" + logicalTypeAnnotation + ")");
206213
}
207214
}
@@ -1152,10 +1159,6 @@ private static boolean isLongDecimal(DataType dt) {
11521159
return false;
11531160
}
11541161

1155-
private static boolean isTimestamp(DataType dt) {
1156-
return dt == DataTypes.TimestampType || dt == DataTypes.TimestampNTZType;
1157-
}
1158-
11591162
private static boolean isDecimalTypeMatched(ColumnDescriptor descriptor, DataType dt) {
11601163
DecimalType d = (DecimalType) dt;
11611164
LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation();

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,18 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
255255
}
256256
}
257257

258+
test("SPARK-46466: write and read TimestampNTZ with legacy rebase mode") {
259+
withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> "LEGACY") {
260+
withTable("ts") {
261+
sql("create table ts (c1 timestamp_ntz) using parquet")
262+
sql("insert into ts values (timestamp_ntz'0900-01-01 01:10:10')")
263+
withAllParquetReaders {
264+
checkAnswer(spark.table("ts"), sql("select timestamp_ntz'0900-01-01 01:10:10'"))
265+
}
266+
}
267+
}
268+
}
269+
258270
test("Enabling/disabling merging partfiles when merging parquet schema") {
259271
def testSchemaMerging(expectedColumnNumber: Int): Unit = {
260272
withTempDir { dir =>

0 commit comments

Comments
 (0)