Skip to content

Commit d712a7a

Browse files
MaxGekkcloud-fan
authored andcommitted
[SPARK-31159][SQL] Rebase date/timestamp from/to Julian calendar in parquet
The PR addresses the issue of compatibility with Spark 2.4 and earlier version in reading/writing dates and timestamp via Parquet datasource. Previous releases are based on a hybrid calendar - Julian + Gregorian. Since Spark 3.0, Proleptic Gregorian calendar is used by default, see SPARK-26651. In particular, the issue pops up for dates/timestamps before 1582-10-15 when the hybrid calendar switches from/to Gregorian to/from Julian calendar. The same local date in different calendar is converted to different number of days since the epoch 1970-01-01. For example, the 1001-01-01 date is converted to: - -719164 in Julian calendar. Spark 2.4 saves the number as a value of DATE type into parquet. - -719162 in Proleptic Gregorian calendar. Spark 3.0 saves the number as a date value. According to the parquet spec, parquet timestamps of the `TIMESTAMP_MILLIS`, `TIMESTAMP_MICROS` output type and parquet dates should be based on Proleptic Gregorian calendar but the `INT96` timestamps should be stored as Julian days. Since the version 3.0, Spark conforms the spec but for the backward compatibility with previous version, the PR proposes rebasing from/to Proleptic Gregorian calendar to the hybrid one under the SQL config: ``` spark.sql.legacy.parquet.rebaseDateTime.enabled ``` which is set to `false` by default which means the rebasing is not performed by default. The details of the implementation: 1. Added 2 methods to `DateTimeUtils` for rebasing microseconds. `rebaseGregorianToJulianMicros()` builds a local timestamp in Proleptic Gregorian calendar, extracts date-time fields `year`, `month`, ..., `second fraction` from the local timestamp and uses them to build another local timestamp based on the hybrid calendar (using `java.util.Calendar` API). After that it calculates the number of microseconds since the epoch using the resulted local timestamp. The function performs the conversion via the system JVM time zone for compatibility with Spark 2.4 and earlier versions. The `rebaseJulianToGregorianMicros()` function does reverse conversion. 2. Added 2 methods to `DateTimeUtils` for rebasing days. `rebaseGregorianToJulianDays()` builds a local date from the passed number of days since the epoch in Proleptic Gregorian calendar, interprets the resulted date as a local date in the hybrid calendar and gets the number of days since the epoch from the resulted local date. The conversion is performed via the `UTC` time zone because the conversion is independent from time zones, and `UTC` is selected to void round issues of casting days to milliseconds and back. The `rebaseJulianToGregorianDays()` functions does revers conversion. 3. Use `rebaseGregorianToJulianMicros()` and `rebaseGregorianToJulianDays()` while saving timestamps/dates to parquet files if the SQL config is on. 4. Use `rebaseJulianToGregorianMicros()` and `rebaseJulianToGregorianDays()` while loading timestamps/dates from parquet files if the SQL config is on. 5. The SQL config `spark.sql.legacy.parquet.rebaseDateTime.enabled` controls conversions from/to dates, timestamps of `TIMESTAMP_MILLIS`, `TIMESTAMP_MICROS`, see the SQL config `spark.sql.parquet.outputTimestampType`. 6. The rebasing is always performed for `INT96` timestamps, independently from `spark.sql.legacy.parquet.rebaseDateTime.enabled`. 7. Supported the vectorized parquet reader, see the SQL config `spark.sql.parquet.enableVectorizedReader`. - For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result. Also after the changes, users can enable the rebasing in write, and save dates/timestamps that can be loaded correctly by Spark 2.4 and earlier versions. - It fixes the bug of incorrect saving/loading timestamps of the `INT96` type Yes, the timestamp `1001-01-01 01:02:03.123456` saved by Spark 2.4.5 as `TIMESTAMP_MICROS` is interpreted by Spark 3.0.0-preview2 differently: ```scala scala> spark.read.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_micros").show(false) +--------------------------+ |ts | +--------------------------+ |1001-01-07 11:32:20.123456| +--------------------------+ ``` After the changes: ```scala scala> spark.conf.set("spark.sql.legacy.parquet.rebaseDateTime.enabled", true) scala> spark.read.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_micros").show(false) +--------------------------+ |ts | +--------------------------+ |1001-01-01 01:02:03.123456| +--------------------------+ ``` 1. Added tests to `ParquetIOSuite` to check rebasing in read for regular reader and vectorized parquet reader. The test reads back parquet files saved by Spark 2.4.5 via: ```shell $ export TZ="America/Los_Angeles" ``` ```scala scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") scala> val df = Seq("1001-01-01").toDF("dateS").select($"dateS".cast("date").as("date")) df: org.apache.spark.sql.DataFrame = [date: date] scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_date") scala> val df = Seq("1001-01-01 01:02:03.123456").toDF("tsS").select($"tsS".cast("timestamp").as("ts")) df: org.apache.spark.sql.DataFrame = [ts: timestamp] scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_micros") scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS") scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_millis") scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "INT96") scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_int96") ``` 2. Manually check the write code path. Save date/timestamps (TIMESTAMP_MICROS, TIMESTAMP_MILLIS, INT96) by Spark 3.1.0-SNAPSHOT (after the changes): ```bash $ export TZ="America/Los_Angeles" ``` ```scala scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") scala> spark.conf.set("spark.sql.legacy.parquet.rebaseDateTime.enabled", true) scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS") scala> val df = Seq(("1001-01-01", "1001-01-01 01:02:03.123456")).toDF("dateS", "tsS").select($"dateS".cast("date").as("d"), $"tsS".cast("timestamp").as("ts")) df: org.apache.spark.sql.DataFrame = [d: date, ts: timestamp] scala> df.write.parquet("/Users/maxim/tmp/before_1582/3_0_0_micros") scala> spark.read.parquet("/Users/maxim/tmp/before_1582/3_0_0_micros").show(false) +----------+--------------------------+ |d |ts | +----------+--------------------------+ |1001-01-01|1001-01-01 01:02:03.123456| +----------+--------------------------+ ``` Read the saved date/timestamp by Spark 2.4.5: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") scala> spark.read.parquet("/Users/maxim/tmp/before_1582/3_0_0_micros").show(false) +----------+--------------------------+ |d |ts | +----------+--------------------------+ |1001-01-01|1001-01-01 01:02:03.123456| +----------+--------------------------+ ``` Closes #27915 from MaxGekk/rebase-parquet-datetime. Authored-by: Maxim Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit bb295d8) Signed-off-by: Wenchen Fan <[email protected]>
1 parent f45be8b commit d712a7a

