From 438d70e02cfaf9e3b6beccc8d3a8d0c65f7499da Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Thu, 5 May 2016 09:23:05 +0800 Subject: [PATCH 1/5] [SPARK-14476][SQL] Display table name and path in physical plan for data source like Hive table --- .../spark/sql/execution/ExistingRDD.scala | 35 ++++++++++++++----- .../datasources/DataSourceStrategy.scala | 9 ++--- .../datasources/FileSourceStrategy.scala | 19 ++++++---- 3 files changed, 41 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index d6516f26a70f3..4cc133398db47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution +import org.apache.commons.lang.StringUtils + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} @@ -127,8 +129,13 @@ private[sql] case class RDDScanExec( private[sql] trait DataSourceScanExec extends LeafExecNode { val rdd: RDD[InternalRow] val relation: BaseRelation + val metastoreTableIdentifier: Option[TableIdentifier] - override val nodeName: String = relation.toString + override val nodeName: String = if (metastoreTableIdentifier.isEmpty) { + relation.toString + } else { + relation.toString + " " + metastoreTableIdentifier.get.unquotedString + } // Ignore rdd when checking results override def sameResult(plan: SparkPlan): Boolean = plan match { @@ -143,7 +150,8 @@ private[sql] case class RowDataSourceScanExec( rdd: RDD[InternalRow], @transient relation: BaseRelation, override val outputPartitioning: Partitioning, - override val metadata: Map[String, String] = Map.empty) + override val metadata: Map[String, String], + override val metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with CodegenSupport { private[sql] override lazy val metrics = @@ -174,7 +182,10 @@ private[sql] case class RowDataSourceScanExec( } override def simpleString: String = { - val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" + val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { + key + ": " + StringUtils.abbreviate(value, 100) + } + s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" } @@ -212,7 +223,8 @@ private[sql] case class BatchedDataSourceScanExec( rdd: RDD[InternalRow], @transient relation: BaseRelation, override val outputPartitioning: Partitioning, - override val metadata: Map[String, String] = Map.empty) + override val metadata: Map[String, String], + override val metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with CodegenSupport { private[sql] override lazy val metrics = @@ -224,7 +236,9 @@ private[sql] case class BatchedDataSourceScanExec( } override def simpleString: String = { - val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" + val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { + key + ": " + StringUtils.abbreviate(value, 100) + } val metadataStr = metadataEntries.mkString(" ", ", ", "") s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr" } @@ -325,7 +339,8 @@ private[sql] object DataSourceScanExec { output: Seq[Attribute], rdd: RDD[InternalRow], relation: BaseRelation, - metadata: Map[String, String] = Map.empty): DataSourceScanExec = { + metadata: Map[String, String] = Map.empty, + metastoreTableIdentifier: Option[TableIdentifier] = None): DataSourceScanExec = { val outputPartitioning = { val bucketSpec = relation match { // TODO: this should be closer to bucket planning. @@ -351,9 +366,11 @@ private[sql] object DataSourceScanExec { relation match { case r: HadoopFsRelation if r.fileFormat.supportBatch(r.sparkSession, StructType.fromAttributes(output)) => - BatchedDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata) + BatchedDataSourceScanExec( + output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier) case _ => - RowDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata) + RowDataSourceScanExec( + output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 9bebd74b4b3a2..bc249f4ed510f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -192,11 +192,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]")) } - relation.relation match { - case r: HadoopFsRelation => pairs += INPUT_PATHS -> r.location.paths.mkString(", ") - case _ => - } - pairs.toMap } @@ -217,7 +212,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.DataSourceScanExec.create( projects.map(_.toAttribute), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation, metadata) + relation.relation, metadata, relation.metastoreTableIdentifier) filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) } else { // Don't request columns that are only referenced by pushed filters. @@ -227,7 +222,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.DataSourceScanExec.create( requestedColumns, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation, metadata) + relation.relation, metadata, relation.metastoreTableIdentifier) execution.ProjectExec( projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 8a93c6ff9a4f6..350508c1d9f4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -27,7 +27,9 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.DataSourceScanExec +import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FILTERS} +import org.apache.spark.sql.execution.SparkPlan /** * A strategy for planning scans over collections of files that might be partitioned or bucketed @@ -54,7 +56,8 @@ import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan} */ private[sql] object FileSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) => + case PhysicalOperation(projects, filters, + l @ LogicalRelation(files: HadoopFsRelation, _, table)) => // Filters on this relation fall into four categories based on where we can use them to avoid // reading unneeded data: // - partition keys only - used to prune directories to read @@ -192,6 +195,12 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { partitions } + val meta = Map( + "Format" -> files.fileFormat.toString, + "ReadSchema" -> prunedDataSchema.simpleString, + PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"), + INPUT_PATHS -> files.location.paths.mkString(", ")) + val scan = DataSourceScanExec.create( readDataColumns ++ partitionColumns, @@ -200,10 +209,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { readFile, plannedPartitions), files, - Map( - "Format" -> files.fileFormat.toString, - "PushedFilters" -> pushedDownFilters.mkString("[", ", ", "]"), - "ReadSchema" -> prunedDataSchema.simpleString)) + meta, + table) val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And) val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan) From b6b38a7507414f6fea7edc0b6544b03f91573dd3 Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Tue, 10 May 2016 09:54:02 +0800 Subject: [PATCH 2/5] break long string when showing tooltips --- .../apache/spark/sql/execution/ui/static/spark-sql-viz.css | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css index 303f8ebb8814c..f9da887baca5a 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css @@ -41,3 +41,8 @@ stroke: #444; stroke-width: 1.5px; } + +/* Breaks the long string like file path when showing tooltips */ +.tooltip-inner { + word-wrap:break-word; +} \ No newline at end of file From f0a0951a3b74ff157024559b49460a4aba6339c3 Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Tue, 10 May 2016 17:32:44 +0800 Subject: [PATCH 3/5] show more descriptive info for HadoopFsRelation. --- .../execution/datasources/fileSourceInterfaces.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index c87e672961091..d2c4fe37b9006 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, Inter import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.FileRelation -import org.apache.spark.sql.sources.{BaseRelation, Filter} +import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, Filter} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration @@ -157,8 +157,12 @@ case class HadoopFsRelation( def refresh(): Unit = location.refresh() - override def toString: String = - s"HadoopFiles" + override def toString: String = { + fileFormat match { + case source: DataSourceRegister => "Scan " + source.shortName() + case _ => "Scan" + } + } /** Returns the list of files that will be read when scanning this relation. */ override def inputFiles: Array[String] = From b3e977514dfefa0105ffdaa83bf382250d132a5a Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Wed, 11 May 2016 01:41:57 +0800 Subject: [PATCH 4/5] Improve the UI output --- .../org/apache/spark/sql/execution/ExistingRDD.scala | 8 ++++---- .../sql/execution/datasources/fileSourceInterfaces.scala | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 4cc133398db47..6fcadf76891af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -132,9 +132,9 @@ private[sql] trait DataSourceScanExec extends LeafExecNode { val metastoreTableIdentifier: Option[TableIdentifier] override val nodeName: String = if (metastoreTableIdentifier.isEmpty) { - relation.toString + "Scan " + relation.toString } else { - relation.toString + " " + metastoreTableIdentifier.get.unquotedString + "Scan " + relation.toString + " " + metastoreTableIdentifier.get.unquotedString } // Ignore rdd when checking results @@ -186,7 +186,7 @@ private[sql] case class RowDataSourceScanExec( key + ": " + StringUtils.abbreviate(value, 100) } - s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" + s"$nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" } override def inputRDDs(): Seq[RDD[InternalRow]] = { @@ -240,7 +240,7 @@ private[sql] case class BatchedDataSourceScanExec( key + ": " + StringUtils.abbreviate(value, 100) } val metadataStr = metadataEntries.mkString(" ", ", ", "") - s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr" + s"Batched$nodeName${output.mkString("[", ",", "]")}$metadataStr" } override def inputRDDs(): Seq[RDD[InternalRow]] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index d2c4fe37b9006..b516297115f04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -159,8 +159,8 @@ case class HadoopFsRelation( override def toString: String = { fileFormat match { - case source: DataSourceRegister => "Scan " + source.shortName() - case _ => "Scan" + case source: DataSourceRegister => source.shortName() + case _ => "HadoopFiles" } } From 59f816f4cf91979282d3b9385d746099b040fbc1 Mon Sep 17 00:00:00 2001 From: Sean Zhong Date: Wed, 11 May 2016 11:03:46 +0800 Subject: [PATCH 5/5] update style --- .../apache/spark/sql/execution/ui/static/spark-sql-viz.css | 2 +- .../scala/org/apache/spark/sql/execution/ExistingRDD.scala | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css index f9da887baca5a..594e747a8d3a5 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css @@ -45,4 +45,4 @@ /* Breaks the long string like file path when showing tooltips */ .tooltip-inner { word-wrap:break-word; -} \ No newline at end of file +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 6fcadf76891af..85af4faf4d090 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -131,10 +131,8 @@ private[sql] trait DataSourceScanExec extends LeafExecNode { val relation: BaseRelation val metastoreTableIdentifier: Option[TableIdentifier] - override val nodeName: String = if (metastoreTableIdentifier.isEmpty) { - "Scan " + relation.toString - } else { - "Scan " + relation.toString + " " + metastoreTableIdentifier.get.unquotedString + override val nodeName: String = { + s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}" } // Ignore rdd when checking results