Skip to content

Commit 38958d0

Browse files
rxinCodingCat
authored andcommitted
[SPARK-9733][SQL] Improve physical plan explain for data sources
All data sources show up as "PhysicalRDD" in physical plan explain. It'd be better if we can show the name of the data source. Without this patch: ``` == Physical Plan == NewAggregate with UnsafeHybridAggregationIterator ArrayBuffer(date#0, cat#1) ArrayBuffer((sum(CAST((CAST(count#2, IntegerType) + 1), LongType))2,mode=Final,isDistinct=false)) Exchange hashpartitioning(date#0,cat#1) NewAggregate with UnsafeHybridAggregationIterator ArrayBuffer(date#0, cat#1) ArrayBuffer((sum(CAST((CAST(count#2, IntegerType) + 1), LongType))2,mode=Partial,isDistinct=false)) PhysicalRDD [date#0,cat#1,count#2], MapPartitionsRDD[3] at ``` With this patch: ``` == Physical Plan == TungstenAggregate(key=[date#0,cat#1], value=[(sum(CAST((CAST(count#2, IntegerType) + 1), LongType)),mode=Final,isDistinct=false)] Exchange hashpartitioning(date#0,cat#1) TungstenAggregate(key=[date#0,cat#1], value=[(sum(CAST((CAST(count#2, IntegerType) + 1), LongType)),mode=Partial,isDistinct=false)] ConvertToUnsafe Scan ParquetRelation[file:/scratch/rxin/spark/sales4][date#0,cat#1,count#2] ``` Author: Reynold Xin <[email protected]> Closes apache#8024 from rxin/SPARK-9733 and squashes the following commits: 811b90e [Reynold Xin] Fixed Python test case. 52cab77 [Reynold Xin] Cast. eea9ccc [Reynold Xin] Fix test case. fcecb22 [Reynold Xin] [SPARK-9733][SQL] Improve explain message for data source scan node.
1 parent 11f0341 commit 38958d0

File tree

11 files changed

+45
-27
lines changed

11 files changed

+45
-27
lines changed

