Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
ae83fdc
Add rebaseGregorianToJulianMicros()
MaxGekk Mar 13, 2020
4c35ddb
Add rebaseJulianToGregorianMicros()
MaxGekk Mar 14, 2020
74774fc
Add a comment to rebaseJulianToGregorianMicros()
MaxGekk Mar 14, 2020
8d74214
Add comment about calendar
MaxGekk Mar 14, 2020
56ca744
Add round trip test for micros
MaxGekk Mar 14, 2020
0cfeed5
Rename to JULIAN_CUTOVER_MICROS
MaxGekk Mar 14, 2020
78cbd6c
Add rebaseJulianToGregorianDays and rebaseGregorianToJulianDays
MaxGekk Mar 14, 2020
13aad60
Minor fix code style
MaxGekk Mar 14, 2020
96573a9
Add comments for rebaseGregorianToJulianDays()
MaxGekk Mar 14, 2020
36c0400
Add the SQL config spark.sql.legacy.parquet.rebaseDateTime.enabled
MaxGekk Mar 14, 2020
f0a2df6
Perform rebase in write
MaxGekk Mar 14, 2020
9e3c201
Perform rebase dates in write
MaxGekk Mar 14, 2020
053861c
Perform rebase dates/timestamps in read
MaxGekk Mar 14, 2020
1624756
Rewrite days rebasing using Java 7 API
MaxGekk Mar 15, 2020
e3bbcb5
Rewrite micros rebasing using Java 7 API
MaxGekk Mar 15, 2020
d1e6d84
Extract common code
MaxGekk Mar 15, 2020
acd33f1
Revert "Extract common code"
MaxGekk Mar 16, 2020
41fc33f
Revert "Rewrite micros rebasing using Java 7 API"
MaxGekk Mar 16, 2020
fe9f130
Revert "Rewrite days rebasing using Java 7 API"
MaxGekk Mar 16, 2020
c2c53b8
Remove branching by cutover days in rebase functions
MaxGekk Mar 16, 2020
8e94359
Rebasing via system time zone
MaxGekk Mar 16, 2020
d6f7e6b
Rebase dates via UTC local date
MaxGekk Mar 16, 2020
81d342a
Check more time zones in days rebasing
MaxGekk Mar 16, 2020
63428ab
More dates/timestamps for testing
MaxGekk Mar 16, 2020
a34a9ce
Rename utcCal to cal
MaxGekk Mar 16, 2020
e590d36
Test multiple time zones in rebasing timestamps
MaxGekk Mar 16, 2020
262f744
Test reading parquet files written by Spark 2.4
MaxGekk Mar 16, 2020
8947298
Remove .asInstanceOf[DateType#InternalType]
MaxGekk Mar 16, 2020
276d159
Change SQL config description
MaxGekk Mar 16, 2020
167b463
Rebase timestamp INT96
MaxGekk Mar 16, 2020
bbc4a1a
Support rebasing in VectorizedColumnReader
MaxGekk Mar 16, 2020
a1b34cb
Bug fix in write
MaxGekk Mar 16, 2020
d7debb4
Add test for write
MaxGekk Mar 16, 2020
6bebf3b
Read SQL config in place
MaxGekk Mar 17, 2020
67cec02
Remove a gap
MaxGekk Mar 17, 2020
8fa19a6
Remove config settings from ParquetWriteBuilder
MaxGekk Mar 17, 2020
a061870
Initialize rebaseDateTime in default constructor in ParquetWriteSupport
MaxGekk Mar 18, 2020
ae49cc4
Check INT96 rebasing regardless of SQL config settings
MaxGekk Mar 18, 2020
a96392c
Add JIRA id
MaxGekk Mar 18, 2020
5b52735
Test INT96 w/ and w/o vectorized reader
MaxGekk Mar 18, 2020
184fcd8
Merge remote-tracking branch 'remotes/origin/master' into rebase-parq…
MaxGekk Mar 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.time._
import java.time.temporal.{ChronoField, ChronoUnit, IsoFields}
import java.util.{Locale, TimeZone}
import java.util.{Calendar, Locale, TimeZone}
import java.util.concurrent.TimeUnit._

import scala.util.control.NonFatal
Expand Down Expand Up @@ -148,7 +148,9 @@ object DateTimeUtils {
def fromJulianDay(day: Int, nanoseconds: Long): SQLTimestamp = {
// use Long to avoid rounding errors
val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY
SECONDS.toMicros(seconds) + NANOSECONDS.toMicros(nanoseconds)
val micros = SECONDS.toMicros(seconds) + NANOSECONDS.toMicros(nanoseconds)
val rebased = rebaseJulianToGregorianMicros(micros)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I do believe this is a bug fix. We should rebase independently from new SQL config spark.sql.legacy.parquet.rebaseDateTime.enabled because days are stored as Julian days in Parquet INT96 timestamps.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check
https://github.com/apache/spark/pull/27915/files#diff-9bca6e248a070768e9f38ee5929411d8R897-R898
reads INT96 timestamp saved by Spark 2.4.5 via:

scala> spark.conf.set("spark.sql.parquet.outputTimestampType", "INT96")

scala> val df = Seq("1001-01-01 01:02:03.123456").toDF("tsS").select($"tsS".cast("timestamp").as("ts"))
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> df.write.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_int96")

scala> spark.read.parquet("/Users/maxim/tmp/before_1582/2_4_5_ts_int96").show(false)
+--------------------------+
|ts                        |
+--------------------------+
|1001-01-01 01:02:03.123456|
+--------------------------+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. INT96 is a legacy timestamp type in parquet and I'm not surprised if it follows the java 7 semantic.

rebased
}

/**
Expand All @@ -157,7 +159,7 @@ object DateTimeUtils {
* Note: support timestamp since 4717 BC (without negative nanoseconds, compatible with Hive).
*/
def toJulianDay(us: SQLTimestamp): (Int, Long) = {
val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
val julian_us = rebaseGregorianToJulianMicros(us) + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
val day = julian_us / MICROS_PER_DAY
val micros = julian_us % MICROS_PER_DAY
(day.toInt, MICROSECONDS.toNanos(micros))
Expand Down Expand Up @@ -936,4 +938,102 @@ object DateTimeUtils {
val days = period.getDays
new CalendarInterval(months, days, 0)
}

/**
* Converts the given microseconds to a local date-time in UTC time zone in Proleptic Gregorian
* calendar, interprets the result as a local date-time in Julian calendar in UTC time zone.
* And takes microseconds since the epoch from the Julian timestamp.
*
* @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'.
* @return The rebased microseconds since the epoch in Julian calendar.
*/
def rebaseGregorianToJulianMicros(micros: Long): Long = {
val ldt = microsToInstant(micros).atZone(ZoneId.systemDefault).toLocalDateTime
val cal = new Calendar.Builder()
// `gregory` is a hybrid calendar that supports both
// the Julian and Gregorian calendar systems
.setCalendarType("gregory")
.setDate(ldt.getYear, ldt.getMonthValue - 1, ldt.getDayOfMonth)
.setTimeOfDay(ldt.getHour, ldt.getMinute, ldt.getSecond)
.build()
millisToMicros(cal.getTimeInMillis) + ldt.get(ChronoField.MICRO_OF_SECOND)
}

/**
* Converts the given microseconds to a local date-time in UTC time zone in Julian calendar,
* interprets the result as a local date-time in Proleptic Gregorian calendar in UTC time zone.
* And takes microseconds since the epoch from the Gregorian timestamp.
*
* @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'.
* @return The rebased microseconds since the epoch in Proleptic Gregorian calendar.
*/
def rebaseJulianToGregorianMicros(micros: Long): Long = {
val cal = new Calendar.Builder()
// `gregory` is a hybrid calendar that supports both
// the Julian and Gregorian calendar systems
.setCalendarType("gregory")
.setInstant(microsToMillis(micros))
.build()
val localDateTime = LocalDateTime.of(
cal.get(Calendar.YEAR),
cal.get(Calendar.MONTH) + 1,
cal.get(Calendar.DAY_OF_MONTH),
cal.get(Calendar.HOUR_OF_DAY),
cal.get(Calendar.MINUTE),
cal.get(Calendar.SECOND),
(Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt)
instantToMicros(localDateTime.atZone(ZoneId.systemDefault).toInstant)
}

/**
* Converts the given number of days since the epoch day 1970-01-01 to
* a local date in Julian calendar, interprets the result as a local
* date in Proleptic Gregorian calendar, and take the number of days
* since the epoch from the Gregorian date.
*
* @param days The number of days since the epoch in Julian calendar.
* @return The rebased number of days in Gregorian calendar.
*/
def rebaseJulianToGregorianDays(days: Int): Int = {
val utcCal = new Calendar.Builder()
// `gregory` is a hybrid calendar that supports both
// the Julian and Gregorian calendar systems
.setCalendarType("gregory")
.setTimeZone(TimeZoneUTC)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use particular time zone here because the conversion of "logical" days is independent from time zone, actually. UTC is selected to avoid the problem of rounding micros to/from days because zone offset in UTC is 0.

.setInstant(Math.multiplyExact(days, MILLIS_PER_DAY))
.build()
val localDate = LocalDate.of(
utcCal.get(Calendar.YEAR),
utcCal.get(Calendar.MONTH) + 1,
utcCal.get(Calendar.DAY_OF_MONTH))
Math.toIntExact(localDate.toEpochDay)
}

/**
* Rebasing days since the epoch to store the same number of days
* as by Spark 2.4 and earlier versions. Spark 3.0 switched to
* Proleptic Gregorian calendar (see SPARK-26651), and as a consequence of that,
* this affects dates before 1582-10-15. Spark 2.4 and earlier versions use
* Julian calendar for dates before 1582-10-15. So, the same local date may
* be mapped to different number of days since the epoch in different calendars.
*
* For example:
* Proleptic Gregorian calendar: 1582-01-01 -> -141714
* Julian calendar: 1582-01-01 -> -141704
* The code below converts -141714 to -141704.
*
* @param days The number of days since the epoch 1970-01-01. It can be negative.
* @return The rebased number of days since the epoch in Julian calendar.
*/
def rebaseGregorianToJulianDays(days: Int): Int = {
val localDate = LocalDate.ofEpochDay(days)
val utcCal = new Calendar.Builder()
// `gregory` is a hybrid calendar that supports both
// the Julian and Gregorian calendar systems
.setCalendarType("gregory")
.setTimeZone(TimeZoneUTC)
.setDate(localDate.getYear, localDate.getMonthValue - 1, localDate.getDayOfMonth)
.build()
Math.toIntExact(Math.floorDiv(utcCal.getTimeInMillis, MILLIS_PER_DAY))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2487,6 +2487,20 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_PARQUET_REBASE_DATETIME =
buildConf("spark.sql.legacy.parquet.rebaseDateTime.enabled")
.internal()
.doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " +
"to the hybrid calendar (Julian + Gregorian) in write and " +
"from the hybrid calendar to Proleptic Gregorian calendar in read. " +
"The rebasing is performed by converting micros/millis/days to " +
"a local date/timestamp in the source calendar, interpreting the resulted date/" +
"timestamp in the target calendar, and getting the number of micros/millis/days " +
"since the epoch 1970-01-01 00:00:00Z.")
.version("3.0.0")
.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -3064,6 +3078,8 @@ class SQLConf extends Serializable with Logging {

def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID)

def parquetRebaseDateTimeEnabled: Boolean = getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,4 +670,64 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
assert(toDate("tomorrow CET ", zoneId).get === today + 1)
}
}

test("rebase julian to/from gregorian micros") {
outstandingTimezones.foreach { timeZone =>
withDefaultTimeZone(timeZone) {
Seq(
"0001-01-01 01:02:03.654321",
"1000-01-01 03:02:01.123456",
"1582-10-04 00:00:00.000000",
"1582-10-15 00:00:00.999999", // Gregorian cutover day
"1883-11-10 00:00:00.000000", // America/Los_Angeles -7:52:58 zone offset
"1883-11-20 00:00:00.000000", // America/Los_Angeles -08:00 zone offset
"1969-12-31 11:22:33.000100",
"1970-01-01 00:00:00.000001", // The epoch day
"2020-03-14 09:33:01.500000").foreach { ts =>
withClue(s"time zone = ${timeZone.getID} ts = $ts") {
val julianTs = Timestamp.valueOf(ts)
val julianMicros = millisToMicros(julianTs.getTime) +
((julianTs.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS)
val gregorianMicros = instantToMicros(LocalDateTime.parse(ts.replace(' ', 'T'))
.atZone(timeZone.toZoneId)
.toInstant)

assert(rebaseJulianToGregorianMicros(julianMicros) === gregorianMicros)
assert(rebaseGregorianToJulianMicros(gregorianMicros) === julianMicros)
}
}
}
}
}

test("rebase gregorian to/from julian days") {
// millisToDays() and fromJavaDate() are taken from Spark 2.4
def millisToDays(millisUtc: Long, timeZone: TimeZone): Int = {
val millisLocal = millisUtc + timeZone.getOffset(millisUtc)
Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt
}
def fromJavaDate(date: Date): Int = {
millisToDays(date.getTime, defaultTimeZone())
}
outstandingTimezones.foreach { timeZone =>
withDefaultTimeZone(timeZone) {
Seq(
"0001-01-01",
"1000-01-01",
"1582-10-04",
"1582-10-15", // Gregorian cutover day
"1883-11-10", // America/Los_Angeles -7:52:58 zone offset
"1883-11-20", // America/Los_Angeles -08:00 zone offset
"1969-12-31",
"1970-01-01", // The epoch day
"2020-03-14").foreach { date =>
val julianDays = fromJavaDate(Date.valueOf(date))
val gregorianDays = localDateToDays(LocalDate.parse(date))

assert(rebaseGregorianToJulianDays(gregorianDays) === julianDays)
assert(rebaseJulianToGregorianDays(julianDays) === gregorianDays)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;

Expand Down Expand Up @@ -101,6 +102,7 @@ public class VectorizedColumnReader {
// The timezone conversion to apply to int96 timestamps. Null if no conversion.
private final ZoneId convertTz;
private static final ZoneId UTC = ZoneOffset.UTC;
private final boolean rebaseDateTime;

public VectorizedColumnReader(
ColumnDescriptor descriptor,
Expand Down Expand Up @@ -129,6 +131,7 @@ public VectorizedColumnReader(
if (totalValueCount == 0) {
throw new IOException("totalValueCount == 0");
}
this.rebaseDateTime = SQLConf.get().parquetRebaseDateTimeEnabled();
}

/**
Expand Down Expand Up @@ -407,7 +410,7 @@ private void readBooleanBatch(int rowId, int num, WritableColumnVector column)
private void readIntBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType ||
if (column.dataType() == DataTypes.IntegerType ||
DecimalType.is32BitDecimalType(column.dataType())) {
defColumn.readIntegers(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
Expand All @@ -417,6 +420,21 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw
} else if (column.dataType() == DataTypes.ShortType) {
defColumn.readShorts(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (column.dataType() == DataTypes.DateType ) {
if (rebaseDateTime) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putInt(
rowId + i,
DateTimeUtils.rebaseJulianToGregorianDays(dataColumn.readInteger()));
} else {
column.putNull(rowId + i);
}
}
} else {
defColumn.readIntegers(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
}
} else {
throw constructConvertNotSupportedException(descriptor, column);
}
Expand All @@ -425,14 +443,32 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw
private void readLongBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType()) ||
originalType == OriginalType.TIMESTAMP_MICROS) {
DecimalType.is64BitDecimalType(column.dataType())) {
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (originalType == OriginalType.TIMESTAMP_MICROS) {
if (rebaseDateTime) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(
rowId + i,
DateTimeUtils.rebaseJulianToGregorianMicros(dataColumn.readLong()));
} else {
column.putNull(rowId + i);
}
}
} else {
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
}
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(rowId + i, DateTimeUtils.millisToMicros(dataColumn.readLong()));
long micros = DateTimeUtils.millisToMicros(dataColumn.readLong());
if (rebaseDateTime) {
micros = DateTimeUtils.rebaseJulianToGregorianMicros(micros);
}
column.putLong(rowId + i, micros);
} else {
column.putNull(rowId + i);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ private[parquet] class ParquetRowConverter(
updater: ParentContainerUpdater)
extends ParquetGroupConverter(updater) with Logging {

// Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
private val rebaseDateTime = SQLConf.get.parquetRebaseDateTimeEnabled

assert(
parquetType.getFieldCount <= catalystType.length,
s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema:
Expand Down Expand Up @@ -271,16 +274,35 @@ private[parquet] class ParquetRowConverter(
new ParquetStringConverter(updater)

case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
updater.setLong(value)
if (rebaseDateTime) {
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
val rebased = DateTimeUtils.rebaseJulianToGregorianMicros(value)
updater.setLong(rebased)
}
}
} else {
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
updater.setLong(value)
}
}
}

case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS =>
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
updater.setLong(DateTimeUtils.millisToMicros(value))
if (rebaseDateTime) {
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
val micros = DateTimeUtils.millisToMicros(value)
val rebased = DateTimeUtils.rebaseJulianToGregorianMicros(micros)
updater.setLong(rebased)
}
}
} else {
new ParquetPrimitiveConverter(updater) {
override def addLong(value: Long): Unit = {
updater.setLong(DateTimeUtils.millisToMicros(value))
}
}
}

Expand All @@ -305,10 +327,17 @@ private[parquet] class ParquetRowConverter(
}

case DateType =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = {
// DateType is not specialized in `SpecificMutableRow`, have to box it here.
updater.set(value.asInstanceOf[DateType#InternalType])
if (rebaseDateTime) {
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = {
updater.set(DateTimeUtils.rebaseJulianToGregorianDays(value))
}
}
} else {
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit = {
updater.set(value)
}
}
}

Expand Down
Loading