File tree Expand file tree Collapse file tree 2 files changed +17
-15
lines changed
catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical
core/src/main/scala/org/apache/spark/sql/execution Expand file tree Collapse file tree 2 files changed +17
-15
lines changed Original file line number Diff line number Diff line change @@ -41,7 +41,20 @@ object CatalystSerde {
4141 }
4242
4343 def generateObjAttr [T : Encoder ]: Attribute = {
44- AttributeReference (" obj" , encoderFor[T ].deserializer.dataType, nullable = false )()
44+ val deserializer = encoderFor[T ].deserializer
45+ val dataType = deserializer.dataType
46+ val nullable = if (deserializer.childrenResolved) {
47+ deserializer.nullable
48+ } else {
49+ // Since deserializer is not resolved here, cannot access deserializer.nullable.
50+ // We infer nullability from DataType
51+ dataType match {
52+ case BooleanType | FloatType | DoubleType => false
53+ case _ : IntegralType => false
54+ case _ => true
55+ }
56+ }
57+ AttributeReference (" obj" , dataType, nullable = nullable)()
4558 }
4659}
4760
Original file line number Diff line number Diff line change 1717
1818package org .apache .spark .sql .execution
1919
20+ import scala .reflect .runtime .universe .TypeTag
21+
2022import org .apache .spark .rdd .RDD
2123import org .apache .spark .sql .{Encoder , Row , SparkSession }
2224import org .apache .spark .sql .catalyst .{CatalystConf , CatalystTypeConverters , InternalRow }
@@ -70,20 +72,7 @@ object RDDConversions {
7072object ExternalRDD {
7173
7274 def apply [T : Encoder ](rdd : RDD [T ], session : SparkSession ): LogicalPlan = {
73- val attr = {
74- val attr = CatalystSerde .generateObjAttr[T ]
75- // Since ExpressionEncoder[T].deserializer is not resolved here,
76- // cannot access ExpressionEncoder[T].deserializer.nullable.
77- // We infer nullability from DataType
78- attr.dataType match {
79- case BooleanType => attr
80- case _ : IntegralType => attr
81- case FloatType | DoubleType => attr
82- case DecimalType .Fixed (p, s) if p <= Decimal .MAX_LONG_DIGITS => attr
83- case _ => attr.withNullability(true )
84- }
85- }
86- val externalRdd = ExternalRDD (attr, rdd)(session)
75+ val externalRdd = ExternalRDD (CatalystSerde .generateObjAttr[T ], rdd)(session)
8776 CatalystSerde .serialize[T ](externalRdd)
8877 }
8978}
You can’t perform that action at this time.
0 commit comments