Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,9 @@ package object config {
.doc("Port to use for the block managed on the driver.")
.fallbackConf(BLOCK_MANAGER_PORT)

private[spark] val IGNORE_CORRUPT_FILES = ConfigBuilder("spark.files.ignoreCorruptFiles")
.doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
"encountering corrupt files and contents that have been read will still be returned.")
.booleanConf
.createWithDefault(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So either way we will have a behavioral change - if NewHadoopRDD vs HadoopRDD.
IMO that is fine, given that we are standardizing on the behavior and this is something which was a corner case anyway.

Setting default to false makes sense.

}
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.rdd

import java.io.EOFException
import java.io.IOException
import java.text.SimpleDateFormat
import java.util.Date

Expand All @@ -43,6 +43,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -139,6 +140,8 @@ class HadoopRDD[K, V](

private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)

private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)

// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
Expand Down Expand Up @@ -253,8 +256,7 @@ class HadoopRDD[K, V](
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true
case e: IOException if ignoreCorruptFiles => finished = true
}
if (!finished) {
inputMetrics.incRecordsRead(1)
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.rdd

import java.io.IOException
import java.text.SimpleDateFormat
import java.util.Date

Expand All @@ -33,6 +34,7 @@ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
Expand Down Expand Up @@ -85,6 +87,8 @@ class NewHadoopRDD[K, V](

private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)

private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)

def getConf: Configuration = {
val conf: Configuration = confBroadcast.value.value
if (shouldCloneJobConf) {
Expand Down Expand Up @@ -179,7 +183,11 @@ class NewHadoopRDD[K, V](

override def hasNext: Boolean = {
if (!finished && !havePair) {
finished = !reader.nextKeyValue
try {
finished = !reader.nextKeyValue
} catch {
case e: IOException if ignoreCorruptFiles => finished = true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for changing this too !

if (finished) {
// Close and release the reader here; close() will also be called when the task
// completes, but for tasks that read from many files, it helps to release the
Expand Down
62 changes: 61 additions & 1 deletion core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark

import java.io.{File, FileWriter}
import java.io._
import java.util.zip.GZIPOutputStream

import scala.io.Source

Expand All @@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}

import org.apache.spark.input.PortableDataStream
import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -541,4 +543,62 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}.collect()
assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
}

test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") {
val inputFile = File.createTempFile("input-", ".gz")
try {
// Create a corrupt gzip file
val byteOutput = new ByteArrayOutputStream()
val gzip = new GZIPOutputStream(byteOutput)
try {
gzip.write(Array[Byte](1, 2, 3, 4))
} finally {
gzip.close()
}
val bytes = byteOutput.toByteArray
val o = new FileOutputStream(inputFile)
try {
// It's corrupt since we only write half of bytes into the file.
o.write(bytes.take(bytes.length / 2))
} finally {
o.close()
}

// Reading a corrupt gzip file should throw EOFException
sc = new SparkContext("local", "test")
// Test HadoopRDD
var e = intercept[SparkException] {
sc.textFile(inputFile.toURI.toString).collect()
}
assert(e.getCause.isInstanceOf[EOFException])
assert(e.getCause.getMessage === "Unexpected end of input stream")
// Test NewHadoopRDD
e = intercept[SparkException] {
sc.newAPIHadoopFile(
inputFile.toURI.toString,
classOf[NewTextInputFormat],
classOf[LongWritable],
classOf[Text]).collect()
}
assert(e.getCause.isInstanceOf[EOFException])
assert(e.getCause.getMessage === "Unexpected end of input stream")
sc.stop()

val conf = new SparkConf().set(IGNORE_CORRUPT_FILES, true)
sc = new SparkContext("local", "test", conf)
// Test HadoopRDD
assert(sc.textFile(inputFile.toURI.toString).collect().isEmpty)
// Test NewHadoopRDD
assert {
sc.newAPIHadoopFile(
inputFile.toURI.toString,
classOf[NewTextInputFormat],
classOf[LongWritable],
classOf[Text]).collect().isEmpty
}
} finally {
inputFile.delete()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources

import java.io.IOException

import scala.collection.mutable

import org.apache.spark.{Partition => RDDPartition, TaskContext}
Expand All @@ -25,6 +27,7 @@ import org.apache.spark.rdd.{InputFileNameHolder, RDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.vectorized.ColumnarBatch
import org.apache.spark.util.NextIterator

/**
* A part (i.e. "block") of a single file that should be read, along with partition column values
Expand Down Expand Up @@ -62,6 +65,8 @@ class FileScanRDD(
@transient val filePartitions: Seq[FilePartition])
extends RDD[InternalRow](sparkSession.sparkContext, Nil) {

private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles

override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = {
val iterator = new Iterator[Object] with AutoCloseable {
private val inputMetrics = context.taskMetrics().inputMetrics
Expand Down Expand Up @@ -119,7 +124,30 @@ class FileScanRDD(
InputFileNameHolder.setInputFileName(currentFile.filePath)

try {
currentIterator = readFunction(currentFile)
if (ignoreCorruptFiles) {
currentIterator = new NextIterator[Object] {
private val internalIter = readFunction(currentFile)

override def getNext(): AnyRef = {
try {
if (internalIter.hasNext) {
internalIter.next()
} else {
finished = true
null
}
} catch {
case e: IOException =>
finished = true
null
}
}

override def close(): Unit = {}
}
} else {
currentIterator = readFunction(currentFile)
}
} catch {
case e: java.io.FileNotFoundException =>
throw new java.io.FileNotFoundException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,12 @@ object SQLConf {
.doubleConf
.createWithDefault(0.05)

val IGNORE_CORRUPT_FILES = SQLConfigBuilder("spark.sql.files.ignoreCorruptFiles")
.doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
"encountering corrupt files and contents that have been read will still be returned.")
.booleanConf
.createWithDefault(false)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why we are duplicating the parameter in sql namespace. Wont spark.files.ignoreCorruptFiles not do ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A sql conf can appear in the following command:

sql("set -v").filter('key contains "files").show(truncate = false)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, thanks for clarifying !

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
Expand Down Expand Up @@ -759,6 +765,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def warehousePath: String = new Path(getConf(WAREHOUSE_PATH)).toString

def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)

override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)

override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.sql.execution.datasources

import java.io.File
import java.io._
import java.util.concurrent.atomic.AtomicInteger
import java.util.zip.GZIPOutputStream

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, RawLocalFileSystem}
Expand Down Expand Up @@ -441,6 +442,40 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
}
}

test("spark.files.ignoreCorruptFiles should work in SQL") {
val inputFile = File.createTempFile("input-", ".gz")
try {
// Create a corrupt gzip file
val byteOutput = new ByteArrayOutputStream()
val gzip = new GZIPOutputStream(byteOutput)
try {
gzip.write(Array[Byte](1, 2, 3, 4))
} finally {
gzip.close()
}
val bytes = byteOutput.toByteArray
val o = new FileOutputStream(inputFile)
try {
// It's corrupt since we only write half of bytes into the file.
o.write(bytes.take(bytes.length / 2))
} finally {
o.close()
}
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
val e = intercept[SparkException] {
spark.read.text(inputFile.toURI.toString).collect()
}
assert(e.getCause.isInstanceOf[EOFException])
assert(e.getCause.getMessage === "Unexpected end of input stream")
}
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
assert(spark.read.text(inputFile.toURI.toString).collect().isEmpty)
}
} finally {
inputFile.delete()
}
}

// Helpers for checking the arguments passed to the FileFormat.

protected val checkPartitionSchema =
Expand Down