Skip to content

Commit db1f3cc

Browse files
HyukjinKwoncloud-fan
authored andcommitted
[SPARK-23731][SQL] Make FileSourceScanExec canonicalizable after being (de)serialized
## What changes were proposed in this pull request? ### What's problem? In some cases, sub scalar query could throw a NPE, which is caused in execution side. ``` java.lang.NullPointerException at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:169) at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:526) at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:159) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:296) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:225) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210) at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258) at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36) at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364) at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40) at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139) at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135) at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.get(HashMap.scala:70) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:56) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:97) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:98) at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102) at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102) at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308) at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181) at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71) at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### How does this happen? Here looks what happen now: 1. Sub scalar query was made (for instance `SELECT (SELECT id FROM foo)`). 2. Try to extract some common expressions (via `CodeGenerator.subexpressionElimination`) so that it can generates some common codes and can be reused. 3. During this, seems it extracts some expressions that can be reused (via `EquivalentExpressions.addExprTree`) https://github.com/apache/spark/blob/b2deef64f604ddd9502a31105ed47cb63470ec85/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L1102 4. During this, if the hash (`EquivalentExpressions.Expr.hashCode`) happened to be the same at `EquivalentExpressions.addExpr` anyhow, `EquivalentExpressions.Expr.equals` is called to identify object in the same hash, which eventually calls `semanticEquals` in `ScalarSubquery` https://github.com/apache/spark/blob/087879a77acb37b790c36f8da67355b90719c2dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala#L54 https://github.com/apache/spark/blob/087879a77acb37b790c36f8da67355b90719c2dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala#L36 5. `ScalarSubquery`'s `semanticEquals` needs `SubqueryExec`'s `sameResult` https://github.com/apache/spark/blob/77a2fc5b521788b406bb32bcc3c637c1d7406e58/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala#L58 6. `SubqueryExec`'s `sameResult` requires a canonicalized plan which calls `FileSourceScanExec`'s `doCanonicalize` https://github.com/apache/spark/blob/e008ad175256a3192fdcbd2c4793044d52f46d57/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala#L258 7. In `FileSourceScanExec`'s `doCanonicalize`, `FileSourceScanExec`'s `relation` is required but seems `transient` so it becomes `null`. https://github.com/apache/spark/blob/e76b0124fbe463def00b1dffcfd8fd47e04772fe/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L527 https://github.com/apache/spark/blob/e76b0124fbe463def00b1dffcfd8fd47e04772fe/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L160 8. NPE is thrown. \*1. driver side \*2., 3., 4., 5., 6., 7., 8. executor side Note that most of cases, it looks fine because we will usually call: https://github.com/apache/spark/blob/087879a77acb37b790c36f8da67355b90719c2dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala#L40 which make a canonicalized plan via: https://github.com/apache/spark/blob/b045315e5d87b7ea3588436053aaa4d5a7bd103f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala#L192 https://github.com/apache/spark/blob/77a2fc5b521788b406bb32bcc3c637c1d7406e58/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala#L52 ### How to reproduce? This looks what happened now. I can reproduce this by a bit of messy way: ```diff diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala index 8d06804..d25fc9a7ba9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala -37,7 +37,9 class EquivalentExpressions { case _ => false } - override def hashCode: Int = e.semanticHash() + override def hashCode: Int = { + 1 + } } ``` ```scala spark.range(1).write.mode("overwrite").parquet("/tmp/foo") spark.read.parquet("/tmp/foo").createOrReplaceTempView("foo") spark.conf.set("spark.sql.codegen.wholeStage", false) sql("SELECT (SELECT id FROM foo) == (SELECT id FROM foo)").collect() ``` ### How does this PR fix? - Make all variables that access to `FileSourceScanExec`'s `relation` as `lazy val` so that we avoid NPE. This is a temporary fix. - Allow `makeCopy` in `SparkPlan` without Spark session too. This looks still able to be accessed within executor side. For instance: ``` at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:70) at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:47) at org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:233) at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:243) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210) at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258) at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36) at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364) at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40) at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139) at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135) at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.get(HashMap.scala:70) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:54) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:95) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:96) at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102) at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102) at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319) at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308) at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181) at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71) at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` This PR takes over #20856. ## How was this patch tested? Manually tested and unit test was added. Closes #20856 Author: hyukjinkwon <[email protected]> Closes #21815 from HyukjinKwon/SPARK-23731. (cherry picked from commit e0b6383) Signed-off-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent d0280ab commit db1f3cc

