Skip to content

Commit 799ce56

Browse files
committed
Add nullabilization to Generate of SparkPlan.
1 parent a0fc9bc commit 799ce56

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ case class Generate(
5050
val output = alias
5151
.map(a => generator.output.map(_.withQualifiers(a :: Nil)))
5252
.getOrElse(generator.output)
53-
if (outer) {
53+
if (join && outer) {
5454
output.map {
5555
case attr if !attr.nullable =>
5656
AttributeReference(

sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.annotation.DeveloperApi
21-
import org.apache.spark.sql.catalyst.expressions.{Generator, JoinedRow, Literal, Projection}
21+
import org.apache.spark.sql.catalyst.expressions._
2222

2323
/**
2424
* :: DeveloperApi ::
@@ -39,8 +39,21 @@ case class Generate(
3939
child: SparkPlan)
4040
extends UnaryNode {
4141

42+
protected def generatorOutput: Seq[Attribute] = {
43+
if (join && outer) {
44+
generator.output.map {
45+
case attr if !attr.nullable =>
46+
AttributeReference(
47+
attr.name, attr.dataType, nullable = true)(attr.exprId, attr.qualifiers)
48+
case attr => attr
49+
}
50+
} else {
51+
generator.output
52+
}
53+
}
54+
4255
override def output =
43-
if (join) child.output ++ generator.output else generator.output
56+
if (join) child.output ++ generatorOutput else generatorOutput
4457

4558
override def execute() = {
4659
if (join) {

0 commit comments

Comments
 (0)