Skip to content

Commit 0da7bd5

Browse files
Davies Liurxin
authored andcommitted
[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][SQL] always output UnsafeRow
It's confusing that some operator output UnsafeRow but some not, easy to make mistake. This PR change to only output UnsafeRow for all the operators (SparkPlan), removed the rule to insert Unsafe/Safe conversions. For those that can't output UnsafeRow directly, added UnsafeProjection into them. Closes #10330 cc JoshRosen rxin Author: Davies Liu <[email protected]> Closes #10511 from davies/unsafe_row.
1 parent 6c20b3c commit 0da7bd5

34 files changed

+74
-574
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -904,8 +904,7 @@ class SQLContext private[sql](
904904
@transient
905905
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
906906
val batches = Seq(
907-
Batch("Add exchange", Once, EnsureRequirements(self)),
908-
Batch("Add row converters", Once, EnsureRowFormats)
907+
Batch("Add exchange", Once, EnsureRequirements(self))
909908
)
910909
}
911910

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import org.apache.spark.sql.SQLContext
2828
import org.apache.spark.sql.catalyst.InternalRow
2929
import org.apache.spark.sql.catalyst.errors.attachTree
3030
import org.apache.spark.sql.catalyst.expressions._
31-
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
3231
import org.apache.spark.sql.catalyst.plans.physical._
3332
import org.apache.spark.sql.catalyst.rules.Rule
3433
import org.apache.spark.util.MutablePair
@@ -50,26 +49,14 @@ case class Exchange(
5049
case None => ""
5150
}
5251

53-
val simpleNodeName = if (tungstenMode) "TungstenExchange" else "Exchange"
52+
val simpleNodeName = "Exchange"
5453
s"$simpleNodeName$extraInfo"
5554
}
5655

57-
/**
58-
* Returns true iff we can support the data type, and we are not doing range partitioning.
59-
*/
60-
private lazy val tungstenMode: Boolean = !newPartitioning.isInstanceOf[RangePartitioning]
61-
6256
override def outputPartitioning: Partitioning = newPartitioning
6357

6458
override def output: Seq[Attribute] = child.output
6559

66-
// This setting is somewhat counterintuitive:
67-
// If the schema works with UnsafeRow, then we tell the planner that we don't support safe row,
68-
// so the planner inserts a converter to convert data into UnsafeRow if needed.
69-
override def outputsUnsafeRows: Boolean = tungstenMode
70-
override def canProcessSafeRows: Boolean = !tungstenMode
71-
override def canProcessUnsafeRows: Boolean = tungstenMode
72-
7360
/**
7461
* Determines whether records must be defensively copied before being sent to the shuffle.
7562
* Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
@@ -130,15 +117,7 @@ case class Exchange(
130117
}
131118
}
132119

133-
@transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf
134-
135-
private val serializer: Serializer = {
136-
if (tungstenMode) {
137-
new UnsafeRowSerializer(child.output.size)
138-
} else {
139-
new SparkSqlSerializer(sparkConf)
140-
}
141-
}
120+
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
142121

143122
override protected def doPrepare(): Unit = {
144123
// If an ExchangeCoordinator is needed, we register this Exchange operator

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
2020
import org.apache.spark.rdd.RDD
2121
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
2222
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
23-
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericMutableRow}
23+
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, Attribute, AttributeSet, GenericMutableRow}
2424
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
2525
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
2626
import org.apache.spark.sql.types.DataType
@@ -99,10 +99,19 @@ private[sql] case class PhysicalRDD(
9999
rdd: RDD[InternalRow],
100100
override val nodeName: String,
101101
override val metadata: Map[String, String] = Map.empty,
102-
override val outputsUnsafeRows: Boolean = false)
102+
isUnsafeRow: Boolean = false)
103103
extends LeafNode {
104104

105-
protected override def doExecute(): RDD[InternalRow] = rdd
105+
protected override def doExecute(): RDD[InternalRow] = {
106+
if (isUnsafeRow) {
107+
rdd
108+
} else {
109+
rdd.mapPartitionsInternal { iter =>
110+
val proj = UnsafeProjection.create(schema)
111+
iter.map(proj)
112+
}
113+
}
114+
}
106115

107116
override def simpleString: String = {
108117
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"

sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,20 +41,11 @@ case class Expand(
4141
// as UNKNOWN partitioning
4242
override def outputPartitioning: Partitioning = UnknownPartitioning(0)
4343

44-
override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
45-
override def canProcessUnsafeRows: Boolean = true
46-
override def canProcessSafeRows: Boolean = true
47-
4844
override def references: AttributeSet =
4945
AttributeSet(projections.flatten.flatMap(_.references))
5046

51-
private[this] val projection = {
52-
if (outputsUnsafeRows) {
53-
(exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
54-
} else {
55-
(exprs: Seq[Expression]) => newMutableProjection(exprs, child.output)()
56-
}
57-
}
47+
private[this] val projection =
48+
(exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
5849

5950
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
6051
child.execute().mapPartitions { iter =>

sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ case class Generate(
6464
child.execute().mapPartitionsInternal { iter =>
6565
val generatorNullRow = InternalRow.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null))
6666
val joinedRow = new JoinedRow
67+
val proj = UnsafeProjection.create(output, output)
6768

6869
iter.flatMap { row =>
6970
// we should always set the left (child output)
@@ -77,13 +78,14 @@ case class Generate(
7778
} ++ LazyIterator(() => boundGenerator.terminate()).map { row =>
7879
// we leave the left side as the last element of its child output
7980
// keep it the same as Hive does
80-
joinedRow.withRight(row)
81+
proj(joinedRow.withRight(row))
8182
}
8283
}
8384
} else {
8485
child.execute().mapPartitionsInternal { iter =>
85-
iter.flatMap(row => boundGenerator.eval(row)) ++
86-
LazyIterator(() => boundGenerator.terminate())
86+
val proj = UnsafeProjection.create(output, output)
87+
(iter.flatMap(row => boundGenerator.eval(row)) ++
88+
LazyIterator(() => boundGenerator.terminate())).map(proj)
8789
}
8890
}
8991
}

sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.rdd.RDD
2121
import org.apache.spark.sql.catalyst.InternalRow
22-
import org.apache.spark.sql.catalyst.expressions.Attribute
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
2323

2424

2525
/**
@@ -29,15 +29,20 @@ private[sql] case class LocalTableScan(
2929
output: Seq[Attribute],
3030
rows: Seq[InternalRow]) extends LeafNode {
3131

32-
private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
32+
private val unsafeRows: Array[InternalRow] = {
33+
val proj = UnsafeProjection.create(output, output)
34+
rows.map(r => proj(r).copy()).toArray
35+
}
36+
37+
private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows)
3338

3439
protected override def doExecute(): RDD[InternalRow] = rdd
3540

3641
override def executeCollect(): Array[InternalRow] = {
37-
rows.toArray
42+
unsafeRows
3843
}
3944

4045
override def executeTake(limit: Int): Array[InternalRow] = {
41-
rows.take(limit).toArray
46+
unsafeRows.take(limit)
4247
}
4348
}

sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,6 @@ case class Sort(
3939
testSpillFrequency: Int = 0)
4040
extends UnaryNode {
4141

42-
override def outputsUnsafeRows: Boolean = true
43-
override def canProcessUnsafeRows: Boolean = true
44-
override def canProcessSafeRows: Boolean = false
45-
4642
override def output: Seq[Attribute] = child.output
4743

4844
override def outputOrdering: Seq[SortOrder] = sortOrder

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -97,36 +97,13 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
9797
/** Specifies sort order for each partition requirements on the input data for this operator. */
9898
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
9999

