diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 4ddf6503d99e..26595bb11b3d 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -21,6 +21,7 @@ import java.io._ import java.net.URI import java.nio.file.{Files, Paths, StandardCopyOption} import java.sql.{Date, Timestamp} +import java.time.{LocalDate, LocalDateTime} import java.util.UUID import scala.jdk.CollectionConverters._ @@ -963,6 +964,36 @@ abstract class AvroSuite } } + test("SPARK-49082: Widening date to timestampNTZ in AvroDeserializer") { + withTempPath { tempPath => + // Since timestampNTZ only supports timestamps from + // -290308-12-21 BCE 19:59:06 to +294247-01-10 CE 04:00:54, + // dates outside of this range cannot be widened to timestampNTZ + // and will throw an ArithmeticException. + val datePath = s"$tempPath/date_data" + val dateDf = + Seq(LocalDate.of(2024, 1, 1), + LocalDate.of(2024, 1, 2), + LocalDate.of(1312, 2, 27), + LocalDate.of(0, 1, 1), + LocalDate.of(-1, 12, 31), + LocalDate.of(-290308, 12, 22), // minimum timestampNTZ date + LocalDate.of(294247, 1, 10)) // maximum timestampNTZ date + .toDF("col") + dateDf.write.format("avro").save(datePath) + checkAnswer( + spark.read.schema("col TIMESTAMP_NTZ").format("avro").load(datePath), + Seq(Row(LocalDateTime.of(2024, 1, 1, 0, 0)), + Row(LocalDateTime.of(2024, 1, 2, 0, 0)), + Row(LocalDateTime.of(1312, 2, 27, 0, 0)), + Row(LocalDateTime.of(0, 1, 1, 0, 0)), + Row(LocalDateTime.of(-1, 12, 31, 0, 0)), + Row(LocalDateTime.of(-290308, 12, 22, 0, 0)), + Row(LocalDateTime.of(294247, 1, 10, 0, 0))) + ) + } + } + test("SPARK-43380: Fix Avro data type conversion" + " of DayTimeIntervalType to avoid producing incorrect results") { withTempPath { path => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index ac20614553ca..f66b5bd988c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.avro import java.math.BigDecimal import java.nio.ByteBuffer +import java.time.ZoneOffset import scala.jdk.CollectionConverters._ @@ -159,6 +160,12 @@ private[sql] class AvroDeserializer( case (INT, DateType) => (updater, ordinal, value) => updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int])) + case (INT, TimestampNTZType) if avroType.getLogicalType.isInstanceOf[LogicalTypes.Date] => + (updater, ordinal, value) => + val days = dateRebaseFunc(value.asInstanceOf[Int]) + val micros = DateTimeUtils.daysToMicros(days, ZoneOffset.UTC) + updater.setLong(ordinal, micros) + case (LONG, dt: DatetimeType) if preventReadingIncorrectType && realDataType.isInstanceOf[DayTimeIntervalType] => throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),