From 7e47e033fbe3bd98377b8d39abdd2fbb37f0ee12 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 6 Oct 2016 11:56:01 -0700 Subject: [PATCH 1/2] Backport [SPARK-15062][SQL] fix list type infer serializer issue --- .../apache/spark/sql/catalyst/ScalaReflection.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 8722191f9d3ea..9cf400f5d1b21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -485,6 +485,13 @@ object ScalaReflection extends ScalaReflection { extractorFor(unwrapped, optType, newPath)) } + // Since List[_] also belongs to localTypeOf[Product], we put this case before + // "case t if t <:< localTypeOf[Product]" to make sure it will match to the + // case "localTypeOf[Seq[_]]" + case t if t <:< localTypeOf[Seq[_]] => + val TypeRef(_, _, Seq(elementType)) = t + toCatalystArray(inputObject, elementType) + case t if t <:< localTypeOf[Product] => val params = getConstructorParameters(t) val nonNullOutput = CreateNamedStruct(params.flatMap { case (fieldName, fieldType) => @@ -500,10 +507,6 @@ object ScalaReflection extends ScalaReflection { val TypeRef(_, _, Seq(elementType)) = t toCatalystArray(inputObject, elementType) - case t if t <:< localTypeOf[Seq[_]] => - val TypeRef(_, _, Seq(elementType)) = t - toCatalystArray(inputObject, elementType) - case t if t <:< localTypeOf[Map[_, _]] => val TypeRef(_, _, Seq(keyType, valueType)) = t From f6a78fc03aed92f95a44b61e8ae3634f859f243a Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 6 Oct 2016 11:59:02 -0700 Subject: [PATCH 2/2] add test as well --- .../spark/sql/catalyst/ScalaReflectionSuite.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala index c2aace1ef238e..fc58699535343 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala @@ -21,7 +21,9 @@ import java.math.BigInteger import java.sql.{Date, Timestamp} import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions.{BoundReference, Literal, NewInstance} import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String case class PrimitiveData( intField: Int, @@ -229,4 +231,16 @@ class ScalaReflectionSuite extends SparkFunSuite { assert(anyTypes.forall(!_.isPrimitive)) assert(anyTypes === Seq(classOf[java.lang.Object], classOf[java.lang.Object])) } + + test("SPARK-15062: Get correct serializer for List[_]") { + val list = List(1, 2, 3) + val serializer = extractorsFor[List[Int]](BoundReference( + 0, ObjectType(list.getClass), nullable = false)) + assert(serializer.children.size == 2) + assert(serializer.children.head.isInstanceOf[Literal]) + assert(serializer.children.head.asInstanceOf[Literal].value === UTF8String.fromString("value")) + assert(serializer.children.last.isInstanceOf[NewInstance]) + assert(serializer.children.last.asInstanceOf[NewInstance] + .cls.isInstanceOf[Class[org.apache.spark.sql.catalyst.util.GenericArrayData]]) + } }