Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,8 @@
stroke: #444;
stroke-width: 1.5px;
}

/* Breaks the long string like file path when showing tooltips */
.tooltip-inner {
word-wrap:break-word;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.sql.execution

import org.apache.commons.lang.StringUtils

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
Expand Down Expand Up @@ -127,8 +129,11 @@ private[sql] case class RDDScanExec(
private[sql] trait DataSourceScanExec extends LeafExecNode {
val rdd: RDD[InternalRow]
val relation: BaseRelation
val metastoreTableIdentifier: Option[TableIdentifier]

override val nodeName: String = relation.toString
override val nodeName: String = {
s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}"
}

// Ignore rdd when checking results
override def sameResult(plan: SparkPlan): Boolean = plan match {
Expand All @@ -143,7 +148,8 @@ private[sql] case class RowDataSourceScanExec(
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val outputPartitioning: Partitioning,
override val metadata: Map[String, String] = Map.empty)
override val metadata: Map[String, String],
override val metastoreTableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with CodegenSupport {

private[sql] override lazy val metrics =
Expand Down Expand Up @@ -174,8 +180,11 @@ private[sql] case class RowDataSourceScanExec(
}

override def simpleString: String = {
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
key + ": " + StringUtils.abbreviate(value, 100)
}

s"$nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
Expand Down Expand Up @@ -212,7 +221,8 @@ private[sql] case class BatchedDataSourceScanExec(
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val outputPartitioning: Partitioning,
override val metadata: Map[String, String] = Map.empty)
override val metadata: Map[String, String],
override val metastoreTableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with CodegenSupport {

private[sql] override lazy val metrics =
Expand All @@ -224,9 +234,11 @@ private[sql] case class BatchedDataSourceScanExec(
}

override def simpleString: String = {
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
key + ": " + StringUtils.abbreviate(value, 100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you play with some long paths and see if 100 is good value (it will be also good to put screenshot in the PR description)?

}
val metadataStr = metadataEntries.mkString(" ", ", ", "")
s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr"
s"Batched$nodeName${output.mkString("[", ",", "]")}$metadataStr"
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
Expand Down Expand Up @@ -325,7 +337,8 @@ private[sql] object DataSourceScanExec {
output: Seq[Attribute],
rdd: RDD[InternalRow],
relation: BaseRelation,
metadata: Map[String, String] = Map.empty): DataSourceScanExec = {
metadata: Map[String, String] = Map.empty,
metastoreTableIdentifier: Option[TableIdentifier] = None): DataSourceScanExec = {
val outputPartitioning = {
val bucketSpec = relation match {
// TODO: this should be closer to bucket planning.
Expand All @@ -351,9 +364,11 @@ private[sql] object DataSourceScanExec {
relation match {
case r: HadoopFsRelation
if r.fileFormat.supportBatch(r.sparkSession, StructType.fromAttributes(output)) =>
BatchedDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata)
BatchedDataSourceScanExec(
output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier)
case _ =>
RowDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata)
RowDataSourceScanExec(
output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
}

relation.relation match {
case r: HadoopFsRelation => pairs += INPUT_PATHS -> r.location.paths.mkString(", ")
case _ =>
}

pairs.toMap
}

Expand All @@ -217,7 +212,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val scan = execution.DataSourceScanExec.create(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, metadata)
relation.relation, metadata, relation.metastoreTableIdentifier)
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
} else {
// Don't request columns that are only referenced by pushed filters.
Expand All @@ -227,7 +222,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val scan = execution.DataSourceScanExec.create(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, metadata)
relation.relation, metadata, relation.metastoreTableIdentifier)
execution.ProjectExec(
projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FILTERS}
import org.apache.spark.sql.execution.SparkPlan

/**
* A strategy for planning scans over collections of files that might be partitioned or bucketed
Expand All @@ -54,7 +56,8 @@ import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan}
*/
private[sql] object FileSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) =>
case PhysicalOperation(projects, filters,
l @ LogicalRelation(files: HadoopFsRelation, _, table)) =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
// - partition keys only - used to prune directories to read
Expand Down Expand Up @@ -192,6 +195,12 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
partitions
}

val meta = Map(
"Format" -> files.fileFormat.toString,
"ReadSchema" -> prunedDataSchema.simpleString,
PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"),
INPUT_PATHS -> files.location.paths.mkString(", "))

val scan =
DataSourceScanExec.create(
readDataColumns ++ partitionColumns,
Expand All @@ -200,10 +209,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
readFile,
plannedPartitions),
files,
Map(
"Format" -> files.fileFormat.toString,
"PushedFilters" -> pushedDownFilters.mkString("[", ", ", "]"),
"ReadSchema" -> prunedDataSchema.simpleString))
meta,
table)

val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, Inter
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, Filter}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -157,8 +157,12 @@ case class HadoopFsRelation(

def refresh(): Unit = location.refresh()

override def toString: String =
s"HadoopFiles"
override def toString: String = {
fileFormat match {
case source: DataSourceRegister => source.shortName()
case _ => "HadoopFiles"
}
}

/** Returns the list of files that will be read when scanning this relation. */
override def inputFiles: Array[String] =
Expand Down