diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala index 72ff9361d8f75..a32052ce121df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import scala.collection.mutable +import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.expressions.objects.LambdaVariable @@ -72,7 +73,10 @@ class EquivalentExpressions { val skip = expr.isInstanceOf[LeafExpression] || // `LambdaVariable` is usually used as a loop variable, which can't be evaluated ahead of the // loop. So we can't evaluate sub-expressions containing `LambdaVariable` at the beginning. - expr.find(_.isInstanceOf[LambdaVariable]).isDefined + expr.find(_.isInstanceOf[LambdaVariable]).isDefined || + // `PlanExpression` wraps query plan. To compare query plans of `PlanExpression` on executor, + // can cause error like NPE. + (expr.isInstanceOf[PlanExpression[_]] && TaskContext.get != null) // There are some special expressions that we should not recurse into all of its children. // 1. CodegenFallback: it's children will not be used to generate code (call eval() instead) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 28a027690db04..6f8733a2fbd30 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean import org.apache.spark.{AccumulatorSuite, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} +import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec @@ -3149,6 +3150,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession { checkAnswer(sql("select * from t1 where d > '1999-13'"), Row(result)) checkAnswer(sql("select to_timestamp('2000-01-01 01:10:00') > '1'"), Row(true)) } + sql("DROP VIEW t1") } test("SPARK-28156: self-join should not miss cached view") { @@ -3192,6 +3194,21 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession { checkAnswer(df3, Array(Row(new java.math.BigDecimal("0.100000000000000000000000100")))) } } + + test("SPARK-29239: Subquery should not cause NPE when eliminating subexpression") { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", + SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false", + SQLConf.CODEGEN_FACTORY_MODE.key -> "CODEGEN_ONLY", + SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> ConvertToLocalRelation.ruleName) { + withTempView("t1", "t2") { + sql("create temporary view t1 as select * from values ('val1a', 10L) as t1(t1a, t1b)") + sql("create temporary view t2 as select * from values ('val3a', 110L) as t2(t2a, t2b)") + val df = sql("SELECT min, min from (SELECT (SELECT min(t2b) FROM t2) min " + + "FROM t1 WHERE t1a = 'val1c')") + assert(df.collect().size == 0) + } + } + } } case class Foo(bar: Option[String])