Skip to content

Commit ed154ce

Browse files
committed
Hadoop RDD needs to sort the input partitions if we are going to assume a partitioner
1 parent b828f01 commit ed154ce

File tree

2 files changed

+55
-1
lines changed

2 files changed

+55
-1
lines changed

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,10 @@ class HadoopRDD[K, V](
200200
if (inputFormat.isInstanceOf[Configurable]) {
201201
inputFormat.asInstanceOf[Configurable].setConf(jobConf)
202202
}
203-
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
203+
// we have to sort the partitions here so that part-0000 goes to partition 0, etc. This is
204+
// so we can use the same partitioner after we save an RDD to hdfs and then read it back
205+
// SPARK-1061
206+
val inputSplits = inputFormat.getSplits(jobConf, minPartitions).sorted(SplitOrdering)
204207
val array = new Array[Partition](inputSplits.size)
205208
for (i <- 0 until inputSplits.size) {
206209
array(i) = new HadoopPartition(id, i, inputSplits(i))
@@ -416,3 +419,16 @@ private[spark] object HadoopRDD extends Logging {
416419
out.seq
417420
}
418421
}
422+
423+
private[spark] object SplitOrdering extends Ordering[InputSplit] {
424+
def compare(x: InputSplit, y: InputSplit): Int = {
425+
(x,y) match {
426+
case fileSplits: (FileSplit, FileSplit) =>
427+
fileSplitOrdering.compare(fileSplits._1, fileSplits._2)
428+
case _ => 1
429+
}
430+
}
431+
432+
val fileSplitOrdering: Ordering[FileSplit] = Ordering.by{fileSplit =>
433+
(fileSplit.getPath.toString, fileSplit.getStart)}
434+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.rdd
18+
19+
import java.util
20+
21+
import org.apache.hadoop.fs.Path
22+
import org.apache.hadoop.mapred.FileSplit
23+
import org.scalatest.{Matchers, FunSuite}
24+
25+
import scala.collection.JavaConverters._
26+
27+
class HadoopRDDSuite extends FunSuite with Matchers {
28+
test("file split ordering") {
29+
val splits = (0 until 10).map{idx =>
30+
new FileSplit(new Path("/foo/bar/part-0000" + idx), 0l, 0l, Array[String]())}
31+
32+
val javaShuffledSplits = new util.ArrayList[FileSplit]()
33+
splits.foreach{s => javaShuffledSplits.add(s)}
34+
java.util.Collections.shuffle(javaShuffledSplits)
35+
val scalaShuffledSplits = javaShuffledSplits.asScala
36+
scalaShuffledSplits.sorted(SplitOrdering) should be (splits)
37+
}
38+
}

0 commit comments

Comments
 (0)