diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala index d484cec7ae38..d1699f5c2865 100644 --- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala +++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala @@ -18,6 +18,7 @@ package org.apache.spark.util import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import org.apache.commons.lang3.SystemUtils @@ -59,17 +60,21 @@ private[spark] class Benchmark( } println - val firstRate = results.head.avgRate + val firstBest = results.head.bestMs + val firstAvg = results.head.avgMs // The results are going to be processor specific so it is useful to include that. println(Benchmark.getProcessorName()) - printf("%-30s %16s %16s %14s\n", name + ":", "Avg Time(ms)", "Avg Rate(M/s)", "Relative Rate") - println("-------------------------------------------------------------------------------") - results.zip(benchmarks).foreach { r => - printf("%-30s %16s %16s %14s\n", - r._2.name, - "%10.2f" format r._1.avgMs, - "%10.2f" format r._1.avgRate, - "%6.2f X" format (r._1.avgRate / firstRate)) + printf("%-35s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)", + "Per Row(ns)", "Relative") + println("-----------------------------------------------------------------------------------" + + "--------") + results.zip(benchmarks).foreach { case (result, benchmark) => + printf("%-35s %16s %12s %13s %10s\n", + benchmark.name, + "%5.0f / %4.0f" format (result.bestMs, result.avgMs), + "%10.1f" format result.bestRate, + "%6.1f" format (1000 / result.bestRate), + "%3.1fX" format (firstBest / result.bestMs)) } println // scalastyle:on @@ -78,7 +83,7 @@ private[spark] class Benchmark( private[spark] object Benchmark { case class Case(name: String, fn: Int => Unit) - case class Result(avgMs: Double, avgRate: Double) + case class Result(avgMs: Double, bestRate: Double, bestMs: Double) /** * This should return a user helpful processor information. Getting at this depends on the OS. @@ -99,22 +104,27 @@ private[spark] object Benchmark { * the rate of the function. */ def measure(num: Long, iters: Int, outputPerIteration: Boolean)(f: Int => Unit): Result = { - var totalTime = 0L + val runTimes = ArrayBuffer[Long]() for (i <- 0 until iters + 1) { val start = System.nanoTime() f(i) val end = System.nanoTime() - if (i != 0) totalTime += end - start + val runTime = end - start + if (i > 0) { + runTimes += runTime + } if (outputPerIteration) { // scalastyle:off - println(s"Iteration $i took ${(end - start) / 1000} microseconds") + println(s"Iteration $i took ${runTime / 1000} microseconds") // scalastyle:on } } - Result(totalTime.toDouble / 1000000 / iters, num * iters / (totalTime.toDouble / 1000)) + val best = runTimes.min + val avg = runTimes.sum / iters + Result(avg / 1000000, num / (best / 1000), best / 1000000) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 15ba77353109..33d4976403d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -34,54 +34,47 @@ import org.apache.spark.util.Benchmark */ class BenchmarkWholeStageCodegen extends SparkFunSuite { lazy val conf = new SparkConf().setMaster("local[1]").setAppName("benchmark") + .set("spark.sql.shuffle.partitions", "1") lazy val sc = SparkContext.getOrCreate(conf) lazy val sqlContext = SQLContext.getOrCreate(sc) - def testWholeStage(values: Int): Unit = { - val benchmark = new Benchmark("rang/filter/aggregate", values) + def runBenchmark(name: String, values: Int)(f: => Unit): Unit = { + val benchmark = new Benchmark(name, values) - benchmark.addCase("Without codegen") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "false") - sqlContext.range(values).filter("(id & 1) = 1").count() - } - - benchmark.addCase("With codegen") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.range(values).filter("(id & 1) = 1").count() + Seq(false, true).foreach { enabled => + benchmark.addCase(s"$name codegen=$enabled") { iter => + sqlContext.setConf("spark.sql.codegen.wholeStage", enabled.toString) + f + } } - /* - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - rang/filter/aggregate: Avg Time(ms) Avg Rate(M/s) Relative Rate - ------------------------------------------------------------------------------- - Without codegen 7775.53 26.97 1.00 X - With codegen 342.15 612.94 22.73 X - */ benchmark.run() } - def testStatFunctions(values: Int): Unit = { - - val benchmark = new Benchmark("stat functions", values) - - benchmark.addCase("stddev w/o codegen") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "false") - sqlContext.range(values).groupBy().agg("id" -> "stddev").collect() + // These benchmark are skipped in normal build + ignore("range/filter/sum") { + val N = 500 << 20 + runBenchmark("rang/filter/sum", N) { + sqlContext.range(N).filter("(id & 1) = 1").groupBy().sum().collect() } + /* + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + rang/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + rang/filter/sum codegen=false 14332 / 16646 36.0 27.8 1.0X + rang/filter/sum codegen=true 845 / 940 620.0 1.6 17.0X + */ + } - benchmark.addCase("stddev w codegen") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.range(values).groupBy().agg("id" -> "stddev").collect() - } + ignore("stat functions") { + val N = 100 << 20 - benchmark.addCase("kurtosis w/o codegen") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "false") - sqlContext.range(values).groupBy().agg("id" -> "kurtosis").collect() + runBenchmark("stddev", N) { + sqlContext.range(N).groupBy().agg("id" -> "stddev").collect() } - benchmark.addCase("kurtosis w codegen") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.range(values).groupBy().agg("id" -> "kurtosis").collect() + runBenchmark("kurtosis", N) { + sqlContext.range(N).groupBy().agg("id" -> "kurtosis").collect() } @@ -99,64 +92,56 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { Using DeclarativeAggregate: Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate - ------------------------------------------------------------------------------- - stddev w/o codegen 989.22 21.20 1.00 X - stddev w codegen 352.35 59.52 2.81 X - kurtosis w/o codegen 3636.91 5.77 0.27 X - kurtosis w codegen 369.25 56.79 2.68 X + stddev: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + stddev codegen=false 5630 / 5776 18.0 55.6 1.0X + stddev codegen=true 1259 / 1314 83.0 12.0 4.5X + + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + kurtosis: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + kurtosis codegen=false 14847 / 15084 7.0 142.9 1.0X + kurtosis codegen=true 1652 / 2124 63.0 15.9 9.0X */ - benchmark.run() } - def testAggregateWithKey(values: Int): Unit = { - val benchmark = new Benchmark("Aggregate with keys", values) + ignore("aggregate with keys") { + val N = 20 << 20 - benchmark.addCase("Aggregate w/o codegen") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "false") - sqlContext.range(values).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() - } - benchmark.addCase(s"Aggregate w codegen") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.range(values).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() + runBenchmark("Aggregate w keys", N) { + sqlContext.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() } /* Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - Aggregate with keys: Avg Time(ms) Avg Rate(M/s) Relative Rate - ------------------------------------------------------------------------------- - Aggregate w/o codegen 4254.38 4.93 1.00 X - Aggregate w codegen 2661.45 7.88 1.60 X + Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + Aggregate w keys codegen=false 2402 / 2551 8.0 125.0 1.0X + Aggregate w keys codegen=true 1620 / 1670 12.0 83.3 1.5X */ - benchmark.run() } - def testBroadcastHashJoin(values: Int): Unit = { - val benchmark = new Benchmark("BroadcastHashJoin", values) - + ignore("broadcast hash join") { + val N = 20 << 20 val dim = broadcast(sqlContext.range(1 << 16).selectExpr("id as k", "cast(id as string) as v")) - benchmark.addCase("BroadcastHashJoin w/o codegen") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "false") - sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count() - } - benchmark.addCase(s"BroadcastHashJoin w codegen") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count() + runBenchmark("BroadcastHashJoin", N) { + sqlContext.range(N).join(dim, (col("id") % 60000) === col("k")).count() } /* - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - BroadcastHashJoin: Avg Time(ms) Avg Rate(M/s) Relative Rate - ------------------------------------------------------------------------------- - BroadcastHashJoin w/o codegen 3053.41 3.43 1.00 X - BroadcastHashJoin w codegen 1028.40 10.20 2.97 X + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + BroadcastHashJoin: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + BroadcastHashJoin codegen=false 4405 / 6147 4.0 250.0 1.0X + BroadcastHashJoin codegen=true 1857 / 1878 11.0 90.9 2.4X */ - benchmark.run() } - def testBytesToBytesMap(values: Int): Unit = { - val benchmark = new Benchmark("BytesToBytesMap", values) + ignore("hash and BytesToBytesMap") { + val N = 50 << 20 + + val benchmark = new Benchmark("BytesToBytesMap", N) benchmark.addCase("hash") { iter => var i = 0 @@ -167,7 +152,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { val value = new UnsafeRow(2) value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16) var s = 0 - while (i < values) { + while (i < N) { key.setInt(0, i % 1000) val h = Murmur3_x86_32.hashUnsafeWords( key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, 0) @@ -194,7 +179,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { val value = new UnsafeRow(2) value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16) var i = 0 - while (i < values) { + while (i < N) { key.setInt(0, i % 65536) val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes) if (loc.isDefined) { @@ -212,21 +197,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { /** Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - Aggregate with keys: Avg Time(ms) Avg Rate(M/s) Relative Rate - ------------------------------------------------------------------------------- - hash 662.06 79.19 1.00 X - BytesToBytesMap (off Heap) 2209.42 23.73 0.30 X - BytesToBytesMap (on Heap) 2957.68 17.73 0.22 X + BytesToBytesMap: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + hash 628 / 661 83.0 12.0 1.0X + BytesToBytesMap (off Heap) 3292 / 3408 15.0 66.7 0.2X + BytesToBytesMap (on Heap) 3349 / 4267 15.0 66.7 0.2X */ benchmark.run() } - - // These benchmark are skipped in normal build - ignore("benchmark") { - // testWholeStage(200 << 20) - // testStatFunctions(20 << 20) - // testAggregateWithKey(20 << 20) - // testBytesToBytesMap(50 << 20) - // testBroadcastHashJoin(10 << 20) - } }