Skip to content

Commit 680bc06

Browse files
committed
[SQL] Don't shuffle code generated rows
When sort based shuffle and code gen are on we were trying to ship the code generated rows during a shuffle. This doesn't work because the classes don't exist on the other side. Instead we now copy into a generic row before shipping. Author: Michael Armbrust <[email protected]> Closes #3263 from marmbrus/aggCodeGen and squashes the following commits: f6ba8cf [Michael Armbrust] fix and test (cherry picked from commit 4b4b50c) Signed-off-by: Michael Armbrust <[email protected]>
1 parent e35672e commit 680bc06

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
4747
// TODO: Eliminate redundant expressions in grouping key and value.
4848
val rdd = if (sortBasedShuffleOn) {
4949
child.execute().mapPartitions { iter =>
50-
val hashExpressions = newProjection(expressions, child.output)
51-
iter.map(r => (hashExpressions(r), r.copy()))
50+
val hashExpressions = newMutableProjection(expressions, child.output)()
51+
iter.map(r => (hashExpressions(r).copy(), r.copy()))
5252
}
5353
} else {
5454
child.execute().mapPartitions { iter =>

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
7272
2.5)
7373
}
7474

75+
test("aggregation with codegen") {
76+
val originalValue = codegenEnabled
77+
setConf(SQLConf.CODEGEN_ENABLED, "true")
78+
sql("SELECT key FROM testData GROUP BY key").collect()
79+
setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString)
80+
}
81+
7582
test("SPARK-3176 Added Parser of SQL LAST()") {
7683
checkAnswer(
7784
sql("SELECT LAST(n) FROM lowerCaseData"),

0 commit comments

Comments
 (0)