From 01ac19f59906f67db6f33cd38c9a8350f0149271 Mon Sep 17 00:00:00 2001 From: Mikhail Bautin Date: Fri, 18 Sep 2015 12:49:26 -0700 Subject: [PATCH] Fix nullability computation in union output --- .../sql/catalyst/plans/logical/basicOperators.scala | 6 ++++-- .../org/apache/spark/sql/execution/basicOperators.scala | 9 +++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 33a9e55a47dee..0cf038f1e376e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -88,8 +88,10 @@ case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode { } case class Union(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { - // TODO: These aren't really the same attributes as nullability etc might change. - override def output: Seq[Attribute] = left.output + override def output: Seq[Attribute] = + left.output.zip(right.output).map { case (leftAttr, rightAttr) => + leftAttr.withNullability(leftAttr.nullable || rightAttr.nullable) + } override lazy val resolved: Boolean = childrenResolved && diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 72b8e48595f01..a1c60dd4a7f5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -97,8 +97,13 @@ case class Sample( */ @DeveloperApi case class Union(children: Seq[SparkPlan]) extends SparkPlan { - // TODO: attributes output by union should be distinct for nullability purposes - override def output: Seq[Attribute] = children.head.output + override def output: Seq[Attribute] = { + children.tail.foldLeft(children.head.output) { case (currentOutput, child) => + currentOutput.zip(child.output).map { case (a1, a2) => + a1.withNullability(a1.nullable || a2.nullable) + } + } + } protected override def doExecute(): RDD[Row] = sparkContext.union(children.map(_.execute())) }