File tree

11 files changed

+342
-17
lines changed

11 files changed

+342
-17
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
2121
import java.sql.{Date, Timestamp}
2222
import java.time._
2323
import java.time.temporal.{ChronoField, ChronoUnit, IsoFields}
24-
import java.util.{Locale, TimeZone}
24+
import java.util.{Calendar, Locale, TimeZone}
2525
import java.util.concurrent.TimeUnit._
2626

2727
import scala.util.control.NonFatal
@@ -150,7 +150,9 @@ object DateTimeUtils {
150150
def fromJulianDay(day: Int, nanoseconds: Long): SQLTimestamp = {
151151
// use Long to avoid rounding errors
152152
val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY
153-
SECONDS.toMicros(seconds) + NANOSECONDS.toMicros(nanoseconds)
153+
val micros = SECONDS.toMicros(seconds) + NANOSECONDS.toMicros(nanoseconds)
154+
val rebased = rebaseJulianToGregorianMicros(micros)
155+
rebased
154156
}
155157

156158
/**
@@ -159,7 +161,7 @@ object DateTimeUtils {
159161
* Note: support timestamp since 4717 BC (without negative nanoseconds, compatible with Hive).
160162
*/
161163
def toJulianDay(us: SQLTimestamp): (Int, Long) = {
162-
val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
164+
val julian_us = rebaseGregorianToJulianMicros(us) + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
163165
val day = julian_us / MICROS_PER_DAY
164166
val micros = julian_us % MICROS_PER_DAY
165167
(day.toInt, MICROSECONDS.toNanos(micros))
@@ -942,4 +944,102 @@ object DateTimeUtils {
942944
val days = period.getDays
943945
new CalendarInterval(months, days, 0)
944946
}
947+
948+
/**
949+
* Converts the given microseconds to a local date-time in UTC time zone in Proleptic Gregorian
950+
* calendar, interprets the result as a local date-time in Julian calendar in UTC time zone.
951+
* And takes microseconds since the epoch from the Julian timestamp.
952+
*
953+
* @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'.
954+
* @return The rebased microseconds since the epoch in Julian calendar.
955+
*/
956+
def rebaseGregorianToJulianMicros(micros: Long): Long = {
957+
val ldt = microsToInstant(micros).atZone(ZoneId.systemDefault).toLocalDateTime
958+
val cal = new Calendar.Builder()
959+
// `gregory` is a hybrid calendar that supports both
960+
// the Julian and Gregorian calendar systems
961+
.setCalendarType("gregory")
962+
.setDate(ldt.getYear, ldt.getMonthValue - 1, ldt.getDayOfMonth)
963+
.setTimeOfDay(ldt.getHour, ldt.getMinute, ldt.getSecond)
964+
.build()
965+
fromMillis(cal.getTimeInMillis) + ldt.get(ChronoField.MICRO_OF_SECOND)
966+
}
967+
968+
/**
969+
* Converts the given microseconds to a local date-time in UTC time zone in Julian calendar,
970+
* interprets the result as a local date-time in Proleptic Gregorian calendar in UTC time zone.
971+
* And takes microseconds since the epoch from the Gregorian timestamp.
972+
*
973+
* @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'.
974+
* @return The rebased microseconds since the epoch in Proleptic Gregorian calendar.
975+
*/
976+
def rebaseJulianToGregorianMicros(micros: Long): Long = {
977+
val cal = new Calendar.Builder()
978+
// `gregory` is a hybrid calendar that supports both
979+
// the Julian and Gregorian calendar systems
980+
.setCalendarType("gregory")
981+
.setInstant(toMillis(micros))
982+
.build()
983+
val localDateTime = LocalDateTime.of(
984+
cal.get(Calendar.YEAR),
985+
cal.get(Calendar.MONTH) + 1,
986+
cal.get(Calendar.DAY_OF_MONTH),
987+
cal.get(Calendar.HOUR_OF_DAY),
988+
cal.get(Calendar.MINUTE),
989+
cal.get(Calendar.SECOND),
990+
(Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt)
991+
instantToMicros(localDateTime.atZone(ZoneId.systemDefault).toInstant)
992+
}
993+
994+
/**
995+
* Converts the given number of days since the epoch day 1970-01-01 to
996+
* a local date in Julian calendar, interprets the result as a local
997+
* date in Proleptic Gregorian calendar, and take the number of days
998+
* since the epoch from the Gregorian date.
999+
*
1000+
* @param days The number of days since the epoch in Julian calendar.
1001+
* @return The rebased number of days in Gregorian calendar.
1002+
*/
1003+
def rebaseJulianToGregorianDays(days: Int): Int = {
1004+
val utcCal = new Calendar.Builder()
1005+
// `gregory` is a hybrid calendar that supports both
1006+
// the Julian and Gregorian calendar systems
1007+
.setCalendarType("gregory")
1008+
.setTimeZone(TimeZoneUTC)
1009+
.setInstant(Math.multiplyExact(days, MILLIS_PER_DAY))
1010+
.build()
1011+
val localDate = LocalDate.of(
1012+
utcCal.get(Calendar.YEAR),
1013+
utcCal.get(Calendar.MONTH) + 1,
1014+
utcCal.get(Calendar.DAY_OF_MONTH))
1015+
Math.toIntExact(localDate.toEpochDay)
1016+
}
1017+
1018+
/**
1019+
* Rebasing days since the epoch to store the same number of days
1020+
* as by Spark 2.4 and earlier versions. Spark 3.0 switched to
1021+
* Proleptic Gregorian calendar (see SPARK-26651), and as a consequence of that,
1022+
* this affects dates before 1582-10-15. Spark 2.4 and earlier versions use
1023+
* Julian calendar for dates before 1582-10-15. So, the same local date may
1024+
* be mapped to different number of days since the epoch in different calendars.
1025+
*
1026+
* For example:
1027+
* Proleptic Gregorian calendar: 1582-01-01 -> -141714
1028+
* Julian calendar: 1582-01-01 -> -141704
1029+
* The code below converts -141714 to -141704.
1030+
*
1031+
* @param days The number of days since the epoch 1970-01-01. It can be negative.
1032+
* @return The rebased number of days since the epoch in Julian calendar.
1033+
*/
1034+
def rebaseGregorianToJulianDays(days: Int): Int = {
1035+
val localDate = LocalDate.ofEpochDay(days)
1036+
val utcCal = new Calendar.Builder()
1037+
// `gregory` is a hybrid calendar that supports both
1038+
// the Julian and Gregorian calendar systems
1039+
.setCalendarType("gregory")
1040+
.setTimeZone(TimeZoneUTC)
1041+
.setDate(localDate.getYear, localDate.getMonthValue - 1, localDate.getDayOfMonth)
1042+
.build()
1043+
Math.toIntExact(Math.floorDiv(utcCal.getTimeInMillis, MILLIS_PER_DAY))
1044+
}
9451045
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2232,6 +2232,19 @@ object SQLConf {
22322232
.booleanConf
22332233
.createWithDefault(false)
22342234

2235+
val LEGACY_PARQUET_REBASE_DATETIME =
2236+
buildConf("spark.sql.legacy.parquet.rebaseDateTime.enabled")
2237+
.internal()
2238+
.doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " +
2239+
"to the hybrid calendar (Julian + Gregorian) in write and " +
2240+
"from the hybrid calendar to Proleptic Gregorian calendar in read. " +
2241+
"The rebasing is performed by converting micros/millis/days to " +
2242+
"a local date/timestamp in the source calendar, interpreting the resulted date/" +
2243+
"timestamp in the target calendar, and getting the number of micros/millis/days " +
2244+
"since the epoch 1970-01-01 00:00:00Z.")
2245+
.booleanConf
2246+
.createWithDefault(false)
2247+
22352248
/**
22362249
* Holds information about keys that have been deprecated.
22372250
*
@@ -2807,6 +2820,8 @@ class SQLConf extends Serializable with Logging {
28072820

28082821
def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED)
28092822

2823+
def parquetRebaseDateTimeEnabled: Boolean = getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME)
2824+
28102825
/** ********************** SQLConf functionality methods ************ */
28112826

28122827
/** Set Spark SQL configuration properties. */

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,4 +669,64 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
669669
assert(toDate("tomorrow CET ", zoneId).get === today + 1)
670670
}
671671
}
672+
673+
test("rebase julian to/from gregorian micros") {
674+
outstandingTimezones.foreach { timeZone =>
675+
withDefaultTimeZone(timeZone) {
676+
Seq(
677+
"0001-01-01 01:02:03.654321",
678+
"1000-01-01 03:02:01.123456",
679+
"1582-10-04 00:00:00.000000",
680+
"1582-10-15 00:00:00.999999", // Gregorian cutover day
681+
"1883-11-10 00:00:00.000000", // America/Los_Angeles -7:52:58 zone offset
682+
"1883-11-20 00:00:00.000000", // America/Los_Angeles -08:00 zone offset
683+
"1969-12-31 11:22:33.000100",
684+
"1970-01-01 00:00:00.000001", // The epoch day
685+
"2020-03-14 09:33:01.500000").foreach { ts =>
686+
withClue(s"time zone = ${timeZone.getID} ts = $ts") {
687+
val julianTs = Timestamp.valueOf(ts)
688+
val julianMicros = fromMillis(julianTs.getTime) +
689+
((julianTs.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS)
690+
val gregorianMicros = instantToMicros(LocalDateTime.parse(ts.replace(' ', 'T'))
691+
.atZone(timeZone.toZoneId)
692+
.toInstant)
693+
694+
assert(rebaseJulianToGregorianMicros(julianMicros) === gregorianMicros)
695+
assert(rebaseGregorianToJulianMicros(gregorianMicros) === julianMicros)
696+
}
697+
}
698+
}
699+
}
700+
}
701+
702+
test("rebase gregorian to/from julian days") {
703+
// millisToDays() and fromJavaDate() are taken from Spark 2.4
704+
def millisToDays(millisUtc: Long, timeZone: TimeZone): Int = {
705+
val millisLocal = millisUtc + timeZone.getOffset(millisUtc)
706+
Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt
707+
}
708+
def fromJavaDate(date: Date): Int = {
709+
millisToDays(date.getTime, defaultTimeZone())
710+
}
711+
outstandingTimezones.foreach { timeZone =>
712+
withDefaultTimeZone(timeZone) {
713+
Seq(
714+
"0001-01-01",
715+
"1000-01-01",
716+
"1582-10-04",
717+
"1582-10-15", // Gregorian cutover day
718+
"1883-11-10", // America/Los_Angeles -7:52:58 zone offset
719+
"1883-11-20", // America/Los_Angeles -08:00 zone offset
720+
"1969-12-31",
721+
"1970-01-01", // The epoch day
722+
"2020-03-14").foreach { date =>
723+
val julianDays = fromJavaDate(Date.valueOf(date))
724+
val gregorianDays = localDateToDays(LocalDate.parse(date))
725+
726+
assert(rebaseGregorianToJulianDays(gregorianDays) === julianDays)
727+
assert(rebaseJulianToGregorianDays(julianDays) === gregorianDays)
728+
}
729+
}
730+
}
731+
}
672732
}

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
3838
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
3939
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
40+
import org.apache.spark.sql.internal.SQLConf;
4041
import org.apache.spark.sql.types.DataTypes;
4142
import org.apache.spark.sql.types.DecimalType;
4243

@@ -101,6 +102,7 @@ public class VectorizedColumnReader {
101102
// The timezone conversion to apply to int96 timestamps. Null if no conversion.
102103
private final ZoneId convertTz;
103104
private static final ZoneId UTC = ZoneOffset.UTC;
105+
private final boolean rebaseDateTime;
104106

105107
public VectorizedColumnReader(
106108
ColumnDescriptor descriptor,
@@ -129,6 +131,7 @@ public VectorizedColumnReader(
129131
if (totalValueCount == 0) {
130132
throw new IOException("totalValueCount == 0");
131133
}
134+
this.rebaseDateTime = SQLConf.get().parquetRebaseDateTimeEnabled();
132135
}
133136

134137
/**
@@ -407,7 +410,7 @@ private void readBooleanBatch(int rowId, int num, WritableColumnVector column)
407410
private void readIntBatch(int rowId, int num, WritableColumnVector column) throws IOException {
408411
// This is where we implement support for the valid type conversions.
409412
// TODO: implement remaining type conversions
410-
if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType ||
413+
if (column.dataType() == DataTypes.IntegerType ||
411414
DecimalType.is32BitDecimalType(column.dataType())) {
412415
defColumn.readIntegers(
413416
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
@@ -417,6 +420,21 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw
417420
} else if (column.dataType() == DataTypes.ShortType) {
418421
defColumn.readShorts(
419422
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
423+
} else if (column.dataType() == DataTypes.DateType ) {
424+
if (rebaseDateTime) {
425+
for (int i = 0; i < num; i++) {
426+
if (defColumn.readInteger() == maxDefLevel) {
427+
column.putInt(
428+
rowId + i,
429+
DateTimeUtils.rebaseJulianToGregorianDays(dataColumn.readInteger()));
430+
} else {
431+
column.putNull(rowId + i);
432+
}
433+
}
434+
} else {
435+
defColumn.readIntegers(
436+
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
437+
}
420438
} else {
421439
throw constructConvertNotSupportedException(descriptor, column);
422440
}
@@ -425,14 +443,32 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw
425443
private void readLongBatch(int rowId, int num, WritableColumnVector column) throws IOException {
426444
// This is where we implement support for the valid type conversions.
427445
if (column.dataType() == DataTypes.LongType ||
428-
DecimalType.is64BitDecimalType(column.dataType()) ||
429-
originalType == OriginalType.TIMESTAMP_MICROS) {
446+
DecimalType.is64BitDecimalType(column.dataType())) {
430447
defColumn.readLongs(
431448
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
449+
} else if (originalType == OriginalType.TIMESTAMP_MICROS) {
450+
if (rebaseDateTime) {
451+
for (int i = 0; i < num; i++) {
452+
if (defColumn.readInteger() == maxDefLevel) {
453+
column.putLong(
454+
rowId + i,
455+
DateTimeUtils.rebaseJulianToGregorianMicros(dataColumn.readLong()));
456+
} else {
457+
column.putNull(rowId + i);
458+
}
459+
}
460+
} else {
461+
defColumn.readLongs(
462+
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
463+
}
432464
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
433465
for (int i = 0; i < num; i++) {
434466
if (defColumn.readInteger() == maxDefLevel) {
435-
column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong()));
467+
long micros = DateTimeUtils.fromMillis(dataColumn.readLong());
468+
if (rebaseDateTime) {
469+
micros = DateTimeUtils.rebaseJulianToGregorianMicros(micros);
470+
}
471+
column.putLong(rowId + i, micros);
436472
} else {
437473
column.putNull(rowId + i);
438474
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,9 @@ private[parquet] class ParquetRowConverter(
130130
updater: ParentContainerUpdater)
131131
extends ParquetGroupConverter(updater) with Logging {
132132

133+
// Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
134+
private val rebaseDateTime = SQLConf.get.parquetRebaseDateTimeEnabled
135+
133136
assert(
134137
parquetType.getFieldCount <= catalystType.length,
135138
s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema:
@@ -271,16 +274,35 @@ private[parquet] class ParquetRowConverter(
271274
new ParquetStringConverter(updater)
272275

273276
case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS =>
274-
new ParquetPrimitiveConverter(updater) {
275-
override def addLong(value: Long): Unit = {
276-
updater.setLong(value)
277+
if (rebaseDateTime) {
278+
new ParquetPrimitiveConverter(updater) {
279+
override def addLong(value: Long): Unit = {
280+
val rebased = DateTimeUtils.rebaseJulianToGregorianMicros(value)
281+
updater.setLong(rebased)
282+
}
283+
}
284+
} else {
285+
new ParquetPrimitiveConverter(updater) {
286+
override def addLong(value: Long): Unit = {
287+
updater.setLong(value)
288+
}
277289
}
278290
}
279291

280292
case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS =>
281-
new ParquetPrimitiveConverter(updater) {
282-
override def addLong(value: Long): Unit = {
283-
updater.setLong(DateTimeUtils.fromMillis(value))
293+
if (rebaseDateTime) {
294+
new ParquetPrimitiveConverter(updater) {
295+
override def addLong(value: Long): Unit = {
296+
val micros = DateTimeUtils.fromMillis(value)
297+
val rebased = DateTimeUtils.rebaseJulianToGregorianMicros(micros)
298+
updater.setLong(rebased)
299+
}
300+
}
301+
} else {
302+
new ParquetPrimitiveConverter(updater) {
303+
override def addLong(value: Long): Unit = {
304+
updater.setLong(DateTimeUtils.fromMillis(value))
305+
}
284306
}
285307
}
286308

@@ -305,10 +327,17 @@ private[parquet] class ParquetRowConverter(
305327
}
306328

307329
case DateType =>
308-
new ParquetPrimitiveConverter(updater) {
309-
override def addInt(value: Int): Unit = {
310-
// DateType is not specialized in `SpecificMutableRow`, have to box it here.
311-
updater.set(value.asInstanceOf[DateType#InternalType])
330+
if (rebaseDateTime) {
331+
new ParquetPrimitiveConverter(updater) {
332+
override def addInt(value: Int): Unit = {
333+
updater.set(DateTimeUtils.rebaseJulianToGregorianDays(value))
334+
}
335+
}
336+
} else {
337+
new ParquetPrimitiveConverter(updater) {
338+
override def addInt(value: Int): Unit = {
339+
updater.set(value)
340+
}
312341
}
313342
}
314343

0 commit comments

Comments
 (0)