Skip to content

Commit 1055c94

Browse files
dhruvetgravescs
authored andcommitted
[SPARK-24610] fix reading small files via wholeTextFiles
## What changes were proposed in this pull request? The `WholeTextFileInputFormat` determines the `maxSplitSize` for the file/s being read using the `wholeTextFiles` method. While this works well for large files, for smaller files where the maxSplitSize is smaller than the defaults being used with configs like hive-site.xml or explicitly passed in the form of `mapreduce.input.fileinputformat.split.minsize.per.node` or `mapreduce.input.fileinputformat.split.minsize.per.rack` , it just throws up an exception. ```java java.io.IOException: Minimum split size pernode 123456 cannot be larger than maximum split size 9962 at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:200) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(WholeTextFileRDD.scala:50) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2096) at org.apache.spark.rdd.RDD.count(RDD.scala:1158) ... 48 elided ` This change checks the maxSplitSize against the minSplitSizePerNode and minSplitSizePerRack and set them if `maxSplitSize < minSplitSizePerNode/Rack` ## How was this patch tested? Test manually setting the conf while launching the job and added unit test. Author: Dhruve Ashar <[email protected]> Closes #21601 from dhruve/bug/SPARK-24610.
1 parent 9fa4a1e commit 1055c94

File tree

2 files changed

+109
-0
lines changed

2 files changed

+109
-0
lines changed

core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat
5353
val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
5454
val maxSplitSize = Math.ceil(totalLen * 1.0 /
5555
(if (minPartitions == 0) 1 else minPartitions)).toLong
56+
57+
// For small files we need to ensure the min split size per node & rack <= maxSplitSize
58+
val config = context.getConfiguration
59+
val minSplitSizePerNode = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L)
60+
val minSplitSizePerRack = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)
61+
62+
if (maxSplitSize < minSplitSizePerNode) {
63+
super.setMinSplitSizeNode(maxSplitSize)
64+
}
65+
66+
if (maxSplitSize < minSplitSizePerRack) {
67+
super.setMinSplitSizeRack(maxSplitSize)
68+
}
5669
super.setMaxSplitSize(maxSplitSize)
5770
}
5871
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.input
19+
20+
import java.io.{DataOutputStream, File, FileOutputStream}
21+
22+
import scala.collection.immutable.IndexedSeq
23+
24+
import org.scalatest.BeforeAndAfterAll
25+
26+
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
27+
import org.apache.spark.internal.Logging
28+
import org.apache.spark.util.Utils
29+
30+
/**
31+
* Tests the correctness of
32+
* [[org.apache.spark.input.WholeTextFileInputFormat WholeTextFileInputFormat]]. A temporary
33+
* directory containing files is created as fake input which is deleted in the end.
34+
*/
35+
class WholeTextFileInputFormatSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
36+
private var sc: SparkContext = _
37+
38+
override def beforeAll() {
39+
super.beforeAll()
40+
val conf = new SparkConf()
41+
sc = new SparkContext("local", "test", conf)
42+
}
43+
44+
override def afterAll() {
45+
try {
46+
sc.stop()
47+
} finally {
48+
super.afterAll()
49+
}
50+
}
51+
52+
private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte],
53+
compress: Boolean) = {
54+
val path = s"${inputDir.toString}/$fileName"
55+
val out = new DataOutputStream(new FileOutputStream(path))
56+
out.write(contents, 0, contents.length)
57+
out.close()
58+
}
59+
60+
test("for small files minimum split size per node and per rack should be less than or equal to " +
61+
"maximum split size.") {
62+
var dir : File = null;
63+
try {
64+
dir = Utils.createTempDir()
65+
logInfo(s"Local disk address is ${dir.toString}.")
66+
67+
// Set the minsize per node and rack to be larger than the size of the input file.
68+
sc.hadoopConfiguration.setLong(
69+
"mapreduce.input.fileinputformat.split.minsize.per.node", 123456)
70+
sc.hadoopConfiguration.setLong(
71+
"mapreduce.input.fileinputformat.split.minsize.per.rack", 123456)
72+
73+
WholeTextFileInputFormatSuite.files.foreach { case (filename, contents) =>
74+
createNativeFile(dir, filename, contents, false)
75+
}
76+
// ensure spark job runs successfully without exceptions from the CombineFileInputFormat
77+
assert(sc.wholeTextFiles(dir.toString).count == 3)
78+
} finally {
79+
Utils.deleteRecursively(dir)
80+
}
81+
}
82+
}
83+
84+
/**
85+
* Files to be tested are defined here.
86+
*/
87+
object WholeTextFileInputFormatSuite {
88+
private val testWords: IndexedSeq[Byte] = "Spark is easy to use.\n".map(_.toByte)
89+
90+
private val fileNames = Array("part-00000", "part-00001", "part-00002")
91+
private val fileLengths = Array(10, 100, 1000)
92+
93+
private val files = fileLengths.zip(fileNames).map { case (upperBound, filename) =>
94+
filename -> Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray
95+
}.toMap
96+
}

0 commit comments

Comments
 (0)