Skip to content

Commit de09145

Browse files
Davies Liudavies
authored andcommitted
[SPARK-13131] [SQL] Use best and average time in benchmark
Best time is stabler than average time, also added a column for nano seconds per row (which could be used to estimate contributions of each components in a query). Having best time and average time together for more information (we can see kind of variance). rate, time per row and relative are all calculated using best time. The result looks like this: ``` Intel(R) Core(TM) i7-4558U CPU 2.80GHz rang/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- rang/filter/sum codegen=false 14332 / 16646 36.0 27.8 1.0X rang/filter/sum codegen=true 845 / 940 620.0 1.6 17.0X ``` Author: Davies Liu <[email protected]> Closes #11018 from davies/gen_bench.
1 parent 915a753 commit de09145

File tree

2 files changed

+89
-103
lines changed

2 files changed

+89
-103
lines changed

core/src/main/scala/org/apache/spark/util/Benchmark.scala

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.util
1919

2020
import scala.collection.mutable
21+
import scala.collection.mutable.ArrayBuffer
2122

2223
import org.apache.commons.lang3.SystemUtils
2324

@@ -59,17 +60,21 @@ private[spark] class Benchmark(
5960
}
6061
println
6162

62-
val firstRate = results.head.avgRate
63+
val firstBest = results.head.bestMs
64+
val firstAvg = results.head.avgMs
6365
// The results are going to be processor specific so it is useful to include that.
6466
println(Benchmark.getProcessorName())
65-
printf("%-30s %16s %16s %14s\n", name + ":", "Avg Time(ms)", "Avg Rate(M/s)", "Relative Rate")
66-
println("-------------------------------------------------------------------------------")
67-
results.zip(benchmarks).foreach { r =>
68-
printf("%-30s %16s %16s %14s\n",
69-
r._2.name,
70-
"%10.2f" format r._1.avgMs,
71-
"%10.2f" format r._1.avgRate,
72-
"%6.2f X" format (r._1.avgRate / firstRate))
67+
printf("%-35s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)",
68+
"Per Row(ns)", "Relative")
69+
println("-----------------------------------------------------------------------------------" +
70+
"--------")
71+
results.zip(benchmarks).foreach { case (result, benchmark) =>
72+
printf("%-35s %16s %12s %13s %10s\n",
73+
benchmark.name,
74+
"%5.0f / %4.0f" format (result.bestMs, result.avgMs),
75+
"%10.1f" format result.bestRate,
76+
"%6.1f" format (1000 / result.bestRate),
77+
"%3.1fX" format (firstBest / result.bestMs))
7378
}
7479
println
7580
// scalastyle:on
@@ -78,7 +83,7 @@ private[spark] class Benchmark(
7883

7984
private[spark] object Benchmark {
8085
case class Case(name: String, fn: Int => Unit)
81-
case class Result(avgMs: Double, avgRate: Double)
86+
case class Result(avgMs: Double, bestRate: Double, bestMs: Double)
8287

8388
/**
8489
* This should return a user helpful processor information. Getting at this depends on the OS.
@@ -99,22 +104,27 @@ private[spark] object Benchmark {
99104
* the rate of the function.
100105
*/
101106
def measure(num: Long, iters: Int, outputPerIteration: Boolean)(f: Int => Unit): Result = {
102-
var totalTime = 0L
107+
val runTimes = ArrayBuffer[Long]()
103108
for (i <- 0 until iters + 1) {
104109
val start = System.nanoTime()
105110

106111
f(i)
107112

108113
val end = System.nanoTime()
109-
if (i != 0) totalTime += end - start
114+
val runTime = end - start
115+
if (i > 0) {
116+
runTimes += runTime
117+
}
110118

111119
if (outputPerIteration) {
112120
// scalastyle:off
113-
println(s"Iteration $i took ${(end - start) / 1000} microseconds")
121+
println(s"Iteration $i took ${runTime / 1000} microseconds")
114122
// scalastyle:on
115123
}
116124
}
117-
Result(totalTime.toDouble / 1000000 / iters, num * iters / (totalTime.toDouble / 1000))
125+
val best = runTimes.min
126+
val avg = runTimes.sum / iters
127+
Result(avg / 1000000, num / (best / 1000), best / 1000000)
118128
}
119129
}
120130

sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala

Lines changed: 65 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -34,54 +34,47 @@ import org.apache.spark.util.Benchmark
3434
*/
3535
class BenchmarkWholeStageCodegen extends SparkFunSuite {
3636
lazy val conf = new SparkConf().setMaster("local[1]").setAppName("benchmark")
37+
.set("spark.sql.shuffle.partitions", "1")
3738
lazy val sc = SparkContext.getOrCreate(conf)
3839
lazy val sqlContext = SQLContext.getOrCreate(sc)
3940

40-
def testWholeStage(values: Int): Unit = {
41-
val benchmark = new Benchmark("rang/filter/aggregate", values)
41+
def runBenchmark(name: String, values: Int)(f: => Unit): Unit = {
42+
val benchmark = new Benchmark(name, values)
4243

43-
benchmark.addCase("Without codegen") { iter =>
44-
sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
45-
sqlContext.range(values).filter("(id & 1) = 1").count()
46-
}
47-
48-
benchmark.addCase("With codegen") { iter =>
49-
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
50-
sqlContext.range(values).filter("(id & 1) = 1").count()
44+
Seq(false, true).foreach { enabled =>
45+
benchmark.addCase(s"$name codegen=$enabled") { iter =>
46+
sqlContext.setConf("spark.sql.codegen.wholeStage", enabled.toString)
47+
f
48+
}
5149
}
5250

53-
/*
54-
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
55-
rang/filter/aggregate: Avg Time(ms) Avg Rate(M/s) Relative Rate
56-
-------------------------------------------------------------------------------
57-
Without codegen 7775.53 26.97 1.00 X
58-
With codegen 342.15 612.94 22.73 X
59-
*/
6051
benchmark.run()
6152
}
6253

63-
def testStatFunctions(values: Int): Unit = {
64-
65-
val benchmark = new Benchmark("stat functions", values)
66-
67-
benchmark.addCase("stddev w/o codegen") { iter =>
68-
sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
69-
sqlContext.range(values).groupBy().agg("id" -> "stddev").collect()
54+
// These benchmark are skipped in normal build
55+
ignore("range/filter/sum") {
56+
val N = 500 << 20
57+
runBenchmark("rang/filter/sum", N) {
58+
sqlContext.range(N).filter("(id & 1) = 1").groupBy().sum().collect()
7059
}
60+
/*
61+
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
62+
rang/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
63+
-------------------------------------------------------------------------------------------
64+
rang/filter/sum codegen=false 14332 / 16646 36.0 27.8 1.0X
65+
rang/filter/sum codegen=true 845 / 940 620.0 1.6 17.0X
66+
*/
67+
}
7168

72-
benchmark.addCase("stddev w codegen") { iter =>
73-
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
74-
sqlContext.range(values).groupBy().agg("id" -> "stddev").collect()
75-
}
69+
ignore("stat functions") {
70+
val N = 100 << 20
7671

77-
benchmark.addCase("kurtosis w/o codegen") { iter =>
78-
sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
79-
sqlContext.range(values).groupBy().agg("id" -> "kurtosis").collect()
72+
runBenchmark("stddev", N) {
73+
sqlContext.range(N).groupBy().agg("id" -> "stddev").collect()
8074
}
8175

82-
benchmark.addCase("kurtosis w codegen") { iter =>
83-
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
84-
sqlContext.range(values).groupBy().agg("id" -> "kurtosis").collect()
76+
runBenchmark("kurtosis", N) {
77+
sqlContext.range(N).groupBy().agg("id" -> "kurtosis").collect()
8578
}
8679

8780

@@ -99,64 +92,56 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
9992
Using DeclarativeAggregate:
10093
10194
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
102-
stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate
103-
-------------------------------------------------------------------------------
104-
stddev w/o codegen 989.22 21.20 1.00 X
105-
stddev w codegen 352.35 59.52 2.81 X
106-
kurtosis w/o codegen 3636.91 5.77 0.27 X
107-
kurtosis w codegen 369.25 56.79 2.68 X
95+
stddev: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
96+
-------------------------------------------------------------------------------------------
97+
stddev codegen=false 5630 / 5776 18.0 55.6 1.0X
98+
stddev codegen=true 1259 / 1314 83.0 12.0 4.5X
99+
100+
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
101+
kurtosis: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
102+
-------------------------------------------------------------------------------------------
103+
kurtosis codegen=false 14847 / 15084 7.0 142.9 1.0X
104+
kurtosis codegen=true 1652 / 2124 63.0 15.9 9.0X
108105
*/
109-
benchmark.run()
110106
}
111107

112-
def testAggregateWithKey(values: Int): Unit = {
113-
val benchmark = new Benchmark("Aggregate with keys", values)
108+
ignore("aggregate with keys") {
109+
val N = 20 << 20
114110

115-
benchmark.addCase("Aggregate w/o codegen") { iter =>
116-
sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
117-
sqlContext.range(values).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
118-
}
119-
benchmark.addCase(s"Aggregate w codegen") { iter =>
120-
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
121-
sqlContext.range(values).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
111+
runBenchmark("Aggregate w keys", N) {
112+
sqlContext.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
122113
}
123114

124115
/*
125116
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
126-
Aggregate with keys: Avg Time(ms) Avg Rate(M/s) Relative Rate
127-
-------------------------------------------------------------------------------
128-
Aggregate w/o codegen 4254.38 4.93 1.00 X
129-
Aggregate w codegen 2661.45 7.88 1.60 X
117+
Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
118+
-------------------------------------------------------------------------------------------
119+
Aggregate w keys codegen=false 2402 / 2551 8.0 125.0 1.0X
120+
Aggregate w keys codegen=true 1620 / 1670 12.0 83.3 1.5X
130121
*/
131-
benchmark.run()
132122
}
133123

134-
def testBroadcastHashJoin(values: Int): Unit = {
135-
val benchmark = new Benchmark("BroadcastHashJoin", values)
136-
124+
ignore("broadcast hash join") {
125+
val N = 20 << 20
137126
val dim = broadcast(sqlContext.range(1 << 16).selectExpr("id as k", "cast(id as string) as v"))
138127

139-
benchmark.addCase("BroadcastHashJoin w/o codegen") { iter =>
140-
sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
141-
sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count()
142-
}
143-
benchmark.addCase(s"BroadcastHashJoin w codegen") { iter =>
144-
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
145-
sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count()
128+
runBenchmark("BroadcastHashJoin", N) {
129+
sqlContext.range(N).join(dim, (col("id") % 60000) === col("k")).count()
146130
}
147131

148132
/*
149-
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
150-
BroadcastHashJoin: Avg Time(ms) Avg Rate(M/s) Relative Rate
151-
-------------------------------------------------------------------------------
152-
BroadcastHashJoin w/o codegen 3053.41 3.43 1.00 X
153-
BroadcastHashJoin w codegen 1028.40 10.20 2.97 X
133+
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
134+
BroadcastHashJoin: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
135+
-------------------------------------------------------------------------------------------
136+
BroadcastHashJoin codegen=false 4405 / 6147 4.0 250.0 1.0X
137+
BroadcastHashJoin codegen=true 1857 / 1878 11.0 90.9 2.4X
154138
*/
155-
benchmark.run()
156139
}
157140

158-
def testBytesToBytesMap(values: Int): Unit = {
159-
val benchmark = new Benchmark("BytesToBytesMap", values)
141+
ignore("hash and BytesToBytesMap") {
142+
val N = 50 << 20
143+
144+
val benchmark = new Benchmark("BytesToBytesMap", N)
160145

161146
benchmark.addCase("hash") { iter =>
162147
var i = 0
@@ -167,7 +152,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
167152
val value = new UnsafeRow(2)
168153
value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
169154
var s = 0
170-
while (i < values) {
155+
while (i < N) {
171156
key.setInt(0, i % 1000)
172157
val h = Murmur3_x86_32.hashUnsafeWords(
173158
key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, 0)
@@ -194,7 +179,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
194179
val value = new UnsafeRow(2)
195180
value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
196181
var i = 0
197-
while (i < values) {
182+
while (i < N) {
198183
key.setInt(0, i % 65536)
199184
val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes)
200185
if (loc.isDefined) {
@@ -212,21 +197,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
212197

213198
/**
214199
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
215-
Aggregate with keys: Avg Time(ms) Avg Rate(M/s) Relative Rate
216-
-------------------------------------------------------------------------------
217-
hash 662.06 79.19 1.00 X
218-
BytesToBytesMap (off Heap) 2209.42 23.73 0.30 X
219-
BytesToBytesMap (on Heap) 2957.68 17.73 0.22 X
200+
BytesToBytesMap: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
201+
-------------------------------------------------------------------------------------------
202+
hash 628 / 661 83.0 12.0 1.0X
203+
BytesToBytesMap (off Heap) 3292 / 3408 15.0 66.7 0.2X
204+
BytesToBytesMap (on Heap) 3349 / 4267 15.0 66.7 0.2X
220205
*/
221206
benchmark.run()
222207
}
223-
224-
// These benchmark are skipped in normal build
225-
ignore("benchmark") {
226-
// testWholeStage(200 << 20)
227-
// testStatFunctions(20 << 20)
228-
// testAggregateWithKey(20 << 20)
229-
// testBytesToBytesMap(50 << 20)
230-
// testBroadcastHashJoin(10 << 20)
231-
}
232208
}

0 commit comments

Comments
 (0)