Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jul 19, 2018

What changes were proposed in this pull request?

What's problem?

In some cases, sub scalar query could throw a NPE, which is caused in execution side.

java.lang.NullPointerException
	at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:169)
	at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:526)
	at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:159)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:225)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258)
	at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
	at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364)
	at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
	at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139)
	at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135)
	at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.get(HashMap.scala:70)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:56)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:97)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:98)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181)
	at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71)
	at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

How does this happen?

Here looks what happen now:

  1. Sub scalar query was made (for instance SELECT (SELECT id FROM foo)).

  2. Try to extract some common expressions (via CodeGenerator.subexpressionElimination) so that it can generates some common codes and can be reused.

  3. During this, seems it extracts some expressions that can be reused (via EquivalentExpressions.addExprTree)

expressions.foreach(equivalentExpressions.addExprTree(_))

  1. During this, if the hash (EquivalentExpressions.Expr.hashCode) happened to be the same at EquivalentExpressions.addExpr anyhow, EquivalentExpressions.Expr.equals is called to identify object in the same hash, which eventually calls semanticEquals in ScalarSubquery

  1. ScalarSubquery's semanticEquals needs SubqueryExec's sameResult

case s: ScalarSubquery => plan.sameResult(s.plan)

  1. SubqueryExec's sameResult requires a canonicalized plan which calls FileSourceScanExec's doCanonicalize

final def sameResult(other: PlanType): Boolean = this.canonicalized == other.canonicalized

  1. In FileSourceScanExec's doCanonicalize, FileSourceScanExec's relation is required but seems @transient so it becomes null.

  1. NPE is thrown.

*1. driver side
*2., 3., 4., 5., 6., 7., 8. executor side

Note that most of cases, it looks fine because we will usually call:

which make a canonicalized plan via:

val canonicalizedChildren = children.map(_.canonicalized)

override def children: Seq[Expression] = Nil

How to reproduce?

This looks what happened now. I can reproduce this by a bit of messy way:

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
index 8d06804ce1e..d25fc9a7ba9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
@@ -37,7 +37,9 @@ class EquivalentExpressions {
       case _ => false
     }
    
-    override def hashCode: Int = e.semanticHash()
+    override def hashCode: Int = {
+      1
+    }
   }
spark.range(1).write.mode("overwrite").parquet("/tmp/foo")
spark.read.parquet("/tmp/foo").createOrReplaceTempView("foo")
spark.conf.set("spark.sql.codegen.wholeStage", false)
sql("SELECT (SELECT id FROM foo) == (SELECT id FROM foo)").collect()

How does this PR fix?

  • Make all variables that access to FileSourceScanExec's relation as lazy val so that we avoid NPE. This is a temporary fix.

  • Allow makeCopy in SparkPlan without Spark session too. This looks still able to be accessed within executor side. For instance:

      at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:70)
      at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:47)
      at org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:233)
      at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:243)
      at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
      at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
      at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258)
      at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
      at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
      at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364)
      at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
      at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139)
      at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135)
      at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
      at scala.collection.mutable.HashMap.get(HashMap.scala:70)
      at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:54)
      at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:95)
      at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
      at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
      at scala.collection.immutable.List.foreach(List.scala:392)
      at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:96)
      at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
      at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
      at scala.collection.immutable.List.foreach(List.scala:392)
      at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102)
      at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154)
      at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270)
      at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319)
      at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308)
      at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181)
      at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71)
      at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:109)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      at java.lang.Thread.run(Thread.java:748)
    

This PR takes over #20856.

How was this patch tested?

Manually tested and unit test was added.

Closes #20856

@HyukjinKwon
Copy link
Member Author

@cloud-fan and @mgaido91, mind if I ask to take a look please?

override val needsUnsafeRowConversion: Boolean = {
override lazy val needsUnsafeRowConversion: Boolean = {
if (relation.fileFormat.isInstanceOf[ParquetSource]) {
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you mentioned SparkSession, that line caught my attention where the active SparkSession is accessed using SparkSession.getActiveSession.get not relation.sparkSession as is the case for other places. I think that's something worth considering changing since we're at it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave this out of this PR's scope. That's more like making the plan workable whereas this PR targets the plan can be canonicalized.

}

override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That happens on the driver so no need for the lazy here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be computed anyway, though, when we create a new FileSourceScanExec in the canonicalization process, if it is not lazy, so I'd say that this is needed, as well as all the others.

logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")

override val metadata: Map[String, String] = {
override lazy val metadata: Map[String, String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's driver-only too, isn't it? Why is this lazy required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be in executor side actually:

	at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:275)
	at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:526)
	at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:159)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
	at org.apache.spark.sql.execution.FileSourceScanExecSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(FileSourceScanExecSuite.scala:30)
	at org.apache.spark.sql.execution.FileSourceScanExecSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(FileSourceScanExecSuite.scala:30)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2083)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2083)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch. I'd have never thought about any code with RDD and physical operators on the executor-side (!) Learnt it today.

