From 89fff2ac653fa05ae92ad09ba8307649d6972900 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Thu, 6 Sep 2018 22:04:56 +0800 Subject: [PATCH 1/4] [SPARK-25357][SQL] Abbreviated metadata in DataSourceScanExec results in incomplete information in event log --- .../org/apache/spark/sql/execution/DataSourceScanExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 36ed016773b67..79c9f47f177c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -54,7 +54,7 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport { override def simpleString: String = { val metadataEntries = metadata.toSeq.sorted.map { case (key, value) => - key + ": " + StringUtils.abbreviate(redact(value), 100) + key + ": " + redact(value) } val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "") s"$nodeNamePrefix$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr" From 82dc3d7b43f20cf7e7b254048e4ed5d848556d55 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Mon, 10 Sep 2018 17:32:28 +0800 Subject: [PATCH 2/4] revert the changes in abbreviated simpleString, add back metadata in SparkPlanInfo --- .../org/apache/spark/sql/execution/DataSourceScanExec.scala | 2 +- .../scala/org/apache/spark/sql/execution/SparkPlan.scala | 5 +++++ .../scala/org/apache/spark/sql/execution/SparkPlanInfo.scala | 5 +++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 79c9f47f177c8..36ed016773b67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -54,7 +54,7 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport { override def simpleString: String = { val metadataEntries = metadata.toSeq.sorted.map { case (key, value) => - key + ": " + redact(value) + key + ": " + StringUtils.abbreviate(redact(value), 100) } val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "") s"$nodeNamePrefix$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 1f97993e20458..26243b8fe2ef7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -73,6 +73,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ super.makeCopy(newArgs) } + /** + * @return Metadata that describes more details of this SparkPlan. + */ + def metadata: Map[String, String] = Map.empty + /** * @return All metrics containing metrics of this SparkPlan. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 2a2315896831c..878e53f5faab5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -28,11 +28,11 @@ import org.apache.spark.sql.execution.metric.SQLMetricInfo * Stores information about a SQL SparkPlan. */ @DeveloperApi -@JsonIgnoreProperties(Array("metadata")) // The metadata field was removed in Spark 2.3. class SparkPlanInfo( val nodeName: String, val simpleString: String, val children: Seq[SparkPlanInfo], + val metadata: Map[String, String], val metrics: Seq[SQLMetricInfo]) { override def hashCode(): Int = { @@ -59,6 +59,7 @@ private[execution] object SparkPlanInfo { new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType) } - new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), metrics) + new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), + plan.metadata, metrics) } } From 182eb794c791069afaebc25b599c4890871da807 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Wed, 12 Sep 2018 10:16:00 +0800 Subject: [PATCH 3/4] use pattern match to dump file scan metadata --- .../scala/org/apache/spark/sql/execution/SparkPlan.scala | 5 ----- .../org/apache/spark/sql/execution/SparkPlanInfo.scala | 9 ++++++--- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 26243b8fe2ef7..1f97993e20458 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -73,11 +73,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ super.makeCopy(newArgs) } - /** - * @return Metadata that describes more details of this SparkPlan. - */ - def metadata: Map[String, String] = Map.empty - /** * @return All metrics containing metrics of this SparkPlan. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 878e53f5faab5..59ffd16381116 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution -import com.fasterxml.jackson.annotation.JsonIgnoreProperties - import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo @@ -59,7 +57,12 @@ private[execution] object SparkPlanInfo { new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType) } + // dump the file scan metadata (e.g file path) to event log + val metadata = plan match { + case fileScan: FileSourceScanExec => fileScan.metadata + case _ => Map[String, String]() + } new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), - plan.metadata, metrics) + metadata, metrics) } } From 0340fa648a17384a039ee484de9ce91a0129b260 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Wed, 12 Sep 2018 11:21:56 +0800 Subject: [PATCH 4/4] add UT --- .../apache/spark/sql/execution/SQLJsonProtocolSuite.scala | 2 +- .../org/apache/spark/sql/execution/SparkPlanSuite.scala | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala index c2e62b987e0cc..08e40e28d3d57 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala @@ -46,7 +46,7 @@ class SQLJsonProtocolSuite extends SparkFunSuite { """.stripMargin val reconstructedEvent = JsonProtocol.sparkEventFromJson(parse(SQLExecutionStartJsonString)) val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail", "test plan", - new SparkPlanInfo("TestNode", "test string", Nil, Nil), 0) + new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0) assert(reconstructedEvent == expectedEvent) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index 34dc6f37c0e4d..47ff372992b91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -50,4 +50,12 @@ class SparkPlanSuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-25357 SparkPlanInfo of FileScan contains nonEmpty metadata") { + withTempPath { path => + spark.range(5).write.parquet(path.getAbsolutePath) + val f = spark.read.parquet(path.getAbsolutePath) + assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata.nonEmpty) + } + } }