Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution

import com.fasterxml.jackson.annotation.JsonIgnoreProperties

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
Expand All @@ -28,11 +26,11 @@ import org.apache.spark.sql.execution.metric.SQLMetricInfo
* Stores information about a SQL SparkPlan.
*/
@DeveloperApi
@JsonIgnoreProperties(Array("metadata")) // The metadata field was removed in Spark 2.3.
class SparkPlanInfo(
val nodeName: String,
val simpleString: String,
val children: Seq[SparkPlanInfo],
val metadata: Map[String, String],
val metrics: Seq[SQLMetricInfo]) {

override def hashCode(): Int = {
Expand All @@ -59,6 +57,12 @@ private[execution] object SparkPlanInfo {
new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType)
}

new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), metrics)
// dump the file scan metadata (e.g file path) to event log
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a next step of reviews, did you have a chance to test this on your real environment at least TPCDS 1TB?
This seems to increase the event log traffic dramatically in the worst case. Can we have some comparison before and after this PR? @LantaoJin .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. This field only removed from 2.3. The event log size should be same with before 2.3. The main increasing is input path. The example above read 10000 parquet files will log 10000 paths in one SQLExecutionStart event. No regression pathes. The size increased is foreseeable.

val metadata = plan match {
case fileScan: FileSourceScanExec => fileScan.metadata
case _ => Map[String, String]()
}
new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan),
metadata, metrics)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class SQLJsonProtocolSuite extends SparkFunSuite {
""".stripMargin
val reconstructedEvent = JsonProtocol.sparkEventFromJson(parse(SQLExecutionStartJsonString))
val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail", "test plan",
new SparkPlanInfo("TestNode", "test string", Nil, Nil), 0)
new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0)
assert(reconstructedEvent == expectedEvent)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,12 @@ class SparkPlanSuite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-25357 SparkPlanInfo of FileScan contains nonEmpty metadata") {
withTempPath { path =>
spark.range(5).write.parquet(path.getAbsolutePath)
val f = spark.read.parquet(path.getAbsolutePath)
assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata.nonEmpty)
}
}
}