@@ -39,13 +39,17 @@ import org.apache.spark.sql.types._
3939object JSONBenchmark extends SqlBasedBenchmark {
4040 import spark .implicits ._
4141
42- def schemaInferring (rowsNum : Int ): Unit = {
42+ def prepareDataInfo (benchmark : Benchmark ): Unit = {
43+ // scalastyle:off println
44+ benchmark.out.println(" Preparing data for benchmarking ..." )
45+ // scalastyle:on println
46+ }
47+
48+ def schemaInferring (rowsNum : Int , numIters : Int ): Unit = {
4349 val benchmark = new Benchmark (" JSON schema inferring" , rowsNum, output = output)
4450
4551 withTempPath { path =>
46- // scalastyle:off println
47- benchmark.out.println(" Preparing data for benchmarking ..." )
48- // scalastyle:on println
52+ prepareDataInfo(benchmark)
4953
5054 spark.sparkContext.range(0 , rowsNum, 1 )
5155 .map(_ => " a" )
@@ -54,11 +58,11 @@ object JSONBenchmark extends SqlBasedBenchmark {
5458 .option(" encoding" , " UTF-8" )
5559 .json(path.getAbsolutePath)
5660
57- benchmark.addCase(" No encoding" , 3 ) { _ =>
61+ benchmark.addCase(" No encoding" , numIters ) { _ =>
5862 spark.read.json(path.getAbsolutePath)
5963 }
6064
61- benchmark.addCase(" UTF-8 is set" , 3 ) { _ =>
65+ benchmark.addCase(" UTF-8 is set" , numIters ) { _ =>
6266 spark.read
6367 .option(" encoding" , " UTF-8" )
6468 .json(path.getAbsolutePath)
@@ -68,28 +72,29 @@ object JSONBenchmark extends SqlBasedBenchmark {
6872 }
6973 }
7074
71- def perlineParsing (rowsNum : Int ): Unit = {
72- val benchmark = new Benchmark (" JSON per-line parsing" , rowsNum, output = output)
75+ def writeShortColumn (path : String , rowsNum : Int ): StructType = {
76+ spark.sparkContext.range(0 , rowsNum, 1 )
77+ .map(_ => " a" )
78+ .toDF(" fieldA" )
79+ .write.json(path)
80+ new StructType ().add(" fieldA" , StringType )
81+ }
7382
74- withTempPath { path =>
75- // scalastyle:off println
76- benchmark.out.println(" Preparing data for benchmarking ..." )
77- // scalastyle:on println
83+ def countShortColumn (rowsNum : Int , numIters : Int ): Unit = {
84+ val benchmark = new Benchmark (" count a short column" , rowsNum, output = output)
7885
79- spark.sparkContext.range(0 , rowsNum, 1 )
80- .map(_ => " a" )
81- .toDF(" fieldA" )
82- .write.json(path.getAbsolutePath)
83- val schema = new StructType ().add(" fieldA" , StringType )
86+ withTempPath { path =>
87+ prepareDataInfo(benchmark)
88+ val schema = writeShortColumn(path.getAbsolutePath, rowsNum)
8489
85- benchmark.addCase(" No encoding" , 3 ) { _ =>
90+ benchmark.addCase(" No encoding" , numIters ) { _ =>
8691 spark.read
8792 .schema(schema)
8893 .json(path.getAbsolutePath)
8994 .count()
9095 }
9196
92- benchmark.addCase(" UTF-8 is set" , 3 ) { _ =>
97+ benchmark.addCase(" UTF-8 is set" , numIters ) { _ =>
9398 spark.read
9499 .option(" encoding" , " UTF-8" )
95100 .schema(schema)
@@ -101,35 +106,36 @@ object JSONBenchmark extends SqlBasedBenchmark {
101106 }
102107 }
103108
104- def perlineParsingOfWideColumn (rowsNum : Int ): Unit = {
105- val benchmark = new Benchmark (" JSON parsing of wide lines" , rowsNum, output = output)
109+ def writeWideColumn (path : String , rowsNum : Int ): StructType = {
110+ spark.sparkContext.range(0 , rowsNum, 1 )
111+ .map { i =>
112+ val s = " abcdef0123456789ABCDEF" * 20
113+ s """ {"a":" $s","b": $i,"c":" $s","d": $i,"e":" $s","f": $i,"x":" $s","y": $i,"z":" $s"} """
114+ }
115+ .toDF().write.text(path)
116+ new StructType ()
117+ .add(" a" , StringType ).add(" b" , LongType )
118+ .add(" c" , StringType ).add(" d" , LongType )
119+ .add(" e" , StringType ).add(" f" , LongType )
120+ .add(" x" , StringType ).add(" y" , LongType )
121+ .add(" z" , StringType )
122+ }
123+
124+ def countWideColumn (rowsNum : Int , numIters : Int ): Unit = {
125+ val benchmark = new Benchmark (" count a wide column" , rowsNum, output = output)
106126
107127 withTempPath { path =>
108- // scalastyle:off println
109- benchmark.out.println(" Preparing data for benchmarking ..." )
110- // scalastyle:on println
128+ prepareDataInfo(benchmark)
129+ val schema = writeWideColumn(path.getAbsolutePath, rowsNum)
111130
112- spark.sparkContext.range(0 , rowsNum, 1 )
113- .map { i =>
114- val s = " abcdef0123456789ABCDEF" * 20
115- s """ {"a":" $s","b": $i,"c":" $s","d": $i,"e":" $s","f": $i,"x":" $s","y": $i,"z":" $s"} """
116- }
117- .toDF().write.text(path.getAbsolutePath)
118- val schema = new StructType ()
119- .add(" a" , StringType ).add(" b" , LongType )
120- .add(" c" , StringType ).add(" d" , LongType )
121- .add(" e" , StringType ).add(" f" , LongType )
122- .add(" x" , StringType ).add(" y" , LongType )
123- .add(" z" , StringType )
124-
125- benchmark.addCase(" No encoding" , 3 ) { _ =>
131+ benchmark.addCase(" No encoding" , numIters) { _ =>
126132 spark.read
127133 .schema(schema)
128134 .json(path.getAbsolutePath)
129135 .count()
130136 }
131137
132- benchmark.addCase(" UTF-8 is set" , 3 ) { _ =>
138+ benchmark.addCase(" UTF-8 is set" , numIters ) { _ =>
133139 spark.read
134140 .option(" encoding" , " UTF-8" )
135141 .schema(schema)
@@ -141,12 +147,14 @@ object JSONBenchmark extends SqlBasedBenchmark {
141147 }
142148 }
143149
144- def countBenchmark (rowsNum : Int ): Unit = {
150+ def selectSubsetOfColumns (rowsNum : Int , numIters : Int ): Unit = {
145151 val colsNum = 10
146152 val benchmark =
147- new Benchmark (s " Count a dataset with $colsNum columns " , rowsNum, output = output)
153+ new Benchmark (s " Select a subset of $colsNum columns " , rowsNum, output = output)
148154
149155 withTempPath { path =>
156+ prepareDataInfo(benchmark)
157+
150158 val fields = Seq .tabulate(colsNum)(i => StructField (s " col $i" , IntegerType ))
151159 val schema = StructType (fields)
152160 val columnNames = schema.fieldNames
@@ -158,26 +166,78 @@ object JSONBenchmark extends SqlBasedBenchmark {
158166
159167 val ds = spark.read.schema(schema).json(path.getAbsolutePath)
160168
161- benchmark.addCase(s " Select $colsNum columns + count() " , 3 ) { _ =>
169+ benchmark.addCase(s " Select $colsNum columns + count() " , numIters ) { _ =>
162170 ds.select(" *" ).filter((_ : Row ) => true ).count()
163171 }
164- benchmark.addCase(s " Select 1 column + count() " , 3 ) { _ =>
172+ benchmark.addCase(s " Select 1 column + count() " , numIters ) { _ =>
165173 ds.select($" col1" ).filter((_ : Row ) => true ).count()
166174 }
167- benchmark.addCase(s " count() " , 3 ) { _ =>
175+ benchmark.addCase(s " count() " , numIters ) { _ =>
168176 ds.count()
169177 }
170178
171179 benchmark.run()
172180 }
173181 }
174182
183+ def jsonParserCreation (rowsNum : Int , numIters : Int ): Unit = {
184+ val benchmark = new Benchmark (" creation of JSON parser per line" , rowsNum, output = output)
185+
186+ withTempPath { path =>
187+ prepareDataInfo(benchmark)
188+
189+ val shortColumnPath = path.getAbsolutePath + " /short"
190+ val shortSchema = writeShortColumn(shortColumnPath, rowsNum)
191+
192+ val wideColumnPath = path.getAbsolutePath + " /wide"
193+ val wideSchema = writeWideColumn(wideColumnPath, rowsNum)
194+
195+ benchmark.addCase(" Short column without encoding" , numIters) { _ =>
196+ spark.read
197+ .schema(shortSchema)
198+ .json(shortColumnPath)
199+ .filter((_ : Row ) => true )
200+ .count()
201+ }
202+
203+ benchmark.addCase(" Short column with UTF-8" , numIters) { _ =>
204+ spark.read
205+ .option(" encoding" , " UTF-8" )
206+ .schema(shortSchema)
207+ .json(shortColumnPath)
208+ .filter((_ : Row ) => true )
209+ .count()
210+ }
211+
212+ benchmark.addCase(" Wide column without encoding" , numIters) { _ =>
213+ spark.read
214+ .schema(wideSchema)
215+ .json(wideColumnPath)
216+ .filter((_ : Row ) => true )
217+ .count()
218+ }
219+
220+ benchmark.addCase(" Wide column with UTF-8" , numIters) { _ =>
221+ spark.read
222+ .option(" encoding" , " UTF-8" )
223+ .schema(wideSchema)
224+ .json(wideColumnPath)
225+ .filter((_ : Row ) => true )
226+ .count()
227+ }
228+
229+ benchmark.run()
230+ }
231+ }
232+
175233 override def runBenchmarkSuite (mainArgs : Array [String ]): Unit = {
234+ val numIters = 3
176235 runBenchmark(" Benchmark for performance of JSON parsing" ) {
177- schemaInferring(100 * 1000 * 1000 )
178- perlineParsing(100 * 1000 * 1000 )
179- perlineParsingOfWideColumn(10 * 1000 * 1000 )
180- countBenchmark(10 * 1000 * 1000 )
236+ schemaInferring(100 * 1000 * 1000 , numIters)
237+ countShortColumn(100 * 1000 * 1000 , numIters)
238+ countWideColumn(10 * 1000 * 1000 , numIters)
239+ selectSubsetOfColumns(10 * 1000 * 1000 , numIters)
240+ jsonParserCreation(10 * 1000 * 1000 , numIters)
181241 }
182242 }
183243}
0 commit comments