Skip to content

Commit 028091f

Browse files
MaxGekkcloud-fan
authored andcommitted
[SPARK-31183][SQL] Rebase date/timestamp from/to Julian calendar in Avro
The PR addresses the issue of compatibility with Spark 2.4 and earlier version in reading/writing dates and timestamp via **Avro** datasource. Previous releases are based on a hybrid calendar - Julian + Gregorian. Since Spark 3.0, Proleptic Gregorian calendar is used by default, see SPARK-26651. In particular, the issue pops up for dates/timestamps before 1582-10-15 when the hybrid calendar switches from/to Gregorian to/from Julian calendar. The same local date in different calendar is converted to different number of days since the epoch 1970-01-01. For example, the 1001-01-01 date is converted to: - -719164 in Julian calendar. Spark 2.4 saves the number as a value of DATE type into **Avro** files. - -719162 in Proleptic Gregorian calendar. Spark 3.0 saves the number as a date value. The PR proposes rebasing from/to Proleptic Gregorian calendar to the hybrid one under the SQL config: ``` spark.sql.legacy.avro.rebaseDateTime.enabled ``` which is set to `false` by default which means the rebasing is not performed by default. The details of the implementation: 1. Re-use 2 methods of `DateTimeUtils` added by the PR #27915 for rebasing microseconds. 2. Re-use 2 methods of `DateTimeUtils` added by the PR #27915 for rebasing days. 3. Use `rebaseGregorianToJulianMicros()` and `rebaseGregorianToJulianDays()` while saving timestamps/dates to **Avro** files if the SQL config is on. 4. Use `rebaseJulianToGregorianMicros()` and `rebaseJulianToGregorianDays()` while loading timestamps/dates from **Avro** files if the SQL config is on. 5. The SQL config `spark.sql.legacy.avro.rebaseDateTime.enabled` controls conversions from/to dates, and timestamps of the `timestamp-millis`, `timestamp-micros` logical types. For the backward compatibility with Spark 2.4 and earlier versions. The changes allow users to read dates/timestamps saved by previous version, and get the same result. Also after the changes, users can enable the rebasing in write, and save dates/timestamps that can be loaded correctly by Spark 2.4 and earlier versions. Yes, the timestamp `1001-01-01 01:02:03.123456` saved by Spark 2.4.5 as `timestamp-micros` is interpreted by Spark 3.0.0-preview2 differently: ```scala scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") scala> spark.read.format("avro").load("/Users/maxim/tmp/before_1582/2_4_5_date_avro").show(false) +----------+ |date | +----------+ |1001-01-07| +----------+ ``` After the changes: ```scala scala> spark.conf.set("spark.sql.legacy.avro.rebaseDateTime.enabled", true) scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") scala> spark.read.format("avro").load("/Users/maxim/tmp/before_1582/2_4_5_date_avro").show(false) +----------+ |date | +----------+ |1001-01-01| +----------+ ``` 1. Added tests to `AvroLogicalTypeSuite` to check rebasing in read. The test reads back avro files saved by Spark 2.4.5 via: ```shell $ export TZ="America/Los_Angeles" ``` ```scala scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") scala> val df = Seq("1001-01-01").toDF("dateS").select($"dateS".cast("date").as("date")) df: org.apache.spark.sql.DataFrame = [date: date] scala> df.write.format("avro").save("/Users/maxim/tmp/before_1582/2_4_5_date_avro") scala> val df2 = Seq("1001-01-01 01:02:03.123456").toDF("tsS").select($"tsS".cast("timestamp").as("ts")) df2: org.apache.spark.sql.DataFrame = [ts: timestamp] scala> df2.write.format("avro").save("/Users/maxim/tmp/before_1582/2_4_5_ts_avro") scala> :paste // Entering paste mode (ctrl-D to finish) val timestampSchema = s""" | { | "namespace": "logical", | "type": "record", | "name": "test", | "fields": [ | {"name": "ts", "type": ["null", {"type": "long","logicalType": "timestamp-millis"}], "default": null} | ] | } |""".stripMargin // Exiting paste mode, now interpreting. scala> df3.write.format("avro").option("avroSchema", timestampSchema).save("/Users/maxim/tmp/before_1582/2_4_5_ts_millis_avro") ``` 2. Added the following tests to `AvroLogicalTypeSuite` to check rebasing of dates/timestamps (in microsecond and millisecond precision). The tests write rebased a date/timestamps and read them back w/ enabled/disabled rebasing, and compare results. : - `rebasing microseconds timestamps in write` - `rebasing milliseconds timestamps in write` - `rebasing dates in write` Closes #27953 from MaxGekk/rebase-avro-datetime. Authored-by: Maxim Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 4766a36) Signed-off-by: Wenchen Fan <[email protected]>
1 parent c42f9f6 commit 028091f

