Skip to content

Commit 586b347

Browse files
committed
Make ignoreCorruptFiles work for Parquet.
1 parent 5263622 commit 586b347

File tree

3 files changed

+102
-8
lines changed

3 files changed

+102
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,21 @@ class FileScanRDD(
135135
try {
136136
if (ignoreCorruptFiles) {
137137
currentIterator = new NextIterator[Object] {
138-
private val internalIter = readFunction(currentFile)
138+
private val internalIter = {
139+
try {
140+
// The readFunction may read files before consuming the iterator.
141+
// E.g., vectorized Parquet reader.
142+
readFunction(currentFile)
143+
} catch {
144+
case e @(_: RuntimeException | _: IOException) =>
145+
logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
146+
null
147+
}
148+
}
139149

140150
override def getNext(): AnyRef = {
141151
try {
142-
if (internalIter.hasNext) {
152+
if (internalIter != null && internalIter.hasNext) {
143153
internalIter.next()
144154
} else {
145155
finished = true

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.spark.sql.execution.datasources.parquet
1919

20+
import java.io.IOException
2021
import java.net.URI
22+
import java.util.concurrent.{Callable, ExecutionException, Executors, ExecutorService, Future}
2123

2224
import scala.collection.JavaConverters._
2325
import scala.collection.mutable
@@ -30,6 +32,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit
3032
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
3133
import org.apache.parquet.filter2.compat.FilterCompat
3234
import org.apache.parquet.filter2.predicate.FilterApi
35+
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
3336
import org.apache.parquet.hadoop._
3437
import org.apache.parquet.hadoop.codec.CodecConfig
3538
import org.apache.parquet.hadoop.util.ContextUtil
@@ -151,7 +154,7 @@ class ParquetFileFormat
151154
}
152155
}
153156

154-
def inferSchema(
157+
override def inferSchema(
155158
sparkSession: SparkSession,
156159
parameters: Map[String, String],
157160
files: Seq[FileStatus]): Option[StructType] = {
@@ -542,6 +545,58 @@ object ParquetFileFormat extends Logging {
542545
StructType(parquetSchema ++ missingFields)
543546
}
544547

548+
/**
549+
* Reads Parquet footers in multi-threaded manner.
550+
* If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted
551+
* files when reading footers.
552+
*/
553+
private def readParquetFootersInParallel(
554+
conf: Configuration,
555+
partFiles: Seq[FileStatus],
556+
ignoreCorruptFiles: Boolean): Seq[Footer] = {
557+
val footers = partFiles.map { currentFile =>
558+
new Callable[Option[Footer]]() {
559+
override def call(): Option[Footer] = {
560+
try {
561+
// Skips row group information since we only need the schema.
562+
// ParquetFileReader.readFooter throws RuntimeException, instead of IOException,
563+
// when it can't read the footer.
564+
Some(new Footer(currentFile.getPath(),
565+
ParquetFileReader.readFooter(
566+
conf, currentFile, SKIP_ROW_GROUPS)))
567+
} catch { case e: RuntimeException =>
568+
if (ignoreCorruptFiles) {
569+
logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
570+
None
571+
} else {
572+
throw new IOException(s"Could not read footer for file: $currentFile", e)
573+
}
574+
}
575+
}
576+
}
577+
}
578+
val parallelism = conf.getInt(ParquetFileReader.PARQUET_READ_PARALLELISM, 5)
579+
val threadPool: ExecutorService = Executors.newFixedThreadPool(parallelism)
580+
try {
581+
val futures: mutable.ArrayBuffer[Future[Option[Footer]]] = mutable.ArrayBuffer.empty
582+
footers.foreach(callable => futures += threadPool.submit(callable))
583+
val result: mutable.ArrayBuffer[Footer] = mutable.ArrayBuffer.empty
584+
futures.foreach { future =>
585+
try {
586+
val footer = future.get()
587+
footer.foreach(f => result += f)
588+
} catch { case e: InterruptedException =>
589+
throw new RuntimeException("The thread was interrupted", e)
590+
}
591+
}
592+
result.toSeq
593+
} catch { case e: ExecutionException =>
594+
throw new IOException("Could not read footer: " + e.getMessage(), e.getCause())
595+
} finally {
596+
threadPool.shutdownNow()
597+
}
598+
}
599+
545600
/**
546601
* Figures out a merged Parquet schema with a distributed Spark job.
547602
*
@@ -582,6 +637,8 @@ object ParquetFileFormat extends Logging {
582637
val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
583638
sparkSession.sparkContext.defaultParallelism)
584639

640+
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
641+
585642
// Issues a Spark job to read Parquet schema in parallel.
586643
val partiallyMergedSchemas =
587644
sparkSession
@@ -593,13 +650,10 @@ object ParquetFileFormat extends Logging {
593650
new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
594651
}.toSeq
595652

596-
// Skips row group information since we only need the schema
597-
val skipRowGroups = true
598-
599653
// Reads footers in multi-threaded manner within each task
600654
val footers =
601-
ParquetFileReader.readAllFootersInParallel(
602-
serializedConf.value, fakeFileStatuses.asJava, skipRowGroups).asScala
655+
ParquetFileFormat.readParquetFootersInParallel(
656+
serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)
603657

604658
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
605659
val converter =

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.io.File
2222
import org.apache.hadoop.fs.{FileSystem, Path}
2323
import org.apache.parquet.hadoop.ParquetOutputFormat
2424

25+
import org.apache.spark.SparkException
2526
import org.apache.spark.sql._
2627
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
2728
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -212,6 +213,35 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
212213
}
213214
}
214215

216+
test("Enabling/disabling ignoreCorruptFiles") {
217+
def testIgnoreCorruptFiles(): Unit = {
218+
withTempDir { dir =>
219+
val basePath = dir.getCanonicalPath
220+
spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
221+
spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
222+
spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
223+
val df = spark.read.parquet(
224+
new Path(basePath, "first").toString,
225+
new Path(basePath, "second").toString,
226+
new Path(basePath, "third").toString)
227+
checkAnswer(
228+
df,
229+
Seq(Row(0), Row(1)))
230+
}
231+
}
232+
233+
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
234+
testIgnoreCorruptFiles()
235+
}
236+
237+
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
238+
val exception = intercept[SparkException] {
239+
testIgnoreCorruptFiles()
240+
}
241+
assert(exception.getMessage().contains("is not a Parquet file"))
242+
}
243+
}
244+
215245
test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
216246
withTempPath { dir =>
217247
val basePath = dir.getCanonicalPath

0 commit comments

Comments
 (0)