Skip to content

Commit a8a18d3

Browse files
viiryacmonkey
authored andcommitted
[SPARK-19082][SQL] Make ignoreCorruptFiles work for Parquet
## What changes were proposed in this pull request? We have a config `spark.sql.files.ignoreCorruptFiles` which can be used to ignore corrupt files when reading files in SQL. Currently the `ignoreCorruptFiles` config has two issues and can't work for Parquet: 1. We only ignore corrupt files in `FileScanRDD` . Actually, we begin to read those files as early as inferring data schema from the files. For corrupt files, we can't read the schema and fail the program. A related issue reported at http://apache-spark-developers-list.1001551.n3.nabble.com/Skip-Corrupted-Parquet-blocks-footer-tc20418.html 2. In `FileScanRDD`, we assume that we only begin to read the files when starting to consume the iterator. However, it is possibly the files are read before that. In this case, `ignoreCorruptFiles` config doesn't work too. This patch targets Parquet datasource. If this direction is ok, we can address the same issue for other datasources like Orc. Two main changes in this patch: 1. Replace `ParquetFileReader.readAllFootersInParallel` by implementing the logic to read footers in multi-threaded manner We can't ignore corrupt files if we use `ParquetFileReader.readAllFootersInParallel`. So this patch implements the logic to do the similar thing in `readParquetFootersInParallel`. 2. In `FileScanRDD`, we need to ignore corrupt file too when we call `readFunction` to return iterator. One thing to notice is: We read schema from Parquet file's footer. The method to read footer `ParquetFileReader.readFooter` throws `RuntimeException`, instead of `IOException`, if it can't successfully read the footer. Please check out https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L470. So this patch catches `RuntimeException`. One concern is that it might also shadow other runtime exceptions other than reading corrupt files. ## How was this patch tested? Jenkins tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes apache#16474 from viirya/fix-ignorecorrupted-parquet-files.
1 parent e788897 commit a8a18d3

File tree

5 files changed

+140
-8
lines changed

5 files changed

+140
-8
lines changed

core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.rdd
2020
import java.io.{IOException, ObjectOutputStream}
2121

