Skip to content

Commit e6a02c6

Browse files
Davies Liudavies
authored andcommitted
[SPARK-12914] [SQL] generate aggregation with grouping keys
This PR add support for grouping keys for generated TungstenAggregate. Spilling and performance improvements for BytesToBytesMap will be done by followup PR. Author: Davies Liu <[email protected]> Closes #10855 from davies/gen_keys.
1 parent 12252d1 commit e6a02c6

File tree

6 files changed

+393
-53
lines changed

6 files changed

+393
-53
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,20 @@ class CodegenContext {
5555
*/
5656
val references: mutable.ArrayBuffer[Any] = new mutable.ArrayBuffer[Any]()
5757

58+
/**
59+
* Add an object to `references`, create a class member to access it.
60+
*
61+
* Returns the name of class member.
62+
*/
63+
def addReferenceObj(name: String, obj: Any, className: String = null): String = {
64+
val term = freshName(name)
65+
val idx = references.length
66+
references += obj
67+
val clsName = Option(className).getOrElse(obj.getClass.getName)
68+
addMutableState(clsName, term, s"this.$term = ($clsName) references[$idx];")
69+
term
70+
}
71+
5872
/**
5973
* Holding a list of generated columns as input of current operator, will be used by
6074
* BoundReference to generate code.
@@ -198,6 +212,39 @@ class CodegenContext {
198212
}
199213
}
200214

215+
/**
216+
* Update a column in MutableRow from ExprCode.
217+
*/
218+
def updateColumn(
219+
row: String,
220+
dataType: DataType,
221+
ordinal: Int,
222+
ev: ExprCode,
223+
nullable: Boolean): String = {
224+
if (nullable) {
225+
// Can't call setNullAt on DecimalType, because we need to keep the offset
226+
if (dataType.isInstanceOf[DecimalType]) {
227+
s"""
228+
if (!${ev.isNull}) {
229+
${setColumn(row, dataType, ordinal, ev.value)};
230+
} else {
231+
${setColumn(row, dataType, ordinal, "null")};
232+
}
233+
"""
234+
} else {
235+
s"""
236+
if (!${ev.isNull}) {
237+
${setColumn(row, dataType, ordinal, ev.value)};
238+
} else {
239+
$row.setNullAt($ordinal);
240+
}
241+
"""
242+
}
243+
} else {
244+
s"""${setColumn(row, dataType, ordinal, ev.value)};"""
245+
}
246+
}
247+
201248
/**
202249
* Returns the name used in accessor and setter for a Java primitive type.
203250
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -88,31 +88,8 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
8888

8989
val updates = validExpr.zip(index).map {
9090
case (e, i) =>
91-
if (e.nullable) {
92-
if (e.dataType.isInstanceOf[DecimalType]) {
93-
// Can't call setNullAt on DecimalType, because we need to keep the offset
94-
s"""
95-
if (this.isNull_$i) {
96-
${ctx.setColumn("mutableRow", e.dataType, i, "null")};
97-
} else {
98-
${ctx.setColumn("mutableRow", e.dataType, i, s"this.value_$i")};
99-
}
100-
"""
101-
} else {
102-
s"""
103-
if (this.isNull_$i) {
104-
mutableRow.setNullAt($i);
105-
} else {
106-
${ctx.setColumn("mutableRow", e.dataType, i, s"this.value_$i")};
107-
}
108-
"""
109-
}
110-
} else {
111-
s"""
112-
${ctx.setColumn("mutableRow", e.dataType, i, s"this.value_$i")};
113-
"""
114-
}
115-
91+
val ev = ExprCode("", s"this.isNull_$i", s"this.value_$i")
92+
ctx.updateColumn("mutableRow", e.dataType, i, ev, e.nullable)
11693
}
11794

11895
val allProjections = ctx.splitExpressions(ctx.INPUT_ROW, projectionCodes)

sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution;
1919

20+
import java.io.IOException;
21+
2022
import scala.collection.Iterator;
2123

2224
import org.apache.spark.sql.catalyst.InternalRow;
@@ -34,7 +36,7 @@ public class BufferedRowIterator {
3436
// used when there is no column in output
3537
protected UnsafeRow unsafeRow = new UnsafeRow(0);
3638

37-
public boolean hasNext() {
39+
public boolean hasNext() throws IOException {
3840
if (currentRow == null) {
3941
processNext();
4042
}
@@ -56,7 +58,7 @@ public void setInput(Iterator<InternalRow> iter) {
5658
*
5759
* After it's called, if currentRow is still null, it means no more rows left.
5860
*/
59-
protected void processNext() {
61+
protected void processNext() throws IOException {
6062
if (input.hasNext()) {
6163
currentRow = input.next();
6264
}

0 commit comments

Comments
 (0)