From 2369e3acee730b7d4e45175870de0ecac601069b Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Wed, 20 Jun 2018 11:34:36 -0500 Subject: [PATCH 1/6] [SPARK-24610] fix reading small files via wholeTextFiles --- .../input/WholeTextFileInputFormat.scala | 13 +++ .../input/WholeTextFileInputFormatSuite.scala | 92 +++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index f47cd38d712c..04c5c4b90e8a 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minPartitions == 0) 1 else minPartitions)).toLong + + // For small files we need to ensure the min split size per node & rack <= maxSplitSize + val config = context.getConfiguration + val minSplitSizePerNode = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L) + val minSplitSizePerRack = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L) + + if (maxSplitSize < minSplitSizePerNode) { + super.setMinSplitSizeNode(maxSplitSize) + } + + if (maxSplitSize < minSplitSizePerRack) { + super.setMinSplitSizeRack(maxSplitSize) + } super.setMaxSplitSize(maxSplitSize) } } diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala new file mode 100644 index 000000000000..b6e01abf7ba5 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.input + +import java.io.{DataOutputStream, File, FileOutputStream} + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.scalatest.BeforeAndAfterAll + +import scala.collection.immutable.IndexedSeq + +/** + * Tests the correctness of + * [[org.apache.spark.input.WholeTextFileInputFormat WholeTextFileInputFormat]]. A temporary + * directory containing files is created as fake input which is deleted in the end. + */ +class WholeTextFileInputFormatSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { + private var sc: SparkContext = _ + + override def beforeAll() { + super.beforeAll() + val conf = new SparkConf() + sc = new SparkContext("local", "test", conf) + + sc.hadoopConfiguration.setLong("mapreduce.input.fileinputformat.split.minsize.per.node", 123456) + sc.hadoopConfiguration.setLong("mapreduce.input.fileinputformat.split.minsize.per.rack", 123456) + } + + override def afterAll() { + try { + sc.stop() + } finally { + super.afterAll() + } + } + + private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte], + compress: Boolean) = { + val path = s"${inputDir.toString}/$fileName" + val out = new DataOutputStream(new FileOutputStream(path)) + out.write(contents, 0, contents.length) + out.close() + } + + test("for small files minimum split size per node and per rack should be less than or equal to " + + "maximum split size.") { + var dir : File = null; + try { + dir = Utils.createTempDir() + logInfo(s"Local disk address is ${dir.toString}.") + + WholeTextFileInputFormatSuite.files.foreach { case (filename, contents) => + createNativeFile(dir, filename, contents, false) + } + + val res = sc.wholeTextFiles(dir.toString).count + } finally { + Utils.deleteRecursively(dir) + } + } +} + +/** + * Files to be tested are defined here. + */ +object WholeTextFileInputFormatSuite { + private val testWords: IndexedSeq[Byte] = "Spark is easy to use.\n".map(_.toByte) + + private val fileNames = Array("part-00000", "part-00001", "part-00002") + private val fileLengths = Array(10, 100, 1000) + + private val files = fileLengths.zip(fileNames).map { case (upperBound, filename) => + filename -> Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray + }.toMap +} From e2d4e07984751a7fc08e53f98dbd604d47f2f035 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Wed, 20 Jun 2018 13:52:09 -0500 Subject: [PATCH 2/6] Fix stylecheck --- .../spark/input/WholeTextFileInputFormatSuite.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala index b6e01abf7ba5..182add491ec4 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala @@ -19,12 +19,13 @@ package org.apache.spark.input import java.io.{DataOutputStream, File, FileOutputStream} -import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import scala.collection.immutable.IndexedSeq + import org.scalatest.BeforeAndAfterAll -import scala.collection.immutable.IndexedSeq +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils /** * Tests the correctness of From b3514067db43b543d8ceac38a0e1ffe6c1a5692e Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Mon, 2 Jul 2018 16:35:25 -0500 Subject: [PATCH 3/6] Add assert check --- .../apache/spark/input/WholeTextFileInputFormatSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala index 182add491ec4..7144dc77e3c1 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala @@ -70,8 +70,8 @@ class WholeTextFileInputFormatSuite extends SparkFunSuite with BeforeAndAfterAll WholeTextFileInputFormatSuite.files.foreach { case (filename, contents) => createNativeFile(dir, filename, contents, false) } - - val res = sc.wholeTextFiles(dir.toString).count + // ensure spark job runs successfully without exceptions from the CombineFileInputFormat + assert(sc.wholeTextFiles(dir.toString).count == 1) } finally { Utils.deleteRecursively(dir) } From 15356df4f796a5e811a79431fb9f9bb122f03c8b Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Tue, 3 Jul 2018 09:01:11 -0500 Subject: [PATCH 4/6] Fix unit tests --- .../org/apache/spark/input/WholeTextFileInputFormatSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala index 7144dc77e3c1..95f56165339b 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala @@ -71,7 +71,7 @@ class WholeTextFileInputFormatSuite extends SparkFunSuite with BeforeAndAfterAll createNativeFile(dir, filename, contents, false) } // ensure spark job runs successfully without exceptions from the CombineFileInputFormat - assert(sc.wholeTextFiles(dir.toString).count == 1) + assert(sc.wholeTextFiles(dir.toString).count == 3) } finally { Utils.deleteRecursively(dir) } From f1c41608c22e3b11271838852370021b10d546ed Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Mon, 9 Jul 2018 15:53:32 -0500 Subject: [PATCH 5/6] Add comment and refactor the config set --- .../apache/spark/input/WholeTextFileInputFormatSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala index 95f56165339b..87618a716443 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala @@ -39,9 +39,6 @@ class WholeTextFileInputFormatSuite extends SparkFunSuite with BeforeAndAfterAll super.beforeAll() val conf = new SparkConf() sc = new SparkContext("local", "test", conf) - - sc.hadoopConfiguration.setLong("mapreduce.input.fileinputformat.split.minsize.per.node", 123456) - sc.hadoopConfiguration.setLong("mapreduce.input.fileinputformat.split.minsize.per.rack", 123456) } override def afterAll() { @@ -67,6 +64,10 @@ class WholeTextFileInputFormatSuite extends SparkFunSuite with BeforeAndAfterAll dir = Utils.createTempDir() logInfo(s"Local disk address is ${dir.toString}.") + // Set the minsize per node and rack to be larger than the size of the input file. + sc.hadoopConfiguration.setLong("mapreduce.input.fileinputformat.split.minsize.per.node", 123456) + sc.hadoopConfiguration.setLong("mapreduce.input.fileinputformat.split.minsize.per.rack", 123456) + WholeTextFileInputFormatSuite.files.foreach { case (filename, contents) => createNativeFile(dir, filename, contents, false) } From bcb2991b278cafb2f163bae0069293c61b939898 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Mon, 9 Jul 2018 16:22:46 -0500 Subject: [PATCH 6/6] Fix scalastyle check --- .../spark/input/WholeTextFileInputFormatSuite.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala index 87618a716443..817dc082b7d3 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala @@ -65,9 +65,11 @@ class WholeTextFileInputFormatSuite extends SparkFunSuite with BeforeAndAfterAll logInfo(s"Local disk address is ${dir.toString}.") // Set the minsize per node and rack to be larger than the size of the input file. - sc.hadoopConfiguration.setLong("mapreduce.input.fileinputformat.split.minsize.per.node", 123456) - sc.hadoopConfiguration.setLong("mapreduce.input.fileinputformat.split.minsize.per.rack", 123456) - + sc.hadoopConfiguration.setLong( + "mapreduce.input.fileinputformat.split.minsize.per.node", 123456) + sc.hadoopConfiguration.setLong( + "mapreduce.input.fileinputformat.split.minsize.per.rack", 123456) + WholeTextFileInputFormatSuite.files.foreach { case (filename, contents) => createNativeFile(dir, filename, contents, false) }