@@ -25,6 +25,17 @@ import org.apache.spark._
2525import org .apache .spark .SparkContext ._
2626
2727class ExternalSorterSuite extends FunSuite with LocalSparkContext {
28+ private def createSparkConf (loadDefaults : Boolean ): SparkConf = {
29+ val conf = new SparkConf (loadDefaults)
30+ // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
31+ // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
32+ conf.set(" spark.serializer.objectStreamReset" , " 0" )
33+ conf.set(" spark.serializer" , " org.apache.spark.serializer.JavaSerializer" )
34+ // Ensure that we actually have multiple batches per spill file
35+ conf.set(" spark.shuffle.spill.batchSize" , " 10" )
36+ conf
37+ }
38+
2839 test(" empty data stream" ) {
2940 val conf = new SparkConf (false )
3041 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
@@ -60,7 +71,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
6071 }
6172
6273 test(" few elements per partition" ) {
63- val conf = new SparkConf (false )
74+ val conf = createSparkConf (false )
6475 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
6576 conf.set(" spark.shuffle.manager" , " org.apache.spark.shuffle.sort.SortShuffleManager" )
6677 sc = new SparkContext (" local" , " test" , conf)
@@ -102,7 +113,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
102113 }
103114
104115 test(" empty partitions with spilling" ) {
105- val conf = new SparkConf (false )
116+ val conf = createSparkConf (false )
106117 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
107118 conf.set(" spark.shuffle.manager" , " org.apache.spark.shuffle.sort.SortShuffleManager" )
108119 sc = new SparkContext (" local" , " test" , conf)
@@ -127,7 +138,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
127138 }
128139
129140 test(" spilling in local cluster" ) {
130- val conf = new SparkConf (true ) // Load defaults, otherwise SPARK_HOME is not found
141+ val conf = createSparkConf (true ) // Load defaults, otherwise SPARK_HOME is not found
131142 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
132143 conf.set(" spark.shuffle.manager" , " org.apache.spark.shuffle.sort.SortShuffleManager" )
133144 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
@@ -198,7 +209,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
198209 }
199210
200211 test(" spilling in local cluster with many reduce tasks" ) {
201- val conf = new SparkConf (true ) // Load defaults, otherwise SPARK_HOME is not found
212+ val conf = createSparkConf (true ) // Load defaults, otherwise SPARK_HOME is not found
202213 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
203214 conf.set(" spark.shuffle.manager" , " org.apache.spark.shuffle.sort.SortShuffleManager" )
204215 sc = new SparkContext (" local-cluster[2,1,512]" , " test" , conf)
@@ -269,7 +280,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
269280 }
270281
271282 test(" cleanup of intermediate files in sorter" ) {
272- val conf = new SparkConf (true ) // Load defaults, otherwise SPARK_HOME is not found
283+ val conf = createSparkConf (true ) // Load defaults, otherwise SPARK_HOME is not found
273284 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
274285 conf.set(" spark.shuffle.manager" , " org.apache.spark.shuffle.sort.SortShuffleManager" )
275286 sc = new SparkContext (" local" , " test" , conf)
@@ -290,7 +301,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
290301 }
291302
292303 test(" cleanup of intermediate files in sorter if there are errors" ) {
293- val conf = new SparkConf (true ) // Load defaults, otherwise SPARK_HOME is not found
304+ val conf = createSparkConf (true ) // Load defaults, otherwise SPARK_HOME is not found
294305 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
295306 conf.set(" spark.shuffle.manager" , " org.apache.spark.shuffle.sort.SortShuffleManager" )
296307 sc = new SparkContext (" local" , " test" , conf)
@@ -311,7 +322,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
311322 }
312323
313324 test(" cleanup of intermediate files in shuffle" ) {
314- val conf = new SparkConf (false )
325+ val conf = createSparkConf (false )
315326 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
316327 conf.set(" spark.shuffle.manager" , " org.apache.spark.shuffle.sort.SortShuffleManager" )
317328 sc = new SparkContext (" local" , " test" , conf)
@@ -326,7 +337,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
326337 }
327338
328339 test(" cleanup of intermediate files in shuffle with errors" ) {
329- val conf = new SparkConf (false )
340+ val conf = createSparkConf (false )
330341 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
331342 conf.set(" spark.shuffle.manager" , " org.apache.spark.shuffle.sort.SortShuffleManager" )
332343 sc = new SparkContext (" local" , " test" , conf)
@@ -348,7 +359,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
348359 }
349360
350361 test(" no partial aggregation or sorting" ) {
351- val conf = new SparkConf (false )
362+ val conf = createSparkConf (false )
352363 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
353364 conf.set(" spark.shuffle.manager" , " org.apache.spark.shuffle.sort.SortShuffleManager" )
354365 sc = new SparkContext (" local" , " test" , conf)
@@ -363,7 +374,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
363374 }
364375
365376 test(" partial aggregation without spill" ) {
366- val conf = new SparkConf (false )
377+ val conf = createSparkConf (false )
367378 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
368379 conf.set(" spark.shuffle.manager" , " org.apache.spark.shuffle.sort.SortShuffleManager" )
369380 sc = new SparkContext (" local" , " test" , conf)
@@ -379,7 +390,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
379390 }
380391
381392 test(" partial aggregation with spill, no ordering" ) {
382- val conf = new SparkConf (false )
393+ val conf = createSparkConf (false )
383394 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
384395 conf.set(" spark.shuffle.manager" , " org.apache.spark.shuffle.sort.SortShuffleManager" )
385396 sc = new SparkContext (" local" , " test" , conf)
@@ -395,7 +406,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
395406 }
396407
397408 test(" partial aggregation with spill, with ordering" ) {
398- val conf = new SparkConf (false )
409+ val conf = createSparkConf (false )
399410 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
400411 conf.set(" spark.shuffle.manager" , " org.apache.spark.shuffle.sort.SortShuffleManager" )
401412 sc = new SparkContext (" local" , " test" , conf)
@@ -412,7 +423,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
412423 }
413424
414425 test(" sorting without aggregation, no spill" ) {
415- val conf = new SparkConf (false )
426+ val conf = createSparkConf (false )
416427 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
417428 conf.set(" spark.shuffle.manager" , " org.apache.spark.shuffle.sort.SortShuffleManager" )
418429 sc = new SparkContext (" local" , " test" , conf)
@@ -429,7 +440,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
429440 }
430441
431442 test(" sorting without aggregation, with spill" ) {
432- val conf = new SparkConf (false )
443+ val conf = createSparkConf (false )
433444 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
434445 conf.set(" spark.shuffle.manager" , " org.apache.spark.shuffle.sort.SortShuffleManager" )
435446 sc = new SparkContext (" local" , " test" , conf)
@@ -446,7 +457,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
446457 }
447458
448459 test(" spilling with hash collisions" ) {
449- val conf = new SparkConf (true )
460+ val conf = createSparkConf (true )
450461 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
451462 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
452463
@@ -503,7 +514,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
503514 }
504515
505516 test(" spilling with many hash collisions" ) {
506- val conf = new SparkConf (true )
517+ val conf = createSparkConf (true )
507518 conf.set(" spark.shuffle.memoryFraction" , " 0.0001" )
508519 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
509520
@@ -526,7 +537,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
526537 }
527538
528539 test(" spilling with hash collisions using the Int.MaxValue key" ) {
529- val conf = new SparkConf (true )
540+ val conf = createSparkConf (true )
530541 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
531542 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
532543
@@ -547,7 +558,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
547558 }
548559
549560 test(" spilling with null keys and values" ) {
550- val conf = new SparkConf (true )
561+ val conf = createSparkConf (true )
551562 conf.set(" spark.shuffle.memoryFraction" , " 0.001" )
552563 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
553564
0 commit comments