From d8b5f8d839d5c3388244cf2a6dcf4494d927145f Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 6 Mar 2017 15:42:10 +0900 Subject: [PATCH 01/12] Initial commit --- .../apache/spark/sql/execution/objects.scala | 30 +++++++++++++++++-- .../spark/sql/DatasetPrimitiveSuite.scala | 27 +++++++++++++++++ 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index 199ba5ce6969b..45db1f110f2d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import com.sun.org.apache.xalan.internal.xsltc.compiler.util.VoidType + import scala.language.existentials import org.apache.spark.api.java.function.MapFunction @@ -32,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState import org.apache.spark.sql.execution.streaming.KeyedStateImpl -import org.apache.spark.sql.types.{DataType, ObjectType, StructType} +import org.apache.spark.sql.types._ /** @@ -217,9 +219,33 @@ case class MapElementsExec( } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { + val inType = if (child.output.length == 1) child.output(0).dataType else NullType + val outType = outputObjAttr.dataType val (funcClass, methodName) = func match { case m: MapFunction[_, _] => classOf[MapFunction[_, _]] -> "call" - case _ => classOf[Any => Any] -> "apply" + case _ => + (inType, outType) match { + // if a pair of an argument and return types is one of specific types + // whose specialized method (apply$mc..$sp) is generated by scalac, + // Catalyst generated a direct method call to the specialized method. + case (IntegerType, IntegerType) => classOf[Int => Int] -> "apply$mcII$sp" + case (IntegerType, LongType) => classOf[Int => Long] -> "apply$mcJI$sp" + case (IntegerType, FloatType) => classOf[Int => Float] -> "apply$mcFI$sp" + case (IntegerType, DoubleType) => classOf[Int => Double] -> "apply$mcDI$sp" + case (LongType, IntegerType) => classOf[Long => Int] -> "apply$mcIJ$sp" + case (LongType, LongType) => classOf[Long => Long] -> "apply$mcJJ$sp" + case (LongType, FloatType) => classOf[Long => Float] -> "apply$mcFJ$sp" + case (LongType, DoubleType) => classOf[Long => Double] -> "apply$mcDJ$sp" + case (FloatType, IntegerType) => classOf[Float => Int] -> "apply$mcIF$sp" + case (FloatType, LongType) => classOf[Float => Long] -> "apply$mcJF$sp" + case (FloatType, FloatType) => classOf[Float => Float] -> "apply$mcFF$sp" + case (FloatType, DoubleType) => classOf[Float => Double] -> "apply$mcDF$sp" + case (DoubleType, IntegerType) => classOf[Double => Int] -> "apply$mcID$sp" + case (DoubleType, LongType) => classOf[Double => Long] -> "apply$mcJD$sp" + case (DoubleType, FloatType) => classOf[Double => Float] -> "apply$mcFD$sp" + case (DoubleType, DoubleType) => classOf[Double => Double] -> "apply$mcDD$sp" + case _ => classOf[Any => Any] -> "apply" + } } val funcObj = Literal.create(func, ObjectType(funcClass)) val callFunc = Invoke(funcObj, methodName, outputObjAttr.dataType, child.output) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala index 6b50cb3e48c76..8e52f36a18428 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala @@ -62,6 +62,33 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { 2, 3, 4) } + test("mapPrimitive") { + val dsInt = Seq(1, 2, 3).toDS() + checkDataset(dsInt.map(e => e + 1), 2, 3, 4) + checkDataset(dsInt.map(e => e + 8589934592L), 8589934593L, 8589934594L, 8589934595L) + checkDataset(dsInt.map(e => e + 1.1F), 2.1F, 3.1F, 4.1F) + checkDataset(dsInt.map(e => e + 1.23D), 2.23D, 3.23D, 4.23D) + + val dsLong = Seq(1L, 2L, 3L).toDS() + checkDataset(dsLong.map(e => (e + 1).toInt), 2, 3, 4) + checkDataset(dsLong.map(e => e + 8589934592L), 8589934593L, 8589934594L, 8589934595L) + checkDataset(dsLong.map(e => e + 1.1F), 2.1F, 3.1F, 4.1F) + checkDataset(dsLong.map(e => e + 1.23D), 2.23D, 3.23D, 4.23D) + + val dsFloat = Seq(1F, 2F, 3F).toDS() + checkDataset(dsFloat.map(e => (e + 1).toInt), 2, 3, 4) + checkDataset(dsFloat.map(e => (e + 123456L).toLong), 123457L, 123458L, 123459L) + checkDataset(dsFloat.map(e => e + 1.1F), 2.1F, 3.1F, 4.1F) + checkDataset(dsFloat.map(e => e + 1.23D), 2.23D, 3.23D, 4.23D) + + val dsDouble = Seq(1D, 2D, 3D).toDS() + checkDataset(dsDouble.map(e => (e + 1).toInt), 2, 3, 4) + checkDataset(dsDouble.map(e => (e + 8589934592L).toLong), + 8589934593L, 8589934594L, 8589934595L) + checkDataset(dsDouble.map(e => (e + 1.1F).toFloat), 2.1F, 3.1F, 4.1F) + checkDataset(dsDouble.map(e => e + 1.23D), 2.23D, 3.23D, 4.23D) + } + test("filter") { val ds = Seq(1, 2, 3, 4).toDS() checkDataset( From a8859078da4257ab6580889b74f463847d3dbb00 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 6 Mar 2017 17:54:53 +0900 Subject: [PATCH 02/12] fix scala style error --- .../src/main/scala/org/apache/spark/sql/execution/objects.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index 45db1f110f2d8..55d8ea0ad3df5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution -import com.sun.org.apache.xalan.internal.xsltc.compiler.util.VoidType - import scala.language.existentials import org.apache.spark.api.java.function.MapFunction From 65fa05a72be841219fd1a0ba65d88223ad7b79cb Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 6 Mar 2017 18:08:58 +0900 Subject: [PATCH 03/12] Simplify scope of varibles as needed --- .../main/scala/org/apache/spark/sql/execution/objects.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index 55d8ea0ad3df5..3ae44fd03a06c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -217,12 +217,11 @@ case class MapElementsExec( } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { - val inType = if (child.output.length == 1) child.output(0).dataType else NullType - val outType = outputObjAttr.dataType val (funcClass, methodName) = func match { case m: MapFunction[_, _] => classOf[MapFunction[_, _]] -> "call" case _ => - (inType, outType) match { + (if (child.output.length == 1) child.output(0).dataType else NullType, + outputObjAttr.dataType) match { // if a pair of an argument and return types is one of specific types // whose specialized method (apply$mc..$sp) is generated by scalac, // Catalyst generated a direct method call to the specialized method. From 1f9ece409144265023ba9f41790c35c96732a366 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 8 Mar 2017 19:02:00 +0900 Subject: [PATCH 04/12] add new benchmark suite with results --- .../apache/spark/sql/DatasetBenchmark.scala | 59 ++++++++++++++++++- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala index 66d94d6016050..b5f28010c041d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala @@ -31,6 +31,49 @@ object DatasetBenchmark { case class Data(l: Long, s: String) + def backToBackMapLong(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = { + import spark.implicits._ + + val rdd = spark.sparkContext.range(0, numRows) + val ds = spark.range(0, numRows) + val df = ds.toDF("l") + val func = (l: Long) => l + 1 + + val benchmark = new Benchmark("back-to-back map long", numRows) + + benchmark.addCase("RDD") { iter => + var res = rdd + var i = 0 + while (i < numChains) { + res = res.map(func) + i += 1 + } + res.foreach(_ => Unit) + } + + benchmark.addCase("DataFrame") { iter => + var res = df + var i = 0 + while (i < numChains) { + res = res.select($"l" + 1 as "l") + i += 1 + } + res.queryExecution.toRdd.foreach(_ => Unit) + } + + benchmark.addCase("Dataset") { iter => + var res = ds.as[Long] + var i = 0 + while (i < numChains) { + res = res.map(func) + i += 1 + } + res.queryExecution.toRdd.foreach(_ => Unit) + } + + benchmark + } + def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = { import spark.implicits._ @@ -165,10 +208,22 @@ object DatasetBenchmark { val numRows = 100000000 val numChains = 10 - val benchmark = backToBackMap(spark, numRows, numChains) + val benchmark0 = backToBackMapLong(spark, numRows, numChains) + val benchmark1 = backToBackMap(spark, numRows, numChains) val benchmark2 = backToBackFilter(spark, numRows, numChains) val benchmark3 = aggregate(spark, numRows) + /* + OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic + Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz + back-to-back map long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + RDD 1883 / 1892 53.1 18.8 1.0X + DataFrame 502 / 642 199.1 5.0 3.7X + Dataset 657 / 784 152.2 6.6 2.9X + */ + benchmark0.run() + /* OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64 Intel Xeon E3-12xx v2 (Ivy Bridge) @@ -178,7 +233,7 @@ object DatasetBenchmark { DataFrame 2647 / 3116 37.8 26.5 1.3X Dataset 4781 / 5155 20.9 47.8 0.7X */ - benchmark.run() + benchmark1.run() /* OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64 From dfbce2a484c1e7ea333677e2a6d61913ad9df846 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 8 Mar 2017 19:02:39 +0900 Subject: [PATCH 05/12] address review comments --- .../apache/spark/sql/execution/objects.scala | 51 ++++++++++--------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index 3ae44fd03a06c..1de083e41cf58 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -219,30 +219,33 @@ case class MapElementsExec( override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { val (funcClass, methodName) = func match { case m: MapFunction[_, _] => classOf[MapFunction[_, _]] -> "call" - case _ => - (if (child.output.length == 1) child.output(0).dataType else NullType, - outputObjAttr.dataType) match { - // if a pair of an argument and return types is one of specific types - // whose specialized method (apply$mc..$sp) is generated by scalac, - // Catalyst generated a direct method call to the specialized method. - case (IntegerType, IntegerType) => classOf[Int => Int] -> "apply$mcII$sp" - case (IntegerType, LongType) => classOf[Int => Long] -> "apply$mcJI$sp" - case (IntegerType, FloatType) => classOf[Int => Float] -> "apply$mcFI$sp" - case (IntegerType, DoubleType) => classOf[Int => Double] -> "apply$mcDI$sp" - case (LongType, IntegerType) => classOf[Long => Int] -> "apply$mcIJ$sp" - case (LongType, LongType) => classOf[Long => Long] -> "apply$mcJJ$sp" - case (LongType, FloatType) => classOf[Long => Float] -> "apply$mcFJ$sp" - case (LongType, DoubleType) => classOf[Long => Double] -> "apply$mcDJ$sp" - case (FloatType, IntegerType) => classOf[Float => Int] -> "apply$mcIF$sp" - case (FloatType, LongType) => classOf[Float => Long] -> "apply$mcJF$sp" - case (FloatType, FloatType) => classOf[Float => Float] -> "apply$mcFF$sp" - case (FloatType, DoubleType) => classOf[Float => Double] -> "apply$mcDF$sp" - case (DoubleType, IntegerType) => classOf[Double => Int] -> "apply$mcID$sp" - case (DoubleType, LongType) => classOf[Double => Long] -> "apply$mcJD$sp" - case (DoubleType, FloatType) => classOf[Double => Float] -> "apply$mcFD$sp" - case (DoubleType, DoubleType) => classOf[Double => Double] -> "apply$mcDD$sp" - case _ => classOf[Any => Any] -> "apply" - } + case _ => (child.output(0).dataType, outputObjAttr.dataType) match { + // if a pair of an argument and return types is one of specific types + // whose specialized method (apply$mc..$sp) is generated by scalac, + // Catalyst generated a direct method call to the specialized method. + // The followings are references for this specialization: + // https://github.com/scala/scala/blob/2.11.x/src/compiler/scala/tools/nsc/transform/ + // SpecializeTypes.scala + // http://www.cakesolutions.net/teamblogs/scala-dissection-functions + // http://axel22.github.io/2013/11/03/specialization-quirks.html + case (IntegerType, IntegerType) => classOf[Int => Int] -> "apply$mcII$sp" + case (IntegerType, LongType) => classOf[Int => Long] -> "apply$mcJI$sp" + case (IntegerType, FloatType) => classOf[Int => Float] -> "apply$mcFI$sp" + case (IntegerType, DoubleType) => classOf[Int => Double] -> "apply$mcDI$sp" + case (LongType, IntegerType) => classOf[Long => Int] -> "apply$mcIJ$sp" + case (LongType, LongType) => classOf[Long => Long] -> "apply$mcJJ$sp" + case (LongType, FloatType) => classOf[Long => Float] -> "apply$mcFJ$sp" + case (LongType, DoubleType) => classOf[Long => Double] -> "apply$mcDJ$sp" + case (FloatType, IntegerType) => classOf[Float => Int] -> "apply$mcIF$sp" + case (FloatType, LongType) => classOf[Float => Long] -> "apply$mcJF$sp" + case (FloatType, FloatType) => classOf[Float => Float] -> "apply$mcFF$sp" + case (FloatType, DoubleType) => classOf[Float => Double] -> "apply$mcDF$sp" + case (DoubleType, IntegerType) => classOf[Double => Int] -> "apply$mcID$sp" + case (DoubleType, LongType) => classOf[Double => Long] -> "apply$mcJD$sp" + case (DoubleType, FloatType) => classOf[Double => Float] -> "apply$mcFD$sp" + case (DoubleType, DoubleType) => classOf[Double => Double] -> "apply$mcDD$sp" + case _ => classOf[Any => Any] -> "apply" + } } val funcObj = Literal.create(func, ObjectType(funcClass)) val callFunc = Invoke(funcObj, methodName, outputObjAttr.dataType, child.output) From 8ee91af93c1d6f439cbef0e3aa47154b6881946d Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 9 Mar 2017 16:55:45 +0900 Subject: [PATCH 06/12] support Boolean for returnType Use compositional approach instead of enumeration approach --- .../apache/spark/sql/execution/objects.scala | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index 1de083e41cf58..29473aeface9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution import scala.language.existentials - import org.apache.spark.api.java.function.MapFunction import org.apache.spark.api.r._ import org.apache.spark.broadcast.Broadcast @@ -33,6 +32,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState import org.apache.spark.sql.execution.streaming.KeyedStateImpl import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** @@ -216,35 +216,38 @@ case class MapElementsExec( child.asInstanceOf[CodegenSupport].produce(ctx, this) } + private def getMethodType(dt: DataType, isOutput: Boolean): String = { + dt match { + case BooleanType if isOutput => "Z" + case IntegerType => "I" + case LongType => "J" + case FloatType => "F" + case DoubleType => "D" + case _ => null + } + } + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { val (funcClass, methodName) = func match { case m: MapFunction[_, _] => classOf[MapFunction[_, _]] -> "call" - case _ => (child.output(0).dataType, outputObjAttr.dataType) match { + // load "scala.Function1" using Java API to avoid requirements of type parameters + case _ => Utils.classForName("scala.Function1") -> { // if a pair of an argument and return types is one of specific types // whose specialized method (apply$mc..$sp) is generated by scalac, // Catalyst generated a direct method call to the specialized method. // The followings are references for this specialization: + // http://www.scala-lang.org/api/2.12.0/scala/Function1.html // https://github.com/scala/scala/blob/2.11.x/src/compiler/scala/tools/nsc/transform/ // SpecializeTypes.scala // http://www.cakesolutions.net/teamblogs/scala-dissection-functions // http://axel22.github.io/2013/11/03/specialization-quirks.html - case (IntegerType, IntegerType) => classOf[Int => Int] -> "apply$mcII$sp" - case (IntegerType, LongType) => classOf[Int => Long] -> "apply$mcJI$sp" - case (IntegerType, FloatType) => classOf[Int => Float] -> "apply$mcFI$sp" - case (IntegerType, DoubleType) => classOf[Int => Double] -> "apply$mcDI$sp" - case (LongType, IntegerType) => classOf[Long => Int] -> "apply$mcIJ$sp" - case (LongType, LongType) => classOf[Long => Long] -> "apply$mcJJ$sp" - case (LongType, FloatType) => classOf[Long => Float] -> "apply$mcFJ$sp" - case (LongType, DoubleType) => classOf[Long => Double] -> "apply$mcDJ$sp" - case (FloatType, IntegerType) => classOf[Float => Int] -> "apply$mcIF$sp" - case (FloatType, LongType) => classOf[Float => Long] -> "apply$mcJF$sp" - case (FloatType, FloatType) => classOf[Float => Float] -> "apply$mcFF$sp" - case (FloatType, DoubleType) => classOf[Float => Double] -> "apply$mcDF$sp" - case (DoubleType, IntegerType) => classOf[Double => Int] -> "apply$mcID$sp" - case (DoubleType, LongType) => classOf[Double => Long] -> "apply$mcJD$sp" - case (DoubleType, FloatType) => classOf[Double => Float] -> "apply$mcFD$sp" - case (DoubleType, DoubleType) => classOf[Double => Double] -> "apply$mcDD$sp" - case _ => classOf[Any => Any] -> "apply" + val inputType = getMethodType(child.output(0).dataType, false) + val outputType = getMethodType(outputObjAttr.dataType, true) + if (inputType != null && outputType != null) { + s"apply$$mc$outputType$inputType$$sp" + } else { + "apply" + } } } val funcObj = Literal.create(func, ObjectType(funcClass)) From 89c896eabf71a60058de8555142b0111e9d27804 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 9 Mar 2017 17:29:07 +0900 Subject: [PATCH 07/12] add test cases for boolean type --- .../scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala index 8e52f36a18428..2fba7360ef1ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala @@ -64,29 +64,36 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { test("mapPrimitive") { val dsInt = Seq(1, 2, 3).toDS() + checkDataset(dsInt.map(e => e > 1), false, true, true) checkDataset(dsInt.map(e => e + 1), 2, 3, 4) checkDataset(dsInt.map(e => e + 8589934592L), 8589934593L, 8589934594L, 8589934595L) checkDataset(dsInt.map(e => e + 1.1F), 2.1F, 3.1F, 4.1F) checkDataset(dsInt.map(e => e + 1.23D), 2.23D, 3.23D, 4.23D) val dsLong = Seq(1L, 2L, 3L).toDS() + checkDataset(dsLong.map(e => e > 1), false, true, true) checkDataset(dsLong.map(e => (e + 1).toInt), 2, 3, 4) checkDataset(dsLong.map(e => e + 8589934592L), 8589934593L, 8589934594L, 8589934595L) checkDataset(dsLong.map(e => e + 1.1F), 2.1F, 3.1F, 4.1F) checkDataset(dsLong.map(e => e + 1.23D), 2.23D, 3.23D, 4.23D) val dsFloat = Seq(1F, 2F, 3F).toDS() + checkDataset(dsFloat.map(e => e > 1), false, true, true) checkDataset(dsFloat.map(e => (e + 1).toInt), 2, 3, 4) checkDataset(dsFloat.map(e => (e + 123456L).toLong), 123457L, 123458L, 123459L) checkDataset(dsFloat.map(e => e + 1.1F), 2.1F, 3.1F, 4.1F) checkDataset(dsFloat.map(e => e + 1.23D), 2.23D, 3.23D, 4.23D) val dsDouble = Seq(1D, 2D, 3D).toDS() + checkDataset(dsDouble.map(e => e > 1), false, true, true) checkDataset(dsDouble.map(e => (e + 1).toInt), 2, 3, 4) checkDataset(dsDouble.map(e => (e + 8589934592L).toLong), 8589934593L, 8589934594L, 8589934595L) checkDataset(dsDouble.map(e => (e + 1.1F).toFloat), 2.1F, 3.1F, 4.1F) checkDataset(dsDouble.map(e => e + 1.23D), 2.23D, 3.23D, 4.23D) + + val dsBoolean = Seq(true, false).toDS() + checkDataset(dsBoolean.map(e => !e), false, true) } test("filter") { From 1fb2933c60d32d9652f50d30aeefd4dbe52643e9 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 9 Mar 2017 17:29:37 +0900 Subject: [PATCH 08/12] fix scala type failure --- .../src/main/scala/org/apache/spark/sql/execution/objects.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index 29473aeface9a..6f89569490bed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import scala.language.existentials + import org.apache.spark.api.java.function.MapFunction import org.apache.spark.api.r._ import org.apache.spark.broadcast.Broadcast From 35ba2c6a696681f7e779c693638fb4f17b04a9d3 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 10 Mar 2017 11:34:23 +0900 Subject: [PATCH 09/12] Generate a call to specialized method for Dataset.filter() --- .../sql/catalyst/plans/logical/object.scala | 38 ++++++++++++++++++- .../apache/spark/sql/execution/objects.scala | 32 +--------------- 2 files changed, 39 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 0be4823bbc895..ea6b2ef1a12d1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.objects.Invoke import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils object CatalystSerde { def deserialize[T : Encoder](child: LogicalPlan): DeserializeToObject = { @@ -210,13 +211,48 @@ case class TypedFilter( def typedCondition(input: Expression): Expression = { val (funcClass, methodName) = func match { case m: FilterFunction[_] => classOf[FilterFunction[_]] -> "call" - case _ => classOf[Any => Boolean] -> "apply" + case _ => FunctionUtils.getFunctionOneName(BooleanType, input.dataType) } val funcObj = Literal.create(func, ObjectType(funcClass)) Invoke(funcObj, methodName, BooleanType, input :: Nil) } } +object FunctionUtils { + private def getMethodType(dt: DataType, isOutput: Boolean): Option[String] = { + dt match { + case BooleanType if isOutput => Some("Z") + case IntegerType => Some("I") + case LongType => Some("J") + case FloatType => Some("F") + case DoubleType => Some("D") + case _ => None + } + } + + def getFunctionOneName(outputDT: DataType, inputDT: DataType): (Class[_], String) = { + // load "scala.Function1" using Java API to avoid requirements of type parameters + Utils.classForName("scala.Function1") -> { + // if a pair of an argument and return types is one of specific types + // whose specialized method (apply$mc..$sp) is generated by scalac, + // Catalyst generated a direct method call to the specialized method. + // The followings are references for this specialization: + // http://www.scala-lang.org/api/2.12.0/scala/Function1.html + // https://github.com/scala/scala/blob/2.11.x/src/compiler/scala/tools/nsc/transform/ + // SpecializeTypes.scala + // http://www.cakesolutions.net/teamblogs/scala-dissection-functions + // http://axel22.github.io/2013/11/03/specialization-quirks.html + val inputType = getMethodType(inputDT, false) + val outputType = getMethodType(outputDT, true) + if (inputType.isDefined && outputType.isDefined) { + s"apply$$mc${outputType.get}${inputType.get}$$sp" + } else { + "apply" + } + } + } +} + /** Factory for constructing new `AppendColumn` nodes. */ object AppendColumns { def apply[T : Encoder, U : Encoder]( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index 6f89569490bed..fdd1bcc94be25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.objects.Invoke +import org.apache.spark.sql.catalyst.plans.logical.FunctionUtils import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState @@ -217,39 +218,10 @@ case class MapElementsExec( child.asInstanceOf[CodegenSupport].produce(ctx, this) } - private def getMethodType(dt: DataType, isOutput: Boolean): String = { - dt match { - case BooleanType if isOutput => "Z" - case IntegerType => "I" - case LongType => "J" - case FloatType => "F" - case DoubleType => "D" - case _ => null - } - } - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { val (funcClass, methodName) = func match { case m: MapFunction[_, _] => classOf[MapFunction[_, _]] -> "call" - // load "scala.Function1" using Java API to avoid requirements of type parameters - case _ => Utils.classForName("scala.Function1") -> { - // if a pair of an argument and return types is one of specific types - // whose specialized method (apply$mc..$sp) is generated by scalac, - // Catalyst generated a direct method call to the specialized method. - // The followings are references for this specialization: - // http://www.scala-lang.org/api/2.12.0/scala/Function1.html - // https://github.com/scala/scala/blob/2.11.x/src/compiler/scala/tools/nsc/transform/ - // SpecializeTypes.scala - // http://www.cakesolutions.net/teamblogs/scala-dissection-functions - // http://axel22.github.io/2013/11/03/specialization-quirks.html - val inputType = getMethodType(child.output(0).dataType, false) - val outputType = getMethodType(outputObjAttr.dataType, true) - if (inputType != null && outputType != null) { - s"apply$$mc$outputType$inputType$$sp" - } else { - "apply" - } - } + case _ => FunctionUtils.getFunctionOneName(outputObjAttr.dataType, child.output(0).dataType) } val funcObj = Literal.create(func, ObjectType(funcClass)) val callFunc = Invoke(funcObj, methodName, outputObjAttr.dataType, child.output) From 4c88c033d23991de26f9e50132dffe00b3163909 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 10 Mar 2017 11:35:03 +0900 Subject: [PATCH 10/12] update test cases for map() add test cases for filter() --- .../spark/sql/DatasetPrimitiveSuite.scala | 45 +++++++++++++------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala index 2fba7360ef1ee..82b707537e45f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala @@ -64,33 +64,33 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { test("mapPrimitive") { val dsInt = Seq(1, 2, 3).toDS() - checkDataset(dsInt.map(e => e > 1), false, true, true) - checkDataset(dsInt.map(e => e + 1), 2, 3, 4) - checkDataset(dsInt.map(e => e + 8589934592L), 8589934593L, 8589934594L, 8589934595L) - checkDataset(dsInt.map(e => e + 1.1F), 2.1F, 3.1F, 4.1F) - checkDataset(dsInt.map(e => e + 1.23D), 2.23D, 3.23D, 4.23D) + checkDataset(dsInt.map(_ > 1), false, true, true) + checkDataset(dsInt.map(_ + 1), 2, 3, 4) + checkDataset(dsInt.map(_ + 8589934592L), 8589934593L, 8589934594L, 8589934595L) + checkDataset(dsInt.map(_ + 1.1F), 2.1F, 3.1F, 4.1F) + checkDataset(dsInt.map(_ + 1.23D), 2.23D, 3.23D, 4.23D) val dsLong = Seq(1L, 2L, 3L).toDS() - checkDataset(dsLong.map(e => e > 1), false, true, true) + checkDataset(dsLong.map(_ > 1), false, true, true) checkDataset(dsLong.map(e => (e + 1).toInt), 2, 3, 4) - checkDataset(dsLong.map(e => e + 8589934592L), 8589934593L, 8589934594L, 8589934595L) - checkDataset(dsLong.map(e => e + 1.1F), 2.1F, 3.1F, 4.1F) - checkDataset(dsLong.map(e => e + 1.23D), 2.23D, 3.23D, 4.23D) + checkDataset(dsLong.map(_ + 8589934592L), 8589934593L, 8589934594L, 8589934595L) + checkDataset(dsLong.map(_ + 1.1F), 2.1F, 3.1F, 4.1F) + checkDataset(dsLong.map(_ + 1.23D), 2.23D, 3.23D, 4.23D) val dsFloat = Seq(1F, 2F, 3F).toDS() - checkDataset(dsFloat.map(e => e > 1), false, true, true) + checkDataset(dsFloat.map(_ > 1), false, true, true) checkDataset(dsFloat.map(e => (e + 1).toInt), 2, 3, 4) checkDataset(dsFloat.map(e => (e + 123456L).toLong), 123457L, 123458L, 123459L) - checkDataset(dsFloat.map(e => e + 1.1F), 2.1F, 3.1F, 4.1F) - checkDataset(dsFloat.map(e => e + 1.23D), 2.23D, 3.23D, 4.23D) + checkDataset(dsFloat.map(_ + 1.1F), 2.1F, 3.1F, 4.1F) + checkDataset(dsFloat.map(_ + 1.23D), 2.23D, 3.23D, 4.23D) val dsDouble = Seq(1D, 2D, 3D).toDS() - checkDataset(dsDouble.map(e => e > 1), false, true, true) + checkDataset(dsDouble.map(_ > 1), false, true, true) checkDataset(dsDouble.map(e => (e + 1).toInt), 2, 3, 4) checkDataset(dsDouble.map(e => (e + 8589934592L).toLong), 8589934593L, 8589934594L, 8589934595L) checkDataset(dsDouble.map(e => (e + 1.1F).toFloat), 2.1F, 3.1F, 4.1F) - checkDataset(dsDouble.map(e => e + 1.23D), 2.23D, 3.23D, 4.23D) + checkDataset(dsDouble.map(_ + 1.23D), 2.23D, 3.23D, 4.23D) val dsBoolean = Seq(true, false).toDS() checkDataset(dsBoolean.map(e => !e), false, true) @@ -103,6 +103,23 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { 2, 4) } + test("filterPrimitive") { + val dsInt = Seq(1, 2, 3).toDS() + checkDataset(dsInt.filter(_ > 1), 2, 3) + + val dsLong = Seq(1L, 2L, 3L).toDS() + checkDataset(dsLong.filter(_ > 1), 2L, 3L) + + val dsFloat = Seq(1F, 2F, 3F).toDS() + checkDataset(dsFloat.filter(_ > 1), 2F, 3F) + + val dsDouble = Seq(1D, 2D, 3D).toDS() + checkDataset(dsDouble.filter(_ > 1), 2D, 3D) + + val dsBoolean = Seq(true, false).toDS() + checkDataset(dsBoolean.filter(e => !e), false) + } + test("foreach") { val ds = Seq(1, 2, 3).toDS() val acc = sparkContext.longAccumulator From 200cec783f33de21d9895f90161a9d11877d0877 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 10 Mar 2017 11:35:31 +0900 Subject: [PATCH 11/12] add benchmark suite for Dataset.filter() --- .../apache/spark/sql/DatasetBenchmark.scala | 64 +++++++++++++++++-- 1 file changed, 60 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala index b5f28010c041d..2166bc694b50f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala @@ -115,6 +115,49 @@ object DatasetBenchmark { benchmark } + def backToBackFilterLong(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = { + import spark.implicits._ + + val rdd = spark.sparkContext.range(1, numRows) + val ds = spark.range(1, numRows) + val df = ds.toDF("l") + val func = (l: Long) => l % 2L == 0L + + val benchmark = new Benchmark("back-to-back filter Long", numRows) + + benchmark.addCase("RDD") { iter => + var res = rdd + var i = 0 + while (i < numChains) { + res = res.filter(func) + i += 1 + } + res.foreach(_ => Unit) + } + + benchmark.addCase("DataFrame") { iter => + var res = df + var i = 0 + while (i < numChains) { + res = res.filter($"l" % 2L === 0L) + i += 1 + } + res.queryExecution.toRdd.foreach(_ => Unit) + } + + benchmark.addCase("Dataset") { iter => + var res = ds.as[Long] + var i = 0 + while (i < numChains) { + res = res.filter(func) + i += 1 + } + res.queryExecution.toRdd.foreach(_ => Unit) + } + + benchmark + } + def backToBackFilter(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = { import spark.implicits._ @@ -210,8 +253,9 @@ object DatasetBenchmark { val benchmark0 = backToBackMapLong(spark, numRows, numChains) val benchmark1 = backToBackMap(spark, numRows, numChains) - val benchmark2 = backToBackFilter(spark, numRows, numChains) - val benchmark3 = aggregate(spark, numRows) + val benchmark2 = backToBackFilterLong(spark, numRows, numChains) + val benchmark3 = backToBackFilter(spark, numRows, numChains) + val benchmark4 = aggregate(spark, numRows) /* OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic @@ -235,6 +279,18 @@ object DatasetBenchmark { */ benchmark1.run() + /* + OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic + Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz + back-to-back map long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + RDD 1807 / 1833 55.4 18.1 1.0X + DataFrame 532 / 597 187.9 5.3 3.4X + Dataset 636 / 702 157.3 6.4 2.8X + */ + + benchmark2.run() + /* OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64 Intel Xeon E3-12xx v2 (Ivy Bridge) @@ -244,7 +300,7 @@ object DatasetBenchmark { DataFrame 59 / 72 1695.4 0.6 22.8X Dataset 2777 / 2805 36.0 27.8 0.5X */ - benchmark2.run() + benchmark3.run() /* Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1 @@ -256,6 +312,6 @@ object DatasetBenchmark { Dataset sum using Aggregator 4656 / 4758 21.5 46.6 0.4X Dataset complex Aggregator 6636 / 7039 15.1 66.4 0.3X */ - benchmark3.run() + benchmark4.run() } } From b25b191687259303df5ab2fad0c64687a88de5bd Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 10 Mar 2017 11:52:49 +0900 Subject: [PATCH 12/12] update benchmark result --- .../scala/org/apache/spark/sql/DatasetBenchmark.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala index 2166bc694b50f..1a0672b8876da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala @@ -282,13 +282,12 @@ object DatasetBenchmark { /* OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-47-generic Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz - back-to-back map long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + back-to-back filter Long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - RDD 1807 / 1833 55.4 18.1 1.0X - DataFrame 532 / 597 187.9 5.3 3.4X - Dataset 636 / 702 157.3 6.4 2.8X + RDD 846 / 1120 118.1 8.5 1.0X + DataFrame 270 / 329 370.9 2.7 3.1X + Dataset 545 / 789 183.5 5.4 1.6X */ - benchmark2.run() /*