Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ case class HashAggregateExec(
val fastRowKeys = ctx.generateExpressions(
bindReferences[Expression](groupingExpressions, child.output))
val unsafeRowKeys = unsafeRowKeyCode.value
val unsafeRowKeyHash = ctx.freshName("unsafeRowKeyHash")
val unsafeRowBuffer = ctx.freshName("unsafeRowAggBuffer")
val fastRowBuffer = ctx.freshName("fastAggBuffer")

Expand All @@ -755,13 +756,6 @@ case class HashAggregateExec(
}
}

// generate hash code for key
// SPARK-24076: HashAggregate uses the same hash algorithm on the same expressions
// as ShuffleExchange, it may lead to bad hash conflict when shuffle.partitions=8192*n,
// pick a different seed to avoid this conflict
val hashExpr = Murmur3Hash(groupingExpressions, 48)
val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx)

val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter,
incCounter) = if (testFallbackStartsAt.isDefined) {
val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "fallbackCounter")
Expand All @@ -777,11 +771,11 @@ case class HashAggregateExec(
s"""
|// generate grouping key
|${unsafeRowKeyCode.code}
|${hashEval.code}
|int $unsafeRowKeyHash = ${unsafeRowKeyCode.value}.hashCode();
|if ($checkFallbackForBytesToBytesMap) {
| // try to get the buffer from hash map
| $unsafeRowBuffer =
| $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value});
| $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, $unsafeRowKeyHash);
|}
|// Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
|// aggregation after processing all input rows.
Expand All @@ -795,7 +789,7 @@ case class HashAggregateExec(
| // the hash map had be spilled, it should have enough memory now,
| // try to allocate buffer again.
| $unsafeRowBuffer = $hashMapTerm.getAggregationBufferFromUnsafeRow(
| $unsafeRowKeys, ${hashEval.value});
| $unsafeRowKeys, $unsafeRowKeyHash);
| if ($unsafeRowBuffer == null) {
| // failed to allocate the first page
| throw new $oomeClassName("No enough memory for aggregation");
Expand Down