File tree

7 files changed

+158
-15
lines changed

7 files changed

+158
-15
lines changed

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ import org.apache.avro.util.Utf8
3232

3333
import org.apache.spark.sql.catalyst.InternalRow
3434
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
35-
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData}
35+
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
3636
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
37+
import org.apache.spark.sql.internal.SQLConf
3738
import org.apache.spark.sql.types._
3839
import org.apache.spark.unsafe.types.UTF8String
3940
/**
@@ -42,6 +43,9 @@ import org.apache.spark.unsafe.types.UTF8String
4243
class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
4344
private lazy val decimalConversions = new DecimalConversion()
4445

46+
// Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
47+
private val rebaseDateTime = SQLConf.get.avroRebaseDateTimeEnabled
48+
4549
private val converter: Any => Any = rootCatalystType match {
4650
// A shortcut for empty schema.
4751
case st: StructType if st.isEmpty =>
@@ -88,21 +92,35 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
8892
case (INT, IntegerType) => (updater, ordinal, value) =>
8993
updater.setInt(ordinal, value.asInstanceOf[Int])
9094

95+
case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) =>
96+
val days = value.asInstanceOf[Int]
97+
val rebasedDays = DateTimeUtils.rebaseJulianToGregorianDays(days)
98+
updater.setInt(ordinal, rebasedDays)
99+
91100
case (INT, DateType) => (updater, ordinal, value) =>
92101
updater.setInt(ordinal, value.asInstanceOf[Int])
93102

94103
case (LONG, LongType) => (updater, ordinal, value) =>
95104
updater.setLong(ordinal, value.asInstanceOf[Long])
96105

97106
case (LONG, TimestampType) => avroType.getLogicalType match {
98-
case _: TimestampMillis => (updater, ordinal, value) =>
99-
updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
107+
// For backward compatibility, if the Avro type is Long and it is not logical type
108+
// (the `null` case), the value is processed as timestamp type with millisecond precision.
109+
case null | _: TimestampMillis => (updater, ordinal, value) =>
110+
val millis = value.asInstanceOf[Long]
111+
val micros = DateTimeUtils.fromMillis(millis)
112+
if (rebaseDateTime) {
113+
updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros))
114+
} else {
115+
updater.setLong(ordinal, micros)
116+
}
100117
case _: TimestampMicros => (updater, ordinal, value) =>
101-
updater.setLong(ordinal, value.asInstanceOf[Long])
102-
case null => (updater, ordinal, value) =>
103-
// For backward compatibility, if the Avro type is Long and it is not logical type,
104-
// the value is processed as timestamp type with millisecond precision.
105-
updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
118+
val micros = value.asInstanceOf[Long]
119+
if (rebaseDateTime) {
120+
updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros))
121+
} else {
122+
updater.setLong(ordinal, micros)
123+
}
106124
case other => throw new IncompatibleSchemaException(
107125
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
108126
}

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import org.apache.avro.util.Utf8
3434
import org.apache.spark.internal.Logging
3535
import org.apache.spark.sql.catalyst.InternalRow
3636
import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow}
37+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
38+
import org.apache.spark.sql.internal.SQLConf
3739
import org.apache.spark.sql.types._
3840

3941
/**
@@ -42,6 +44,9 @@ import org.apache.spark.sql.types._
4244
class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean)
4345
extends Logging {
4446

47+
// Whether to rebase datetimes from Gregorian to Julian calendar in write
48+
private val rebaseDateTime: Boolean = SQLConf.get.avroRebaseDateTimeEnabled
49+
4550
def serialize(catalystData: Any): Any = {
4651
converter.apply(catalystData)
4752
}
@@ -135,15 +140,26 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
135140
case (BinaryType, BYTES) =>
136141
(getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))
137142

143+
case (DateType, INT) if rebaseDateTime =>
144+
(getter, ordinal) => DateTimeUtils.rebaseGregorianToJulianDays(getter.getInt(ordinal))
145+
138146
case (DateType, INT) =>
139147
(getter, ordinal) => getter.getInt(ordinal)
140148

141149
case (TimestampType, LONG) => avroType.getLogicalType match {
142-
case _: TimestampMillis => (getter, ordinal) => getter.getLong(ordinal) / 1000
143-
case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal)
144-
// For backward compatibility, if the Avro type is Long and it is not logical type,
145-
// output the timestamp value as with millisecond precision.
146-
case null => (getter, ordinal) => getter.getLong(ordinal) / 1000
150+
// For backward compatibility, if the Avro type is Long and it is not logical type
151+
// (the `null` case), output the timestamp value as with millisecond precision.
152+
case null | _: TimestampMillis => (getter, ordinal) =>
153+
val micros = getter.getLong(ordinal)
154+
val rebasedMicros = if (rebaseDateTime) {
155+
DateTimeUtils.rebaseGregorianToJulianMicros(micros)
156+
} else micros
157+
DateTimeUtils.toMillis(rebasedMicros)
158+
case _: TimestampMicros => (getter, ordinal) =>
159+
val micros = getter.getLong(ordinal)
160+
if (rebaseDateTime) {
161+
DateTimeUtils.rebaseGregorianToJulianMicros(micros)
162+
} else micros
147163
case other => throw new IncompatibleSchemaException(
148164
s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}")
149165
}
202 Bytes
Binary file not shown.
218 Bytes
Binary file not shown.
244 Bytes
Binary file not shown.

external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@
1717
package org.apache.spark.sql.avro
1818

1919
import java.io.File
20-
import java.sql.Timestamp
20+
import java.sql.{Date, Timestamp}
2121

2222
import org.apache.avro.{LogicalTypes, Schema}
2323
import org.apache.avro.Conversions.DecimalConversion
2424
import org.apache.avro.file.DataFileWriter
2525
import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
2626

2727
import org.apache.spark.{SparkConf, SparkException}
28-
import org.apache.spark.sql.{QueryTest, Row}
28+
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
2929
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3030
import org.apache.spark.sql.internal.SQLConf
3131
import org.apache.spark.sql.test.SharedSparkSession
@@ -348,6 +348,100 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
348348
assert(msg.contains("Unscaled value too large for precision"))
349349
}
350350
}
351+
352+
private def readResourceAvroFile(name: String): DataFrame = {
353+
val url = Thread.currentThread().getContextClassLoader.getResource(name)
354+
spark.read.format("avro").load(url.toString)
355+
}
356+
357+
test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
358+
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
359+
checkAnswer(
360+
readResourceAvroFile("before_1582_date_v2_4.avro"),
361+
Row(java.sql.Date.valueOf("1001-01-01")))
362+
checkAnswer(
363+
readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
364+
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
365+
checkAnswer(
366+
readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
367+
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
368+
}
369+
}
370+
371+
test("SPARK-31183: rebasing microseconds timestamps in write") {
372+
val tsStr = "1001-01-01 01:02:03.123456"
373+
val nonRebased = "1001-01-07 01:09:05.123456"
374+
withTempPath { dir =>
375+
val path = dir.getAbsolutePath
376+
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
377+
Seq(tsStr).toDF("tsS")
378+
.select($"tsS".cast("timestamp").as("ts"))
379+
.write.format("avro")
380+
.save(path)
381+
382+
checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
383+
}
384+
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
385+
checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased)))
386+
}
387+
}
388+
}
389+
390+
test("SPARK-31183: rebasing milliseconds timestamps in write") {
391+
val tsStr = "1001-01-01 01:02:03.123456"
392+
val rebased = "1001-01-01 01:02:03.123"
393+
val nonRebased = "1001-01-07 01:09:05.123"
394+
Seq(
395+
"""{"type": "long","logicalType": "timestamp-millis"}""",
396+
""""long"""").foreach { tsType =>
397+
val timestampSchema = s"""
398+
|{
399+
| "namespace": "logical",
400+
| "type": "record",
401+
| "name": "test",
402+
| "fields": [
403+
| {"name": "ts", "type": $tsType}
404+
| ]
405+
|}""".stripMargin
406+
withTempPath { dir =>
407+
val path = dir.getAbsolutePath
408+
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
409+
Seq(tsStr).toDF("tsS")
410+
.select($"tsS".cast("timestamp").as("ts"))
411+
.write
412+
.option("avroSchema", timestampSchema)
413+
.format("avro")
414+
.save(path)
415+
416+
checkAnswer(
417+
spark.read.schema("ts timestamp").format("avro").load(path),
418+
Row(Timestamp.valueOf(rebased)))
419+
}
420+
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
421+
checkAnswer(
422+
spark.read.schema("ts timestamp").format("avro").load(path),
423+
Row(Timestamp.valueOf(nonRebased)))
424+
}
425+
}
426+
}
427+
}
428+
429+
test("SPARK-31183: rebasing dates in write") {
430+
withTempPath { dir =>
431+
val path = dir.getAbsolutePath
432+
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
433+
Seq("1001-01-01").toDF("dateS")
434+
.select($"dateS".cast("date").as("date"))
435+
.write.format("avro")
436+
.save(path)
437+
438+
checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
439+
}
440+
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
441+
checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07")))
442+
}
443+
}
444+
}
351445
}
352446

353447
class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite {

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2245,6 +2245,19 @@ object SQLConf {
22452245
.booleanConf
22462246
.createWithDefault(false)
22472247

2248+
val LEGACY_AVRO_REBASE_DATETIME =
2249+
buildConf("spark.sql.legacy.avro.rebaseDateTime.enabled")
2250+
.internal()
2251+
.doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " +
2252+
"to the hybrid calendar (Julian + Gregorian) in write and " +
2253+
"from the hybrid calendar to Proleptic Gregorian calendar in read. " +
2254+
"The rebasing is performed by converting micros/millis/days to " +
2255+
"a local date/timestamp in the source calendar, interpreting the resulted date/" +
2256+
"timestamp in the target calendar, and getting the number of micros/millis/days " +
2257+
"since the epoch 1970-01-01 00:00:00Z.")
2258+
.booleanConf
2259+
.createWithDefault(false)
2260+
22482261
/**
22492262
* Holds information about keys that have been deprecated.
22502263
*
@@ -2822,6 +2835,8 @@ class SQLConf extends Serializable with Logging {
28222835

28232836
def parquetRebaseDateTimeEnabled: Boolean = getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME)
28242837

2838+
def avroRebaseDateTimeEnabled: Boolean = getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME)
2839+
28252840
/** ********************** SQLConf functionality methods ************ */
28262841

28272842
/** Set Spark SQL configuration properties. */

0 commit comments

Comments
 (0)