File tree

3 files changed

+28
-9
lines changed

3 files changed

+28
-9
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,10 +164,12 @@ case class FileSourceScanExec(
164164
override val tableIdentifier: Option[TableIdentifier])
165165
extends DataSourceScanExec with ColumnarBatchScan {
166166

167-
override val supportsBatch: Boolean = relation.fileFormat.supportBatch(
167+
// Note that some vals referring the file-based relation are lazy intentionally
168+
// so that this plan can be canonicalized on executor side too. See SPARK-23731.
169+
override lazy val supportsBatch: Boolean = relation.fileFormat.supportBatch(
168170
relation.sparkSession, StructType.fromAttributes(output))
169171

170-
override val needsUnsafeRowConversion: Boolean = {
172+
override lazy val needsUnsafeRowConversion: Boolean = {
171173
if (relation.fileFormat.isInstanceOf[ParquetSource]) {
172174
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
173175
} else {
@@ -197,7 +199,7 @@ case class FileSourceScanExec(
197199
ret
198200
}
199201

200-
override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
202+
override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
201203
val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
202204
relation.bucketSpec
203205
} else {
@@ -268,7 +270,7 @@ case class FileSourceScanExec(
268270
private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
269271
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
270272

271-
override val metadata: Map[String, String] = {
273+
override lazy val metadata: Map[String, String] = {
272274
def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
273275
val location = relation.location
274276
val locationDesc =

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,15 @@ import org.apache.spark.util.ThreadUtils
4747
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable {
4848

4949
/**
50-
* A handle to the SQL Context that was used to create this plan. Since many operators need
50+
* A handle to the SQL Context that was used to create this plan. Since many operators need
5151
* access to the sqlContext for RDD operations or configuration this field is automatically
5252
* populated by the query planning infrastructure.
5353
*/
54-
@transient
55-
final val sqlContext = SparkSession.getActiveSession.map(_.sqlContext).orNull
54+
@transient final val sqlContext = SparkSession.getActiveSession.map(_.sqlContext).orNull
5655

5756
protected def sparkContext = sqlContext.sparkContext
5857

5958
// sqlContext will be null when SparkPlan nodes are created without the active sessions.
60-
// So far, this only happens in the test cases.
6159
val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) {
6260
sqlContext.conf.subexpressionEliminationEnabled
6361
} else {
@@ -69,7 +67,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
6967

7068
/** Overridden make copy also propagates sqlContext to copied plan. */
7169
override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {
72-
SparkSession.setActiveSession(sqlContext.sparkSession)
70+
if (sqlContext != null) {
71+
SparkSession.setActiveSession(sqlContext.sparkSession)
72+
}
7373
super.makeCopy(newArgs)
7474
}
7575

sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.execution
1919

20+
import org.apache.spark.SparkEnv
2021
import org.apache.spark.sql.QueryTest
2122
import org.apache.spark.sql.test.SharedSQLContext
2223

@@ -33,4 +34,20 @@ class SparkPlanSuite extends QueryTest with SharedSQLContext {
3334
intercept[IllegalStateException] { plan.executeTake(1) }
3435
}
3536

37+
test("SPARK-23731 plans should be canonicalizable after being (de)serialized") {
38+
withTempPath { path =>
39+
spark.range(1).write.parquet(path.getAbsolutePath)
40+
val df = spark.read.parquet(path.getAbsolutePath)
41+
val fileSourceScanExec =
42+
df.queryExecution.sparkPlan.collectFirst { case p: FileSourceScanExec => p }.get
43+
val serializer = SparkEnv.get.serializer.newInstance()
44+
val readback =
45+
serializer.deserialize[FileSourceScanExec](serializer.serialize(fileSourceScanExec))
46+
try {
47+
readback.canonicalized
48+
} catch {
49+
case e: Throwable => fail("FileSourceScanExec was not canonicalizable", e)
50+
}
51+
}
52+
}
3653
}

0 commit comments

Comments
 (0)