2222
import scala.collection.mutable.ArrayBuffer
23-
import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport}
23+
import scala.collection.parallel.ForkJoinTaskSupport
2424
import scala.concurrent.forkjoin.ForkJoinPool
2525
import scala.reflect.ClassTag
2626

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,17 @@ class FileScanRDD(
135135
try {
136136
if (ignoreCorruptFiles) {
137137
currentIterator = new NextIterator[Object] {
138-
private val internalIter = readFunction(currentFile)
138+
private val internalIter = {
139+
try {
140+
// The readFunction may read files before consuming the iterator.
141+
// E.g., vectorized Parquet reader.
142+
readFunction(currentFile)
143+
} catch {
144+
case e @(_: RuntimeException | _: IOException) =>
145+
logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
146+
Iterator.empty
147+
}
148+
}
139149

140150
override def getNext(): AnyRef = {
141151
try {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.spark.sql.execution.datasources.parquet
1919

20+
import java.io.IOException
2021
import java.net.URI
2122

2223
import scala.collection.JavaConverters._
2324
import scala.collection.mutable
25+
import scala.collection.parallel.ForkJoinTaskSupport
26+
import scala.concurrent.forkjoin.ForkJoinPool
2427
import scala.util.{Failure, Try}
2528

2629
import org.apache.hadoop.conf.Configuration
@@ -30,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit
3033
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
3134
import org.apache.parquet.filter2.compat.FilterCompat
3235
import org.apache.parquet.filter2.predicate.FilterApi
36+
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
3337
import org.apache.parquet.hadoop._
3438
import org.apache.parquet.hadoop.codec.CodecConfig
3539
import org.apache.parquet.hadoop.util.ContextUtil
@@ -151,7 +155,7 @@ class ParquetFileFormat
151155
}
152156
}
153157

154-
def inferSchema(
158+
override def inferSchema(
155159
sparkSession: SparkSession,
156160
parameters: Map[String, String],
157161
files: Seq[FileStatus]): Option[StructType] = {
@@ -542,6 +546,36 @@ object ParquetFileFormat extends Logging {
542546
StructType(parquetSchema ++ missingFields)
543547
}
544548

549+
/**
550+
* Reads Parquet footers in multi-threaded manner.
551+
* If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted
552+
* files when reading footers.
553+
*/
554+
private[parquet] def readParquetFootersInParallel(
555+
conf: Configuration,
556+
partFiles: Seq[FileStatus],
557+
ignoreCorruptFiles: Boolean): Seq[Footer] = {
558+
val parFiles = partFiles.par
559+
parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
560+
parFiles.flatMap { currentFile =>
561+
try {
562+
// Skips row group information since we only need the schema.
563+
// ParquetFileReader.readFooter throws RuntimeException, instead of IOException,
564+
// when it can't read the footer.
565+
Some(new Footer(currentFile.getPath(),
566+
ParquetFileReader.readFooter(
567+
conf, currentFile, SKIP_ROW_GROUPS)))
568+
} catch { case e: RuntimeException =>
569+
if (ignoreCorruptFiles) {
570+
logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
571+
None
572+
} else {
573+
throw new IOException(s"Could not read footer for file: $currentFile", e)
574+
}
575+
}
576+
}.seq
577+
}
578+
545579
/**
546580
* Figures out a merged Parquet schema with a distributed Spark job.
547581
*
@@ -582,6 +616,8 @@ object ParquetFileFormat extends Logging {
582616
val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
583617
sparkSession.sparkContext.defaultParallelism)
584618

619+
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
620+
585621
// Issues a Spark job to read Parquet schema in parallel.
586622
val partiallyMergedSchemas =
587623
sparkSession
@@ -593,13 +629,10 @@ object ParquetFileFormat extends Logging {
593629
new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
594630
}.toSeq
595631

596-
// Skips row group information since we only need the schema
597-
val skipRowGroups = true
598-
599632
// Reads footers in multi-threaded manner within each task
600633
val footers =
601-
ParquetFileReader.readAllFootersInParallel(
602-
serializedConf.value, fakeFileStatuses.asJava, skipRowGroups).asScala
634+
ParquetFileFormat.readParquetFootersInParallel(
635+
serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)
603636

604637
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
605638
val converter =
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.parquet
19+
20+
import org.apache.hadoop.fs.{FileSystem, Path}
21+
22+
import org.apache.spark.SparkException
23+
import org.apache.spark.sql.QueryTest
24+
import org.apache.spark.sql.internal.SQLConf
25+
import org.apache.spark.sql.test.SharedSQLContext
26+
27+
class ParquetFileFormatSuite extends QueryTest with ParquetTest with SharedSQLContext {
28+
29+
test("read parquet footers in parallel") {
30+
def testReadFooters(ignoreCorruptFiles: Boolean): Unit = {
31+
withTempDir { dir =>
32+
val fs = FileSystem.get(sparkContext.hadoopConfiguration)
33+
val basePath = dir.getCanonicalPath
34+
35+
val path1 = new Path(basePath, "first")
36+
val path2 = new Path(basePath, "second")
37+
val path3 = new Path(basePath, "third")
38+
39+
spark.range(1).toDF("a").coalesce(1).write.parquet(path1.toString)
40+
spark.range(1, 2).toDF("a").coalesce(1).write.parquet(path2.toString)
41+
spark.range(2, 3).toDF("a").coalesce(1).write.json(path3.toString)
42+
43+
val fileStatuses =
44+
Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten
45+
46+
val footers = ParquetFileFormat.readParquetFootersInParallel(
47+
sparkContext.hadoopConfiguration, fileStatuses, ignoreCorruptFiles)
48+
49+
assert(footers.size == 2)
50+
}
51+
}
52+
53+
testReadFooters(true)
54+
val exception = intercept[java.io.IOException] {
55+
testReadFooters(false)
56+
}
57+
assert(exception.getMessage().contains("Could not read footer for file"))
58+
}
59+
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.io.File
2222
import org.apache.hadoop.fs.{FileSystem, Path}
2323
import org.apache.parquet.hadoop.ParquetOutputFormat
2424

25+
import org.apache.spark.SparkException
2526
import org.apache.spark.sql._
2627
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
2728
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -217,6 +218,35 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
217218
}
218219
}
219220

221+
test("Enabling/disabling ignoreCorruptFiles") {
222+
def testIgnoreCorruptFiles(): Unit = {
223+
withTempDir { dir =>
224+
val basePath = dir.getCanonicalPath
225+
spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
226+
spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
227+
spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
228+
val df = spark.read.parquet(
229+
new Path(basePath, "first").toString,
230+
new Path(basePath, "second").toString,
231+
new Path(basePath, "third").toString)
232+
checkAnswer(
233+
df,
234+
Seq(Row(0), Row(1)))
235+
}
236+
}
237+
238+
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
239+
testIgnoreCorruptFiles()
240+
}
241+
242+
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
243+
val exception = intercept[SparkException] {
244+
testIgnoreCorruptFiles()
245+
}
246+
assert(exception.getMessage().contains("is not a Parquet file"))
247+
}
248+
}
249+
220250
test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
221251
withTempPath { dir =>
222252
val basePath = dir.getCanonicalPath

0 commit comments

Comments
 (0)