Skip to content

Commit ce63bef

Browse files
MaxGekkHyukjinKwon
authored andcommitted
[SPARK-31662][SQL] Fix loading of dates before 1582-10-15 from dictionary encoded Parquet columns
### What changes were proposed in this pull request? Modified the `decodeDictionaryIds()` method `VectorizedColumnReader` to handle especially the `DateType` when passed parameter `rebaseDateTime` is true. In that case, decoded days are rebased from the hybrid calendar to Proleptic Gregorian calendar using `RebaseDateTime`.`rebaseJulianToGregorianDays()`. ### Why are the changes needed? This fixes the bug of loading dates before the cutover day from dictionary encoded column in parquet files. The code below forces dictionary encoding: ```scala spark.conf.set("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled", true) Seq.tabulate(8)(_ => "1001-01-01").toDF("dateS") .select($"dateS".cast("date").as("date")).repartition(1) .write .option("parquet.enable.dictionary", true) .parquet(path) ``` Load the dates back: ```scala spark.read.parquet(path).show(false) +----------+ |date | +----------+ |1001-01-07| ... |1001-01-07| +----------+ ``` Expected values **must be 1000-01-01** but not 1001-01-07. ### Does this PR introduce _any_ user-facing change? Yes. After the changes: ```scala spark.read.parquet(path).show(false) +----------+ |date | +----------+ |1001-01-01| ... |1001-01-01| +----------+ ``` ### How was this patch tested? Modified the test `SPARK-31159: rebasing dates in write` in `ParquetIOSuite` to checked reading dictionary encoded dates. Closes #28479 from MaxGekk/fix-datetime-rebase-parquet-dict-enc. Authored-by: Max Gekk <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent ecda38a commit ce63bef

File tree

2 files changed

+58
-28
lines changed

2 files changed

+58
-28
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,24 @@ private boolean next() throws IOException {
152152
return definitionLevelColumn.nextInt() == maxDefLevel;
153153
}
154154

155+
private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName) {
156+
boolean isSupported = false;
157+
switch (typeName) {
158+
case INT32:
159+
isSupported = originalType != OriginalType.DATE || !rebaseDateTime;
160+
break;
161+
case INT64:
162+
isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
163+
break;
164+
case FLOAT:
165+
case DOUBLE:
166+
case BINARY:
167+
isSupported = true;
168+
break;
169+
}
170+
return isSupported;
171+
}
172+
155173
/**
156174
* Reads `total` values from this columnReader into column.
157175
*/
@@ -181,13 +199,7 @@ void readBatch(int total, WritableColumnVector column) throws IOException {
181199

182200
// TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we need to post process
183201
// the values to add microseconds precision.
184-
if (column.hasDictionary() || (rowId == 0 &&
185-
(typeName == PrimitiveType.PrimitiveTypeName.INT32 ||
186-
(typeName == PrimitiveType.PrimitiveTypeName.INT64 &&
187-
originalType != OriginalType.TIMESTAMP_MILLIS) ||
188-
typeName == PrimitiveType.PrimitiveTypeName.FLOAT ||
189-
typeName == PrimitiveType.PrimitiveTypeName.DOUBLE ||
190-
typeName == PrimitiveType.PrimitiveTypeName.BINARY))) {
202+
if (column.hasDictionary() || (rowId == 0 && isLazyDecodingSupported(typeName))) {
191203
// Column vector supports lazy decoding of dictionary values so just set the dictionary.
192204
// We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
193205
// non-dictionary encoded values have already been added).
@@ -266,7 +278,8 @@ private void decodeDictionaryIds(
266278
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
267279
case INT32:
268280
if (column.dataType() == DataTypes.IntegerType ||
269-
DecimalType.is32BitDecimalType(column.dataType())) {
281+
DecimalType.is32BitDecimalType(column.dataType()) ||
282+
(column.dataType() == DataTypes.DateType && !rebaseDateTime)) {
270283
for (int i = rowId; i < rowId + num; ++i) {
271284
if (!column.isNullAt(i)) {
272285
column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
@@ -284,6 +297,14 @@ private void decodeDictionaryIds(
284297
column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
285298
}
286299
}
300+
} else if (column.dataType() == DataTypes.DateType) {
301+
for (int i = rowId; i < rowId + num; ++i) {
302+
if (!column.isNullAt(i)) {
303+
int julianDays = dictionary.decodeToInt(dictionaryIds.getDictId(i));
304+
int gregorianDays = RebaseDateTime.rebaseJulianToGregorianDays(julianDays);
305+
column.putInt(i, gregorianDays);
306+
}
307+
}
287308
} else {
288309
throw constructConvertNotSupportedException(descriptor, column);
289310
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -978,29 +978,38 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
978978
}
979979

980980
test("SPARK-31159: rebasing dates in write") {
981-
withTempPath { dir =>
982-
val path = dir.getAbsolutePath
983-
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
984-
Seq("1001-01-01").toDF("dateS")
985-
.select($"dateS".cast("date").as("date"))
986-
.write
987-
.parquet(path)
988-
}
981+
val N = 8
982+
Seq(false, true).foreach { dictionaryEncoding =>
983+
withTempPath { dir =>
984+
val path = dir.getAbsolutePath
985+
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") {
986+
Seq.tabulate(N)(_ => "1001-01-01").toDF("dateS")
987+
.select($"dateS".cast("date").as("date"))
988+
.repartition(1)
989+
.write
990+
.option("parquet.enable.dictionary", dictionaryEncoding)
991+
.parquet(path)
992+
}
989993

990-
Seq(false, true).foreach { vectorized =>
991-
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
992-
// The file metadata indicates if it needs rebase or not, so we can always get the correct
993-
// result regardless of the "rebaseInRead" config.
994-
Seq(true, false).foreach { rebase =>
995-
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
996-
checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-01")))
994+
Seq(false, true).foreach { vectorized =>
995+
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
996+
// The file metadata indicates if it needs rebase or not, so we can always get
997+
// the correct result regardless of the "rebaseInRead" config.
998+
Seq(true, false).foreach { rebase =>
999+
withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
1000+
checkAnswer(
1001+
spark.read.parquet(path),
1002+
Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01"))))
1003+
}
9971004
}
998-
}
9991005

1000-
// Force to not rebase to prove the written datetime values are rebased and we will get
1001-
// wrong result if we don't rebase while reading.
1002-
withSQLConf("spark.test.forceNoRebase" -> "true") {
1003-
checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-07")))
1006+
// Force to not rebase to prove the written datetime values are rebased and we will get
1007+
// wrong result if we don't rebase while reading.
1008+
withSQLConf("spark.test.forceNoRebase" -> "true") {
1009+
checkAnswer(
1010+
spark.read.parquet(path),
1011+
Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-07"))))
1012+
}
10041013
}
10051014
}
10061015
}

0 commit comments

Comments
 (0)