Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat
val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
val maxSplitSize = Math.ceil(totalLen * 1.0 /
(if (minPartitions == 0) 1 else minPartitions)).toLong

// For small files we need to ensure the min split size per node & rack <= maxSplitSize
val config = context.getConfiguration
val minSplitSizePerNode = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L)
val minSplitSizePerRack = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)

if (maxSplitSize < minSplitSizePerNode) {
super.setMinSplitSizeNode(maxSplitSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a point in even checking the configuration? Why not just set these to 0L unconditionally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIU If we set these to 0L unconditionally, every time there is left over data which wasn't combined into a split, would result in its own split because minSplitSizePerNode is 0L.
This shouldn't be an issue for small no. of files. But if we have a large no. of small files which result in a similar situation, we will end up having more splits rather than combining these together to form lesser no. of splits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if a user specifies them via configs we are ensuring that these don't break the code. If we set them to 0L where a user specifies them, we would end up breaking the code anyways as the way CombineFileInputFormat works is it checks to see if the setting is 0L or not. If it is 0 it ends up picking the value from the config. https://github.com/apache/hadoop/blob/release-2.8.2-RC0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java#L182 So we would have to atleast set the config to avoid hitting the error.

}

if (maxSplitSize < minSplitSizePerRack) {
super.setMinSplitSizeRack(maxSplitSize)
}
super.setMaxSplitSize(maxSplitSize)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.input

import java.io.{DataOutputStream, File, FileOutputStream}

import scala.collection.immutable.IndexedSeq

import org.scalatest.BeforeAndAfterAll

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

/**
* Tests the correctness of
* [[org.apache.spark.input.WholeTextFileInputFormat WholeTextFileInputFormat]]. A temporary
* directory containing files is created as fake input which is deleted in the end.
*/
class WholeTextFileInputFormatSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
private var sc: SparkContext = _

override def beforeAll() {
super.beforeAll()
val conf = new SparkConf()
sc = new SparkContext("local", "test", conf)
}

override def afterAll() {
try {
sc.stop()
} finally {
super.afterAll()
}
}

private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte],
compress: Boolean) = {
val path = s"${inputDir.toString}/$fileName"
val out = new DataOutputStream(new FileOutputStream(path))
out.write(contents, 0, contents.length)
out.close()
}

test("for small files minimum split size per node and per rack should be less than or equal to " +
"maximum split size.") {
var dir : File = null;
try {
dir = Utils.createTempDir()
logInfo(s"Local disk address is ${dir.toString}.")

// Set the minsize per node and rack to be larger than the size of the input file.
sc.hadoopConfiguration.setLong(
"mapreduce.input.fileinputformat.split.minsize.per.node", 123456)
sc.hadoopConfiguration.setLong(
"mapreduce.input.fileinputformat.split.minsize.per.rack", 123456)

WholeTextFileInputFormatSuite.files.foreach { case (filename, contents) =>
createNativeFile(dir, filename, contents, false)
}
// ensure spark job runs successfully without exceptions from the CombineFileInputFormat
assert(sc.wholeTextFiles(dir.toString).count == 3)
} finally {
Utils.deleteRecursively(dir)
}
}
}

/**
* Files to be tested are defined here.
*/
object WholeTextFileInputFormatSuite {
private val testWords: IndexedSeq[Byte] = "Spark is easy to use.\n".map(_.toByte)

private val fileNames = Array("part-00000", "part-00001", "part-00002")
private val fileLengths = Array(10, 100, 1000)

private val files = fileLengths.zip(fileNames).map { case (upperBound, filename) =>
filename -> Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray
}.toMap
}