From 3ffab4ba49236a86fe1d981fdf0d6d3cae8558e8 Mon Sep 17 00:00:00 2001 From: Alden Lau Date: Tue, 18 Mar 2025 18:16:41 -0700 Subject: [PATCH 1/3] Initial commit --- .../org/apache/spark/sql/avro/AvroSuite.scala | 23 +++++++++++++++++++ .../spark/sql/avro/AvroDeserializer.scala | 7 ++++++ 2 files changed, 30 insertions(+) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 4ddf6503d99e..52673c4c55a0 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -21,6 +21,7 @@ import java.io._ import java.net.URI import java.nio.file.{Files, Paths, StandardCopyOption} import java.sql.{Date, Timestamp} +import java.time.{LocalDate, LocalDateTime} import java.util.UUID import scala.jdk.CollectionConverters._ @@ -963,6 +964,28 @@ abstract class AvroSuite } } + test("SPARK-49082: Widening date to timestampNTZ in AvroDeserializer") { + withTempPath { tempPath => + val datePath = s"$tempPath/date_data" + val dateDf = + Seq(LocalDate.of(2024, 1, 1), + LocalDate.of(2024, 1, 2), + LocalDate.of(1312, 2, 27), + LocalDate.of(-5877641, 6, 23), + LocalDate.of(5881580, 7, 11)) + .toDF("col") + dateDf.write.format("avro").save(datePath) + checkAnswer( + spark.read.schema("col TIMESTAMP_NTZ").format("avro").load(datePath), + Seq(Row(LocalDateTime.of(2024, 1, 1, 0, 0)), + Row(LocalDateTime.of(2024, 1, 2, 0, 0)), + Row(LocalDateTime.of(1312, 2, 27, 0, 0)), + Row(LocalDateTime.of(-5877641, 6, 23, 0, 0)), + Row(LocalDateTime.of(5881580, 7, 11, 0, 0))) + ) + } + } + test("SPARK-43380: Fix Avro data type conversion" + " of DayTimeIntervalType to avoid producing incorrect results") { withTempPath { path => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index ac20614553ca..f66b5bd988c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.avro import java.math.BigDecimal import java.nio.ByteBuffer +import java.time.ZoneOffset import scala.jdk.CollectionConverters._ @@ -159,6 +160,12 @@ private[sql] class AvroDeserializer( case (INT, DateType) => (updater, ordinal, value) => updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int])) + case (INT, TimestampNTZType) if avroType.getLogicalType.isInstanceOf[LogicalTypes.Date] => + (updater, ordinal, value) => + val days = dateRebaseFunc(value.asInstanceOf[Int]) + val micros = DateTimeUtils.daysToMicros(days, ZoneOffset.UTC) + updater.setLong(ordinal, micros) + case (LONG, dt: DatetimeType) if preventReadingIncorrectType && realDataType.isInstanceOf[DayTimeIntervalType] => throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath), From 082d67ed2e0d16647fa0e1cc681585f3f82309fe Mon Sep 17 00:00:00 2001 From: Alden Lau Date: Thu, 20 Mar 2025 12:03:00 -0700 Subject: [PATCH 2/3] change test cases --- .../scala/org/apache/spark/sql/avro/AvroSuite.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 52673c4c55a0..c779e15c1c98 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -971,8 +971,10 @@ abstract class AvroSuite Seq(LocalDate.of(2024, 1, 1), LocalDate.of(2024, 1, 2), LocalDate.of(1312, 2, 27), - LocalDate.of(-5877641, 6, 23), - LocalDate.of(5881580, 7, 11)) + LocalDate.of(0, 1, 1), + LocalDate.of(-1, 12, 31), + LocalDate.of(-290308, 12, 22), // minimum timestampNTZ date + LocalDate.of(294247, 1, 10)) // maximum timestampNTZ date .toDF("col") dateDf.write.format("avro").save(datePath) checkAnswer( @@ -980,8 +982,10 @@ abstract class AvroSuite Seq(Row(LocalDateTime.of(2024, 1, 1, 0, 0)), Row(LocalDateTime.of(2024, 1, 2, 0, 0)), Row(LocalDateTime.of(1312, 2, 27, 0, 0)), - Row(LocalDateTime.of(-5877641, 6, 23, 0, 0)), - Row(LocalDateTime.of(5881580, 7, 11, 0, 0))) + Row(LocalDateTime.of(0, 1, 1, 0, 0)), + Row(LocalDateTime.of(-1, 12, 31, 0, 0)), + Row(LocalDateTime.of(-290308, 12, 22, 0, 0)), + Row(LocalDateTime.of(294247, 1, 10, 0, 0))) ) } } From de4580064f1263ef4e8971c58711f9c4046450a9 Mon Sep 17 00:00:00 2001 From: Alden Lau Date: Thu, 20 Mar 2025 12:12:33 -0700 Subject: [PATCH 3/3] add comment --- .../src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index c779e15c1c98..26595bb11b3d 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -966,6 +966,10 @@ abstract class AvroSuite test("SPARK-49082: Widening date to timestampNTZ in AvroDeserializer") { withTempPath { tempPath => + // Since timestampNTZ only supports timestamps from + // -290308-12-21 BCE 19:59:06 to +294247-01-10 CE 04:00:54, + // dates outside of this range cannot be widened to timestampNTZ + // and will throw an ArithmeticException. val datePath = s"$tempPath/date_data" val dateDf = Seq(LocalDate.of(2024, 1, 1),