Skip to content

Commit 606f99b

Browse files
gatorsmiledavies
authored andcommitted
[SPARK-12288] [SQL] Support UnsafeRow in Coalesce/Except/Intersect.
Support UnsafeRow for the Coalesce/Except/Intersect. Could you review if my code changes are ok? davies Thank you! Author: gatorsmile <[email protected]> Closes #10285 from gatorsmile/unsafeSupportCIE.
1 parent d13ff82 commit 606f99b

File tree

2 files changed

+46
-1
lines changed

2 files changed

+46
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan {
137137
}
138138
}
139139
}
140-
override def outputsUnsafeRows: Boolean = children.forall(_.outputsUnsafeRows)
140+
override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
141141
override def canProcessUnsafeRows: Boolean = true
142142
override def canProcessSafeRows: Boolean = true
143143
protected override def doExecute(): RDD[InternalRow] =
@@ -250,7 +250,9 @@ case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode {
250250
child.execute().coalesce(numPartitions, shuffle = false)
251251
}
252252

253+
override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
253254
override def canProcessUnsafeRows: Boolean = true
255+
override def canProcessSafeRows: Boolean = true
254256
}
255257

256258
/**
@@ -263,6 +265,10 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
263265
protected override def doExecute(): RDD[InternalRow] = {
264266
left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
265267
}
268+
269+
override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
270+
override def canProcessUnsafeRows: Boolean = true
271+
override def canProcessSafeRows: Boolean = true
266272
}
267273

268274
/**
@@ -275,6 +281,10 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
275281
protected override def doExecute(): RDD[InternalRow] = {
276282
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
277283
}
284+
285+
override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
286+
override def canProcessUnsafeRows: Boolean = true
287+
override def canProcessSafeRows: Boolean = true
278288
}
279289

280290
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,41 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
5858
assert(!preparedPlan.outputsUnsafeRows)
5959
}
6060

61+
test("coalesce can process unsafe rows") {
62+
val plan = Coalesce(1, outputsUnsafe)
63+
val preparedPlan = sqlContext.prepareForExecution.execute(plan)
64+
assert(getConverters(preparedPlan).size === 1)
65+
assert(preparedPlan.outputsUnsafeRows)
66+
}
67+
68+
test("except can process unsafe rows") {
69+
val plan = Except(outputsUnsafe, outputsUnsafe)
70+
val preparedPlan = sqlContext.prepareForExecution.execute(plan)
71+
assert(getConverters(preparedPlan).size === 2)
72+
assert(preparedPlan.outputsUnsafeRows)
73+
}
74+
75+
test("except requires all of its input rows' formats to agree") {
76+
val plan = Except(outputsSafe, outputsUnsafe)
77+
assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
78+
val preparedPlan = sqlContext.prepareForExecution.execute(plan)
79+
assert(preparedPlan.outputsUnsafeRows)
80+
}
81+
82+
test("intersect can process unsafe rows") {
83+
val plan = Intersect(outputsUnsafe, outputsUnsafe)
84+
val preparedPlan = sqlContext.prepareForExecution.execute(plan)
85+
assert(getConverters(preparedPlan).size === 2)
86+
assert(preparedPlan.outputsUnsafeRows)
87+
}
88+
89+
test("intersect requires all of its input rows' formats to agree") {
90+
val plan = Intersect(outputsSafe, outputsUnsafe)
91+
assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
92+
val preparedPlan = sqlContext.prepareForExecution.execute(plan)
93+
assert(preparedPlan.outputsUnsafeRows)
94+
}
95+
6196
test("execute() fails an assertion if inputs rows are of different formats") {
6297
val e = intercept[AssertionError] {
6398
Union(Seq(outputsSafe, outputsUnsafe)).execute()

0 commit comments

Comments
 (0)