diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 0e2ae19ca39a..2de2c2fd1a60 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2029,12 +2029,12 @@ def coalesce(self, numPartitions, shuffle=False): [[1, 2, 3, 4, 5]] """ if shuffle: - # In Scala's repartition code, we will distribute elements evenly across output - # partitions. However, the RDD from Python is serialized as a single binary data, - # so the distribution fails and produces highly skewed partitions. We need to - # convert it to a RDD of java object before repartitioning. - data_java_rdd = self._to_java_object_rdd().coalesce(numPartitions, shuffle) - jrdd = self.ctx._jvm.SerDeUtil.javaToPython(data_java_rdd) + # Decrease the batch size in order to distribute evenly the elements across output + # partitions. Otherwise, repartition will possibly produce highly skewed partitions. + batchSize = min(10, self.ctx._batchSize or 1024) + ser = BatchedSerializer(PickleSerializer(), batchSize) + selfCopy = self._reserialize(ser) + jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle) else: jrdd = self._jrdd.coalesce(numPartitions, shuffle) return RDD(jrdd, self.ctx, self._jrdd_deserializer)