import org.apache.spark.sql.test.SharedSQLContext

class FileSourceScanExecSuite extends SharedSQLContext {
test("FileSourceScanExec should be canonicalizable in executor side") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/in/on

class FileSourceScanExecSuite extends SharedSQLContext {
test("FileSourceScanExec should be canonicalizable in executor side") {
withTempPath { path =>
spark.range(1).toDF().write.parquet(path.getAbsolutePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant toDF

spark.range(1).toDF().write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
val fileSourceScanExec =
df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isInstanceOf is a bit non-Scala IMHO and I'd prefer collectFirst { case op: FileSourceScanExec => op } instead.

try {
spark.range(1).foreach(_ => fileSourceScanExec.canonicalized)
} catch {
case e: Throwable => fail("FileSourceScanExec was not canonicalizable", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a named test so I'd get rid of the try-catch block because:

  1. It's going to fail the test anyway
  2. The title of the test matches the fail message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this gives an explicit scope about which condition is a failure case though. I believe this is a rather pattern. If both are okay, let me just keep in this way.

Copy link
Contributor

@mgaido91 mgaido91 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change itself looks good to me as a workaround for the current situation. I'd love anyway if we can discuss a cleaner ling-term solution to avoid problems like this in the future.

Thanks for your work on this @HyukjinKwon !

extends DataSourceScanExec with ColumnarBatchScan {

override val supportsBatch: Boolean = relation.fileFormat.supportBatch(
override lazy val supportsBatch: Boolean = relation.fileFormat.supportBatch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we maybe add a comment about the reason we are making them lazy?

val fileSourceScanExec =
df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
try {
spark.range(1).foreach(_ => fileSourceScanExec.canonicalized)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure whether it is feasible (maybe in a followup?), but it would be great if we can test the canonicalization of all the Exec nodes in order to prevent such issue in the future... what do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think of course it is.. it took me a while to make a small and simple test for it.. Hope leave it out of this PR's scope though.

/** Overridden make copy also propagates sqlContext to copied plan. */
override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {
SparkSession.setActiveSession(sqlContext.sparkSession)
if (sqlContext != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, why wasn't the makeCopy problem discovered in the previous PR/investigation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it looks failed ahead. Once we go with lazy then it's discovered later (the exception message in the PR description).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks, I wondered because it seems a more generic issue, easier to happen, but probably we never met it as all the trials included FileSourceScanExec which caused an earlier failure... thanks.

@mgaido91
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Jul 19, 2018

Test build #93269 has finished for PR 21815 at commit b5b99aa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 19, 2018

Test build #93271 has finished for PR 21815 at commit 22c7986.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 19, 2018

Test build #93281 has finished for PR 21815 at commit b488836.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

cc @gengliangwang @cloud-fan This needs a careful review.

import org.apache.spark.sql.test.SharedSQLContext

class FileSourceScanExecSuite extends SharedSQLContext {
test("FileSourceScanExec should be canonicalizable on executor side") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to put this test in QueryPlanSuite, with name SPARK-XXXX: query plans can be serialized and deserialized.

In the test we don't need to trigger a job, just call spark.env.serializer to serialize and deserialize the FileSourceScanExec

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jul 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's few things bothering for that actually - it's kind of messy to create FileSourceScanExec without SparkSession (and also without other utils from SharedSQLContext), and QueryPlanSuite is under catalyst whereas this plan itself is under execution in SQL core.

And, I actually believe this test more targets to make the plan canonicalizable after it's de/serialized since this plan itself is serializable and deserializable already but it's not canonicalizable after that - which I believe is more specific to FileSourceScanExec in a way.

Let me try to clean up based on your comment anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found SparkPlanSuite could be another place to add to address your comment. Let me stick to FileSourceScanExec but please let me know if you prefer this please. I don't mind changing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkPlanSuite SGTM

@cloud-fan
Copy link
Contributor

LGTM

@HyukjinKwon
Copy link
Member Author

Let me update it soon.

@HyukjinKwon HyukjinKwon changed the title [SPARK-23731][SQL] Make FileSourceScanExec canonicalizable in executor side [SPARK-23731][SQL] Make FileSourceScanExec canonicalizable after being (de)serialized Jul 20, 2018
@SparkQA
Copy link

SparkQA commented Jul 20, 2018

Test build #93334 has finished for PR 21815 at commit 7f531bd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jul 20, 2018

Test build #93335 has finished for PR 21815 at commit be6e594.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Jul 20, 2018
…g (de)serialized

## What changes were proposed in this pull request?

### What's problem?

In some cases, sub scalar query could throw a NPE, which is caused in execution side.

```
java.lang.NullPointerException
	at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:169)
	at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:526)
	at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:159)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:225)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258)
	at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
	at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364)
	at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
	at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139)
	at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135)
	at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.get(HashMap.scala:70)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:56)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:97)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:98)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181)
	at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71)
	at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```

### How does this happen?

Here looks what happen now:

1. Sub scalar query was made (for instance `SELECT (SELECT id FROM foo)`).

2. Try to extract some common expressions (via `CodeGenerator.subexpressionElimination`) so that it can generates some common codes and can be reused.

3. During this, seems it extracts some expressions that can be reused (via `EquivalentExpressions.addExprTree`)

  https://github.com/apache/spark/blob/b2deef64f604ddd9502a31105ed47cb63470ec85/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L1102

4. During this, if the hash (`EquivalentExpressions.Expr.hashCode`) happened to be the same at `EquivalentExpressions.addExpr` anyhow, `EquivalentExpressions.Expr.equals` is called to identify object in the same hash, which eventually calls `semanticEquals` in `ScalarSubquery`

  https://github.com/apache/spark/blob/087879a77acb37b790c36f8da67355b90719c2dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala#L54

  https://github.com/apache/spark/blob/087879a77acb37b790c36f8da67355b90719c2dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala#L36

5. `ScalarSubquery`'s `semanticEquals` needs `SubqueryExec`'s `sameResult`

  https://github.com/apache/spark/blob/77a2fc5b521788b406bb32bcc3c637c1d7406e58/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala#L58

6. `SubqueryExec`'s `sameResult` requires a canonicalized plan which calls `FileSourceScanExec`'s `doCanonicalize`

  https://github.com/apache/spark/blob/e008ad175256a3192fdcbd2c4793044d52f46d57/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala#L258

7. In `FileSourceScanExec`'s `doCanonicalize`, `FileSourceScanExec`'s `relation` is required but seems `transient` so it becomes `null`.

  https://github.com/apache/spark/blob/e76b0124fbe463def00b1dffcfd8fd47e04772fe/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L527

  https://github.com/apache/spark/blob/e76b0124fbe463def00b1dffcfd8fd47e04772fe/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L160

8. NPE is thrown.

\*1. driver side
\*2., 3., 4., 5., 6., 7., 8. executor side

Note that most of cases, it looks fine because we will usually call:

https://github.com/apache/spark/blob/087879a77acb37b790c36f8da67355b90719c2dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala#L40

which make a canonicalized plan via:

https://github.com/apache/spark/blob/b045315e5d87b7ea3588436053aaa4d5a7bd103f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala#L192

https://github.com/apache/spark/blob/77a2fc5b521788b406bb32bcc3c637c1d7406e58/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala#L52

### How to reproduce?

This looks what happened now. I can reproduce this by a bit of messy way:

```diff
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
index 8d06804..d25fc9a7ba9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
 -37,7 +37,9  class EquivalentExpressions {
       case _ => false
     }

-    override def hashCode: Int = e.semanticHash()
+    override def hashCode: Int = {
+      1
+    }
   }
```

```scala
spark.range(1).write.mode("overwrite").parquet("/tmp/foo")
spark.read.parquet("/tmp/foo").createOrReplaceTempView("foo")
spark.conf.set("spark.sql.codegen.wholeStage", false)
sql("SELECT (SELECT id FROM foo) == (SELECT id FROM foo)").collect()
```

### How does this PR fix?

- Make all variables that access to `FileSourceScanExec`'s `relation` as `lazy val` so that we avoid NPE. This is a temporary fix.

- Allow `makeCopy` in `SparkPlan` without Spark session too. This looks still able to be accessed within executor side. For instance:

  ```
	at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:70)
	at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:47)
	at org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:233)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:243)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258)
	at org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
	at scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364)
	at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
	at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139)
	at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135)
	at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.get(HashMap.scala:70)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:54)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:95)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:96)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181)
	at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71)
	at org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
  ```

This PR takes over #20856.

## How was this patch tested?

Manually tested and unit test was added.

Closes #20856

Author: hyukjinkwon <[email protected]>

Closes #21815 from HyukjinKwon/SPARK-23731.

(cherry picked from commit e0b6383)

Signed-off-by: Wenchen Fan <[email protected]>

Signed-off-by: Wenchen Fan <[email protected]>
@cloud-fan
Copy link
Contributor

thanks, merging to master/2.3!

@asfgit asfgit closed this in e0b6383 Jul 20, 2018
@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jul 20, 2018

Thanks @jaceklaskowski, @mgaido91, @gatorsmile and @cloud-fan.

@SparkQA
Copy link

SparkQA commented Jul 20, 2018

Test build #93343 has finished for PR 21815 at commit be6e594.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants