@@ -28,16 +28,10 @@ object DatasetBenchmark {
2828
2929 case class Data (l : Long , s : String )
3030
31- def main (args : Array [String ]): Unit = {
32- val sparkContext = new SparkContext (" local[*]" , " Dataset benchmark" )
33- val sqlContext = new SQLContext (sparkContext)
34-
31+ def backToBackMap (sqlContext : SQLContext , numRows : Long , numChains : Int ): Benchmark = {
3532 import sqlContext .implicits ._
3633
37- val numRows = 10000000
3834 val df = sqlContext.range(1 , numRows).select($" id" .as(" l" ), $" id" .cast(StringType ).as(" s" ))
39- val numChains = 10
40-
4135 val benchmark = new Benchmark (" back-to-back map" , numRows)
4236
4337 val func = (d : Data ) => Data (d.l + 1 , d.s)
@@ -61,7 +55,7 @@ object DatasetBenchmark {
6155 res.queryExecution.toRdd.foreach(_ => Unit )
6256 }
6357
64- val rdd = sparkContext.range(1 , numRows).map(l => Data (l, l.toString))
58+ val rdd = sqlContext. sparkContext.range(1 , numRows).map(l => Data (l, l.toString))
6559 benchmark.addCase(" RDD" ) { iter =>
6660 var res = rdd
6761 var i = 0
@@ -72,6 +66,63 @@ object DatasetBenchmark {
7266 res.foreach(_ => Unit )
7367 }
7468
69+ benchmark
70+ }
71+
72+ def backToBackFilter (sqlContext : SQLContext , numRows : Long , numChains : Int ): Benchmark = {
73+ import sqlContext .implicits ._
74+
75+ val df = sqlContext.range(1 , numRows).select($" id" .as(" l" ), $" id" .cast(StringType ).as(" s" ))
76+ val benchmark = new Benchmark (" back-to-back filter" , numRows)
77+
78+ val func = (d : Data , i : Int ) => d.l % (100L + i) == 0L
79+ val funcs = 0 .until(numChains).map { i =>
80+ (d : Data ) => func(d, i)
81+ }
82+ benchmark.addCase(" Dataset" ) { iter =>
83+ var res = df.as[Data ]
84+ var i = 0
85+ while (i < numChains) {
86+ res = res.filter(funcs(i))
87+ i += 1
88+ }
89+ res.queryExecution.toRdd.foreach(_ => Unit )
90+ }
91+
92+ benchmark.addCase(" DataFrame" ) { iter =>
93+ var res = df
94+ var i = 0
95+ while (i < numChains) {
96+ res = res.filter($" l" % (100L + i) === 0L )
97+ i += 1
98+ }
99+ res.queryExecution.toRdd.foreach(_ => Unit )
100+ }
101+
102+ val rdd = sqlContext.sparkContext.range(1 , numRows).map(l => Data (l, l.toString))
103+ benchmark.addCase(" RDD" ) { iter =>
104+ var res = rdd
105+ var i = 0
106+ while (i < numChains) {
107+ res = rdd.filter(funcs(i))
108+ i += 1
109+ }
110+ res.foreach(_ => Unit )
111+ }
112+
113+ benchmark
114+ }
115+
116+ def main (args : Array [String ]): Unit = {
117+ val sparkContext = new SparkContext (" local[*]" , " Dataset benchmark" )
118+ val sqlContext = new SQLContext (sparkContext)
119+
120+ val numRows = 10000000
121+ val numChains = 10
122+
123+ val benchmark = backToBackMap(sqlContext, numRows, numChains)
124+ val benchmark2 = backToBackFilter(sqlContext, numRows, numChains)
125+
75126 /*
76127 Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11.4
77128 Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
@@ -82,5 +133,14 @@ object DatasetBenchmark {
82133 RDD 216 / 237 46.3 21.6 4.2X
83134 */
84135 benchmark.run()
136+
137+ /*
138+ back-to-back filter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
139+ -------------------------------------------------------------------------------------------
140+ Dataset 585 / 628 17.1 58.5 1.0X
141+ DataFrame 62 / 80 160.7 6.2 9.4X
142+ RDD 205 / 220 48.7 20.5 2.8X
143+ */
144+ benchmark2.run()
85145 }
86146}
0 commit comments