Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,16 @@ abstract class HashExpression[E] extends Expression {

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
ev.isNull = "false"
val childrenHash = children.map { child =>
val childrenHash = ctx.splitExpressions(ctx.INPUT_ROW, children.map { child =>
val childGen = child.genCode(ctx)
childGen.code + ctx.nullSafeExec(child.nullable, childGen.isNull) {
computeHash(childGen.value, child.dataType, ev.value, ctx)
}
}.mkString("\n")
})

ctx.addMutableState(ctx.javaType(dataType), ev.value, "")
ev.copy(code = s"""
${ctx.javaType(dataType)} ${ev.value} = $seed;
${ev.value} = $seed;
$childrenHash""")
}

Expand Down Expand Up @@ -600,15 +601,18 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] {
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
ev.isNull = "false"
val childHash = ctx.freshName("childHash")
val childrenHash = children.map { child =>
val childrenHash = ctx.splitExpressions(ctx.INPUT_ROW, children.map { child =>
val childGen = child.genCode(ctx)
childGen.code + ctx.nullSafeExec(child.nullable, childGen.isNull) {
computeHash(childGen.value, child.dataType, childHash, ctx)
} + s"${ev.value} = (31 * ${ev.value}) + $childHash;"
}.mkString(s"int $childHash = 0;", s"\n$childHash = 0;\n", "")
} + s"${ev.value} = (31 * ${ev.value}) + $childHash;" +
s"\n$childHash = 0;"
})

ctx.addMutableState(ctx.javaType(dataType), ev.value, "")
ctx.addMutableState("int", childHash, s"$childHash = 0;")
ev.copy(code = s"""
${ctx.javaType(dataType)} ${ev.value} = $seed;
${ev.value} = $seed;
$childrenHash""")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import org.apache.commons.codec.digest.DigestUtils
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.encoders.{ExamplePointUDT, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {

Expand Down Expand Up @@ -124,6 +126,26 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
new StructType().add("array", arrayOfString).add("map", mapOfString))
.add("structOfUDT", structOfUDT))

test("SPARK-18207: Compute hash for a lot of expressions") {
val N = 1000
val wideRow = new GenericInternalRow(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are you using the wideRow?

Seq.tabulate(N)(i => UTF8String.fromString(i.toString)).toArray[Any])
val schema = StructType((1 to N).map(i => StructField("", StringType)))

val exprs = schema.fields.zipWithIndex.map { case (f, i) =>
BoundReference(i, f.dataType, true)
}
val murmur3HashExpr = Murmur3Hash(exprs, 42)
val murmur3HashPlan = GenerateMutableProjection.generate(Seq(murmur3HashExpr))
val murmursHashEval = Murmur3Hash(exprs, 42).eval(wideRow)
assert(murmur3HashPlan(wideRow).getInt(0) == murmursHashEval)

val hiveHashExpr = HiveHash(exprs)
val hiveHashPlan = GenerateMutableProjection.generate(Seq(hiveHashExpr))
val hiveHashEval = HiveHash(exprs).eval(wideRow)
assert(hiveHashPlan(wideRow).getInt(0) == hiveHashEval)
}

private def testHash(inputSchema: StructType): Unit = {
val inputGenerator = RandomDataGenerator.forType(inputSchema, nullable = false).get
val encoder = RowEncoder(inputSchema)
Expand Down