diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 17cc7fde42bb..23ae1f0e2590 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -742,6 +742,7 @@ case class HashAggregateExec( val fastRowKeys = ctx.generateExpressions( bindReferences[Expression](groupingExpressions, child.output)) val unsafeRowKeys = unsafeRowKeyCode.value + val unsafeRowKeyHash = ctx.freshName("unsafeRowKeyHash") val unsafeRowBuffer = ctx.freshName("unsafeRowAggBuffer") val fastRowBuffer = ctx.freshName("fastAggBuffer") @@ -755,13 +756,6 @@ case class HashAggregateExec( } } - // generate hash code for key - // SPARK-24076: HashAggregate uses the same hash algorithm on the same expressions - // as ShuffleExchange, it may lead to bad hash conflict when shuffle.partitions=8192*n, - // pick a different seed to avoid this conflict - val hashExpr = Murmur3Hash(groupingExpressions, 48) - val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx) - val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter, incCounter) = if (testFallbackStartsAt.isDefined) { val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "fallbackCounter") @@ -777,11 +771,11 @@ case class HashAggregateExec( s""" |// generate grouping key |${unsafeRowKeyCode.code} - |${hashEval.code} + |int $unsafeRowKeyHash = ${unsafeRowKeyCode.value}.hashCode(); |if ($checkFallbackForBytesToBytesMap) { | // try to get the buffer from hash map | $unsafeRowBuffer = - | $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value}); + | $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, $unsafeRowKeyHash); |} |// Can't allocate buffer from the hash map. Spill the map and fallback to sort-based |// aggregation after processing all input rows. @@ -795,7 +789,7 @@ case class HashAggregateExec( | // the hash map had be spilled, it should have enough memory now, | // try to allocate buffer again. | $unsafeRowBuffer = $hashMapTerm.getAggregationBufferFromUnsafeRow( - | $unsafeRowKeys, ${hashEval.value}); + | $unsafeRowKeys, $unsafeRowKeyHash); | if ($unsafeRowBuffer == null) { | // failed to allocate the first page | throw new $oomeClassName("No enough memory for aggregation");