@@ -24,6 +24,8 @@ import org.scalatest.{PrivateMethodTester, FunSuite}
2424import org .apache .spark ._
2525import org .apache .spark .SparkContext ._
2626
27+ import scala .util .Random
28+
2729class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMethodTester {
2830 private def createSparkConf (loadDefaults : Boolean ): SparkConf = {
2931 val conf = new SparkConf (loadDefaults)
@@ -714,14 +716,39 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
714716 conf.set(" spark.shuffle.manager" , " sort" )
715717 sc = new SparkContext (" local-cluster[1,1,512]" , " test" , conf)
716718
719+ // Using wrongHashOrdering to show integer overflow introduced exception.
720+ val rand = new Random
721+ val wrongOrdering = new Ordering [Int ] {
722+ override def compare (a : Int , b : Int ) = a - b
723+ }
724+
725+ val testIntData = new Iterator [Int ] {
726+ private var count = 0
727+
728+ def hasNext = count < 1000000
729+
730+ def next (): Int = {
731+ count += 1 ; rand.nextInt()
732+ }
733+ } ++ Iterator [Int ](Int .MaxValue , Int .MinValue , Int .MaxValue , Int .MinValue )
734+
735+ val sorter1 = new ExternalSorter [Int , Int , Int ](
736+ None , None , Some (wrongOrdering), None )
737+ val thrown = intercept[IllegalArgumentException ] {
738+ sorter1.insertAll(testIntData.map(i => (i, i)))
739+ }
740+
741+ assert(thrown.getClass() === classOf [IllegalArgumentException ])
742+ assert(thrown.getMessage().contains(" Comparison method violates its general contract" ))
743+
744+ // Using aggregation and external spill to make sure ExternalSorter using
745+ // partitionKeyComparator.
717746 val testData = Array [String ](
718747 " hierarch" , // -1732884796
719748 " variants" , // -1249574770
720749 " inwork" , // -1183663690
721750 " isohel" , // -1179291542
722- " misused" // 1069518484
723- )
724- val expected = testData.map(s => (s, 200000 ))
751+ " misused" ) // 1069518484
725752
726753 def createCombiner (i : Int ) = ArrayBuffer (i)
727754 def mergeValue (c : ArrayBuffer [Int ], i : Int ) = c += i
@@ -730,23 +757,8 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe
730757 val agg = new Aggregator [String , Int , ArrayBuffer [Int ]](
731758 createCombiner, mergeValue, mergeCombiners)
732759
733- // Using wrongHashOrdering to show that integer overflow will lead to wrong sort result.
734- val wrongHashOrdering = new Ordering [String ] {
735- override def compare (a : String , b : String ) = {
736- val h1 = a.hashCode()
737- val h2 = b.hashCode()
738- h1 - h2
739- }
740- }
741- val sorter1 = new ExternalSorter [String , Int , ArrayBuffer [Int ]](
742- None , None , Some (wrongHashOrdering), None )
743- sorter1.insertAll(expected.iterator)
744-
745- val unexpectedResults = sorter1.iterator.toArray
746- assert(unexpectedResults !== expected)
760+ val expected = testData.map(i => (i, 25000 ))
747761
748- // Using aggregation and external spill to make sure ExternalSorter using
749- // partitionKeyComparator.
750762 val sorter2 = new ExternalSorter [String , Int , ArrayBuffer [Int ]](
751763 Some (agg), None , None , None )
752764 sorter2.insertAll(expected.flatMap { case (k, v) =>
0 commit comments