python/pyspark/sql/dataframe.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,7 @@ def explain(self, extended=False):
212212
:param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
213213
214214
>>> df.explain()
215-
PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at applySchemaToPythonRDD at\
216-
NativeMethodAccessorImpl.java:...
215+
Scan PhysicalRDD[age#0,name#1]
217216
218217
>>> df.explain(True)
219218
== Parsed Logical Plan ==
@@ -224,7 +223,6 @@ def explain(self, extended=False):
224223
...
225224
== Physical Plan ==
226225
...
227-
== RDD ==
228226
"""
229227
if extended:
230228
print(self._jdf.queryExecution().toString())

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ object Cast {
107107
case class Cast(child: Expression, dataType: DataType)
108108
extends UnaryExpression with CodegenFallback {
109109

110+
override def toString: String = s"cast($child as ${dataType.simpleString})"
111+
110112
override def checkInputDataTypes(): TypeCheckResult = {
111113
if (Cast.canCast(child.dataType, dataType)) {
112114
TypeCheckResult.TypeCheckSuccess
@@ -118,8 +120,6 @@ case class Cast(child: Expression, dataType: DataType)
118120

119121
override def nullable: Boolean = Cast.forceNullable(child.dataType, dataType) || child.nullable
120122

121-
override def toString: String = s"CAST($child, $dataType)"
122-
123123
// [[func]] assumes the input is no longer null because eval already does the null check.
124124
@inline private[this] def buildCast[T](a: Any, func: T => Any): Any = func(a.asInstanceOf[T])
125125

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ private[sql] case class AggregateExpression2(
9393
AttributeSet(childReferences)
9494
}
9595

96-
override def toString: String = s"(${aggregateFunction}2,mode=$mode,isDistinct=$isDistinct)"
96+
override def toString: String = s"(${aggregateFunction},mode=$mode,isDistinct=$isDistinct)"
9797
}
9898

9999
abstract class AggregateFunction2

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,9 +1011,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
10111011
def output =
10121012
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")
10131013

1014-
// TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)})
1015-
// however, the `toRdd` will cause the real execution, which is not what we want.
1016-
// We need to think about how to avoid the side effect.
10171014
s"""== Parsed Logical Plan ==
10181015
|${stringOrError(logical)}
10191016
|== Analyzed Logical Plan ==
@@ -1024,7 +1021,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
10241021
|== Physical Plan ==
10251022
|${stringOrError(executedPlan)}
10261023
|Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
1027-
|== RDD ==
10281024
""".stripMargin.trim
10291025
}
10301026
}

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
2323
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2424
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
2525
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
26+
import org.apache.spark.sql.sources.BaseRelation
2627
import org.apache.spark.sql.types.DataType
2728
import org.apache.spark.sql.{Row, SQLContext}
2829

@@ -95,11 +96,23 @@ private[sql] case class LogicalRDD(
9596
/** Physical plan node for scanning data from an RDD. */
9697
private[sql] case class PhysicalRDD(
9798
output: Seq[Attribute],
98-
rdd: RDD[InternalRow]) extends LeafNode {
99+
rdd: RDD[InternalRow],
100+
extraInformation: String) extends LeafNode {
99101

100102
override protected[sql] val trackNumOfRowsEnabled = true
101103

102104
protected override def doExecute(): RDD[InternalRow] = rdd
105+
106+
override def simpleString: String = "Scan " + extraInformation + output.mkString("[", ",", "]")
107+
}
108+
109+
private[sql] object PhysicalRDD {
110+
def createFromDataSource(
111+
output: Seq[Attribute],
112+
rdd: RDD[InternalRow],
113+
relation: BaseRelation): PhysicalRDD = {
114+
PhysicalRDD(output, rdd, relation.toString)
115+
}
103116
}
104117

105118
/** Logical plan node for scanning data from a local collection. */

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,12 +363,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
363363
execution.Generate(
364364
generator, join = join, outer = outer, g.output, planLater(child)) :: Nil
365365
case logical.OneRowRelation =>
366-
execution.PhysicalRDD(Nil, singleRowRdd) :: Nil
366+
execution.PhysicalRDD(Nil, singleRowRdd, "OneRowRelation") :: Nil
367367
case logical.RepartitionByExpression(expressions, child) =>
368368
execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
369369
case e @ EvaluatePython(udf, child, _) =>
370370
BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
371-
case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil
371+
case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "PhysicalRDD") :: Nil
372372
case BroadcastHint(child) => apply(child)
373373
case _ => Nil
374374
}

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,13 @@ case class TungstenAggregate(
9393
val allAggregateExpressions = nonCompleteAggregateExpressions ++ completeAggregateExpressions
9494

9595
testFallbackStartsAt match {
96-
case None => s"TungstenAggregate ${groupingExpressions} ${allAggregateExpressions}"
96+
case None =>
97+
val keyString = groupingExpressions.mkString("[", ",", "]")
98+
val valueString = allAggregateExpressions.mkString("[", ",", "]")
99+
s"TungstenAggregate(key=$keyString, value=$valueString"
97100
case Some(fallbackStartsAt) =>
98-
s"TungstenAggregateWithControlledFallback ${groupingExpressions} " +
99-
s"${allAggregateExpressions} fallbackStartsAt=$fallbackStartsAt"
101+
s"TungstenAggregateWithControlledFallback $groupingExpressions " +
102+
s"$allAggregateExpressions fallbackStartsAt=$fallbackStartsAt"
100103
}
101104
}
102105
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,9 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
101101
(a, f) =>
102102
toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil
103103

104-
case l @ LogicalRelation(t: TableScan) =>
105-
execution.PhysicalRDD(l.output, toCatalystRDD(l, t.buildScan())) :: Nil
104+
case l @ LogicalRelation(baseRelation: TableScan) =>
105+
execution.PhysicalRDD.createFromDataSource(
106+
l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil
106107

107108
case i @ logical.InsertIntoTable(
108109
l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty =>
@@ -169,7 +170,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
169170
new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
170171
}
171172

172-
execution.PhysicalRDD(projections.map(_.toAttribute), unionedRows)
173+
execution.PhysicalRDD.createFromDataSource(
174+
projections.map(_.toAttribute),
175+
unionedRows,
176+
logicalRelation.relation)
173177
}
174178

175179
// TODO: refactor this thing. It is very complicated because it does projection internally.
@@ -299,14 +303,18 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
299303
projects.asInstanceOf[Seq[Attribute]] // Safe due to if above.
300304
.map(relation.attributeMap) // Match original case of attributes.
301305

302-
val scan = execution.PhysicalRDD(projects.map(_.toAttribute),
303-
scanBuilder(requestedColumns, pushedFilters))
306+
val scan = execution.PhysicalRDD.createFromDataSource(
307+
projects.map(_.toAttribute),
308+
scanBuilder(requestedColumns, pushedFilters),
309+
relation.relation)
304310
filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
305311
} else {
306312
val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq
307313

308-
val scan = execution.PhysicalRDD(requestedColumns,
309-
scanBuilder(requestedColumns, pushedFilters))
314+
val scan = execution.PhysicalRDD.createFromDataSource(
315+
requestedColumns,
316+
scanBuilder(requestedColumns, pushedFilters),
317+
relation.relation)
310318
execution.Project(projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
311319
}
312320
}

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ private[sql] abstract class OutputWriterInternal extends OutputWriter {
383383
abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec])
384384
extends BaseRelation with Logging {
385385

386-
logInfo("Constructing HadoopFsRelation")
386+
override def toString: String = getClass.getSimpleName + paths.mkString("[", ",", "]")
387387

388388
def this() = this(None)
389389

sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ class RowFormatConvertersSuite extends SparkPlanTest {
3232
case c: ConvertToSafe => c
3333
}
3434

35-
private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null))
35+
private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
3636
assert(!outputsSafe.outputsUnsafeRows)
37-
private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null))
37+
private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null, "name"))
3838
assert(outputsUnsafe.outputsUnsafeRows)
3939

4040
test("planner should insert unsafe->safe conversions when required") {

0 commit comments

Comments
 (0)