diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 139c45adb442..877c3f89e88c 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -141,6 +141,12 @@ private[sql] class AvroDeserializer( case (INT, IntegerType) => (updater, ordinal, value) => updater.setInt(ordinal, value.asInstanceOf[Int]) + case (INT, LongType) => (updater, ordinal, value) => + updater.setLong(ordinal, value.asInstanceOf[Int]) + + case (INT, DoubleType) => (updater, ordinal, value) => + updater.setDouble(ordinal, value.asInstanceOf[Int]) + case (INT, dt: DatetimeType) if preventReadingIncorrectType && realDataType.isInstanceOf[YearMonthIntervalType] => throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath), @@ -194,6 +200,9 @@ private[sql] class AvroDeserializer( case (FLOAT, FloatType) => (updater, ordinal, value) => updater.setFloat(ordinal, value.asInstanceOf[Float]) + case (FLOAT, DoubleType) => (updater, ordinal, value) => + updater.setDouble(ordinal, value.asInstanceOf[Float]) + case (DOUBLE, DoubleType) => (updater, ordinal, value) => updater.setDouble(ordinal, value.asInstanceOf[Double]) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index ce38ada7c9e4..1f00392420be 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -921,6 +921,39 @@ abstract class AvroSuite } } + test("SPARK-49082: Widening type promotions in AvroDeserializer") { + withTempPath { tempPath => + // Int -> Long + val intPath = s"$tempPath/int_data" + val intDf = Seq(1, Int.MinValue, Int.MaxValue).toDF("col") + intDf.write.format("avro").save(intPath) + checkAnswer( + spark.read.schema("col Long").format("avro").load(intPath), + Seq(Row(1L), Row(-2147483648L), Row(2147483647L)) + ) + + // Int -> Double + checkAnswer( + spark.read.schema("col Double").format("avro").load(intPath), + Seq(Row(1D), Row(-2147483648D), Row(2147483647D)) + ) + + // Float -> Double + val floatPath = s"$tempPath/float_data" + val floatDf = Seq(1F, + Float.MinValue, Float.MinPositiveValue, Float.MaxValue, + Float.NaN, Float.NegativeInfinity, Float.PositiveInfinity + ).toDF("col") + floatDf.write.format("avro").save(floatPath) + checkAnswer( + spark.read.schema("col Double").format("avro").load(floatPath), + Seq(Row(1D), + Row(-3.4028234663852886E38D), Row(1.401298464324817E-45D), Row(3.4028234663852886E38D), + Row(Double.NaN), Row(Double.NegativeInfinity), Row(Double.PositiveInfinity)) + ) + } + } + test("SPARK-43380: Fix Avro data type conversion" + " of DayTimeIntervalType to avoid producing incorrect results") { withTempPath { path =>