From 5008eb65fb89741196c30da26e189fc046ea0af1 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 11 Jul 2017 21:47:56 +0800 Subject: [PATCH 1/2] Refactor DataSourceScanExec so its sameResult call does not compare strings --- .../sql/execution/DataSourceScanExec.scala | 51 +++++++---------- .../datasources/DataSourceStrategy.scala | 56 ++++++------------- 2 files changed, 37 insertions(+), 70 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index a0def68d88e0d..a5735dcd02984 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -33,19 +33,18 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils trait DataSourceScanExec extends LeafExecNode with CodegenSupport { val relation: BaseRelation - val metastoreTableIdentifier: Option[TableIdentifier] + val tableIdentifier: Option[TableIdentifier] protected val nodeNamePrefix: String = "" override val nodeName: String = { - s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}" + s"Scan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}" } override def simpleString: String = { @@ -73,34 +72,24 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport { /** Physical plan node for scanning data from a relation. */ case class RowDataSourceScanExec( - output: Seq[Attribute], + fullOutput: Seq[Attribute], + requiredColumnsIndex: Seq[Int], + filters: Set[Filter], rdd: RDD[InternalRow], @transient relation: BaseRelation, - override val outputPartitioning: Partitioning, - override val metadata: Map[String, String], - override val metastoreTableIdentifier: Option[TableIdentifier]) + override val tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec { + def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput) + override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) - val outputUnsafeRows = relation match { - case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] => - !SparkSession.getActiveSession.get.sessionState.conf.getConf( - SQLConf.PARQUET_VECTORIZED_READER_ENABLED) - case _: HadoopFsRelation => true - case _ => false - } - protected override def doExecute(): RDD[InternalRow] = { - val unsafeRow = if (outputUnsafeRows) { - rdd - } else { - rdd.mapPartitionsWithIndexInternal { (index, iter) => - val proj = UnsafeProjection.create(schema) - proj.initialize(index) - iter.map(proj) - } + val unsafeRow = rdd.mapPartitionsWithIndexInternal { (index, iter) => + val proj = UnsafeProjection.create(schema) + proj.initialize(index) + iter.map(proj) } val numOutputRows = longMetric("numOutputRows") @@ -126,24 +115,22 @@ case class RowDataSourceScanExec( ctx.INPUT_ROW = row ctx.currentVars = null val columnsRowInput = exprRows.map(_.genCode(ctx)) - val inputRow = if (outputUnsafeRows) row else null s""" |while ($input.hasNext()) { | InternalRow $row = (InternalRow) $input.next(); | $numOutputRows.add(1); - | ${consume(ctx, columnsRowInput, inputRow).trim} + | ${consume(ctx, columnsRowInput, null).trim} | if (shouldStop()) return; |} """.stripMargin } - // Only care about `relation` and `metadata` when canonicalizing. + // Don't care about `rdd` and `tableIdentifier` when canonicalizing. override lazy val canonicalized: SparkPlan = copy( - output.map(QueryPlan.normalizeExprId(_, output)), + fullOutput.map(QueryPlan.normalizeExprId(_, fullOutput)), rdd = null, - outputPartitioning = null, - metastoreTableIdentifier = None) + tableIdentifier = None) } /** @@ -154,7 +141,7 @@ case class RowDataSourceScanExec( * @param requiredSchema Required schema of the underlying relation, excluding partition columns. * @param partitionFilters Predicates to use for partition pruning. * @param dataFilters Filters on non-partition columns. - * @param metastoreTableIdentifier identifier for the table in the metastore. + * @param tableIdentifier identifier for the table in the metastore. */ case class FileSourceScanExec( @transient relation: HadoopFsRelation, @@ -162,7 +149,7 @@ case class FileSourceScanExec( requiredSchema: StructType, partitionFilters: Seq[Expression], dataFilters: Seq[Expression], - override val metastoreTableIdentifier: Option[TableIdentifier]) + override val tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with ColumnarBatchScan { val supportsBatch: Boolean = relation.fileFormat.supportBatch( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index e05a8d5f02bd8..9676ef78cf403 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources import java.util.concurrent.Callable -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -288,10 +286,10 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with case l @ LogicalRelation(baseRelation: TableScan, _, _) => RowDataSourceScanExec( l.output, + l.output.indices, + Set.empty, toCatalystRDD(l, baseRelation.buildScan()), baseRelation, - UnknownPartitioning(0), - Map.empty, None) :: Nil case _ => Nil @@ -354,36 +352,10 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with val (unhandledPredicates, pushedFilters, handledFilters) = selectFilters(relation.relation, candidatePredicates) - // A set of column attributes that are only referenced by pushed down filters. We can eliminate - // them from requested columns. - val handledSet = { - val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains) - val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references)) - AttributeSet(handledPredicates.flatMap(_.references)) -- - (projectSet ++ unhandledSet).map(relation.attributeMap) - } - // Combines all Catalyst filter `Expression`s that are either not convertible to data source // `Filter`s or cannot be handled by `relation`. val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) - // These metadata values make scan plans uniquely identifiable for equality checking. - // TODO(SPARK-17701) using strings for equality checking is brittle - val metadata: Map[String, String] = { - val pairs = ArrayBuffer.empty[(String, String)] - - // Mark filters which are handled by the underlying DataSource with an Astrisk - if (pushedFilters.nonEmpty) { - val markedFilters = for (filter <- pushedFilters) yield { - if (handledFilters.contains(filter)) s"*$filter" else s"$filter" - } - pairs += ("PushedFilters" -> markedFilters.mkString("[", ", ", "]")) - } - pairs += ("ReadSchema" -> - StructType.fromAttributes(projects.map(_.toAttribute)).catalogString) - pairs.toMap - } - if (projects.map(_.toAttribute) == projects && projectSet.size == projects.size && filterSet.subsetOf(projectSet)) { @@ -395,25 +367,33 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with .asInstanceOf[Seq[Attribute]] // Match original case of attributes. .map(relation.attributeMap) - // Don't request columns that are only referenced by pushed filters. - .filterNot(handledSet.contains) val scan = RowDataSourceScanExec( - projects.map(_.toAttribute), + relation.output, + requestedColumns.map(relation.output.indexOf), + pushedFilters.toSet, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation, UnknownPartitioning(0), metadata, - relation.catalogTable.map(_.identifier)) + relation.relation, relation.catalogTable.map(_.identifier)) filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) } else { + // A set of column attributes that are only referenced by pushed down filters. We can + // eliminate them from requested columns. + val handledSet = { + val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains) + val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references)) + AttributeSet(handledPredicates.flatMap(_.references)) -- + (projectSet ++ unhandledSet).map(relation.attributeMap) + } // Don't request columns that are only referenced by pushed filters. val requestedColumns = (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq val scan = RowDataSourceScanExec( - requestedColumns, + relation.output, + requestedColumns.map(relation.output.indexOf), + pushedFilters.toSet, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation, UnknownPartitioning(0), metadata, - relation.catalogTable.map(_.identifier)) + relation.relation, relation.catalogTable.map(_.identifier)) execution.ProjectExec( projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)) } From 2dc4ce1e439b4ad86c4ede14397e645d05a1555b Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 12 Jul 2017 15:43:11 +0800 Subject: [PATCH 2/2] address comments --- .../spark/sql/execution/DataSourceScanExec.scala | 14 +++++++++++++- .../org/apache/spark/sql/execution/SparkPlan.scala | 5 ----- .../apache/spark/sql/execution/SparkPlanInfo.scala | 4 +--- .../execution/datasources/DataSourceStrategy.scala | 9 +++++++-- .../spark/sql/execution/ui/SparkPlanGraph.scala | 5 ++--- 5 files changed, 23 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index a5735dcd02984..588c937a13e45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -47,6 +47,9 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport { s"Scan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}" } + // Metadata that describes more details of this scan. + protected def metadata: Map[String, String] + override def simpleString: String = { val metadataEntries = metadata.toSeq.sorted.map { case (key, value) => @@ -75,6 +78,7 @@ case class RowDataSourceScanExec( fullOutput: Seq[Attribute], requiredColumnsIndex: Seq[Int], filters: Set[Filter], + handledFilters: Set[Filter], rdd: RDD[InternalRow], @transient relation: BaseRelation, override val tableIdentifier: Option[TableIdentifier]) @@ -125,6 +129,15 @@ case class RowDataSourceScanExec( """.stripMargin } + override val metadata: Map[String, String] = { + val markedFilters = for (filter <- filters) yield { + if (handledFilters.contains(filter)) s"*$filter" else s"$filter" + } + Map( + "ReadSchema" -> output.toStructType.catalogString, + "PushedFilters" -> markedFilters.mkString("[", ", ", "]")) + } + // Don't care about `rdd` and `tableIdentifier` when canonicalizing. override lazy val canonicalized: SparkPlan = copy( @@ -248,7 +261,6 @@ case class FileSourceScanExec( private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}") - // These metadata values make scan plans uniquely identifiable for equality checking. override val metadata: Map[String, String] = { def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") val location = relation.location diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index db975614c961a..c7277c21cebb2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -71,11 +71,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ super.makeCopy(newArgs) } - /** - * @return Metadata that describes more details of this SparkPlan. - */ - def metadata: Map[String, String] = Map.empty - /** * @return All metrics containing metrics of this SparkPlan. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 7aa93126fdabd..06b69625fb53e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -31,7 +31,6 @@ class SparkPlanInfo( val nodeName: String, val simpleString: String, val children: Seq[SparkPlanInfo], - val metadata: Map[String, String], val metrics: Seq[SQLMetricInfo]) { override def hashCode(): Int = { @@ -58,7 +57,6 @@ private[execution] object SparkPlanInfo { new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType) } - new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), - plan.metadata, metrics) + new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), metrics) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 9676ef78cf403..587b9b450ea2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -288,6 +288,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with l.output, l.output.indices, Set.empty, + Set.empty, toCatalystRDD(l, baseRelation.buildScan()), baseRelation, None) :: Nil @@ -372,8 +373,10 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with relation.output, requestedColumns.map(relation.output.indexOf), pushedFilters.toSet, + handledFilters, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation, relation.catalogTable.map(_.identifier)) + relation.relation, + relation.catalogTable.map(_.identifier)) filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) } else { // A set of column attributes that are only referenced by pushed down filters. We can @@ -392,8 +395,10 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with relation.output, requestedColumns.map(relation.output.indexOf), pushedFilters.toSet, + handledFilters, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation, relation.catalogTable.map(_.identifier)) + relation.relation, + relation.catalogTable.map(_.identifier)) execution.ProjectExec( projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 9d4ebcce4d103..884f945815e0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -113,7 +113,7 @@ object SparkPlanGraph { } val node = new SparkPlanGraphNode( nodeIdGenerator.getAndIncrement(), planInfo.nodeName, - planInfo.simpleString, planInfo.metadata, metrics) + planInfo.simpleString, metrics) if (subgraph == null) { nodes += node } else { @@ -143,7 +143,6 @@ private[ui] class SparkPlanGraphNode( val id: Long, val name: String, val desc: String, - val metadata: Map[String, String], val metrics: Seq[SQLPlanMetric]) { def makeDotNode(metricsValue: Map[Long, String]): String = { @@ -177,7 +176,7 @@ private[ui] class SparkPlanGraphCluster( desc: String, val nodes: mutable.ArrayBuffer[SparkPlanGraphNode], metrics: Seq[SQLPlanMetric]) - extends SparkPlanGraphNode(id, name, desc, Map.empty, metrics) { + extends SparkPlanGraphNode(id, name, desc, metrics) { override def makeDotNode(metricsValue: Map[Long, String]): String = { val duration = metrics.filter(_.name.startsWith(WholeStageCodegenExec.PIPELINE_DURATION_METRIC))