100-
/** Specifies whether this operator outputs UnsafeRows */
101-
def outputsUnsafeRows: Boolean = false
102-
103-
/** Specifies whether this operator is capable of processing UnsafeRows */
104-
def canProcessUnsafeRows: Boolean = false
105-
106-
/**
107-
* Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows
108-
* that are not UnsafeRows).
109-
*/
110-
def canProcessSafeRows: Boolean = true
111100

112101
/**
113102
* Returns the result of this query as an RDD[InternalRow] by delegating to doExecute
114103
* after adding query plan information to created RDDs for visualization.
115104
* Concrete implementations of SparkPlan should override doExecute instead.
116105
*/
117106
final def execute(): RDD[InternalRow] = {
118-
if (children.nonEmpty) {
119-
val hasUnsafeInputs = children.exists(_.outputsUnsafeRows)
120-
val hasSafeInputs = children.exists(!_.outputsUnsafeRows)
121-
assert(!(hasSafeInputs && hasUnsafeInputs),
122-
"Child operators should output rows in the same format")
123-
assert(canProcessSafeRows || canProcessUnsafeRows,
124-
"Operator must be able to process at least one row format")
125-
assert(!hasSafeInputs || canProcessSafeRows,
126-
"Operator will receive safe rows as input but cannot process safe rows")
127-
assert(!hasUnsafeInputs || canProcessUnsafeRows,
128-
"Operator will receive unsafe rows as input but cannot process unsafe rows")
129-
}
130107
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
131108
prepare()
132109
doExecute()

sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,6 @@ case class Window(
100100

101101
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
102102

103-
override def canProcessUnsafeRows: Boolean = true
104-
105103
/**
106104
* Create a bound ordering object for a given frame type and offset. A bound ordering object is
107105
* used to determine which input row lies within the frame boundaries of an output row.
@@ -259,16 +257,16 @@ case class Window(
259257
* @return the final resulting projection.
260258
*/
261259
private[this] def createResultProjection(
262-
expressions: Seq[Expression]): MutableProjection = {
260+
expressions: Seq[Expression]): UnsafeProjection = {
263261
val references = expressions.zipWithIndex.map{ case (e, i) =>
264262
// Results of window expressions will be on the right side of child's output
265263
BoundReference(child.output.size + i, e.dataType, e.nullable)
266264
}
267265
val unboundToRefMap = expressions.zip(references).toMap
268266
val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap))
269-
newMutableProjection(
267+
UnsafeProjection.create(
270268
projectList ++ patchedWindowExpression,
271-
child.output)()
269+
child.output)
272270
}
273271

274272
protected override def doExecute(): RDD[InternalRow] = {

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,6 @@ case class SortBasedAggregate(
4949
"numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
5050
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
5151

52-
override def outputsUnsafeRows: Boolean = true
53-
override def canProcessUnsafeRows: Boolean = false
54-
override def canProcessSafeRows: Boolean = true
55-
5652
override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
5753

5854
override def requiredChildDistribution: List[Distribution] = {

0 commit comments

Comments
 (0)