Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io._
import java.net.URI
import java.nio.file.{Files, Paths, StandardCopyOption}
import java.sql.{Date, Timestamp}
import java.time.{LocalDate, LocalDateTime}
import java.util.UUID

import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -963,6 +964,36 @@ abstract class AvroSuite
}
}

test("SPARK-49082: Widening date to timestampNTZ in AvroDeserializer") {
withTempPath { tempPath =>
// Since timestampNTZ only supports timestamps from
// -290308-12-21 BCE 19:59:06 to +294247-01-10 CE 04:00:54,
// dates outside of this range cannot be widened to timestampNTZ
// and will throw an ArithmeticException.
val datePath = s"$tempPath/date_data"
val dateDf =
Seq(LocalDate.of(2024, 1, 1),
LocalDate.of(2024, 1, 2),
LocalDate.of(1312, 2, 27),
LocalDate.of(0, 1, 1),
LocalDate.of(-1, 12, 31),
LocalDate.of(-290308, 12, 22), // minimum timestampNTZ date
LocalDate.of(294247, 1, 10)) // maximum timestampNTZ date
.toDF("col")
dateDf.write.format("avro").save(datePath)
checkAnswer(
spark.read.schema("col TIMESTAMP_NTZ").format("avro").load(datePath),
Seq(Row(LocalDateTime.of(2024, 1, 1, 0, 0)),
Row(LocalDateTime.of(2024, 1, 2, 0, 0)),
Row(LocalDateTime.of(1312, 2, 27, 0, 0)),
Row(LocalDateTime.of(0, 1, 1, 0, 0)),
Row(LocalDateTime.of(-1, 12, 31, 0, 0)),
Row(LocalDateTime.of(-290308, 12, 22, 0, 0)),
Row(LocalDateTime.of(294247, 1, 10, 0, 0)))
)
}
}

test("SPARK-43380: Fix Avro data type conversion" +
" of DayTimeIntervalType to avoid producing incorrect results") {
withTempPath { path =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.avro

import java.math.BigDecimal
import java.nio.ByteBuffer
import java.time.ZoneOffset

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -159,6 +160,12 @@ private[sql] class AvroDeserializer(
case (INT, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))

case (INT, TimestampNTZType) if avroType.getLogicalType.isInstanceOf[LogicalTypes.Date] =>
(updater, ordinal, value) =>
val days = dateRebaseFunc(value.asInstanceOf[Int])
val micros = DateTimeUtils.daysToMicros(days, ZoneOffset.UTC)
updater.setLong(ordinal, micros)

case (LONG, dt: DatetimeType)
if preventReadingIncorrectType && realDataType.isInstanceOf[DayTimeIntervalType] =>
throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
Expand Down