Skip to content

Commit 743b73d

Browse files
yucaicloud-fan
authored andcommitted
[SPARK-26909][FOLLOWUP][SQL] use unsafeRow.hashCode() as hash value in HashAggregate
## What changes were proposed in this pull request? This is a followup PR for #21149. New way uses unsafeRow.hashCode() as hash value in HashAggregate. The unsafe row has [null bit set] etc., so the hash should be different from shuffle hash, and then we don't need a special seed. ## How was this patch tested? UTs. Closes #23821 from yucai/unsafe_hash. Authored-by: yucai <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 865c88f commit 743b73d

File tree

1 file changed

+4
-10
lines changed

1 file changed

+4
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,7 @@ case class HashAggregateExec(
742742
val fastRowKeys = ctx.generateExpressions(
743743
bindReferences[Expression](groupingExpressions, child.output))
744744
val unsafeRowKeys = unsafeRowKeyCode.value
745+
val unsafeRowKeyHash = ctx.freshName("unsafeRowKeyHash")
745746
val unsafeRowBuffer = ctx.freshName("unsafeRowAggBuffer")
746747
val fastRowBuffer = ctx.freshName("fastAggBuffer")
747748

@@ -755,13 +756,6 @@ case class HashAggregateExec(
755756
}
756757
}
757758

758-
// generate hash code for key
759-
// SPARK-24076: HashAggregate uses the same hash algorithm on the same expressions
760-
// as ShuffleExchange, it may lead to bad hash conflict when shuffle.partitions=8192*n,
761-
// pick a different seed to avoid this conflict
762-
val hashExpr = Murmur3Hash(groupingExpressions, 48)
763-
val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx)
764-
765759
val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter,
766760
incCounter) = if (testFallbackStartsAt.isDefined) {
767761
val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "fallbackCounter")
@@ -777,11 +771,11 @@ case class HashAggregateExec(
777771
s"""
778772
|// generate grouping key
779773
|${unsafeRowKeyCode.code}
780-
|${hashEval.code}
774+
|int $unsafeRowKeyHash = ${unsafeRowKeyCode.value}.hashCode();
781775
|if ($checkFallbackForBytesToBytesMap) {
782776
| // try to get the buffer from hash map
783777
| $unsafeRowBuffer =
784-
| $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value});
778+
| $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, $unsafeRowKeyHash);
785779
|}
786780
|// Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
787781
|// aggregation after processing all input rows.
@@ -795,7 +789,7 @@ case class HashAggregateExec(
795789
| // the hash map had be spilled, it should have enough memory now,
796790
| // try to allocate buffer again.
797791
| $unsafeRowBuffer = $hashMapTerm.getAggregationBufferFromUnsafeRow(
798-
| $unsafeRowKeys, ${hashEval.value});
792+
| $unsafeRowKeys, $unsafeRowKeyHash);
799793
| if ($unsafeRowBuffer == null) {
800794
| // failed to allocate the first page
801795
| throw new $oomeClassName("No enough memory for aggregation");

0 commit comments

Comments
 (0)