diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 81c087e314be1..41382133bd84c 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -48,5 +48,5 @@ private[spark] case class ExecutorDeadException(message: String) * Exception thrown when Spark returns different result after upgrading to a new version. */ private[spark] class SparkUpgradeException(version: String, message: String, cause: Throwable) - extends SparkException("You may get a different result due to the upgrading of Spark" + + extends RuntimeException("You may get a different result due to the upgrading of Spark" + s" $version: $message", cause) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index f32fe46bb6e1f..1d18594fd349c 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -34,22 +34,33 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData} import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY -import org.apache.spark.sql.catalyst.util.RebaseDateTime._ +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** * A deserializer to deserialize data in avro format to data in catalyst format. */ -class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseDateTime: Boolean) { +class AvroDeserializer( + rootAvroType: Schema, + rootCatalystType: DataType, + datetimeRebaseMode: LegacyBehaviorPolicy.Value) { def this(rootAvroType: Schema, rootCatalystType: DataType) { this(rootAvroType, rootCatalystType, - SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)) + LegacyBehaviorPolicy.withName( + SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))) } private lazy val decimalConversions = new DecimalConversion() + private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead( + datetimeRebaseMode, "Avro") + + private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead( + datetimeRebaseMode, "Avro") + private val converter: Any => Any = rootCatalystType match { // A shortcut for empty schema. case st: StructType if st.isEmpty => @@ -96,13 +107,8 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseD case (INT, IntegerType) => (updater, ordinal, value) => updater.setInt(ordinal, value.asInstanceOf[Int]) - case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) => - val days = value.asInstanceOf[Int] - val rebasedDays = rebaseJulianToGregorianDays(days) - updater.setInt(ordinal, rebasedDays) - case (INT, DateType) => (updater, ordinal, value) => - updater.setInt(ordinal, value.asInstanceOf[Int]) + updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int])) case (LONG, LongType) => (updater, ordinal, value) => updater.setLong(ordinal, value.asInstanceOf[Long]) @@ -110,22 +116,13 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseD case (LONG, TimestampType) => avroType.getLogicalType match { // For backward compatibility, if the Avro type is Long and it is not logical type // (the `null` case), the value is processed as timestamp type with millisecond precision. - case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal, value) => - val millis = value.asInstanceOf[Long] - val micros = DateTimeUtils.millisToMicros(millis) - val rebasedMicros = rebaseJulianToGregorianMicros(micros) - updater.setLong(ordinal, rebasedMicros) case null | _: TimestampMillis => (updater, ordinal, value) => val millis = value.asInstanceOf[Long] val micros = DateTimeUtils.millisToMicros(millis) - updater.setLong(ordinal, micros) - case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value) => - val micros = value.asInstanceOf[Long] - val rebasedMicros = rebaseJulianToGregorianMicros(micros) - updater.setLong(ordinal, rebasedMicros) + updater.setLong(ordinal, timestampRebaseFunc(micros)) case _: TimestampMicros => (updater, ordinal, value) => val micros = value.asInstanceOf[Long] - updater.setLong(ordinal, micros) + updater.setLong(ordinal, timestampRebaseFunc(micros)) case other => throw new IncompatibleSchemaException( s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.") } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index e69c95b797c73..59d54bc433f8b 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -124,12 +124,12 @@ private[sql] class AvroFileFormat extends FileFormat reader.sync(file.start) val stop = file.start + file.length - val rebaseDateTime = DataSourceUtils.needRebaseDateTime( - reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse { - SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ) - } + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + reader.asInstanceOf[DataFileReader[_]].getMetaString, + SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)) + val deserializer = new AvroDeserializer( - userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, rebaseDateTime) + userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, datetimeRebaseMode) new Iterator[InternalRow] { private[this] var completed = false diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala index 82a568049990e..ac9608c867937 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.OutputWriter import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types._ // NOTE: This class is instantiated and used on executor side only, no need to be serializable. @@ -43,12 +44,12 @@ private[avro] class AvroOutputWriter( avroSchema: Schema) extends OutputWriter { // Whether to rebase datetimes from Gregorian to Julian calendar in write - private val rebaseDateTime: Boolean = - SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE) + private val datetimeRebaseMode = LegacyBehaviorPolicy.withName( + SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE)) // The input rows will never be null. private lazy val serializer = - new AvroSerializer(schema, avroSchema, nullable = false, rebaseDateTime) + new AvroSerializer(schema, avroSchema, nullable = false, datetimeRebaseMode) /** * Overrides the couple of methods responsible for generating the output streams / files so @@ -56,7 +57,11 @@ private[avro] class AvroOutputWriter( */ private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = { val fileMeta = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT) ++ { - if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None + if (datetimeRebaseMode == LegacyBehaviorPolicy.LEGACY) { + Some(SPARK_LEGACY_DATETIME -> "") + } else { + None + } } new SparkAvroKeyOutputFormat(fileMeta.asJava) { diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index c87249e29fbd6..21c5dec6239bd 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -35,8 +35,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.catalyst.util.RebaseDateTime._ +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types._ /** @@ -46,17 +47,24 @@ class AvroSerializer( rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean, - rebaseDateTime: Boolean) extends Logging { + datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends Logging { def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) { this(rootCatalystType, rootAvroType, nullable, - SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)) + LegacyBehaviorPolicy.withName(SQLConf.get.getConf( + SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE))) } def serialize(catalystData: Any): Any = { converter.apply(catalystData) } + private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInWrite( + datetimeRebaseMode, "Avro") + + private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInWrite( + datetimeRebaseMode, "Avro") + private val converter: Any => Any = { val actualAvroType = resolveNullableType(rootAvroType, nullable) val baseConverter = rootCatalystType match { @@ -146,24 +154,16 @@ class AvroSerializer( case (BinaryType, BYTES) => (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal)) - case (DateType, INT) if rebaseDateTime => - (getter, ordinal) => rebaseGregorianToJulianDays(getter.getInt(ordinal)) - case (DateType, INT) => - (getter, ordinal) => getter.getInt(ordinal) + (getter, ordinal) => dateRebaseFunc(getter.getInt(ordinal)) case (TimestampType, LONG) => avroType.getLogicalType match { // For backward compatibility, if the Avro type is Long and it is not logical type // (the `null` case), output the timestamp value as with millisecond precision. - case null | _: TimestampMillis if rebaseDateTime => (getter, ordinal) => - val micros = getter.getLong(ordinal) - val rebasedMicros = rebaseGregorianToJulianMicros(micros) - DateTimeUtils.microsToMillis(rebasedMicros) case null | _: TimestampMillis => (getter, ordinal) => - DateTimeUtils.microsToMillis(getter.getLong(ordinal)) - case _: TimestampMicros if rebaseDateTime => (getter, ordinal) => - rebaseGregorianToJulianMicros(getter.getLong(ordinal)) - case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal) + DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal))) + case _: TimestampMicros => (getter, ordinal) => + timestampRebaseFunc(getter.getLong(ordinal)) case other => throw new IncompatibleSchemaException( s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}") } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala index 712aec6acbd56..15918f46a83bb 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala @@ -88,12 +88,11 @@ case class AvroPartitionReaderFactory( reader.sync(partitionedFile.start) val stop = partitionedFile.start + partitionedFile.length - val rebaseDateTime = DataSourceUtils.needRebaseDateTime( - reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse { - SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ) - } + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + reader.asInstanceOf[DataFileReader[_]].getMetaString, + SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)) val deserializer = new AvroDeserializer( - userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, rebaseDateTime) + userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, datetimeRebaseMode) val fileReader = new PartitionReader[InternalRow] { private[this] var completed = false diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index 64d790bc4acd4..c8a1f670bda9e 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -288,7 +288,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite """.stripMargin val avroSchema = new Schema.Parser().parse(jsonFormatSchema) val dataType = SchemaConverters.toSqlType(avroSchema).dataType - val deserializer = new AvroDeserializer(avroSchema, dataType, rebaseDateTime = false) + val deserializer = new AvroDeserializer(avroSchema, dataType) def checkDeserialization(data: GenericData.Record, expected: Any): Unit = { assert(checkResult( diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 3e754f02911dc..a5c1fb15add5c 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -33,7 +33,7 @@ import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWri import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed} import org.apache.commons.io.FileUtils -import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException} +import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, SparkUpgradeException} import org.apache.spark.sql._ import org.apache.spark.sql.TestingUDT.IntervalData import org.apache.spark.sql.catalyst.expressions.AttributeReference @@ -43,6 +43,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.sql.v2.avro.AvroScan @@ -1538,13 +1539,28 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { val path3_0_rebase = paths(1).getCanonicalPath if (dt == "date") { val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date")) - df.write.format("avro").save(path3_0) - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") { + + // By default we should fail to write ancient datetime values. + var e = intercept[SparkException](df.write.format("avro").save(path3_0)) + assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException]) + // By default we should fail to read ancient datetime values. + e = intercept[SparkException](spark.read.format("avro").load(path2_4).collect()) + assert(e.getCause.isInstanceOf[SparkUpgradeException]) + + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { + df.write.format("avro").mode("overwrite").save(path3_0) + } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { df.write.format("avro").save(path3_0_rebase) } - checkAnswer( - spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase), - 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr)))) + + // For Avro files written by Spark 3.0, we know the writer info and don't need the config + // to guide the rebase behavior. + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> LEGACY.toString) { + checkAnswer( + spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase), + 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr)))) + } } else { val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts")) val avroSchema = @@ -1556,24 +1572,39 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { | {"name": "ts", "type": {"type": "long", "logicalType": "$dt"}} | ] |}""".stripMargin - df.write.format("avro").option("avroSchema", avroSchema).save(path3_0) - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") { + + // By default we should fail to write ancient datetime values. + var e = intercept[SparkException] { + df.write.format("avro").option("avroSchema", avroSchema).save(path3_0) + } + assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException]) + // By default we should fail to read ancient datetime values. + e = intercept[SparkException](spark.read.format("avro").load(path2_4).collect()) + assert(e.getCause.isInstanceOf[SparkUpgradeException]) + + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { + df.write.format("avro").option("avroSchema", avroSchema).mode("overwrite").save(path3_0) + } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { df.write.format("avro").option("avroSchema", avroSchema).save(path3_0_rebase) } - checkAnswer( - spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase), - 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr)))) + + // For Avro files written by Spark 3.0, we know the writer info and don't need the config + // to guide the rebase behavior. + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> LEGACY.toString) { + checkAnswer( + spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase), + 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr)))) + } } } } - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") { - checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01") - checkReadMixedFiles( - "before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01 01:02:03.123456") - checkReadMixedFiles( - "before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01 01:02:03.124") - } + checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01") + checkReadMixedFiles( + "before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01 01:02:03.123456") + checkReadMixedFiles( + "before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01 01:02:03.124") } test("SPARK-31183: rebasing microseconds timestamps in write") { @@ -1581,7 +1612,7 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { val nonRebased = "1001-01-07 01:09:05.123456" withTempPath { dir => val path = dir.getAbsolutePath - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") { + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { Seq(tsStr).toDF("tsS") .select($"tsS".cast("timestamp").as("ts")) .write.format("avro") @@ -1589,9 +1620,9 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { } // The file metadata indicates if it needs rebase or not, so we can always get the correct - // result regardless of the "rebaseInRead" config. - Seq(true, false).foreach { rebase => - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + // result regardless of the "rebase mode" config. + Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr))) } } @@ -1622,7 +1653,7 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { |}""".stripMargin withTempPath { dir => val path = dir.getAbsolutePath - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") { + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { Seq(tsStr).toDF("tsS") .select($"tsS".cast("timestamp").as("ts")) .write @@ -1632,9 +1663,9 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { } // The file metadata indicates if it needs rebase or not, so we can always get the correct - // result regardless of the "rebaseInRead" config. - Seq(true, false).foreach { rebase => - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + // result regardless of the "rebase mode" config. + Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { checkAnswer( spark.read.schema("ts timestamp").format("avro").load(path), Row(Timestamp.valueOf(rebased))) @@ -1655,7 +1686,7 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { test("SPARK-31183: rebasing dates in write") { withTempPath { dir => val path = dir.getAbsolutePath - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") { + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { Seq("1001-01-01").toDF("dateS") .select($"dateS".cast("date").as("date")) .write.format("avro") @@ -1663,9 +1694,9 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { } // The file metadata indicates if it needs rebase or not, so we can always get the correct - // result regardless of the "rebaseInRead" config. - Seq(true, false).foreach { rebase => - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + // result regardless of the "rebase mode" config. + Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01"))) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index a52c3450e83df..ef987931e928a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -456,6 +456,7 @@ class JacksonParser( } } } catch { + case e: SparkUpgradeException => throw e case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) => // JSON parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala index a1b87e8e02351..cc75340cd8fcd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala @@ -146,6 +146,8 @@ object RebaseDateTime { -354226, -317702, -244653, -208129, -171605, -141436, -141435, -141434, -141433, -141432, -141431, -141430, -141429, -141428, -141427) + final val lastSwitchGregorianDay: Int = gregJulianDiffSwitchDay.last + // The first days of Common Era (CE) which is mapped to the '0001-01-01' date // in Proleptic Gregorian calendar. private final val gregorianCommonEraStartDay = gregJulianDiffSwitchDay(0) @@ -295,7 +297,7 @@ object RebaseDateTime { } // The switch time point after which all diffs between Gregorian and Julian calendars // across all time zones are zero - private final val lastSwitchGregorianTs: Long = getLastSwitchTs(gregJulianRebaseMap) + final val lastSwitchGregorianTs: Long = getLastSwitchTs(gregJulianRebaseMap) private final val gregorianStartTs = LocalDateTime.of(gregorianStartDate, LocalTime.MIDNIGHT) private final val julianEndTs = LocalDateTime.of( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c4922b56f0756..aeaf884c7d15b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2528,57 +2528,63 @@ object SQLConf { .booleanConf .createWithDefault(false) - val LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE = - buildConf("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled") - .internal() - .doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " + - "to the hybrid calendar (Julian + Gregorian) in write. " + - "The rebasing is performed by converting micros/millis/days to " + - "a local date/timestamp in the source calendar, interpreting the resulted date/" + - "timestamp in the target calendar, and getting the number of micros/millis/days " + - "since the epoch 1970-01-01 00:00:00Z.") + val LEGACY_PARQUET_REBASE_MODE_IN_WRITE = + buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInWrite") + .internal() + .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic Gregorian calendar " + + "to the legacy hybrid (Julian + Gregorian) calendar when writing Parquet files. " + + "When CORRECTED, Spark will not do rebase and write the dates/timestamps as it is. " + + "When EXCEPTION, which is the default, Spark will fail the writing if it sees " + + "ancient dates/timestamps that are ambiguous between the two calendars.") .version("3.0.0") - .booleanConf - .createWithDefault(false) - - val LEGACY_PARQUET_REBASE_DATETIME_IN_READ = - buildConf("spark.sql.legacy.parquet.rebaseDateTimeInRead.enabled") - .internal() - .doc("When true, rebase dates/timestamps " + - "from the hybrid calendar to Proleptic Gregorian calendar in read. " + - "The rebasing is performed by converting micros/millis/days to " + - "a local date/timestamp in the source calendar, interpreting the resulted date/" + - "timestamp in the target calendar, and getting the number of micros/millis/days " + - "since the epoch 1970-01-01 00:00:00Z.") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) + .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + + val LEGACY_PARQUET_REBASE_MODE_IN_READ = + buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInRead") + .internal() + .doc("When LEGACY, Spark will rebase dates/timestamps from the legacy hybrid (Julian + " + + "Gregorian) calendar to Proleptic Gregorian calendar when reading Parquet files. " + + "When CORRECTED, Spark will not do rebase and read the dates/timestamps as it is. " + + "When EXCEPTION, which is the default, Spark will fail the reading if it sees " + + "ancient dates/timestamps that are ambiguous between the two calendars. This config is " + + "only effective if the writer info (like Spark, Hive) of the Parquet files is unknown.") .version("3.0.0") - .booleanConf - .createWithDefault(false) + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) + .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) - val LEGACY_AVRO_REBASE_DATETIME_IN_WRITE = - buildConf("spark.sql.legacy.avro.rebaseDateTimeInWrite.enabled") + val LEGACY_AVRO_REBASE_MODE_IN_WRITE = + buildConf("spark.sql.legacy.avro.datetimeRebaseModeInWrite") .internal() - .doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " + - "to the hybrid calendar (Julian + Gregorian) in write. " + - "The rebasing is performed by converting micros/millis/days to " + - "a local date/timestamp in the source calendar, interpreting the resulted date/" + - "timestamp in the target calendar, and getting the number of micros/millis/days " + - "since the epoch 1970-01-01 00:00:00Z.") + .doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic Gregorian calendar " + + "to the legacy hybrid (Julian + Gregorian) calendar when writing Avro files. " + + "When CORRECTED, Spark will not do rebase and write the dates/timestamps as it is. " + + "When EXCEPTION, which is the default, Spark will fail the writing if it sees " + + "ancient dates/timestamps that are ambiguous between the two calendars.") .version("3.0.0") - .booleanConf - .createWithDefault(false) - - val LEGACY_AVRO_REBASE_DATETIME_IN_READ = - buildConf("spark.sql.legacy.avro.rebaseDateTimeInRead.enabled") - .internal() - .doc("When true, rebase dates/timestamps " + - "from the hybrid calendar to Proleptic Gregorian calendar in read. " + - "The rebasing is performed by converting micros/millis/days to " + - "a local date/timestamp in the source calendar, interpreting the resulted date/" + - "timestamp in the target calendar, and getting the number of micros/millis/days " + - "since the epoch 1970-01-01 00:00:00Z.") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) + .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + + val LEGACY_AVRO_REBASE_MODE_IN_READ = + buildConf("spark.sql.legacy.avro.datetimeRebaseModeInRead") + .internal() + .doc("When LEGACY, Spark will rebase dates/timestamps from the legacy hybrid (Julian + " + + "Gregorian) calendar to Proleptic Gregorian calendar when reading Avro files. " + + "When CORRECTED, Spark will not do rebase and read the dates/timestamps as it is. " + + "When EXCEPTION, which is the default, Spark will fail the reading if it sees " + + "ancient dates/timestamps that are ambiguous between the two calendars. This config is " + + "only effective if the writer info (like Spark, Hive) of the Avro files is unknown.") .version("3.0.0") - .booleanConf - .createWithDefault(false) + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) + .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) /** * Holds information about keys that have been deprecated. @@ -3162,10 +3168,6 @@ class SQLConf extends Serializable with Logging { def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID) - def parquetRebaseDateTimeInReadEnabled: Boolean = { - getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ) - } - /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 0fa1bdfc160c2..3e409ab9a50a1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.catalyst.util.RebaseDateTime; +import org.apache.spark.sql.execution.datasources.DataSourceUtils; import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.spark.sql.types.DataTypes; @@ -102,14 +103,14 @@ public class VectorizedColumnReader { // The timezone conversion to apply to int96 timestamps. Null if no conversion. private final ZoneId convertTz; private static final ZoneId UTC = ZoneOffset.UTC; - private final boolean rebaseDateTime; + private final String datetimeRebaseMode; public VectorizedColumnReader( ColumnDescriptor descriptor, OriginalType originalType, PageReader pageReader, ZoneId convertTz, - boolean rebaseDateTime) throws IOException { + String datetimeRebaseMode) throws IOException { this.descriptor = descriptor; this.pageReader = pageReader; this.convertTz = convertTz; @@ -132,7 +133,9 @@ public VectorizedColumnReader( if (totalValueCount == 0) { throw new IOException("totalValueCount == 0"); } - this.rebaseDateTime = rebaseDateTime; + assert "LEGACY".equals(datetimeRebaseMode) || "EXCEPTION".equals(datetimeRebaseMode) || + "CORRECTED".equals(datetimeRebaseMode); + this.datetimeRebaseMode = datetimeRebaseMode; } /** @@ -156,11 +159,11 @@ private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName boolean isSupported = false; switch (typeName) { case INT32: - isSupported = originalType != OriginalType.DATE || !rebaseDateTime; + isSupported = originalType != OriginalType.DATE || "CORRECTED".equals(datetimeRebaseMode); break; case INT64: if (originalType == OriginalType.TIMESTAMP_MICROS) { - isSupported = !rebaseDateTime; + isSupported = "CORRECTED".equals(datetimeRebaseMode); } else { isSupported = originalType != OriginalType.TIMESTAMP_MILLIS; } @@ -174,6 +177,30 @@ private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName return isSupported; } + static int rebaseDays(int julianDays, final boolean failIfRebase) { + if (failIfRebase) { + if (julianDays < RebaseDateTime.lastSwitchJulianDay()) { + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); + } else { + return julianDays; + } + } else { + return RebaseDateTime.rebaseJulianToGregorianDays(julianDays); + } + } + + static long rebaseMicros(long julianMicros, final boolean failIfRebase) { + if (failIfRebase) { + if (julianMicros < RebaseDateTime.lastSwitchJulianTs()) { + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); + } else { + return julianMicros; + } + } else { + return RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros); + } + } + /** * Reads `total` values from this columnReader into column. */ @@ -283,7 +310,7 @@ private void decodeDictionaryIds( case INT32: if (column.dataType() == DataTypes.IntegerType || DecimalType.is32BitDecimalType(column.dataType()) || - (column.dataType() == DataTypes.DateType && !rebaseDateTime)) { + (column.dataType() == DataTypes.DateType && "CORRECTED".equals(datetimeRebaseMode))) { for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i))); @@ -302,11 +329,11 @@ private void decodeDictionaryIds( } } } else if (column.dataType() == DataTypes.DateType) { + final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { int julianDays = dictionary.decodeToInt(dictionaryIds.getDictId(i)); - int gregorianDays = RebaseDateTime.rebaseJulianToGregorianDays(julianDays); - column.putInt(i, gregorianDays); + column.putInt(i, rebaseDays(julianDays, failIfRebase)); } } } else { @@ -317,36 +344,37 @@ private void decodeDictionaryIds( case INT64: if (column.dataType() == DataTypes.LongType || DecimalType.is64BitDecimalType(column.dataType()) || - (originalType == OriginalType.TIMESTAMP_MICROS && !rebaseDateTime)) { + (originalType == OriginalType.TIMESTAMP_MICROS && + "CORRECTED".equals(datetimeRebaseMode))) { for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i))); } } } else if (originalType == OriginalType.TIMESTAMP_MILLIS) { - if (rebaseDateTime) { + if ("CORRECTED".equals(datetimeRebaseMode)) { for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { - long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i)); - long julianMicros = DateTimeUtils.millisToMicros(julianMillis); - long gregorianMicros = RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros); - column.putLong(i, gregorianMicros); + long gregorianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i)); + column.putLong(i, DateTimeUtils.millisToMicros(gregorianMillis)); } } } else { + final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { - long gregorianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i)); - column.putLong(i, DateTimeUtils.millisToMicros(gregorianMillis)); + long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i)); + long julianMicros = DateTimeUtils.millisToMicros(julianMillis); + column.putLong(i, rebaseMicros(julianMicros, failIfRebase)); } } } } else if (originalType == OriginalType.TIMESTAMP_MICROS) { + final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { long julianMicros = dictionary.decodeToLong(dictionaryIds.getDictId(i)); - long gregorianMicros = RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros); - column.putLong(i, gregorianMicros); + column.putLong(i, rebaseMicros(julianMicros, failIfRebase)); } } } else { @@ -466,12 +494,13 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw defColumn.readShorts( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (column.dataType() == DataTypes.DateType ) { - if (rebaseDateTime) { - defColumn.readIntegersWithRebase( + if ("CORRECTED".equals(datetimeRebaseMode)) { + defColumn.readIntegers( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else { - defColumn.readIntegers( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); + defColumn.readIntegersWithRebase( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, failIfRebase); } } else { throw constructConvertNotSupportedException(descriptor, column); @@ -485,27 +514,29 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro defColumn.readLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (originalType == OriginalType.TIMESTAMP_MICROS) { - if (rebaseDateTime) { - defColumn.readLongsWithRebase( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); - } else { + if ("CORRECTED".equals(datetimeRebaseMode)) { defColumn.readLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else { + boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); + defColumn.readLongsWithRebase( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, failIfRebase); } } else if (originalType == OriginalType.TIMESTAMP_MILLIS) { - if (rebaseDateTime) { + if ("CORRECTED".equals(datetimeRebaseMode)) { for (int i = 0; i < num; i++) { if (defColumn.readInteger() == maxDefLevel) { - long micros = DateTimeUtils.millisToMicros(dataColumn.readLong()); - column.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(micros)); + column.putLong(rowId + i, DateTimeUtils.millisToMicros(dataColumn.readLong())); } else { column.putNull(rowId + i); } } } else { + final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); for (int i = 0; i < num; i++) { if (defColumn.readInteger() == maxDefLevel) { - column.putLong(rowId + i, DateTimeUtils.millisToMicros(dataColumn.readLong())); + long julianMicros = DateTimeUtils.millisToMicros(dataColumn.readLong()); + column.putLong(rowId + i, rebaseMicros(julianMicros, failIfRebase)); } else { column.putNull(rowId + i); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index c9590b97ce9cd..b40cc154d76fe 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -89,9 +89,9 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa private final ZoneId convertTz; /** - * true if need to rebase date/timestamp from Julian to Proleptic Gregorian calendar. + * The mode of rebasing date/timestamp from Julian to Proleptic Gregorian calendar. */ - private final boolean rebaseDateTime; + private final String datetimeRebaseMode; /** * columnBatch object that is used for batch decoding. This is created on first use and triggers @@ -122,16 +122,16 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa private final MemoryMode MEMORY_MODE; public VectorizedParquetRecordReader( - ZoneId convertTz, boolean rebaseDateTime, boolean useOffHeap, int capacity) { + ZoneId convertTz, String datetimeRebaseMode, boolean useOffHeap, int capacity) { this.convertTz = convertTz; - this.rebaseDateTime = rebaseDateTime; + this.datetimeRebaseMode = datetimeRebaseMode; MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP; this.capacity = capacity; } // For test only. public VectorizedParquetRecordReader(boolean useOffHeap, int capacity) { - this(null, false, useOffHeap, capacity); + this(null, "CORRECTED", useOffHeap, capacity); } /** @@ -321,7 +321,7 @@ private void checkEndOfRowGroup() throws IOException { for (int i = 0; i < columns.size(); ++i) { if (missingColumns[i]) continue; columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(), - pages.getPageReader(columns.get(i)), convertTz, rebaseDateTime); + pages.getPageReader(columns.get(i)), convertTz, datetimeRebaseMode); } totalCountLoadedSoFar += pages.getRowCount(); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index 2ed2e11b60c03..eddbf39178e9a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -21,13 +21,14 @@ import java.nio.ByteOrder; import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.ParquetDecodingException; + import org.apache.spark.sql.catalyst.util.RebaseDateTime; +import org.apache.spark.sql.execution.datasources.DataSourceUtils; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; -import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.io.api.Binary; - /** * An implementation of the Parquet PLAIN decoder that supports the vectorized interface. */ @@ -86,7 +87,8 @@ public final void readIntegers(int total, WritableColumnVector c, int rowId) { // iterates the values twice: check if we need to rebase first, then go to the optimized branch // if rebase is not needed. @Override - public final void readIntegersWithRebase(int total, WritableColumnVector c, int rowId) { + public final void readIntegersWithRebase( + int total, WritableColumnVector c, int rowId, boolean failIfRebase) { int requiredBytes = total * 4; ByteBuffer buffer = getBuffer(requiredBytes); boolean rebase = false; @@ -94,8 +96,12 @@ public final void readIntegersWithRebase(int total, WritableColumnVector c, int rebase |= buffer.getInt(buffer.position() + i * 4) < RebaseDateTime.lastSwitchJulianDay(); } if (rebase) { - for (int i = 0; i < total; i += 1) { - c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt())); + if (failIfRebase) { + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); + } else { + for (int i = 0; i < total; i += 1) { + c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt())); + } } } else { if (buffer.hasArray()) { @@ -128,7 +134,8 @@ public final void readLongs(int total, WritableColumnVector c, int rowId) { // iterates the values twice: check if we need to rebase first, then go to the optimized branch // if rebase is not needed. @Override - public final void readLongsWithRebase(int total, WritableColumnVector c, int rowId) { + public final void readLongsWithRebase( + int total, WritableColumnVector c, int rowId, boolean failIfRebase) { int requiredBytes = total * 8; ByteBuffer buffer = getBuffer(requiredBytes); boolean rebase = false; @@ -136,8 +143,12 @@ public final void readLongsWithRebase(int total, WritableColumnVector c, int row rebase |= buffer.getLong(buffer.position() + i * 8) < RebaseDateTime.lastSwitchJulianTs(); } if (rebase) { - for (int i = 0; i < total; i += 1) { - c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong())); + if (failIfRebase) { + throw DataSourceUtils.newRebaseExceptionInRead("Parquet"); + } else { + for (int i = 0; i < total; i += 1) { + c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong())); + } } } else { if (buffer.hasArray()) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index 4d72a33fcf774..24347a4e3a0c5 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.datasources.parquet; +import java.io.IOException; +import java.nio.ByteBuffer; + import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; @@ -26,12 +29,8 @@ import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; -import org.apache.spark.sql.catalyst.util.RebaseDateTime; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; -import java.io.IOException; -import java.nio.ByteBuffer; - /** * A values reader for Parquet's run-length encoded data. This is based off of the version in * parquet-mr with these changes: @@ -211,7 +210,8 @@ public void readIntegersWithRebase( WritableColumnVector c, int rowId, int level, - VectorizedValuesReader data) throws IOException { + VectorizedValuesReader data, + final boolean failIfRebase) throws IOException { int left = total; while (left > 0) { if (this.currentCount == 0) this.readNextGroup(); @@ -219,7 +219,7 @@ public void readIntegersWithRebase( switch (mode) { case RLE: if (currentValue == level) { - data.readIntegersWithRebase(n, c, rowId); + data.readIntegersWithRebase(n, c, rowId, failIfRebase); } else { c.putNulls(rowId, n); } @@ -227,8 +227,8 @@ public void readIntegersWithRebase( case PACKED: for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == level) { - c.putInt(rowId + i, - RebaseDateTime.rebaseJulianToGregorianDays(data.readInteger())); + int julianDays = data.readInteger(); + c.putInt(rowId + i, VectorizedColumnReader.rebaseDays(julianDays, failIfRebase)); } else { c.putNull(rowId + i); } @@ -387,7 +387,8 @@ public void readLongsWithRebase( WritableColumnVector c, int rowId, int level, - VectorizedValuesReader data) throws IOException { + VectorizedValuesReader data, + final boolean failIfRebase) throws IOException { int left = total; while (left > 0) { if (this.currentCount == 0) this.readNextGroup(); @@ -395,7 +396,7 @@ public void readLongsWithRebase( switch (mode) { case RLE: if (currentValue == level) { - data.readLongsWithRebase(n, c, rowId); + data.readLongsWithRebase(n, c, rowId, failIfRebase); } else { c.putNulls(rowId, n); } @@ -403,8 +404,8 @@ public void readLongsWithRebase( case PACKED: for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == level) { - c.putLong(rowId + i, - RebaseDateTime.rebaseJulianToGregorianMicros(data.readLong())); + long julianMicros = data.readLong(); + c.putLong(rowId + i, VectorizedColumnReader.rebaseMicros(julianMicros, failIfRebase)); } else { c.putNull(rowId + i); } @@ -584,7 +585,8 @@ public void readIntegers(int total, WritableColumnVector c, int rowId) { } @Override - public void readIntegersWithRebase(int total, WritableColumnVector c, int rowId) { + public void readIntegersWithRebase( + int total, WritableColumnVector c, int rowId, boolean failIfRebase) { throw new UnsupportedOperationException("only readInts is valid."); } @@ -604,7 +606,8 @@ public void readLongs(int total, WritableColumnVector c, int rowId) { } @Override - public void readLongsWithRebase(int total, WritableColumnVector c, int rowId) { + public void readLongsWithRebase( + int total, WritableColumnVector c, int rowId, boolean failIfRebase) { throw new UnsupportedOperationException("only readInts is valid."); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java index 809ac44cc8272..35db8f235ed60 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java @@ -40,9 +40,9 @@ public interface VectorizedValuesReader { void readBooleans(int total, WritableColumnVector c, int rowId); void readBytes(int total, WritableColumnVector c, int rowId); void readIntegers(int total, WritableColumnVector c, int rowId); - void readIntegersWithRebase(int total, WritableColumnVector c, int rowId); + void readIntegersWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase); void readLongs(int total, WritableColumnVector c, int rowId); - void readLongsWithRebase(int total, WritableColumnVector c, int rowId); + void readLongsWithRebase(int total, WritableColumnVector c, int rowId, boolean failIfRebase); void readFloats(int total, WritableColumnVector c, int rowId); void readDoubles(int total, WritableColumnVector c, int rowId); void readBinary(int total, WritableColumnVector c, int rowId); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 45a9b1a808cf3..abb74d8d09ec6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -23,9 +23,12 @@ import org.apache.hadoop.fs.Path import org.json4s.NoTypeHints import org.json4s.jackson.Serialization +import org.apache.spark.SparkUpgradeException import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.util.RebaseDateTime import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -84,17 +87,107 @@ object DataSourceUtils { case _ => false } - def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = { + def datetimeRebaseMode( + lookupFileMeta: String => String, + modeByConfig: String): LegacyBehaviorPolicy.Value = { if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { - return Some(false) + return LegacyBehaviorPolicy.CORRECTED } - // If there is no version, we return None and let the caller side to decide. + // If there is no version, we return the mode specified by the config. Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version => // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to // rebase the datetime values. // Files written by Spark 3.0 and latter may also need the rebase if they were written with - // the "rebaseInWrite" config enabled. - version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null + // the "LEGACY" rebase mode. + if (version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null) { + LegacyBehaviorPolicy.LEGACY + } else { + LegacyBehaviorPolicy.CORRECTED + } + }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) + } + + def newRebaseExceptionInRead(format: String): SparkUpgradeException = { + val config = if (format == "Parquet") { + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key + } else if (format == "Avro") { + SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key + } else { + throw new IllegalStateException("unrecognized format " + format) } + new SparkUpgradeException("3.0", "reading dates before 1582-10-15 or timestamps before " + + s"1900-01-01T00:00:00Z from $format files can be ambiguous, as the files may be written by " + + "Spark 2.x or legacy versions of Hive, which uses a legacy hybrid calendar that is " + + "different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in " + + s"SPARK-31404. You can set $config to 'LEGACY' to rebase the datetime values w.r.t. " + + s"the calendar difference during reading. Or set $config to 'CORRECTED' to read the " + + "datetime values as it is.", null) + } + + def newRebaseExceptionInWrite(format: String): SparkUpgradeException = { + val config = if (format == "Parquet") { + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key + } else if (format == "Avro") { + SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key + } else { + throw new IllegalStateException("unrecognized format " + format) + } + new SparkUpgradeException("3.0", "writing dates before 1582-10-15 or timestamps before " + + s"1900-01-01T00:00:00Z into $format files can be dangerous, as the files may be read by " + + "Spark 2.x or legacy versions of Hive later, which uses a legacy hybrid calendar that is " + + "different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in " + + s"SPARK-31404. You can set $config to 'LEGACY' to rebase the datetime values w.r.t. " + + "the calendar difference during writing, to get maximum interoperability. Or set " + + s"$config to 'CORRECTED' to write the datetime values as it is, if you are 100% sure that " + + "the written files will only be read by Spark 3.0+ or other systems that use Proleptic " + + "Gregorian calendar.", null) + } + + def creteDateRebaseFuncInRead( + rebaseMode: LegacyBehaviorPolicy.Value, + format: String): Int => Int = rebaseMode match { + case LegacyBehaviorPolicy.EXCEPTION => days: Int => + if (days < RebaseDateTime.lastSwitchJulianDay) { + throw DataSourceUtils.newRebaseExceptionInRead(format) + } + days + case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseJulianToGregorianDays + case LegacyBehaviorPolicy.CORRECTED => identity[Int] + } + + def creteDateRebaseFuncInWrite( + rebaseMode: LegacyBehaviorPolicy.Value, + format: String): Int => Int = rebaseMode match { + case LegacyBehaviorPolicy.EXCEPTION => days: Int => + if (days < RebaseDateTime.lastSwitchGregorianDay) { + throw DataSourceUtils.newRebaseExceptionInWrite(format) + } + days + case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseGregorianToJulianDays + case LegacyBehaviorPolicy.CORRECTED => identity[Int] + } + + def creteTimestampRebaseFuncInRead( + rebaseMode: LegacyBehaviorPolicy.Value, + format: String): Long => Long = rebaseMode match { + case LegacyBehaviorPolicy.EXCEPTION => micros: Long => + if (micros < RebaseDateTime.lastSwitchJulianTs) { + throw DataSourceUtils.newRebaseExceptionInRead(format) + } + micros + case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseJulianToGregorianMicros + case LegacyBehaviorPolicy.CORRECTED => identity[Long] + } + + def creteTimestampRebaseFuncInWrite( + rebaseMode: LegacyBehaviorPolicy.Value, + format: String): Long => Long = rebaseMode match { + case LegacyBehaviorPolicy.EXCEPTION => micros: Long => + if (micros < RebaseDateTime.lastSwitchGregorianTs) { + throw DataSourceUtils.newRebaseExceptionInWrite(format) + } + micros + case LegacyBehaviorPolicy.LEGACY => RebaseDateTime.rebaseGregorianToJulianMicros + case LegacyBehaviorPolicy.CORRECTED => identity[Long] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 542c996a5342d..fc59336d6107c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -21,7 +21,7 @@ import java.io.{FileNotFoundException, IOException} import org.apache.parquet.io.ParquetDecodingException -import org.apache.spark.{Partition => RDDPartition, TaskContext} +import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.{InputFileBlockHolder, RDD} import org.apache.spark.sql.SparkSession @@ -178,7 +178,9 @@ class FileScanRDD( s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" throw new QueryExecutionException(message, e) case e: ParquetDecodingException => - if (e.getMessage.contains("Can not read value at")) { + if (e.getCause.isInstanceOf[SparkUpgradeException]) { + throw e.getCause + } else if (e.getMessage.contains("Can not read value at")) { val message = "Encounter error while reading parquet files. " + "One possible cause: Parquet column cannot be converted in the " + "corresponding files. Details: " diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index c6d9ddf370e22..71874104fcf4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -300,10 +300,9 @@ class ParquetFileFormat None } - val rebaseDateTime = DataSourceUtils.needRebaseDateTime( - footerFileMetaData.getKeyValueMetaData.get).getOrElse { - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ) - } + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = @@ -318,7 +317,7 @@ class ParquetFileFormat if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader( convertTz.orNull, - rebaseDateTime, + datetimeRebaseMode.toString, enableOffHeapColumnVector && taskContext.isDefined, capacity) val iter = new RecordReaderIterator(vectorizedReader) @@ -337,7 +336,7 @@ class ParquetFileFormat logDebug(s"Falling back to parquet-mr") // ParquetRecordReader returns InternalRow val readSupport = new ParquetReadSupport( - convertTz, enableVectorizedReader = false, rebaseDateTime) + convertTz, enableVectorizedReader = false, datetimeRebaseMode) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index 28165e0bbecde..a30d1c26b3b2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -32,6 +32,7 @@ import org.apache.parquet.schema.Type.Repetition import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types._ /** @@ -53,7 +54,7 @@ import org.apache.spark.sql.types._ class ParquetReadSupport( val convertTz: Option[ZoneId], enableVectorizedReader: Boolean, - rebaseDateTime: Boolean) + datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends ReadSupport[InternalRow] with Logging { private var catalystRequestedSchema: StructType = _ @@ -61,7 +62,7 @@ class ParquetReadSupport( // We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only // used in the vectorized reader, where we get the convertTz/rebaseDateTime value directly, // and the values here are ignored. - this(None, enableVectorizedReader = true, rebaseDateTime = false) + this(None, enableVectorizedReader = true, datetimeRebaseMode = LegacyBehaviorPolicy.CORRECTED) } /** @@ -130,7 +131,7 @@ class ParquetReadSupport( ParquetReadSupport.expandUDT(catalystRequestedSchema), new ParquetToSparkSchemaConverter(conf), convertTz, - rebaseDateTime) + datetimeRebaseMode) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index ec037130aa7e9..bb528d548b6ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -23,6 +23,7 @@ import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer} import org.apache.parquet.schema.MessageType import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types.StructType /** @@ -32,19 +33,19 @@ import org.apache.spark.sql.types.StructType * @param catalystSchema Catalyst schema of the rows to be constructed * @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters * @param convertTz the optional time zone to convert to int96 data - * @param rebaseDateTime true if need to rebase date/timestamp from Julian to Proleptic Gregorian - * calendar + * @param datetimeRebaseMode the mode of rebasing date/timestamp from Julian to Proleptic Gregorian + * calendar */ private[parquet] class ParquetRecordMaterializer( parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetToSparkSchemaConverter, convertTz: Option[ZoneId], - rebaseDateTime: Boolean) + datetimeRebaseMode: LegacyBehaviorPolicy.Value) extends RecordMaterializer[InternalRow] { private val rootConverter = new ParquetRowConverter( - schemaConverter, parquetSchema, catalystSchema, convertTz, rebaseDateTime, NoopUpdater) + schemaConverter, parquetSchema, catalystSchema, convertTz, datetimeRebaseMode, NoopUpdater) override def getCurrentRecord: InternalRow = rootConverter.currentRecord diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 8376b7b137ae4..201ee16faeb08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -35,8 +35,9 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp -import org.apache.spark.sql.catalyst.util.RebaseDateTime._ +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -121,8 +122,8 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined * types should have been expanded. * @param convertTz the optional time zone to convert to int96 data - * @param rebaseDateTime true if need to rebase date/timestamp from Julian to Proleptic Gregorian - * calendar + * @param datetimeRebaseMode the mode of rebasing date/timestamp from Julian to Proleptic Gregorian + * calendar * @param updater An updater which propagates converted field values to the parent container */ private[parquet] class ParquetRowConverter( @@ -130,7 +131,7 @@ private[parquet] class ParquetRowConverter( parquetType: GroupType, catalystType: StructType, convertTz: Option[ZoneId], - rebaseDateTime: Boolean, + datetimeRebaseMode: LegacyBehaviorPolicy.Value, updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) with Logging { @@ -181,6 +182,12 @@ private[parquet] class ParquetRowConverter( */ def currentRecord: InternalRow = currentRow + private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInRead( + datetimeRebaseMode, "Parquet") + + private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInRead( + datetimeRebaseMode, "Parquet") + // Converters for each field. private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = { // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false @@ -275,35 +282,17 @@ private[parquet] class ParquetRowConverter( new ParquetStringConverter(updater) case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS => - if (rebaseDateTime) { - new ParquetPrimitiveConverter(updater) { - override def addLong(value: Long): Unit = { - val rebased = rebaseJulianToGregorianMicros(value) - updater.setLong(rebased) - } - } - } else { - new ParquetPrimitiveConverter(updater) { - override def addLong(value: Long): Unit = { - updater.setLong(value) - } + new ParquetPrimitiveConverter(updater) { + override def addLong(value: Long): Unit = { + updater.setLong(timestampRebaseFunc(value)) } } case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS => - if (rebaseDateTime) { - new ParquetPrimitiveConverter(updater) { - override def addLong(value: Long): Unit = { - val micros = DateTimeUtils.millisToMicros(value) - val rebased = rebaseJulianToGregorianMicros(micros) - updater.setLong(rebased) - } - } - } else { - new ParquetPrimitiveConverter(updater) { - override def addLong(value: Long): Unit = { - updater.setLong(DateTimeUtils.millisToMicros(value)) - } + new ParquetPrimitiveConverter(updater) { + override def addLong(value: Long): Unit = { + val micros = DateTimeUtils.millisToMicros(value) + updater.setLong(timestampRebaseFunc(micros)) } } @@ -328,17 +317,9 @@ private[parquet] class ParquetRowConverter( } case DateType => - if (rebaseDateTime) { - new ParquetPrimitiveConverter(updater) { - override def addInt(value: Int): Unit = { - updater.set(rebaseJulianToGregorianDays(value)) - } - } - } else { - new ParquetPrimitiveConverter(updater) { - override def addInt(value: Int): Unit = { - updater.set(value) - } + new ParquetPrimitiveConverter(updater) { + override def addInt(value: Int): Unit = { + updater.set(dateRebaseFunc(value)) } } @@ -386,7 +367,12 @@ private[parquet] class ParquetRowConverter( } } new ParquetRowConverter( - schemaConverter, parquetType.asGroupType(), t, convertTz, rebaseDateTime, wrappedUpdater) + schemaConverter, + parquetType.asGroupType(), + t, + convertTz, + datetimeRebaseMode, + wrappedUpdater) case t => throw new RuntimeException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index b135611dd6416..6c333671d59cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -35,8 +35,9 @@ import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.catalyst.util.RebaseDateTime._ +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types._ /** @@ -78,9 +79,14 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { private val decimalBuffer = new Array[Byte](Decimal.minBytesForPrecision(DecimalType.MAX_PRECISION)) - // Whether to rebase datetimes from Gregorian to Julian calendar in write - private val rebaseDateTime: Boolean = - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE) + private val datetimeRebaseMode = LegacyBehaviorPolicy.withName( + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE)) + + private val dateRebaseFunc = DataSourceUtils.creteDateRebaseFuncInWrite( + datetimeRebaseMode, "Parquet") + + private val timestampRebaseFunc = DataSourceUtils.creteTimestampRebaseFuncInWrite( + datetimeRebaseMode, "Parquet") override def init(configuration: Configuration): WriteContext = { val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA) @@ -103,7 +109,13 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { val metadata = Map( SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT, ParquetReadSupport.SPARK_METADATA_KEY -> schemaString - ) ++ (if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None) + ) ++ { + if (datetimeRebaseMode == LegacyBehaviorPolicy.LEGACY) { + Some(SPARK_LEGACY_DATETIME -> "") + } else { + None + } + } logInfo( s"""Initialized Parquet WriteSupport with Catalyst schema: @@ -152,12 +164,11 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { (row: SpecializedGetters, ordinal: Int) => recordConsumer.addInteger(row.getShort(ordinal)) - case DateType if rebaseDateTime => + case DateType => (row: SpecializedGetters, ordinal: Int) => - val rebasedDays = rebaseGregorianToJulianDays(row.getInt(ordinal)) - recordConsumer.addInteger(rebasedDays) + recordConsumer.addInteger(dateRebaseFunc(row.getInt(ordinal))) - case IntegerType | DateType => + case IntegerType => (row: SpecializedGetters, ordinal: Int) => recordConsumer.addInteger(row.getInt(ordinal)) @@ -187,24 +198,15 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) - case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS if rebaseDateTime => - (row: SpecializedGetters, ordinal: Int) => - val rebasedMicros = rebaseGregorianToJulianMicros(row.getLong(ordinal)) - recordConsumer.addLong(rebasedMicros) - case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS => (row: SpecializedGetters, ordinal: Int) => - recordConsumer.addLong(row.getLong(ordinal)) - - case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS if rebaseDateTime => - (row: SpecializedGetters, ordinal: Int) => - val rebasedMicros = rebaseGregorianToJulianMicros(row.getLong(ordinal)) - val millis = DateTimeUtils.microsToMillis(rebasedMicros) - recordConsumer.addLong(millis) + val micros = row.getLong(ordinal) + recordConsumer.addLong(timestampRebaseFunc(micros)) case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS => (row: SpecializedGetters, ordinal: Int) => - val millis = DateTimeUtils.microsToMillis(row.getLong(ordinal)) + val micros = row.getLong(ordinal) + val millis = DateTimeUtils.microsToMillis(timestampRebaseFunc(micros)) recordConsumer.addLong(millis) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 1925fa1796d48..3b482b0c8ab62 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedF import org.apache.spark.sql.execution.datasources.parquet._ import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.{AtomicType, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -116,8 +117,9 @@ case class ParquetPartitionReaderFactory( private def buildReaderBase[T]( file: PartitionedFile, buildReaderFunc: ( - ParquetInputSplit, InternalRow, TaskAttemptContextImpl, Option[FilterPredicate], - Option[ZoneId], Boolean) => RecordReader[Void, T]): RecordReader[Void, T] = { + ParquetInputSplit, InternalRow, TaskAttemptContextImpl, + Option[FilterPredicate], Option[ZoneId], + LegacyBehaviorPolicy.Value) => RecordReader[Void, T]): RecordReader[Void, T] = { val conf = broadcastedConf.value.value val filePath = new Path(new URI(file.filePath)) @@ -169,12 +171,11 @@ case class ParquetPartitionReaderFactory( if (pushed.isDefined) { ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } - val rebaseDatetime = DataSourceUtils.needRebaseDateTime( - footerFileMetaData.getKeyValueMetaData.get).getOrElse { - SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ) - } + val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( + footerFileMetaData.getKeyValueMetaData.get, + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)) val reader = buildReaderFunc( - split, file.partitionValues, hadoopAttemptContext, pushed, convertTz, rebaseDatetime) + split, file.partitionValues, hadoopAttemptContext, pushed, convertTz, datetimeRebaseMode) reader.initialize(split, hadoopAttemptContext) reader } @@ -189,12 +190,12 @@ case class ParquetPartitionReaderFactory( hadoopAttemptContext: TaskAttemptContextImpl, pushed: Option[FilterPredicate], convertTz: Option[ZoneId], - needDateTimeRebase: Boolean): RecordReader[Void, InternalRow] = { + datetimeRebaseMode: LegacyBehaviorPolicy.Value): RecordReader[Void, InternalRow] = { logDebug(s"Falling back to parquet-mr") val taskContext = Option(TaskContext.get()) // ParquetRecordReader returns InternalRow val readSupport = new ParquetReadSupport( - convertTz, enableVectorizedReader = false, needDateTimeRebase) + convertTz, enableVectorizedReader = false, datetimeRebaseMode) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) @@ -220,11 +221,11 @@ case class ParquetPartitionReaderFactory( hadoopAttemptContext: TaskAttemptContextImpl, pushed: Option[FilterPredicate], convertTz: Option[ZoneId], - rebaseDatetime: Boolean): VectorizedParquetRecordReader = { + datetimeRebaseMode: LegacyBehaviorPolicy.Value): VectorizedParquetRecordReader = { val taskContext = Option(TaskContext.get()) val vectorizedReader = new VectorizedParquetRecordReader( convertTz.orNull, - rebaseDatetime, + datetimeRebaseMode.toString, enableOffHeapColumnVector && taskContext.isDefined, capacity) val iter = new RecordReaderIterator(vectorizedReader) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala index aa47d36fe8c87..d6167f98b5a51 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.util.DateTimeConstants.SECONDS_PER_DAY import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, LA} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType object DateTime extends Enumeration { @@ -161,9 +162,10 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { Seq(true, false).foreach { modernDates => Seq(false, true).foreach { rebase => benchmark.addCase(caseName(modernDates, dateTime, Some(rebase)), 1) { _ => + val mode = if (rebase) LEGACY else CORRECTED withSQLConf( SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> getOutputType(dateTime), - SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> rebase.toString) { + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> mode.toString) { genDF(rowsNum, dateTime, modernDates) .write .mode("overwrite") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index cf2c7c8b2f516..87b4db3fe087a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -41,7 +41,7 @@ import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} -import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} +import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, SparkUpgradeException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} @@ -49,6 +49,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -892,41 +893,67 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession val path3_0_rebase = paths(1).getCanonicalPath if (dt == "date") { val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date")) - df.write.parquet(path3_0) - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") { + + // By default we should fail to write ancient datetime values. + var e = intercept[SparkException](df.write.parquet(path3_0)) + assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException]) + // By default we should fail to read ancient datetime values. + e = intercept[SparkException](spark.read.parquet(path2_4).collect()) + assert(e.getCause.isInstanceOf[SparkUpgradeException]) + + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { + df.write.mode("overwrite").parquet(path3_0) + } + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { df.write.parquet(path3_0_rebase) } - checkAnswer( - spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), - 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr)))) + + // For Parquet files written by Spark 3.0, we know the writer info and don't need the + // config to guide the rebase behavior. + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) { + checkAnswer( + spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), + 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr)))) + } } else { val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts")) withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> dt) { - df.write.parquet(path3_0) - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") { + // By default we should fail to write ancient datetime values. + var e = intercept[SparkException](df.write.parquet(path3_0)) + assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException]) + // By default we should fail to read ancient datetime values. + e = intercept[SparkException](spark.read.parquet(path2_4).collect()) + assert(e.getCause.isInstanceOf[SparkUpgradeException]) + + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { + df.write.mode("overwrite").parquet(path3_0) + } + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { df.write.parquet(path3_0_rebase) } } - checkAnswer( - spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), - 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr)))) + // For Parquet files written by Spark 3.0, we know the writer info and don't need the + // config to guide the rebase behavior. + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) { + checkAnswer( + spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), + 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr)))) + } } } } Seq(false, true).foreach { vectorized => withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "true") { - checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01") - checkReadMixedFiles( - "before_1582_timestamp_micros_v2_4.snappy.parquet", - "TIMESTAMP_MICROS", - "1001-01-01 01:02:03.123456") - checkReadMixedFiles( - "before_1582_timestamp_millis_v2_4.snappy.parquet", - "TIMESTAMP_MILLIS", - "1001-01-01 01:02:03.123") - } + checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01") + checkReadMixedFiles( + "before_1582_timestamp_micros_v2_4.snappy.parquet", + "TIMESTAMP_MICROS", + "1001-01-01 01:02:03.123456") + checkReadMixedFiles( + "before_1582_timestamp_millis_v2_4.snappy.parquet", + "TIMESTAMP_MILLIS", + "1001-01-01 01:02:03.123") // INT96 is a legacy timestamp format and we always rebase the seconds for it. checkAnswer(readResourceParquetFile( @@ -948,7 +975,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) { withTempPath { dir => val path = dir.getAbsolutePath - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") { + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { Seq.tabulate(N)(_ => tsStr).toDF("tsS") .select($"tsS".cast("timestamp").as("ts")) .repartition(1) @@ -960,10 +987,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession Seq(false, true).foreach { vectorized => withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { // The file metadata indicates if it needs rebase or not, so we can always get the - // correct result regardless of the "rebaseInRead" config. - Seq(true, false).foreach { rebase => + // correct result regardless of the "rebase mode" config. + Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => withSQLConf( - SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> mode.toString) { checkAnswer( spark.read.parquet(path), Seq.tabulate(N)(_ => Row(Timestamp.valueOf(tsStr)))) @@ -991,7 +1018,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession Seq(false, true).foreach { dictionaryEncoding => withTempPath { dir => val path = dir.getAbsolutePath - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") { + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { Seq.tabulate(N)(_ => "1001-01-01").toDF("dateS") .select($"dateS".cast("date").as("date")) .repartition(1) @@ -1002,10 +1029,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession Seq(false, true).foreach { vectorized => withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { - // The file metadata indicates if it needs rebase or not, so we can always get - // the correct result regardless of the "rebaseInRead" config. - Seq(true, false).foreach { rebase => - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + // The file metadata indicates if it needs rebase or not, so we can always get the + // correct result regardless of the "rebase mode" config. + Seq(LEGACY, CORRECTED, EXCEPTION).foreach { mode => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key -> mode.toString) { checkAnswer( spark.read.parquet(path), Seq.tabulate(N)(_ => Row(Date.valueOf("1001-01-01")))) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 42b6862907d8d..cbea74103343e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ @@ -151,7 +152,10 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes Seq(false) } java8ApiConfValues.foreach { java8Api => - withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) { + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString, + SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { val dataGenerator = RandomDataGenerator.forType( dataType = dataType, nullable = true,