From f8109371e6eab2ab4b30268aa7684bb685198297 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 10 Oct 2016 15:39:57 -0700 Subject: [PATCH 1/3] HadoopRDD should not catch EOFException --- .../org/apache/spark/rdd/HadoopRDD.scala | 8 +---- .../scala/org/apache/spark/FileSuite.scala | 36 ++++++++++++++++++- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 4640b5dc2f654..8a0e481d8f7d3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -17,7 +17,6 @@ package org.apache.spark.rdd -import java.io.EOFException import java.text.SimpleDateFormat import java.util.Date @@ -250,12 +249,7 @@ class HadoopRDD[K, V]( val value: V = reader.createValue() override def getNext(): (K, V) = { - try { - finished = !reader.next(key, value) - } catch { - case eof: EOFException => - finished = true - } + finished = !reader.next(key, value) if (!finished) { inputMetrics.incRecordsRead(1) } diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 993834f8d7d42..8f53eba21bfb6 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark -import java.io.{File, FileWriter} +import java.io._ +import java.util.zip.GZIPOutputStream import scala.io.Source @@ -541,4 +542,37 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { }.collect() assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001")) } + + test("HadoopRDD should not skip corrupt gzip files") { + val inputFile = File.createTempFile("input-", ".gz") + try { + // Create a corrupt gzip file + val byteOutput = new ByteArrayOutputStream() + val gzip = new GZIPOutputStream(byteOutput) + try { + gzip.write(Array[Byte](1, 2, 3, 4)) + } finally { + gzip.close() + } + val bytes = byteOutput.toByteArray + val o = new FileOutputStream(inputFile) + try { + // It's corrupt since we only write half of bytes into the file. + o.write(bytes.take(bytes.length / 2)) + } finally { + o.close() + } + + // Reading a corrupt gzip file should throw EOFException + sc = new SparkContext("local", "test") + val e = intercept[SparkException] { + sc.textFile(inputFile.toURI.toString).collect() + } + assert(e.getCause.isInstanceOf[EOFException]) + assert(e.getCause.getMessage === "Unexpected end of input stream") + } finally { + inputFile.delete() + } + } + } From ef88a64ac5e27e58f6f87bf0588ac1c3995be882 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 11 Oct 2016 17:08:04 -0700 Subject: [PATCH 2/3] Add a flag to ignore corrupt files --- .../spark/internal/config/package.scala | 5 +++ .../org/apache/spark/rdd/HadoopRDD.scala | 15 +++++++- .../org/apache/spark/rdd/NewHadoopRDD.scala | 15 +++++++- .../scala/org/apache/spark/FileSuite.scala | 30 ++++++++++++++- .../execution/datasources/FileScanRDD.scala | 30 ++++++++++++++- .../apache/spark/sql/internal/SQLConf.scala | 8 ++++ .../datasources/FileSourceStrategySuite.scala | 37 ++++++++++++++++++- 7 files changed, 134 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 0896e68eca7dc..10fe5a9c8de24 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -170,4 +170,9 @@ package object config { .doc("Port to use for the block managed on the driver.") .fallbackConf(BLOCK_MANAGER_PORT) + private[spark] val IGNORE_CORRUPT_FILES = ConfigBuilder("spark.files.ignoreCorruptFiles") + .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " + + "encountering corrupt files and contents that have been read will still be returned.") + .booleanConf + .createWithDefault(false) } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 8a0e481d8f7d3..05e01e6b80d80 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -17,6 +17,7 @@ package org.apache.spark.rdd +import java.io.IOException import java.text.SimpleDateFormat import java.util.Date @@ -42,6 +43,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation} import org.apache.spark.storage.StorageLevel @@ -138,6 +140,8 @@ class HadoopRDD[K, V]( private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false) + private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES) + // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. protected def getJobConf(): JobConf = { val conf: Configuration = broadcastedConf.value.value @@ -249,7 +253,16 @@ class HadoopRDD[K, V]( val value: V = reader.createValue() override def getNext(): (K, V) = { - finished = !reader.next(key, value) + try { + finished = !reader.next(key, value) + } catch { + case e: IOException => + if (ignoreCorruptFiles) { + finished = true + } else { + throw e + } + } if (!finished) { inputMetrics.incRecordsRead(1) } diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 1c7aec919bdc4..5d77ea31cc66e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -17,6 +17,7 @@ package org.apache.spark.rdd +import java.io.IOException import java.text.SimpleDateFormat import java.util.Date @@ -33,6 +34,7 @@ import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD import org.apache.spark.storage.StorageLevel import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager} @@ -85,6 +87,8 @@ class NewHadoopRDD[K, V]( private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false) + private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES) + def getConf: Configuration = { val conf: Configuration = confBroadcast.value.value if (shouldCloneJobConf) { @@ -179,7 +183,16 @@ class NewHadoopRDD[K, V]( override def hasNext: Boolean = { if (!finished && !havePair) { - finished = !reader.nextKeyValue + try { + finished = !reader.nextKeyValue + } catch { + case e: IOException => + if (ignoreCorruptFiles) { + finished = true + } else { + throw e + } + } if (finished) { // Close and release the reader here; close() will also be called when the task // completes, but for tasks that read from many files, it helps to release the diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 8f53eba21bfb6..cc52bb1d23cd5 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} import org.apache.spark.input.PortableDataStream +import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -543,7 +544,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001")) } - test("HadoopRDD should not skip corrupt gzip files") { + test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") { val inputFile = File.createTempFile("input-", ".gz") try { // Create a corrupt gzip file @@ -565,11 +566,36 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { // Reading a corrupt gzip file should throw EOFException sc = new SparkContext("local", "test") - val e = intercept[SparkException] { + // Test HadoopRDD + var e = intercept[SparkException] { sc.textFile(inputFile.toURI.toString).collect() } assert(e.getCause.isInstanceOf[EOFException]) assert(e.getCause.getMessage === "Unexpected end of input stream") + // Test NewHadoopRDD + e = intercept[SparkException] { + sc.newAPIHadoopFile( + inputFile.toURI.toString, + classOf[NewTextInputFormat], + classOf[LongWritable], + classOf[Text]).collect() + } + assert(e.getCause.isInstanceOf[EOFException]) + assert(e.getCause.getMessage === "Unexpected end of input stream") + sc.stop() + + val conf = new SparkConf().set(IGNORE_CORRUPT_FILES, true) + sc = new SparkContext("local", "test", conf) + // Test HadoopRDD + assert(sc.textFile(inputFile.toURI.toString).collect().isEmpty) + // Test NewHadoopRDD + assert { + sc.newAPIHadoopFile( + inputFile.toURI.toString, + classOf[NewTextInputFormat], + classOf[LongWritable], + classOf[Text]).collect().isEmpty + } } finally { inputFile.delete() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index c66da3a83198d..89944570df662 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import java.io.IOException + import scala.collection.mutable import org.apache.spark.{Partition => RDDPartition, TaskContext} @@ -25,6 +27,7 @@ import org.apache.spark.rdd.{InputFileNameHolder, RDD} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.vectorized.ColumnarBatch +import org.apache.spark.util.NextIterator /** * A part (i.e. "block") of a single file that should be read, along with partition column values @@ -62,6 +65,8 @@ class FileScanRDD( @transient val filePartitions: Seq[FilePartition]) extends RDD[InternalRow](sparkSession.sparkContext, Nil) { + private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = { val iterator = new Iterator[Object] with AutoCloseable { private val inputMetrics = context.taskMetrics().inputMetrics @@ -119,7 +124,30 @@ class FileScanRDD( InputFileNameHolder.setInputFileName(currentFile.filePath) try { - currentIterator = readFunction(currentFile) + if (ignoreCorruptFiles) { + currentIterator = new NextIterator[Object] { + private val internalIter = readFunction(currentFile) + + override def getNext(): AnyRef = { + try { + if (internalIter.hasNext) { + internalIter.next() + } else { + finished = true + null + } + } catch { + case e: IOException => + finished = true + null + } + } + + override def close(): Unit = {} + } + } else { + currentIterator = readFunction(currentFile) + } } catch { case e: java.io.FileNotFoundException => throw new java.io.FileNotFoundException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fecdf792fd14a..2654437ca6610 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -588,6 +588,12 @@ object SQLConf { .doubleConf .createWithDefault(0.05) + val IGNORE_CORRUPT_FILES = SQLConfigBuilder("spark.sql.files.ignoreCorruptFiles") + .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " + + "encountering corrupt files and contents that have been read will still be returned.") + .booleanConf + .createWithDefault(false) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } @@ -759,6 +765,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def warehousePath: String = new Path(getConf(WAREHOUSE_PATH)).toString + def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES) + override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL) override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 45411fa0656cd..c5deb31fec183 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -17,8 +17,9 @@ package org.apache.spark.sql.execution.datasources -import java.io.File +import java.io._ import java.util.concurrent.atomic.AtomicInteger +import java.util.zip.GZIPOutputStream import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, RawLocalFileSystem} @@ -441,6 +442,40 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi } } + test("spark.files.ignoreCorruptFiles should work in SQL") { + val inputFile = File.createTempFile("input-", ".gz") + try { + // Create a corrupt gzip file + val byteOutput = new ByteArrayOutputStream() + val gzip = new GZIPOutputStream(byteOutput) + try { + gzip.write(Array[Byte](1, 2, 3, 4)) + } finally { + gzip.close() + } + val bytes = byteOutput.toByteArray + val o = new FileOutputStream(inputFile) + try { + // It's corrupt since we only write half of bytes into the file. + o.write(bytes.take(bytes.length / 2)) + } finally { + o.close() + } + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + val e = intercept[SparkException] { + spark.read.text(inputFile.toURI.toString).collect() + } + assert(e.getCause.isInstanceOf[EOFException]) + assert(e.getCause.getMessage === "Unexpected end of input stream") + } + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + assert(spark.read.text(inputFile.toURI.toString).collect().isEmpty) + } + } finally { + inputFile.delete() + } + } + // Helpers for checking the arguments passed to the FileFormat. protected val checkPartitionSchema = From ceecc19cd01827fc48e1f0de5147e94c3045f592 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 12 Oct 2016 13:30:08 -0700 Subject: [PATCH 3/3] Address nits --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 7 +------ .../src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 7 +------ 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 05e01e6b80d80..e1cf3938de098 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -256,12 +256,7 @@ class HadoopRDD[K, V]( try { finished = !reader.next(key, value) } catch { - case e: IOException => - if (ignoreCorruptFiles) { - finished = true - } else { - throw e - } + case e: IOException if ignoreCorruptFiles => finished = true } if (!finished) { inputMetrics.incRecordsRead(1) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 5d77ea31cc66e..baf31fb658870 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -186,12 +186,7 @@ class NewHadoopRDD[K, V]( try { finished = !reader.nextKeyValue } catch { - case e: IOException => - if (ignoreCorruptFiles) { - finished = true - } else { - throw e - } + case e: IOException if ignoreCorruptFiles => finished = true } if (finished) { // Close and release the reader here; close() will also be called when the task