Skip to content

Commit 438d70e

Browse files
committed
[SPARK-14476][SQL] Display table name and path in physical plan for data source like Hive table
1 parent 157a49a commit 438d70e

File tree

3 files changed

+41
-22
lines changed

3 files changed

+41
-22
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
package org.apache.spark.sql.execution
1919

20+
import org.apache.commons.lang.StringUtils
21+
2022
import org.apache.spark.rdd.RDD
2123
import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext}
22-
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
24+
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
2325
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2426
import org.apache.spark.sql.catalyst.expressions._
2527
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
@@ -127,8 +129,13 @@ private[sql] case class RDDScanExec(
127129
private[sql] trait DataSourceScanExec extends LeafExecNode {
128130
val rdd: RDD[InternalRow]
129131
val relation: BaseRelation
132+
val metastoreTableIdentifier: Option[TableIdentifier]
130133

131-
override val nodeName: String = relation.toString
134+
override val nodeName: String = if (metastoreTableIdentifier.isEmpty) {
135+
relation.toString
136+
} else {
137+
relation.toString + " " + metastoreTableIdentifier.get.unquotedString
138+
}
132139

133140
// Ignore rdd when checking results
134141
override def sameResult(plan: SparkPlan): Boolean = plan match {
@@ -143,7 +150,8 @@ private[sql] case class RowDataSourceScanExec(
143150
rdd: RDD[InternalRow],
144151
@transient relation: BaseRelation,
145152
override val outputPartitioning: Partitioning,
146-
override val metadata: Map[String, String] = Map.empty)
153+
override val metadata: Map[String, String],
154+
override val metastoreTableIdentifier: Option[TableIdentifier])
147155
extends DataSourceScanExec with CodegenSupport {
148156

149157
private[sql] override lazy val metrics =
@@ -174,7 +182,10 @@ private[sql] case class RowDataSourceScanExec(
174182
}
175183

176184
override def simpleString: String = {
177-
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
185+
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
186+
key + ": " + StringUtils.abbreviate(value, 100)
187+
}
188+
178189
s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
179190
}
180191

@@ -212,7 +223,8 @@ private[sql] case class BatchedDataSourceScanExec(
212223
rdd: RDD[InternalRow],
213224
@transient relation: BaseRelation,
214225
override val outputPartitioning: Partitioning,
215-
override val metadata: Map[String, String] = Map.empty)
226+
override val metadata: Map[String, String],
227+
override val metastoreTableIdentifier: Option[TableIdentifier])
216228
extends DataSourceScanExec with CodegenSupport {
217229

218230
private[sql] override lazy val metrics =
@@ -224,7 +236,9 @@ private[sql] case class BatchedDataSourceScanExec(
224236
}
225237

226238
override def simpleString: String = {
227-
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
239+
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
240+
key + ": " + StringUtils.abbreviate(value, 100)
241+
}
228242
val metadataStr = metadataEntries.mkString(" ", ", ", "")
229243
s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr"
230244
}
@@ -325,7 +339,8 @@ private[sql] object DataSourceScanExec {
325339
output: Seq[Attribute],
326340
rdd: RDD[InternalRow],
327341
relation: BaseRelation,
328-
metadata: Map[String, String] = Map.empty): DataSourceScanExec = {
342+
metadata: Map[String, String] = Map.empty,
343+
metastoreTableIdentifier: Option[TableIdentifier] = None): DataSourceScanExec = {
329344
val outputPartitioning = {
330345
val bucketSpec = relation match {
331346
// TODO: this should be closer to bucket planning.
@@ -351,9 +366,11 @@ private[sql] object DataSourceScanExec {
351366
relation match {
352367
case r: HadoopFsRelation
353368
if r.fileFormat.supportBatch(r.sparkSession, StructType.fromAttributes(output)) =>
354-
BatchedDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata)
369+
BatchedDataSourceScanExec(
370+
output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier)
355371
case _ =>
356-
RowDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata)
372+
RowDataSourceScanExec(
373+
output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier)
357374
}
358375
}
359376
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -192,11 +192,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
192192
pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
193193
}
194194

195-
relation.relation match {
196-
case r: HadoopFsRelation => pairs += INPUT_PATHS -> r.location.paths.mkString(", ")
197-
case _ =>
198-
}
199-
200195
pairs.toMap
201196
}
202197

@@ -217,7 +212,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
217212
val scan = execution.DataSourceScanExec.create(
218213
projects.map(_.toAttribute),
219214
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
220-
relation.relation, metadata)
215+
relation.relation, metadata, relation.metastoreTableIdentifier)
221216
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
222217
} else {
223218
// Don't request columns that are only referenced by pushed filters.
@@ -227,7 +222,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
227222
val scan = execution.DataSourceScanExec.create(
228223
requestedColumns,
229224
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
230-
relation.relation, metadata)
225+
relation.relation, metadata, relation.metastoreTableIdentifier)
231226
execution.ProjectExec(
232227
projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
233228
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ import org.apache.spark.sql.catalyst.expressions
2727
import org.apache.spark.sql.catalyst.expressions._
2828
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2929
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
30-
import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan}
30+
import org.apache.spark.sql.execution.DataSourceScanExec
31+
import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FILTERS}
32+
import org.apache.spark.sql.execution.SparkPlan
3133

3234
/**
3335
* A strategy for planning scans over collections of files that might be partitioned or bucketed
@@ -54,7 +56,8 @@ import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan}
5456
*/
5557
private[sql] object FileSourceStrategy extends Strategy with Logging {
5658
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
57-
case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) =>
59+
case PhysicalOperation(projects, filters,
60+
l @ LogicalRelation(files: HadoopFsRelation, _, table)) =>
5861
// Filters on this relation fall into four categories based on where we can use them to avoid
5962
// reading unneeded data:
6063
// - partition keys only - used to prune directories to read
@@ -192,6 +195,12 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
192195
partitions
193196
}
194197

198+
val meta = Map(
199+
"Format" -> files.fileFormat.toString,
200+
"ReadSchema" -> prunedDataSchema.simpleString,
201+
PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"),
202+
INPUT_PATHS -> files.location.paths.mkString(", "))
203+
195204
val scan =
196205
DataSourceScanExec.create(
197206
readDataColumns ++ partitionColumns,
@@ -200,10 +209,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
200209
readFile,
201210
plannedPartitions),
202211
files,
203-
Map(
204-
"Format" -> files.fileFormat.toString,
205-
"PushedFilters" -> pushedDownFilters.mkString("[", ", ", "]"),
206-
"ReadSchema" -> prunedDataSchema.simpleString))
212+
meta,
213+
table)
207214

208215
val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
209216
val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan)

0 commit comments

Comments
 (0)