From 15bf2c17bbbbee8b4f6bd19e74e552d0572aa18c Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 4 May 2016 01:17:21 -0700 Subject: [PATCH 1/2] [SPARK-15115][SQL] Reorganize whole stage codegen benchmark suites --- .../sort/RecordPointerAndKeyPrefix.java | 2 +- .../unsafe/sort/UnsafeSortDataFormat.java | 3 +- .../unsafe/sort/RadixSortSuite.scala | 77 ---- .../AggregateBenchmark.scala} | 392 +++--------------- .../execution/benchmark/BenchmarkBase.scala | 54 +++ .../execution/benchmark/JoinBenchmark.scala | 229 ++++++++++ .../execution/benchmark/MiscBenchmark.scala | 135 ++++++ .../execution/benchmark/SortBenchmark.scala | 132 ++++++ 8 files changed, 603 insertions(+), 421 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/execution/{BenchmarkWholeStageCodegen.scala => benchmark/AggregateBenchmark.scala} (52%) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkBase.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/RecordPointerAndKeyPrefix.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/RecordPointerAndKeyPrefix.java index de92b8db4713..e9571aa8bb05 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/RecordPointerAndKeyPrefix.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/RecordPointerAndKeyPrefix.java @@ -17,7 +17,7 @@ package org.apache.spark.util.collection.unsafe.sort; -final class RecordPointerAndKeyPrefix { +public final class RecordPointerAndKeyPrefix { /** * A pointer to a record; see {@link org.apache.spark.memory.TaskMemoryManager} for a * description of how these addresses are encoded. diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java index 12fb62fb77f0..d19b71fbc1bc 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSortDataFormat.java @@ -29,7 +29,8 @@ * Within each long[] buffer, position {@code 2 * i} holds a pointer pointer to the record at * index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix. */ -final class UnsafeSortDataFormat extends SortDataFormat { +public final class UnsafeSortDataFormat + extends SortDataFormat { public static final UnsafeSortDataFormat INSTANCE = new UnsafeSortDataFormat(); diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala index b03df1a94d84..8ff6485599bc 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala @@ -184,81 +184,4 @@ class RadixSortSuite extends SparkFunSuite with Logging { assert(res1.view == res2.view) } } - - ignore("microbenchmarks") { - val size = 25000000 - val rand = new XORShiftRandom(123) - val benchmark = new Benchmark("radix sort " + size, size) - benchmark.addTimerCase("reference TimSort key prefix array") { timer => - val array = Array.tabulate[Long](size * 2) { i => rand.nextLong } - val buf = new LongArray(MemoryBlock.fromLongArray(array)) - timer.startTiming() - referenceKeyPrefixSort(buf, 0, size, PrefixComparators.BINARY) - timer.stopTiming() - } - benchmark.addTimerCase("reference Arrays.sort") { timer => - val ref = Array.tabulate[Long](size) { i => rand.nextLong } - timer.startTiming() - Arrays.sort(ref) - timer.stopTiming() - } - benchmark.addTimerCase("radix sort one byte") { timer => - val array = new Array[Long](size * 2) - var i = 0 - while (i < size) { - array(i) = rand.nextLong & 0xff - i += 1 - } - val buf = new LongArray(MemoryBlock.fromLongArray(array)) - timer.startTiming() - RadixSort.sort(buf, size, 0, 7, false, false) - timer.stopTiming() - } - benchmark.addTimerCase("radix sort two bytes") { timer => - val array = new Array[Long](size * 2) - var i = 0 - while (i < size) { - array(i) = rand.nextLong & 0xffff - i += 1 - } - val buf = new LongArray(MemoryBlock.fromLongArray(array)) - timer.startTiming() - RadixSort.sort(buf, size, 0, 7, false, false) - timer.stopTiming() - } - benchmark.addTimerCase("radix sort eight bytes") { timer => - val array = new Array[Long](size * 2) - var i = 0 - while (i < size) { - array(i) = rand.nextLong - i += 1 - } - val buf = new LongArray(MemoryBlock.fromLongArray(array)) - timer.startTiming() - RadixSort.sort(buf, size, 0, 7, false, false) - timer.stopTiming() - } - benchmark.addTimerCase("radix sort key prefix array") { timer => - val (_, buf2) = generateKeyPrefixTestData(size, rand.nextLong) - timer.startTiming() - RadixSort.sortKeyPrefixArray(buf2, size, 0, 7, false, false) - timer.stopTiming() - } - benchmark.run() - - /** - Running benchmark: radix sort 25000000 - Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 3.13.0-44-generic - Intel(R) Core(TM) i7-4600U CPU @ 2.10GHz - - radix sort 25000000: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - reference TimSort key prefix array 15546 / 15859 1.6 621.9 1.0X - reference Arrays.sort 2416 / 2446 10.3 96.6 6.4X - radix sort one byte 133 / 137 188.4 5.3 117.2X - radix sort two bytes 255 / 258 98.2 10.2 61.1X - radix sort eight bytes 991 / 997 25.2 39.6 15.7X - radix sort key prefix array 1540 / 1563 16.2 61.6 10.1X - */ - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala similarity index 52% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala index 7ca4b75f480b..b31338e82768 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala @@ -15,59 +15,35 @@ * limitations under the License. */ -package org.apache.spark.sql.execution +package org.apache.spark.sql.execution.benchmark import java.util.HashMap -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.SparkConf import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager} -import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap -import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap import org.apache.spark.sql.execution.vectorized.AggregateHashMap -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{IntegerType, LongType, StructType} +import org.apache.spark.sql.types.{LongType, StructType} import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.hash.Murmur3_x86_32 import org.apache.spark.unsafe.map.BytesToBytesMap import org.apache.spark.util.Benchmark /** - * Benchmark to measure whole stage codegen performance. + * Benchmark to measure performance for aggregate primitives. * To run this: - * build/sbt "sql/test-only *BenchmarkWholeStageCodegen" + * build/sbt "sql/test-only *benchmark.AggregateBenchmark" * * Benchmarks in this file are skipped in normal builds. */ -class BenchmarkWholeStageCodegen extends SparkFunSuite { - lazy val conf = new SparkConf().setMaster("local[1]").setAppName("benchmark") - .set("spark.sql.shuffle.partitions", "1") - .set("spark.sql.autoBroadcastJoinThreshold", "1") - lazy val sc = SparkContext.getOrCreate(conf) - lazy val sqlContext = SQLContext.getOrCreate(sc) - - /** Runs function `f` with whole stage codegen on and off. */ - def runBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = { - val benchmark = new Benchmark(name, cardinality) - - benchmark.addCase(s"$name wholestage off", numIters = 2) { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "false") - f - } - - benchmark.addCase(s"$name wholestage on", numIters = 5) { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - f - } - - benchmark.run() - } +class AggregateBenchmark extends BenchmarkBase { ignore("aggregate without grouping") { val N = 500L << 22 val benchmark = new Benchmark("agg without grouping", N) runBenchmark("agg w/o group", N) { - sqlContext.range(N).selectExpr("sum(id)").collect() + sparkSession.range(N).selectExpr("sum(id)").collect() } /* agg w/o group: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative @@ -77,79 +53,19 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { */ } - ignore("filter & aggregate without group") { - val N = 500L << 22 - runBenchmark("range/filter/sum", N) { - sqlContext.range(N).filter("(id & 1) = 1").groupBy().sum().collect() - } - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - - range/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - range/filter/sum codegen=false 30663 / 31216 68.4 14.6 1.0X - range/filter/sum codegen=true 2399 / 2409 874.1 1.1 12.8X - */ - } - - ignore("range/limit/sum") { - val N = 500L << 20 - runBenchmark("range/limit/sum", N) { - sqlContext.range(N).limit(1000000).groupBy().sum().collect() - } - /* - Westmere E56xx/L56xx/X56xx (Nehalem-C) - range/limit/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - range/limit/sum codegen=false 609 / 672 861.6 1.2 1.0X - range/limit/sum codegen=true 561 / 621 935.3 1.1 1.1X - */ - } - - ignore("sample") { - val N = 500 << 18 - runBenchmark("sample with replacement", N) { - sqlContext.range(N).sample(withReplacement = true, 0.01).groupBy().sum().collect() - } - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - - sample with replacement: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - sample with replacement codegen=false 7073 / 7227 18.5 54.0 1.0X - sample with replacement codegen=true 5199 / 5203 25.2 39.7 1.4X - */ - - runBenchmark("sample without replacement", N) { - sqlContext.range(N).sample(withReplacement = false, 0.01).groupBy().sum().collect() - } - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - - sample without replacement: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - sample without replacement codegen=false 1508 / 1529 86.9 11.5 1.0X - sample without replacement codegen=true 644 / 662 203.5 4.9 2.3X - */ - } - ignore("stat functions") { val N = 100L << 20 runBenchmark("stddev", N) { - sqlContext.range(N).groupBy().agg("id" -> "stddev").collect() + sparkSession.range(N).groupBy().agg("id" -> "stddev").collect() } runBenchmark("kurtosis", N) { - sqlContext.range(N).groupBy().agg("id" -> "kurtosis").collect() + sparkSession.range(N).groupBy().agg("id" -> "kurtosis").collect() } - - /** - Using ImperativeAggregate (as implemented in Spark 1.6): + /* + Using ImperativeAggregate (as implemented in Spark 1.6): Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz stddev: Avg Time(ms) Avg Rate(M/s) Relative Rate @@ -172,29 +88,31 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { ------------------------------------------------------------------------------------------- kurtosis codegen=false 14847 / 15084 7.0 142.9 1.0X kurtosis codegen=true 1652 / 2124 63.0 15.9 9.0X - */ + */ } ignore("aggregate with linear keys") { val N = 20 << 22 val benchmark = new Benchmark("Aggregate w keys", N) - def f(): Unit = sqlContext.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() + def f(): Unit = { + sparkSession.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect() + } benchmark.addCase(s"codegen = F", numIters = 2) { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "false") + sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") f() } benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") + sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") + sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "0") f() } benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3") + sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") + sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "3") f() } @@ -216,24 +134,24 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { val N = 20 << 22 val benchmark = new Benchmark("Aggregate w keys", N) - sqlContext.range(N).selectExpr("id", "floor(rand() * 10000) as k").registerTempTable("test") + sparkSession.range(N).selectExpr("id", "floor(rand() * 10000) as k").registerTempTable("test") - def f(): Unit = sqlContext.sql("select k, k, sum(id) from test group by k, k").collect() + def f(): Unit = sparkSession.sql("select k, k, sum(id) from test group by k, k").collect() benchmark.addCase(s"codegen = F", numIters = 2) { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "false") + sparkSession.conf.set("spark.sql.codegen.wholeStage", value = false) f() } benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") + sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true) + sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", 0) f() } benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3") + sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true) + sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", 3) f() } @@ -255,23 +173,23 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { val N = 20 << 20 val benchmark = new Benchmark("Aggregate w string key", N) - def f(): Unit = sqlContext.range(N).selectExpr("id", "cast(id & 1023 as string) as k") + def f(): Unit = sparkSession.range(N).selectExpr("id", "cast(id & 1023 as string) as k") .groupBy("k").count().collect() benchmark.addCase(s"codegen = F", numIters = 2) { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "false") + sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") f() } benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") + sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") + sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "0") f() } benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3") + sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") + sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "3") f() } @@ -292,23 +210,23 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { val N = 20 << 20 val benchmark = new Benchmark("Aggregate w decimal key", N) - def f(): Unit = sqlContext.range(N).selectExpr("id", "cast(id & 65535 as decimal) as k") + def f(): Unit = sparkSession.range(N).selectExpr("id", "cast(id & 65535 as decimal) as k") .groupBy("k").count().collect() benchmark.addCase(s"codegen = F") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "false") + sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") f() } benchmark.addCase(s"codegen = T hashmap = F") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") + sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") + sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "0") f() } benchmark.addCase(s"codegen = T hashmap = T") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3") + sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") + sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "3") f() } @@ -329,7 +247,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { val N = 20 << 20 val benchmark = new Benchmark("Aggregate w multiple keys", N) - def f(): Unit = sqlContext.range(N) + def f(): Unit = sparkSession.range(N) .selectExpr( "id", "(id & 1023) as k1", @@ -343,19 +261,19 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { .collect() benchmark.addCase(s"codegen = F") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "false") + sparkSession.conf.set("spark.sql.codegen.wholeStage", "false") f() } benchmark.addCase(s"codegen = T hashmap = F") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") + sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") + sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "0") f() } benchmark.addCase(s"codegen = T hashmap = T") { iter => - sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "10") + sparkSession.conf.set("spark.sql.codegen.wholeStage", "true") + sparkSession.conf.set("spark.sql.codegen.aggregate.map.columns.max", "10") f() } @@ -372,187 +290,22 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { */ } - ignore("broadcast hash join") { - val N = 20 << 20 - val M = 1 << 16 - val dim = broadcast(sqlContext.range(M).selectExpr("id as k", "cast(id as string) as v")) - - runBenchmark("Join w long", N) { - sqlContext.range(N).join(dim, (col("id") % M) === col("k")).count() - } - - /* - Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - Join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Join w long codegen=false 3002 / 3262 7.0 143.2 1.0X - Join w long codegen=true 321 / 371 65.3 15.3 9.3X - */ - - runBenchmark("Join w long duplicated", N) { - val dim = broadcast(sqlContext.range(M).selectExpr("cast(id/10 as long) as k")) - sqlContext.range(N).join(dim, (col("id") % M) === col("k")).count() - } - - /** - Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - Join w long duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Join w long duplicated codegen=false 3446 / 3478 6.1 164.3 1.0X - Join w long duplicated codegen=true 322 / 351 65.2 15.3 10.7X - */ - - val dim2 = broadcast(sqlContext.range(M) - .selectExpr("cast(id as int) as k1", "cast(id as int) as k2", "cast(id as string) as v")) - - runBenchmark("Join w 2 ints", N) { - sqlContext.range(N).join(dim2, - (col("id") % M).cast(IntegerType) === col("k1") - && (col("id") % M).cast(IntegerType) === col("k2")).count() - } - - /** - Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Join w 2 ints codegen=false 4426 / 4501 4.7 211.1 1.0X - Join w 2 ints codegen=true 791 / 818 26.5 37.7 5.6X - */ - - val dim3 = broadcast(sqlContext.range(M) - .selectExpr("id as k1", "id as k2", "cast(id as string) as v")) - - runBenchmark("Join w 2 longs", N) { - sqlContext.range(N).join(dim3, - (col("id") % M) === col("k1") && (col("id") % M) === col("k2")) - .count() - } - - /** - Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - Join w 2 longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Join w 2 longs codegen=false 5905 / 6123 3.6 281.6 1.0X - Join w 2 longs codegen=true 2230 / 2529 9.4 106.3 2.6X - */ - - val dim4 = broadcast(sqlContext.range(M) - .selectExpr("cast(id/10 as long) as k1", "cast(id/10 as long) as k2")) - - runBenchmark("Join w 2 longs duplicated", N) { - sqlContext.range(N).join(dim4, - (col("id") bitwiseAND M) === col("k1") && (col("id") bitwiseAND M) === col("k2")) - .count() - } - - /** - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - Join w 2 longs duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - Join w 2 longs duplicated codegen=false 6420 / 6587 3.3 306.1 1.0X - Join w 2 longs duplicated codegen=true 2080 / 2139 10.1 99.2 3.1X - */ - - runBenchmark("outer join w long", N) { - sqlContext.range(N).join(dim, (col("id") % M) === col("k"), "left").count() - } - - /** - Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - outer join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - outer join w long codegen=false 3055 / 3189 6.9 145.7 1.0X - outer join w long codegen=true 261 / 276 80.5 12.4 11.7X - */ - - runBenchmark("semi join w long", N) { - sqlContext.range(N).join(dim, (col("id") % M) === col("k"), "leftsemi").count() - } - - /** - Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - semi join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - semi join w long codegen=false 1912 / 1990 11.0 91.2 1.0X - semi join w long codegen=true 237 / 244 88.3 11.3 8.1X - */ - } - - ignore("sort merge join") { - val N = 2 << 20 - runBenchmark("merge join", N) { - val df1 = sqlContext.range(N).selectExpr(s"id * 2 as k1") - val df2 = sqlContext.range(N).selectExpr(s"id * 3 as k2") - df1.join(df2, col("k1") === col("k2")).count() - } - - /** - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - merge join codegen=false 1588 / 1880 1.3 757.1 1.0X - merge join codegen=true 1477 / 1531 1.4 704.2 1.1X - */ - - runBenchmark("sort merge join", N) { - val df1 = sqlContext.range(N) - .selectExpr(s"(id * 15485863) % ${N*10} as k1") - val df2 = sqlContext.range(N) - .selectExpr(s"(id * 15485867) % ${N*10} as k2") - df1.join(df2, col("k1") === col("k2")).count() - } - - /** - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - sort merge join codegen=false 3626 / 3667 0.6 1728.9 1.0X - sort merge join codegen=true 3405 / 3438 0.6 1623.8 1.1X - */ - } - - ignore("shuffle hash join") { - val N = 4 << 20 - sqlContext.setConf("spark.sql.shuffle.partitions", "2") - sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "10000000") - sqlContext.setConf("spark.sql.join.preferSortMergeJoin", "false") - runBenchmark("shuffle hash join", N) { - val df1 = sqlContext.range(N).selectExpr(s"id as k1") - val df2 = sqlContext.range(N / 5).selectExpr(s"id * 3 as k2") - df1.join(df2, col("k1") === col("k2")).count() - } - - /** - Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - shuffle hash join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - shuffle hash join codegen=false 1101 / 1391 3.8 262.6 1.0X - shuffle hash join codegen=true 528 / 578 7.9 125.8 2.1X - */ - } ignore("cube") { val N = 5 << 20 runBenchmark("cube", N) { - sqlContext.range(N).selectExpr("id", "id % 1000 as k1", "id & 256 as k2") + sparkSession.range(N).selectExpr("id", "id % 1000 as k1", "id & 256 as k2") .cube("k1", "k2").sum("id").collect() } /** - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz cube: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- cube codegen=false 3188 / 3392 1.6 608.2 1.0X cube codegen=true 1239 / 1394 4.2 236.3 2.6X - */ + */ } ignore("hash and BytesToBytesMap") { @@ -802,7 +555,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } } - /** + /* Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz BytesToBytesMap: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- @@ -822,49 +575,4 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.run() } - ignore("collect") { - val N = 1 << 20 - - val benchmark = new Benchmark("collect", N) - benchmark.addCase("collect 1 million") { iter => - sqlContext.range(N).collect() - } - benchmark.addCase("collect 2 millions") { iter => - sqlContext.range(N * 2).collect() - } - benchmark.addCase("collect 4 millions") { iter => - sqlContext.range(N * 4).collect() - } - benchmark.run() - - /** - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - collect 1 million 439 / 654 2.4 418.7 1.0X - collect 2 millions 961 / 1907 1.1 916.4 0.5X - collect 4 millions 3193 / 3895 0.3 3044.7 0.1X - */ - } - - ignore("collect limit") { - val N = 1 << 20 - - val benchmark = new Benchmark("collect limit", N) - benchmark.addCase("collect limit 1 million") { iter => - sqlContext.range(N * 4).limit(N).collect() - } - benchmark.addCase("collect limit 2 millions") { iter => - sqlContext.range(N * 4).limit(N * 2).collect() - } - benchmark.run() - - /** - model name : Westmere E56xx/L56xx/X56xx (Nehalem-C) - collect limit: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - collect limit 1 million 833 / 1284 1.3 794.4 1.0X - collect limit 2 millions 3348 / 4005 0.3 3193.3 0.2X - */ - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkBase.scala new file mode 100644 index 000000000000..c99a5aec1cd6 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BenchmarkBase.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.Benchmark + +/** + * Common base trait for micro benchmarks that are supposed to run standalone (i.e. not together + * with other test suites). + */ +private[benchmark] trait BenchmarkBase extends SparkFunSuite { + + lazy val sparkSession = SparkSession.builder + .master("local[1]") + .appName("microbenchmark") + .config("spark.sql.shuffle.partitions", 1) + .config("spark.sql.autoBroadcastJoinThreshold", 1) + .getOrCreate() + + /** Runs function `f` with whole stage codegen on and off. */ + def runBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = { + val benchmark = new Benchmark(name, cardinality) + + benchmark.addCase(s"$name wholestage off", numIters = 2) { iter => + sparkSession.conf.set("spark.sql.codegen.wholeStage", value = false) + f + } + + benchmark.addCase(s"$name wholestage on", numIters = 5) { iter => + sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true) + f + } + + benchmark.run() + } + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala new file mode 100644 index 000000000000..46db41a8abad --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.IntegerType + +/** + * Benchmark to measure performance for aggregate primitives. + * To run this: + * build/sbt "sql/test-only *benchmark.JoinBenchmark" + * + * Benchmarks in this file are skipped in normal builds. + */ +class JoinBenchmark extends BenchmarkBase { + + ignore("broadcast hash join, long key") { + val N = 20 << 20 + val M = 1 << 16 + + val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v")) + runBenchmark("Join w long", N) { + sparkSession.range(N).join(dim, (col("id") % M) === col("k")).count() + } + + /* + Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + Join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + Join w long codegen=false 3002 / 3262 7.0 143.2 1.0X + Join w long codegen=true 321 / 371 65.3 15.3 9.3X + */ + } + + ignore("broadcast hash join, long key with duplicates") { + val N = 20 << 20 + val M = 1 << 16 + + val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v")) + runBenchmark("Join w long duplicated", N) { + val dim = broadcast(sparkSession.range(M).selectExpr("cast(id/10 as long) as k")) + sparkSession.range(N).join(dim, (col("id") % M) === col("k")).count() + } + + /* + *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *Join w long duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *Join w long duplicated codegen=false 3446 / 3478 6.1 164.3 1.0X + *Join w long duplicated codegen=true 322 / 351 65.2 15.3 10.7X + */ + } + + ignore("broadcast hash join, two int key") { + val N = 20 << 20 + val M = 1 << 16 + val dim2 = broadcast(sparkSession.range(M) + .selectExpr("cast(id as int) as k1", "cast(id as int) as k2", "cast(id as string) as v")) + + runBenchmark("Join w 2 ints", N) { + sparkSession.range(N).join(dim2, + (col("id") % M).cast(IntegerType) === col("k1") + && (col("id") % M).cast(IntegerType) === col("k2")).count() + } + + /* + *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *Join w 2 ints codegen=false 4426 / 4501 4.7 211.1 1.0X + *Join w 2 ints codegen=true 791 / 818 26.5 37.7 5.6X + */ + } + + ignore("broadcast hash join, two long key") { + val N = 20 << 20 + val M = 1 << 16 + val dim3 = broadcast(sparkSession.range(M) + .selectExpr("id as k1", "id as k2", "cast(id as string) as v")) + + runBenchmark("Join w 2 longs", N) { + sparkSession.range(N).join(dim3, + (col("id") % M) === col("k1") && (col("id") % M) === col("k2")) + .count() + } + + /* + *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *Join w 2 longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *Join w 2 longs codegen=false 5905 / 6123 3.6 281.6 1.0X + *Join w 2 longs codegen=true 2230 / 2529 9.4 106.3 2.6X + */ + } + + ignore("broadcast hash join, two long key with duplicates") { + val N = 20 << 20 + val M = 1 << 16 + val dim4 = broadcast(sparkSession.range(M) + .selectExpr("cast(id/10 as long) as k1", "cast(id/10 as long) as k2")) + + runBenchmark("Join w 2 longs duplicated", N) { + sparkSession.range(N).join(dim4, + (col("id") bitwiseAND M) === col("k1") && (col("id") bitwiseAND M) === col("k2")) + .count() + } + + /* + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *Join w 2 longs duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *Join w 2 longs duplicated codegen=false 6420 / 6587 3.3 306.1 1.0X + *Join w 2 longs duplicated codegen=true 2080 / 2139 10.1 99.2 3.1X + */ + } + + ignore("broadcast hash join, outer join long key") { + val N = 20 << 20 + val M = 1 << 16 + val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v")) + runBenchmark("outer join w long", N) { + sparkSession.range(N).join(dim, (col("id") % M) === col("k"), "left").count() + } + + /* + *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *outer join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *outer join w long codegen=false 3055 / 3189 6.9 145.7 1.0X + *outer join w long codegen=true 261 / 276 80.5 12.4 11.7X + */ + } + + ignore("broadcast hash join, semi join long key") { + val N = 20 << 20 + val M = 1 << 16 + val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v")) + runBenchmark("semi join w long", N) { + sparkSession.range(N).join(dim, (col("id") % M) === col("k"), "leftsemi").count() + } + + /* + *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *semi join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *semi join w long codegen=false 1912 / 1990 11.0 91.2 1.0X + *semi join w long codegen=true 237 / 244 88.3 11.3 8.1X + */ + } + + ignore("sort merge join") { + val N = 2 << 20 + runBenchmark("merge join", N) { + val df1 = sparkSession.range(N).selectExpr(s"id * 2 as k1") + val df2 = sparkSession.range(N).selectExpr(s"id * 3 as k2") + df1.join(df2, col("k1") === col("k2")).count() + } + + /* + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *merge join codegen=false 1588 / 1880 1.3 757.1 1.0X + *merge join codegen=true 1477 / 1531 1.4 704.2 1.1X + */ + } + + ignore("sort merge join with duplicates") { + val N = 2 << 20 + runBenchmark("sort merge join", N) { + val df1 = sparkSession.range(N) + .selectExpr(s"(id * 15485863) % ${N*10} as k1") + val df2 = sparkSession.range(N) + .selectExpr(s"(id * 15485867) % ${N*10} as k2") + df1.join(df2, col("k1") === col("k2")).count() + } + + /* + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *sort merge join codegen=false 3626 / 3667 0.6 1728.9 1.0X + *sort merge join codegen=true 3405 / 3438 0.6 1623.8 1.1X + */ + } + + ignore("shuffle hash join") { + val N = 4 << 20 + sparkSession.conf.set("spark.sql.shuffle.partitions", "2") + sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", "10000000") + sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false") + runBenchmark("shuffle hash join", N) { + val df1 = sparkSession.range(N).selectExpr(s"id as k1") + val df2 = sparkSession.range(N / 5).selectExpr(s"id * 3 as k2") + df1.join(df2, col("k1") === col("k2")).count() + } + + /* + *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 + *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + *shuffle hash join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + *------------------------------------------------------------------------------------------- + *shuffle hash join codegen=false 1101 / 1391 3.8 262.6 1.0X + *shuffle hash join codegen=true 528 / 578 7.9 125.8 2.1X + */ + } + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala new file mode 100644 index 000000000000..470c78120b19 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.util.Benchmark + +/** + * Benchmark to measure whole stage codegen performance. + * To run this: + * build/sbt "sql/test-only *benchmark.MiscBenchmark" + * + * Benchmarks in this file are skipped in normal builds. + */ +class MiscBenchmark extends BenchmarkBase { + + ignore("filter & aggregate without group") { + val N = 500L << 22 + runBenchmark("range/filter/sum", N) { + sparkSession.range(N).filter("(id & 1) = 1").groupBy().sum().collect() + } + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11 + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + + range/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + range/filter/sum codegen=false 30663 / 31216 68.4 14.6 1.0X + range/filter/sum codegen=true 2399 / 2409 874.1 1.1 12.8X + */ + } + + ignore("range/limit/sum") { + val N = 500L << 20 + runBenchmark("range/limit/sum", N) { + sparkSession.range(N).limit(1000000).groupBy().sum().collect() + } + /* + Westmere E56xx/L56xx/X56xx (Nehalem-C) + range/limit/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + range/limit/sum codegen=false 609 / 672 861.6 1.2 1.0X + range/limit/sum codegen=true 561 / 621 935.3 1.1 1.1X + */ + } + + ignore("sample") { + val N = 500 << 18 + runBenchmark("sample with replacement", N) { + sparkSession.range(N).sample(withReplacement = true, 0.01).groupBy().sum().collect() + } + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11 + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + + sample with replacement: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + sample with replacement codegen=false 7073 / 7227 18.5 54.0 1.0X + sample with replacement codegen=true 5199 / 5203 25.2 39.7 1.4X + */ + + runBenchmark("sample without replacement", N) { + sparkSession.range(N).sample(withReplacement = false, 0.01).groupBy().sum().collect() + } + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11 + Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + + sample without replacement: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + sample without replacement codegen=false 1508 / 1529 86.9 11.5 1.0X + sample without replacement codegen=true 644 / 662 203.5 4.9 2.3X + */ + } + + ignore("collect") { + val N = 1 << 20 + + val benchmark = new Benchmark("collect", N) + benchmark.addCase("collect 1 million") { iter => + sparkSession.range(N).collect() + } + benchmark.addCase("collect 2 millions") { iter => + sparkSession.range(N * 2).collect() + } + benchmark.addCase("collect 4 millions") { iter => + sparkSession.range(N * 4).collect() + } + benchmark.run() + + /** + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + collect 1 million 439 / 654 2.4 418.7 1.0X + collect 2 millions 961 / 1907 1.1 916.4 0.5X + collect 4 millions 3193 / 3895 0.3 3044.7 0.1X + */ + } + + ignore("collect limit") { + val N = 1 << 20 + + val benchmark = new Benchmark("collect limit", N) + benchmark.addCase("collect limit 1 million") { iter => + sparkSession.range(N * 4).limit(N).collect() + } + benchmark.addCase("collect limit 2 millions") { iter => + sparkSession.range(N * 4).limit(N * 2).collect() + } + benchmark.run() + + /** + model name : Westmere E56xx/L56xx/X56xx (Nehalem-C) + collect limit: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + collect limit 1 million 833 / 1284 1.3 794.4 1.0X + collect limit 2 millions 3348 / 4005 0.3 3193.3 0.2X + */ + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala new file mode 100644 index 000000000000..0e1868dd6656 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import java.util.{Arrays, Comparator} + +import org.apache.spark.unsafe.array.LongArray +import org.apache.spark.unsafe.memory.MemoryBlock +import org.apache.spark.util.Benchmark +import org.apache.spark.util.collection.Sorter +import org.apache.spark.util.collection.unsafe.sort._ +import org.apache.spark.util.random.XORShiftRandom + +/** + * Benchmark to measure performance for aggregate primitives. + * To run this: + * build/sbt "sql/test-only *benchmark.SortBenchmark" + * + * Benchmarks in this file are skipped in normal builds. + */ +class SortBenchmark extends BenchmarkBase { + + private def referenceKeyPrefixSort(buf: LongArray, lo: Int, hi: Int, refCmp: PrefixComparator) { + new Sorter(UnsafeSortDataFormat.INSTANCE).sort( + buf, lo, hi, new Comparator[RecordPointerAndKeyPrefix] { + override def compare( + r1: RecordPointerAndKeyPrefix, + r2: RecordPointerAndKeyPrefix): Int = { + refCmp.compare(r1.keyPrefix, r2.keyPrefix) + } + }) + } + + private def generateKeyPrefixTestData(size: Int, rand: => Long): (LongArray, LongArray) = { + val ref = Array.tabulate[Long](size * 2) { i => rand } + val extended = ref ++ Array.fill[Long](size * 2)(0) + (new LongArray(MemoryBlock.fromLongArray(ref)), + new LongArray(MemoryBlock.fromLongArray(extended))) + } + + ignore("sort") { + val size = 25000000 + val rand = new XORShiftRandom(123) + val benchmark = new Benchmark("radix sort " + size, size) + benchmark.addTimerCase("reference TimSort key prefix array") { timer => + val array = Array.tabulate[Long](size * 2) { i => rand.nextLong } + val buf = new LongArray(MemoryBlock.fromLongArray(array)) + timer.startTiming() + referenceKeyPrefixSort(buf, 0, size, PrefixComparators.BINARY) + timer.stopTiming() + } + benchmark.addTimerCase("reference Arrays.sort") { timer => + val ref = Array.tabulate[Long](size) { i => rand.nextLong } + timer.startTiming() + Arrays.sort(ref) + timer.stopTiming() + } + benchmark.addTimerCase("radix sort one byte") { timer => + val array = new Array[Long](size * 2) + var i = 0 + while (i < size) { + array(i) = rand.nextLong & 0xff + i += 1 + } + val buf = new LongArray(MemoryBlock.fromLongArray(array)) + timer.startTiming() + RadixSort.sort(buf, size, 0, 7, false, false) + timer.stopTiming() + } + benchmark.addTimerCase("radix sort two bytes") { timer => + val array = new Array[Long](size * 2) + var i = 0 + while (i < size) { + array(i) = rand.nextLong & 0xffff + i += 1 + } + val buf = new LongArray(MemoryBlock.fromLongArray(array)) + timer.startTiming() + RadixSort.sort(buf, size, 0, 7, false, false) + timer.stopTiming() + } + benchmark.addTimerCase("radix sort eight bytes") { timer => + val array = new Array[Long](size * 2) + var i = 0 + while (i < size) { + array(i) = rand.nextLong + i += 1 + } + val buf = new LongArray(MemoryBlock.fromLongArray(array)) + timer.startTiming() + RadixSort.sort(buf, size, 0, 7, false, false) + timer.stopTiming() + } + benchmark.addTimerCase("radix sort key prefix array") { timer => + val (_, buf2) = generateKeyPrefixTestData(size, rand.nextLong) + timer.startTiming() + RadixSort.sortKeyPrefixArray(buf2, size, 0, 7, false, false) + timer.stopTiming() + } + benchmark.run() + + /* + Running benchmark: radix sort 25000000 + Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 3.13.0-44-generic + Intel(R) Core(TM) i7-4600U CPU @ 2.10GHz + + radix sort 25000000: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + reference TimSort key prefix array 15546 / 15859 1.6 621.9 1.0X + reference Arrays.sort 2416 / 2446 10.3 96.6 6.4X + radix sort one byte 133 / 137 188.4 5.3 117.2X + radix sort two bytes 255 / 258 98.2 10.2 61.1X + radix sort eight bytes 991 / 997 25.2 39.6 15.7X + radix sort key prefix array 1540 / 1563 16.2 61.6 10.1X + */ + } +} From b65505b7e6e9098a3b06478252f7095d57405d31 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 4 May 2016 01:21:24 -0700 Subject: [PATCH 2/2] update import --- .../spark/util/collection/unsafe/sort/RadixSortSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala index 8ff6485599bc..def0752b46f6 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.unsafe.array.LongArray import org.apache.spark.unsafe.memory.MemoryBlock -import org.apache.spark.util.Benchmark import org.apache.spark.util.collection.Sorter import org.apache.spark.util.random.XORShiftRandom