From 7a759cca3fb302d55b5758e3e8cb85deca460112 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Sun, 16 Apr 2017 20:11:00 -0400 Subject: [PATCH 1/3] catch NPE in EliminateOuterJoin and add test in DataFrameSuite to confirm NPE is no longer thrown --- .../apache/spark/sql/catalyst/optimizer/joins.scala | 11 ++++++++++- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 10 ++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index c3ab58744953..6fa3c7094183 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.annotation.tailrec +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins import org.apache.spark.sql.catalyst.plans._ @@ -124,7 +125,15 @@ case class EliminateOuterJoin(conf: SQLConf) extends Rule[LogicalPlan] with Pred val emptyRow = new GenericInternalRow(attributes.length) val boundE = BindReferences.bindReference(e, attributes) if (boundE.find(_.isInstanceOf[Unevaluable]).isDefined) return false - val v = boundE.eval(emptyRow) + val v = try { + boundE.eval(emptyRow) + } catch { + case e: SparkException => + e.getCause match { + case _: NullPointerException => true + case _ => throw e + } + } v == null || v == false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 52bd4e19f895..476fd3ebe888 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1722,4 +1722,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { "Cannot have map type columns in DataFrame which calls set operations")) } } + + test("SPARK-?????: catalyst outer join optimization should not throw npe") { + val df1 = Seq("a", "b", "c").toDF("x") + .withColumn("y", udf{ (x: String) => x.substring(0, 1) + "!" }.apply($"x")) + val df2 = Seq("a", "b").toDF("x1") + df1 + .join(df2, df1("x") === df2("x1"), "left_outer") + .filter($"x1".isNotNull || !$"y".isin("a!")) + .count + } } From 5e2b8284f663a3d9c0aebe0fe14b74e5e0447eb7 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Mon, 17 Apr 2017 16:01:38 -0400 Subject: [PATCH 2/3] add actual jira --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 476fd3ebe888..b4893b56a8a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1723,7 +1723,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } } - test("SPARK-?????: catalyst outer join optimization should not throw npe") { + test("SPARK-20359: catalyst outer join optimization should not throw npe") { val df1 = Seq("a", "b", "c").toDF("x") .withColumn("y", udf{ (x: String) => x.substring(0, 1) + "!" }.apply($"x")) val df2 = Seq("a", "b").toDF("x1") From 975c850325d4349a45adb4683d905e3f8ec298c9 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Tue, 18 Apr 2017 11:15:30 -0400 Subject: [PATCH 3/3] use lazy vals instead of catching NPE --- .../spark/sql/catalyst/optimizer/joins.scala | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 6fa3c7094183..2fe303977442 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.optimizer import scala.annotation.tailrec -import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins import org.apache.spark.sql.catalyst.plans._ @@ -125,15 +124,7 @@ case class EliminateOuterJoin(conf: SQLConf) extends Rule[LogicalPlan] with Pred val emptyRow = new GenericInternalRow(attributes.length) val boundE = BindReferences.bindReference(e, attributes) if (boundE.find(_.isInstanceOf[Unevaluable]).isDefined) return false - val v = try { - boundE.eval(emptyRow) - } catch { - case e: SparkException => - e.getCause match { - case _: NullPointerException => true - case _ => throw e - } - } + val v = boundE.eval(emptyRow) v == null || v == false } @@ -143,8 +134,8 @@ case class EliminateOuterJoin(conf: SQLConf) extends Rule[LogicalPlan] with Pred val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet)) val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet)) - val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) - val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) + lazy val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) + lazy val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) join.joinType match { case RightOuter if leftHasNonNullPredicate => Inner