From 71c328487acbcd79c14de85c867522c8960ee52d Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 3 Nov 2016 11:16:56 +0900 Subject: [PATCH 01/13] initial commit --- .../spark/sql/catalyst/expressions/hash.scala | 16 ++++--- .../org/apache/spark/sql/DataFrameSuite.scala | 44 +++++++++++++++++++ 2 files changed, 54 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala index 415ef4e4a37ec..021510551290a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala @@ -268,15 +268,16 @@ abstract class HashExpression[E] extends Expression { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { ev.isNull = "false" - val childrenHash = children.map { child => + val childrenHash = ctx.splitExpressions(ctx.INPUT_ROW, children.map { child => val childGen = child.genCode(ctx) childGen.code + ctx.nullSafeExec(child.nullable, childGen.isNull) { computeHash(childGen.value, child.dataType, ev.value, ctx) } - }.mkString("\n") + }) + ctx.addMutableState(ctx.javaType(dataType), ev.value, "") ev.copy(code = s""" - ${ctx.javaType(dataType)} ${ev.value} = $seed; + ${ev.value} = $seed; $childrenHash""") } @@ -600,15 +601,18 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { ev.isNull = "false" val childHash = ctx.freshName("childHash") - val childrenHash = children.map { child => + val childrenHash = ctx.splitExpressions(ctx.INPUT_ROW, children.map { child => val childGen = child.genCode(ctx) childGen.code + ctx.nullSafeExec(child.nullable, childGen.isNull) { computeHash(childGen.value, child.dataType, childHash, ctx) } + s"${ev.value} = (31 * ${ev.value}) + $childHash;" - }.mkString(s"int $childHash = 0;", s"\n$childHash = 0;\n", "") + s"\n$childHash = 0;" + }) + ctx.addMutableState(ctx.javaType(dataType), ev.value, "") + ctx.addMutableState("int", childHash, s"$childHash = 0;") ev.copy(code = s""" - ${ctx.javaType(dataType)} ${ev.value} = $seed; + ${ev.value} = $seed; $childrenHash""") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f5bc8785d5a2c..96ab37873f296 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1728,4 +1728,48 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val df = spark.createDataFrame(spark.sparkContext.makeRDD(rows), schema) assert(df.filter($"array1" === $"array2").count() == 1) } + + test("SPARK-18207: Compute hash for wider table") { + import org.apache.spark.sql.types.{StructType, StringType} + // nest some data structures + val ndr = NestedDR("1", Seq(new DR("1", "2"))) + val dsNDR = spark.createDataset(Seq(ndr)) + + val COLMAX = 1000 + val schema: StructType = (1 to COLMAX) + .foldLeft(new StructType())((s, i) => s.add(s"g$i", StringType, nullable = true)) + val rdds = spark.sparkContext.parallelize(Seq(Row.fromSeq((1 to COLMAX).map(_.toString)))) + val wideDF = spark.createDataFrame(rdds, schema) + wideDF.createOrReplaceTempView("wide") + + // now explode it out + val exploded = dsNDR.explode($"r")(DRo.rowToClass) + exploded.createOrReplaceTempView("exploded") + + val topDf = spark.sqlContext.sql(""" + select * from ( + select *, row_number() OVER (PARTITION BY f1 ORDER BY f2 desc) as d_rank + from exploded + ) inner_table + where inner_table.d_rank = 1""").toDF + topDf.createOrReplaceTempView("d_top1_temp") + + // create datasets to union + val widePlus = spark.sqlContext.sql("select * from wide, d_top1_temp where wide.g1 = d_top1_temp.m1") + widePlus.createOrReplaceTempView("wide_plus") + val widePlus2 = widePlus.withColumn("d_rank", lit(0)) + widePlus2.createOrReplaceTempView("wide_plus2") + + val df = spark.sqlContext.sql("select * from wide_plus union select * from wide_plus2") + df.count + } +} + +case class DR(val f1: String = "", val f2: String = "") +object DRo { + def rowToClass(r: org.apache.spark.sql.Row) : Seq[DR] = { + if (r.getSeq[org.apache.spark.sql.Row](0) == null) { return Seq() } + r.getSeq[org.apache.spark.sql.Row](0).map(x => new DR()) + } } +case class NestedDR(m1: String, r: Seq[DR]) From aa11c595693ef045ef7e45301eb99185b7fbd0be Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 3 Nov 2016 12:00:21 +0900 Subject: [PATCH 02/13] fix scala style error --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 96ab37873f296..b5d6dcfbe1ff0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1755,7 +1755,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { topDf.createOrReplaceTempView("d_top1_temp") // create datasets to union - val widePlus = spark.sqlContext.sql("select * from wide, d_top1_temp where wide.g1 = d_top1_temp.m1") + val widePlus = spark.sqlContext.sql( + "select * from wide, d_top1_temp where wide.g1 = d_top1_temp.m1") widePlus.createOrReplaceTempView("wide_plus") val widePlus2 = widePlus.withColumn("d_rank", lit(0)) widePlus2.createOrReplaceTempView("wide_plus2") From 24a6320b85f0cda3a3b6c8919c61dbd4a8d82c04 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sat, 5 Nov 2016 17:15:46 +0900 Subject: [PATCH 03/13] addressed review comments --- .../spark/sql/catalyst/expressions/misc.scala | 1 + .../org/apache/spark/sql/DataFrameSuite.scala | 31 ++----------------- 2 files changed, 3 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index a874a1cf37086..0b6200bc6dd03 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -104,3 +104,4 @@ case class CurrentDatabase() extends LeafExpression with Unevaluable { override def nullable: Boolean = false override def prettyName: String = "current_database" } + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index b5d6dcfbe1ff0..2f33ddf0fb8fe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1731,32 +1731,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("SPARK-18207: Compute hash for wider table") { import org.apache.spark.sql.types.{StructType, StringType} - // nest some data structures - val ndr = NestedDR("1", Seq(new DR("1", "2"))) - val dsNDR = spark.createDataset(Seq(ndr)) val COLMAX = 1000 val schema: StructType = (1 to COLMAX) .foldLeft(new StructType())((s, i) => s.add(s"g$i", StringType, nullable = true)) val rdds = spark.sparkContext.parallelize(Seq(Row.fromSeq((1 to COLMAX).map(_.toString)))) val wideDF = spark.createDataFrame(rdds, schema) - wideDF.createOrReplaceTempView("wide") - - // now explode it out - val exploded = dsNDR.explode($"r")(DRo.rowToClass) - exploded.createOrReplaceTempView("exploded") - - val topDf = spark.sqlContext.sql(""" - select * from ( - select *, row_number() OVER (PARTITION BY f1 ORDER BY f2 desc) as d_rank - from exploded - ) inner_table - where inner_table.d_rank = 1""").toDF - topDf.createOrReplaceTempView("d_top1_temp") - - // create datasets to union - val widePlus = spark.sqlContext.sql( - "select * from wide, d_top1_temp where wide.g1 = d_top1_temp.m1") + + val widePlus = wideDF.withColumn("d_rank", lit(1)) widePlus.createOrReplaceTempView("wide_plus") val widePlus2 = widePlus.withColumn("d_rank", lit(0)) widePlus2.createOrReplaceTempView("wide_plus2") @@ -1765,12 +1747,3 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { df.count } } - -case class DR(val f1: String = "", val f2: String = "") -object DRo { - def rowToClass(r: org.apache.spark.sql.Row) : Seq[DR] = { - if (r.getSeq[org.apache.spark.sql.Row](0) == null) { return Seq() } - r.getSeq[org.apache.spark.sql.Row](0).map(x => new DR()) - } -} -case class NestedDR(m1: String, r: Seq[DR]) From 3fdc8139ccbcf7adcf58814616b2868747d8eee3 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sat, 5 Nov 2016 21:47:20 +0900 Subject: [PATCH 04/13] delete empty line --- .../scala/org/apache/spark/sql/catalyst/expressions/misc.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index 0b6200bc6dd03..a874a1cf37086 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -104,4 +104,3 @@ case class CurrentDatabase() extends LeafExpression with Unevaluable { override def nullable: Boolean = false override def prettyName: String = "current_database" } - From 049e4775779de03db88553ad05357a7030ce676f Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 6 Nov 2016 12:44:06 +0900 Subject: [PATCH 05/13] fix test failures --- .../scala/org/apache/spark/sql/catalyst/expressions/hash.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala index 021510551290a..e14f0544c2b81 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala @@ -605,7 +605,7 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] { val childGen = child.genCode(ctx) childGen.code + ctx.nullSafeExec(child.nullable, childGen.isNull) { computeHash(childGen.value, child.dataType, childHash, ctx) - } + s"${ev.value} = (31 * ${ev.value}) + $childHash;" + } + s"${ev.value} = (31 * ${ev.value}) + $childHash;" + s"\n$childHash = 0;" }) From 6a57ba564f81ba9f5f04a94f2ca516fa0c441fd0 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 7 Nov 2016 12:35:44 +0900 Subject: [PATCH 06/13] addressed review comments --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 2f33ddf0fb8fe..c1c44be0e34e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1743,6 +1743,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val widePlus2 = widePlus.withColumn("d_rank", lit(0)) widePlus2.createOrReplaceTempView("wide_plus2") + // union operation in this SQL involves computation of hash for a row val df = spark.sqlContext.sql("select * from wide_plus union select * from wide_plus2") df.count } From e96742ed925549be8a297376da3de6b7b9856e2b Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 7 Nov 2016 23:52:10 +0900 Subject: [PATCH 07/13] addressed review comments --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c1c44be0e34e5..52886def67229 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1743,7 +1743,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val widePlus2 = widePlus.withColumn("d_rank", lit(0)) widePlus2.createOrReplaceTempView("wide_plus2") - // union operation in this SQL involves computation of hash for a row + // HashAggregate operation in this SQL union operator involves computation of hash for a row val df = spark.sqlContext.sql("select * from wide_plus union select * from wide_plus2") df.count } From 2c405a130c09258ed342a9d20ba99f0374af3f93 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 8 Nov 2016 04:20:34 +0900 Subject: [PATCH 08/13] addressed review comments --- .../expressions/HashExpressionsSuite.scala | 16 ++++++++++++++- .../org/apache/spark/sql/DataFrameSuite.scala | 20 ------------------- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala index c714bc03dc0d5..49d73719c31f4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala @@ -20,11 +20,12 @@ package org.apache.spark.sql.catalyst.expressions import java.nio.charset.StandardCharsets import org.apache.commons.codec.digest.DigestUtils - import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.encoders.{ExamplePointUDT, RowEncoder} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { @@ -124,6 +125,19 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { new StructType().add("array", arrayOfString).add("map", mapOfString)) .add("structOfUDT", structOfUDT)) + test("SPARK-18207: Compute hash for a lot of String expressions") { + val N = 1000 + val wideRow = new GenericInternalRow( + (1 to N).map(i => UTF8String.fromString(i.toString)).toArray[Any]) + val schema = StructType((1 to N).map(i => StructField("", StringType))) + + val exprs = schema.fields.zipWithIndex.map { case (f, i) => + BoundReference(i, f.dataType, true) + } + val hashExpr = Murmur3Hash(exprs, 42) + GenerateMutableProjection.generate(Seq(hashExpr)) + } + private def testHash(inputSchema: StructType): Unit = { val inputGenerator = RandomDataGenerator.forType(inputSchema, nullable = false).get val encoder = RowEncoder(inputSchema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 52886def67229..946b669de280a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1728,23 +1728,3 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val df = spark.createDataFrame(spark.sparkContext.makeRDD(rows), schema) assert(df.filter($"array1" === $"array2").count() == 1) } - - test("SPARK-18207: Compute hash for wider table") { - import org.apache.spark.sql.types.{StructType, StringType} - - val COLMAX = 1000 - val schema: StructType = (1 to COLMAX) - .foldLeft(new StructType())((s, i) => s.add(s"g$i", StringType, nullable = true)) - val rdds = spark.sparkContext.parallelize(Seq(Row.fromSeq((1 to COLMAX).map(_.toString)))) - val wideDF = spark.createDataFrame(rdds, schema) - - val widePlus = wideDF.withColumn("d_rank", lit(1)) - widePlus.createOrReplaceTempView("wide_plus") - val widePlus2 = widePlus.withColumn("d_rank", lit(0)) - widePlus2.createOrReplaceTempView("wide_plus2") - - // HashAggregate operation in this SQL union operator involves computation of hash for a row - val df = spark.sqlContext.sql("select * from wide_plus union select * from wide_plus2") - df.count - } -} From a2544df8b10c611db19e071acb46b714e60dc51e Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 8 Nov 2016 04:28:19 +0900 Subject: [PATCH 09/13] addressed review comments --- .../sql/catalyst/expressions/HashExpressionsSuite.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala index 49d73719c31f4..ab633701d6764 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala @@ -125,7 +125,7 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { new StructType().add("array", arrayOfString).add("map", mapOfString)) .add("structOfUDT", structOfUDT)) - test("SPARK-18207: Compute hash for a lot of String expressions") { + test("SPARK-18207: Compute hash for a lot of expressions") { val N = 1000 val wideRow = new GenericInternalRow( (1 to N).map(i => UTF8String.fromString(i.toString)).toArray[Any]) @@ -134,8 +134,11 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val exprs = schema.fields.zipWithIndex.map { case (f, i) => BoundReference(i, f.dataType, true) } - val hashExpr = Murmur3Hash(exprs, 42) - GenerateMutableProjection.generate(Seq(hashExpr)) + val murmur3HashExpr = Murmur3Hash(exprs, 42) + GenerateMutableProjection.generate(Seq(murmur3HashExpr)) + + val hiveHashExpr = HiveHash(exprs) + GenerateMutableProjection.generate(Seq(hiveHashExpr)) } private def testHash(inputSchema: StructType): Unit = { From 798fd85524e3df45b480382f155d660f8c640711 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 8 Nov 2016 05:02:08 +0900 Subject: [PATCH 10/13] addressed review comments --- .../sql/catalyst/expressions/HashExpressionsSuite.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala index ab633701d6764..e245d9eca34fe 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import java.nio.charset.StandardCharsets import org.apache.commons.codec.digest.DigestUtils + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.encoders.{ExamplePointUDT, RowEncoder} @@ -135,10 +136,12 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { BoundReference(i, f.dataType, true) } val murmur3HashExpr = Murmur3Hash(exprs, 42) - GenerateMutableProjection.generate(Seq(murmur3HashExpr)) + val murmur3HashPlan = GenerateMutableProjection.generate(Seq(murmur3HashExpr)) + assert(murmur3HashPlan(wideRow).getInt(0) == 58499324) val hiveHashExpr = HiveHash(exprs) - GenerateMutableProjection.generate(Seq(hiveHashExpr)) + val hiveHashPlan = GenerateMutableProjection.generate(Seq(hiveHashExpr)) + assert(hiveHashPlan(wideRow).getInt(0) == 117331003) } private def testHash(inputSchema: StructType): Unit = { From 61504c38af27d3ede6a13930c421274bebb3d602 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 8 Nov 2016 05:05:41 +0900 Subject: [PATCH 11/13] addressed review comments --- .../spark/sql/catalyst/expressions/HashExpressionsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala index e245d9eca34fe..acffd6d4a5b29 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala @@ -129,7 +129,7 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("SPARK-18207: Compute hash for a lot of expressions") { val N = 1000 val wideRow = new GenericInternalRow( - (1 to N).map(i => UTF8String.fromString(i.toString)).toArray[Any]) + Seq.tabulate(N)(i => UTF8String.fromString(i.toString)).toArray[Any]) val schema = StructType((1 to N).map(i => StructField("", StringType))) val exprs = schema.fields.zipWithIndex.map { case (f, i) => From fa315216f040339c9307169e754d476d60ea653f Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 8 Nov 2016 05:17:12 +0900 Subject: [PATCH 12/13] fix compilation error --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 946b669de280a..8010c9e00077a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1728,3 +1728,5 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val df = spark.createDataFrame(spark.sparkContext.makeRDD(rows), schema) assert(df.filter($"array1" === $"array2").count() == 1) } +} + From a2b240808fc449c919c85ebe2eb840ca131ab8a4 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 8 Nov 2016 10:43:07 +0900 Subject: [PATCH 13/13] addressed review comments --- .../sql/catalyst/expressions/HashExpressionsSuite.scala | 6 ++++-- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 1 - 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala index acffd6d4a5b29..032629265269a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HashExpressionsSuite.scala @@ -137,11 +137,13 @@ class HashExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } val murmur3HashExpr = Murmur3Hash(exprs, 42) val murmur3HashPlan = GenerateMutableProjection.generate(Seq(murmur3HashExpr)) - assert(murmur3HashPlan(wideRow).getInt(0) == 58499324) + val murmursHashEval = Murmur3Hash(exprs, 42).eval(wideRow) + assert(murmur3HashPlan(wideRow).getInt(0) == murmursHashEval) val hiveHashExpr = HiveHash(exprs) val hiveHashPlan = GenerateMutableProjection.generate(Seq(hiveHashExpr)) - assert(hiveHashPlan(wideRow).getInt(0) == 117331003) + val hiveHashEval = HiveHash(exprs).eval(wideRow) + assert(hiveHashPlan(wideRow).getInt(0) == hiveHashEval) } private def testHash(inputSchema: StructType): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 8010c9e00077a..f5bc8785d5a2c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1729,4 +1729,3 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(df.filter($"array1" === $"array2").count() == 1) } } -