diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index d3148de6f41a3..e57d9c767e43a 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -577,7 +577,7 @@ def sortPartition(iterator): maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner fraction = min(maxSampleSize / max(rddSize, 1), 1.0) samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() - samples = sorted(samples, reverse=(not ascending), key=keyfunc) + samples = sorted(samples, key=keyfunc) # we have numPartitions many parts but one of the them has # an implicit boundary diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 06ba2b461d53e..39e277fdbb934 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -782,6 +782,17 @@ def test_narrow_dependency_in_join(self): jobId = tracker.getJobIdsForGroup("test4")[0] self.assertEqual(3, len(tracker.getJobInfo(jobId).stageIds)) + def test_sortByKey_uses_all_partitions_not_only_first_and_last(self): + # Regression test for SPARK-5969 + seq = [(i * 59 % 101, i) for i in range(101)] # unsorted sequence + rdd = self.sc.parallelize(seq) + for ascending in [True, False]: + sort = rdd.sortByKey(ascending=ascending, numPartitions=5) + self.assertEqual(sort.collect(), sorted(seq, reverse=not ascending)) + sizes = sort.glom().map(len).collect() + for size in sizes: + self.assertGreater(size, 0) + class ProfilerTests(PySparkTestCase):