From 556357826cfa71abf510766e644ca0519defa79a Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Sun, 12 Nov 2023 14:34:32 -0800 Subject: [PATCH] [SPARK-45896][SQL] Construct `ValidateExternalType` with the correct expected type When creating a serializer for a `Map` or `Seq` with an element of type `Option`, pass an expected type of `Option` to `ValidateExternalType` rather than the `Option`'s type argument. In 3.4.1, 3.5.0, and master, the following code gets an error: ``` scala> val df = Seq(Seq(Some(Seq(0)))).toDF("a") val df = Seq(Seq(Some(Seq(0)))).toDF("a") org.apache.spark.SparkRuntimeException: [EXPRESSION_ENCODING_FAILED] Failed to encode a value of the expressions: mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -2), assertnotnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -2), IntegerType, IntegerType)), unwrapoption(ObjectType(interface scala.collection.immutable.Seq), validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), ArrayType(IntegerType,false), ObjectType(class scala.Option))), None), input[0, scala.collection.immutable.Seq, true], None) AS value#0 to a row. SQLSTATE: 42846 ... Caused by: java.lang.RuntimeException: scala.Some is not a valid external type for schema of array at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_0$(Unknown Source) ... ``` However, this code works in 3.3.3. Similarly, this code gets an error: ``` scala> val df = Seq(Seq(Some(java.sql.Timestamp.valueOf("2023-01-01 00:00:00")))).toDF("a") val df = Seq(Seq(Some(java.sql.Timestamp.valueOf("2023-01-01 00:00:00")))).toDF("a") org.apache.spark.SparkRuntimeException: [EXPRESSION_ENCODING_FAILED] Failed to encode a value of the expressions: mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, unwrapoption(ObjectType(class java.sql.Timestamp), validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), TimestampType, ObjectType(class scala.Option))), true, false, true), input[0, scala.collection.immutable.Seq, true], None) AS value#0 to a row. SQLSTATE: 42846 ... Caused by: java.lang.RuntimeException: scala.Some is not a valid external type for schema of timestamp ... ``` As with the first example, this code works in 3.3.3. `SerializerBuildHelper#validateAndSerializeElement` will construct `ValidateExternalType` with an expected type of the `Option`'s type parameter. Therefore, for element types `Option[Seq/Date/Timestamp/BigDecimal]`, `ValidateExternalType` will try to validate that the element is of the contained type (e.g., `BigDecimal`) rather than of type `Option`. Since the element type is of type `Option`, the validation fails. Validation currently works by accident for element types `Option[Map/ Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/catalyst/ScalaReflection.scala | 7 ++++++- .../catalyst/encoders/ExpressionEncoderSuite.scala | 12 ++++++++++++ .../scala/org/apache/spark/sql/DatasetSuite.scala | 9 +++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index b18613bdad3a1..4e4ca4ee06323 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -564,10 +564,15 @@ object ScalaReflection extends ScalaReflection { private def validateAndSerializeElement( enc: AgnosticEncoder[_], nullable: Boolean): Expression => Expression = { input => + val expected = enc match { + case OptionEncoder(_) => lenientExternalDataTypeFor(enc) + case _ => enc.dataType + } + expressionWithNullSafety( serializerFor( enc, - ValidateExternalType(input, enc.dataType, lenientExternalDataTypeFor(enc))), + ValidateExternalType(input, expected, lenientExternalDataTypeFor(enc))), nullable, WalkedTypePath()) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala index 79417c4ca1fe8..9e19c3755b24b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala @@ -476,6 +476,18 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes encodeDecodeTest(Option.empty[Int], "empty option of int") encodeDecodeTest(Option("abc"), "option of string") encodeDecodeTest(Option.empty[String], "empty option of string") + encodeDecodeTest(Seq(Some(Seq(0))), "SPARK-45896: seq of option of seq") + encodeDecodeTest(Map(0 -> Some(Seq(0))), "SPARK-45896: map of option of seq") + encodeDecodeTest(Seq(Some(Timestamp.valueOf("2023-01-01 00:00:00"))), + "SPARK-45896: seq of option of timestamp") + encodeDecodeTest(Map(0 -> Some(Timestamp.valueOf("2023-01-01 00:00:00"))), + "SPARK-45896: map of option of timestamp") + encodeDecodeTest(Seq(Some(Date.valueOf("2023-01-01"))), + "SPARK-45896: seq of option of date") + encodeDecodeTest(Map(0 -> Some(Date.valueOf("2023-01-01"))), + "SPARK-45896: map of option of date") + encodeDecodeTest(Seq(Some(BigDecimal(200))), "SPARK-45896: seq of option of bigdecimal") + encodeDecodeTest(Map(0 -> Some(BigDecimal(200))), "SPARK-45896: map of option of bigdecimal") encodeDecodeTest(ScroogeLikeExample(1), "SPARK-40385 class with only a companion object constructor") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 6a1aa25c6e217..7dec558f8df3f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -270,6 +270,13 @@ class DatasetSuite extends QueryTest (ClassData("one", 2), 1L), (ClassData("two", 3), 1L)) } + test("SPARK-45896: seq of option of seq") { + val ds = Seq(DataSeqOptSeq(Seq(Some(Seq(0))))).toDS() + checkDataset( + ds, + DataSeqOptSeq(Seq(Some(List(0))))) + } + test("select") { val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS() checkDataset( @@ -2561,6 +2568,8 @@ case class ClassNullableData(a: String, b: Integer) case class NestedStruct(f: ClassData) case class DeepNestedStruct(f: NestedStruct) +case class DataSeqOptSeq(a: Seq[Option[Seq[Int]]]) + /** * A class used to test serialization using encoders. This class throws exceptions when using * Java serialization -- so the only way it can be "serialized" is through our encoders.