@@ -60,9 +60,15 @@ object Partitioner {
6060 def defaultPartitioner (rdd : RDD [_], others : RDD [_]* ): Partitioner = {
6161 val rdds = (Seq (rdd) ++ others)
6262 val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0 ))
63- if (hasPartitioner.nonEmpty
64- && isEligiblePartitioner(hasPartitioner.maxBy(_.partitions.length), rdds)) {
65- hasPartitioner.maxBy(_.partitions.length).partitioner.get
63+
64+ val hasMaxPartitioner = if (hasPartitioner.nonEmpty){
65+ Some (hasPartitioner.maxBy(_.partitions.length))
66+ } else {
67+ None
68+ }
69+
70+ if (isEligiblePartitioner(hasMaxPartitioner, rdds)) {
71+ hasMaxPartitioner.get.partitioner.get
6672 } else {
6773 if (rdd.context.conf.contains(" spark.default.parallelism" )) {
6874 new HashPartitioner (rdd.context.defaultParallelism)
@@ -77,9 +83,12 @@ object Partitioner {
7783 * less than and within a single order of magnitude of the max number of upstream partitions;
7884 * otherwise, returns false
7985 */
80- private def isEligiblePartitioner (hasMaxPartitioner : RDD [_], rdds : Seq [RDD [_]]): Boolean = {
86+ private def isEligiblePartitioner (hasMaxPartitioner : Option [RDD [_]], rdds : Seq [RDD [_]]): Boolean = {
87+ if (hasMaxPartitioner.isEmpty){
88+ return false
89+ }
8190 val maxPartitions = rdds.map(_.partitions.length).max
82- log10(maxPartitions).floor - log10(hasMaxPartitioner.getNumPartitions).floor < 1
91+ log10(maxPartitions) - log10(hasMaxPartitioner.get. getNumPartitions) < 1
8392 }
8493}
8594
0 commit comments