From a534e0e72596b7975a12f6feea320093e16eb3b4 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 1 Feb 2016 21:39:21 -0800 Subject: [PATCH 1/3] tune the benchmark --- .../org/apache/spark/util/Benchmark.scala | 11 ++-- .../BenchmarkWholeStageCodegen.scala | 60 +++++++++---------- 2 files changed, 36 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala index d484cec7ae38..a2943b810a6d 100644 --- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala +++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala @@ -99,22 +99,25 @@ private[spark] object Benchmark { * the rate of the function. */ def measure(num: Long, iters: Int, outputPerIteration: Boolean)(f: Int => Unit): Result = { - var totalTime = 0L + var bestTime = Long.MaxValue for (i <- 0 until iters + 1) { val start = System.nanoTime() f(i) val end = System.nanoTime() - if (i != 0) totalTime += end - start + val runTime = end - start + if (runTime < bestTime) { + bestTime = runTime + } if (outputPerIteration) { // scalastyle:off - println(s"Iteration $i took ${(end - start) / 1000} microseconds") + println(s"Iteration $i took ${runTime / 1000} microseconds") // scalastyle:on } } - Result(totalTime.toDouble / 1000000 / iters, num * iters / (totalTime.toDouble / 1000)) + Result(bestTime.toDouble / 1000000, num / (bestTime.toDouble / 1000)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 2f09c8a114bc..c319eac6ecf6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -33,41 +33,40 @@ import org.apache.spark.util.Benchmark */ class BenchmarkWholeStageCodegen extends SparkFunSuite { lazy val conf = new SparkConf().setMaster("local[1]").setAppName("benchmark") + .set("spark.sql.shuffle.partitions", "1") lazy val sc = SparkContext.getOrCreate(conf) lazy val sqlContext = SQLContext.getOrCreate(sc) - def testWholeStage(values: Int): Unit = { - val benchmark = new Benchmark("rang/filter/aggregate", values) + def runBenchmark(name: String, values: Int)(f: => Unit): Unit = { + val benchmark = new Benchmark(name, values) - benchmark.addCase("Without codegen") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "false") - sqlContext.range(values).filter("(id & 1) = 1").count() + Seq(false, true).foreach { enabled => + benchmark.addCase(s"$name codegen=$enabled") { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", enabled.toString) + f + } } - benchmark.addCase("With codegen") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.range(values).filter("(id & 1) = 1").count() - } + benchmark.run() + } + + def testWholeStage(values: Int): Unit = { + runBenchmark("rang/filter/sum", values) { + sqlContext.range(values).filter("(id & 1) = 1").groupBy().sum().collect() + } /* - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - rang/filter/aggregate: Avg Time(ms) Avg Rate(M/s) Relative Rate - ------------------------------------------------------------------------------- - Without codegen 7775.53 26.97 1.00 X - With codegen 342.15 612.94 22.73 X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + rang/filter/aggregate: Avg Time(ms) Avg Rate(M/s) Relative Rate + ------------------------------------------------------------------------------- + rang/filter/aggregate codegen=false 12509.22 41.91 1.00 X + rang/filter/aggregate codegen=true 846.38 619.45 14.78 X */ - benchmark.run() } def testAggregateWithKey(values: Int): Unit = { - val benchmark = new Benchmark("Aggregate with keys", values) - benchmark.addCase("Aggregate w/o codegen") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "false") - sqlContext.range(values).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() - } - benchmark.addCase(s"Aggregate w codegen") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") + runBenchmark("Aggregate w keys", values) { sqlContext.range(values).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() } @@ -75,10 +74,9 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz Aggregate with keys: Avg Time(ms) Avg Rate(M/s) Relative Rate ------------------------------------------------------------------------------- - Aggregate w/o codegen 4254.38 4.93 1.00 X - Aggregate w codegen 2661.45 7.88 1.60 X + Aggregate w keys codegen=false 2589.00 8.10 1.00 X + Aggregate w keys codegen=true 1645.38 12.75 1.57 X */ - benchmark.run() } def testBytesToBytesMap(values: Int): Unit = { @@ -138,18 +136,18 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { /** Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - Aggregate with keys: Avg Time(ms) Avg Rate(M/s) Relative Rate + BytesToBytesMap: Avg Time(ms) Avg Rate(M/s) Relative Rate ------------------------------------------------------------------------------- - hash 662.06 79.19 1.00 X - BytesToBytesMap (off Heap) 2209.42 23.73 0.30 X - BytesToBytesMap (on Heap) 2957.68 17.73 0.22 X + hash 603.61 86.86 1.00 X + BytesToBytesMap (off Heap) 3297.39 15.90 0.18 X + BytesToBytesMap (on Heap) 3607.09 14.53 0.17 X */ benchmark.run() } test("benchmark") { - // testWholeStage(1024 * 1024 * 200) + // testWholeStage(500 << 20) // testAggregateWithKey(20 << 20) - // testBytesToBytesMap(1024 * 1024 * 50) + // testBytesToBytesMap(50 << 20) } } From b105f6fa1a3b312c0c5377186795626930a15a63 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 2 Feb 2016 17:51:28 -0800 Subject: [PATCH 2/3] switch to median time --- .../org/apache/spark/util/Benchmark.scala | 18 ++++++---- .../BenchmarkWholeStageCodegen.scala | 36 +++++++++---------- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala index a2943b810a6d..0f54897a0baf 100644 --- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala +++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala @@ -18,6 +18,7 @@ package org.apache.spark.util import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import org.apache.commons.lang3.SystemUtils @@ -62,13 +63,15 @@ private[spark] class Benchmark( val firstRate = results.head.avgRate // The results are going to be processor specific so it is useful to include that. println(Benchmark.getProcessorName()) - printf("%-30s %16s %16s %14s\n", name + ":", "Avg Time(ms)", "Avg Rate(M/s)", "Relative Rate") - println("-------------------------------------------------------------------------------") + printf("%-30s %16s %12s %13s %10s\n", name + ":", "median Time(ms)", "Rate(M/s)", "Per Row(ns)", + "Relative") + println("-------------------------------------------------------------------------------------") results.zip(benchmarks).foreach { r => - printf("%-30s %16s %16s %14s\n", + printf("%-30s %16s %10s %14s %10s\n", r._2.name, "%10.2f" format r._1.avgMs, "%10.2f" format r._1.avgRate, + "%6.2f" format (1000 / r._1.avgRate), "%6.2f X" format (r._1.avgRate / firstRate)) } println @@ -99,7 +102,7 @@ private[spark] object Benchmark { * the rate of the function. */ def measure(num: Long, iters: Int, outputPerIteration: Boolean)(f: Int => Unit): Result = { - var bestTime = Long.MaxValue + val runTimes = ArrayBuffer[Long]() for (i <- 0 until iters + 1) { val start = System.nanoTime() @@ -107,8 +110,8 @@ private[spark] object Benchmark { val end = System.nanoTime() val runTime = end - start - if (runTime < bestTime) { - bestTime = runTime + if (i > 0) { + runTimes += runTime } if (outputPerIteration) { @@ -117,7 +120,8 @@ private[spark] object Benchmark { // scalastyle:on } } - Result(bestTime.toDouble / 1000000, num / (bestTime.toDouble / 1000)) + val result = runTimes.sortBy(x => x).apply(iters / 2).toDouble + Result(result / 1000000, num / (result / 1000)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index c319eac6ecf6..d6b7994df15a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -56,11 +56,11 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { sqlContext.range(values).filter("(id & 1) = 1").groupBy().sum().collect() } /* - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - rang/filter/aggregate: Avg Time(ms) Avg Rate(M/s) Relative Rate - ------------------------------------------------------------------------------- - rang/filter/aggregate codegen=false 12509.22 41.91 1.00 X - rang/filter/aggregate codegen=true 846.38 619.45 14.78 X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + rang/filter/sum: median Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------- + rang/filter/sum codegen=false 12823.55 40.88 24.46 1.00 X + rang/filter/sum codegen=true 831.80 630.30 1.59 15.42 X */ } @@ -72,10 +72,10 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { /* Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - Aggregate with keys: Avg Time(ms) Avg Rate(M/s) Relative Rate - ------------------------------------------------------------------------------- - Aggregate w keys codegen=false 2589.00 8.10 1.00 X - Aggregate w keys codegen=true 1645.38 12.75 1.57 X + Aggregate w keys: median Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------- + Aggregate w keys codegen=false 2390.44 8.77 113.99 1.00 X + Aggregate w keys codegen=true 1669.62 12.56 79.61 1.43 X */ } @@ -136,18 +136,18 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { /** Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - BytesToBytesMap: Avg Time(ms) Avg Rate(M/s) Relative Rate - ------------------------------------------------------------------------------- - hash 603.61 86.86 1.00 X - BytesToBytesMap (off Heap) 3297.39 15.90 0.18 X - BytesToBytesMap (on Heap) 3607.09 14.53 0.17 X + BytesToBytesMap: median Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------- + hash 663.70 78.99 12.66 1.00 X + BytesToBytesMap (off Heap) 3389.42 15.47 64.65 0.20 X + BytesToBytesMap (on Heap) 3476.07 15.08 66.30 0.19 X */ benchmark.run() } - test("benchmark") { - // testWholeStage(500 << 20) - // testAggregateWithKey(20 << 20) - // testBytesToBytesMap(50 << 20) + ignore("benchmark") { +// testWholeStage(500 << 20) +// testAggregateWithKey(20 << 20) +// testBytesToBytesMap(50 << 20) } } From 2f88960d9bfc8041f33ef2cf422983a7309052ad Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 3 Feb 2016 15:05:07 -0800 Subject: [PATCH 3/3] ignore benchmark tests --- .../spark/sql/execution/BenchmarkWholeStageCodegen.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 26fbab19bf7b..33d4976403d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -52,7 +52,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } // These benchmark are skipped in normal build - test("whole stage codegen") { + ignore("range/filter/sum") { val N = 500 << 20 runBenchmark("rang/filter/sum", N) { sqlContext.range(N).filter("(id & 1) = 1").groupBy().sum().collect() @@ -66,7 +66,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { */ } - test("stat functions") { + ignore("stat functions") { val N = 100 << 20 runBenchmark("stddev", N) {