Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,12 @@ case class FileSourceScanExec(
override val tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with ColumnarBatchScan {

override val supportsBatch: Boolean = relation.fileFormat.supportBatch(
// Note that some vals referring the file-based relation are lazy intentionally
// so that this plan can be canonicalized on executor side too. See SPARK-23731.
override lazy val supportsBatch: Boolean = relation.fileFormat.supportBatch(
relation.sparkSession, StructType.fromAttributes(output))

override val needsUnsafeRowConversion: Boolean = {
override lazy val needsUnsafeRowConversion: Boolean = {
if (relation.fileFormat.isInstanceOf[ParquetSource]) {
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you mentioned SparkSession, that line caught my attention where the active SparkSession is accessed using SparkSession.getActiveSession.get not relation.sparkSession as is the case for other places. I think that's something worth considering changing since we're at it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave this out of this PR's scope. That's more like making the plan workable whereas this PR targets the plan can be canonicalized.

} else {
Expand Down Expand Up @@ -199,7 +201,7 @@ case class FileSourceScanExec(
ret
}

override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That happens on the driver so no need for the lazy here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be computed anyway, though, when we create a new FileSourceScanExec in the canonicalization process, if it is not lazy, so I'd say that this is needed, as well as all the others.

val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
relation.bucketSpec
} else {
Expand Down Expand Up @@ -270,7 +272,7 @@ case class FileSourceScanExec(
private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")

override val metadata: Map[String, String] = {
override lazy val metadata: Map[String, String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's driver-only too, isn't it? Why is this lazy required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be in executor side actually:

	at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:275)
	at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:526)
	at org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:159)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
	at org.apache.spark.sql.execution.FileSourceScanExecSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(FileSourceScanExecSuite.scala:30)
	at org.apache.spark.sql.execution.FileSourceScanExecSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(FileSourceScanExecSuite.scala:30)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2083)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2083)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch. I'd have never thought about any code with RDD and physical operators on the executor-side (!) Learnt it today.

def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
val location = relation.location
val locationDesc =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,15 @@ import org.apache.spark.util.ThreadUtils
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable {

/**
* A handle to the SQL Context that was used to create this plan. Since many operators need
* A handle to the SQL Context that was used to create this plan. Since many operators need
* access to the sqlContext for RDD operations or configuration this field is automatically
* populated by the query planning infrastructure.
*/
@transient
final val sqlContext = SparkSession.getActiveSession.map(_.sqlContext).orNull
@transient final val sqlContext = SparkSession.getActiveSession.map(_.sqlContext).orNull

protected def sparkContext = sqlContext.sparkContext

// sqlContext will be null when SparkPlan nodes are created without the active sessions.
// So far, this only happens in the test cases.
val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) {
sqlContext.conf.subexpressionEliminationEnabled
} else {
Expand All @@ -69,7 +67,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ

/** Overridden make copy also propagates sqlContext to copied plan. */
override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {
SparkSession.setActiveSession(sqlContext.sparkSession)
if (sqlContext != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, why wasn't the makeCopy problem discovered in the previous PR/investigation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it looks failed ahead. Once we go with lazy then it's discovered later (the exception message in the PR description).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks, I wondered because it seems a more generic issue, easier to happen, but probably we never met it as all the trials included FileSourceScanExec which caused an earlier failure... thanks.

SparkSession.setActiveSession(sqlContext.sparkSession)
}
super.makeCopy(newArgs)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.SparkEnv
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSQLContext

Expand All @@ -33,4 +34,20 @@ class SparkPlanSuite extends QueryTest with SharedSQLContext {
intercept[IllegalStateException] { plan.executeTake(1) }
}

test("SPARK-23731 plans should be canonicalizable after being (de)serialized") {
withTempPath { path =>
spark.range(1).write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
val fileSourceScanExec =
df.queryExecution.sparkPlan.collectFirst { case p: FileSourceScanExec => p }.get
val serializer = SparkEnv.get.serializer.newInstance()
val readback =
serializer.deserialize[FileSourceScanExec](serializer.serialize(fileSourceScanExec))
try {
readback.canonicalized
} catch {
case e: Throwable => fail("FileSourceScanExec was not canonicalizable", e)
}
}
}
}