Skip to content

Commit 192e42a

Browse files
yhuailiancheng
authored andcommitted
[SPARK-6016][SQL] Cannot read the parquet table after overwriting the existing table when spark.sql.parquet.cacheMetadata=true
Please see JIRA (https://issues.apache.org/jira/browse/SPARK-6016) for details of the bug. Author: Yin Huai <[email protected]> Closes #4775 from yhuai/parquetFooterCache and squashes the following commits: 78787b1 [Yin Huai] Remove footerCache in FilteringParquetRowInputFormat. dff6fba [Yin Huai] Failed unit test.
1 parent f02394d commit 192e42a

File tree

3 files changed

+42
-42
lines changed

3 files changed

+42
-42
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 8 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -374,8 +374,6 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
374374
private[parquet] class FilteringParquetRowInputFormat
375375
extends parquet.hadoop.ParquetInputFormat[Row] with Logging {
376376

377-
private var footers: JList[Footer] = _
378-
379377
private var fileStatuses = Map.empty[Path, FileStatus]
380378

381379
override def createRecordReader(
@@ -396,46 +394,15 @@ private[parquet] class FilteringParquetRowInputFormat
396394
}
397395
}
398396

399-
override def getFooters(jobContext: JobContext): JList[Footer] = {
400-
import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.footerCache
401-
402-
if (footers eq null) {
403-
val conf = ContextUtil.getConfiguration(jobContext)
404-
val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
405-
val statuses = listStatus(jobContext)
406-
fileStatuses = statuses.map(file => file.getPath -> file).toMap
407-
if (statuses.isEmpty) {
408-
footers = Collections.emptyList[Footer]
409-
} else if (!cacheMetadata) {
410-
// Read the footers from HDFS
411-
footers = getFooters(conf, statuses)
412-
} else {
413-
// Read only the footers that are not in the footerCache
414-
val foundFooters = footerCache.getAllPresent(statuses)
415-
val toFetch = new ArrayList[FileStatus]
416-
for (s <- statuses) {
417-
if (!foundFooters.containsKey(s)) {
418-
toFetch.add(s)
419-
}
420-
}
421-
val newFooters = new mutable.HashMap[FileStatus, Footer]
422-
if (toFetch.size > 0) {
423-
val startFetch = System.currentTimeMillis
424-
val fetched = getFooters(conf, toFetch)
425-
logInfo(s"Fetched $toFetch footers in ${System.currentTimeMillis - startFetch} ms")
426-
for ((status, i) <- toFetch.zipWithIndex) {
427-
newFooters(status) = fetched.get(i)
428-
}
429-
footerCache.putAll(newFooters)
430-
}
431-
footers = new ArrayList[Footer](statuses.size)
432-
for (status <- statuses) {
433-
footers.add(newFooters.getOrElse(status, foundFooters.get(status)))
434-
}
435-
}
436-
}
397+
// This is only a temporary solution sicne we need to use fileStatuses in
398+
// both getClientSideSplits and getTaskSideSplits. It can be removed once we get rid of these
399+
// two methods.
400+
override def getSplits(jobContext: JobContext): JList[InputSplit] = {
401+
// First set fileStatuses.
402+
val statuses = listStatus(jobContext)
403+
fileStatuses = statuses.map(file => file.getPath -> file).toMap
437404

438-
footers
405+
super.getSplits(jobContext)
439406
}
440407

441408
// TODO Remove this method and related code once PARQUET-16 is fixed

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ private[sql] case class ParquetRelation2(
200200
private var commonMetadataStatuses: Array[FileStatus] = _
201201

202202
// Parquet footer cache.
203-
private var footers: Map[FileStatus, Footer] = _
203+
var footers: Map[FileStatus, Footer] = _
204204

205205
// `FileStatus` objects of all data files (Parquet part-files).
206206
var dataStatuses: Array[FileStatus] = _
@@ -400,6 +400,7 @@ private[sql] case class ParquetRelation2(
400400
} else {
401401
metadataCache.dataStatuses.toSeq
402402
}
403+
val selectedFooters = selectedFiles.map(metadataCache.footers)
403404

404405
// FileInputFormat cannot handle empty lists.
405406
if (selectedFiles.nonEmpty) {
@@ -447,11 +448,16 @@ private[sql] case class ParquetRelation2(
447448
@transient
448449
val cachedStatus = selectedFiles
449450

451+
@transient
452+
val cachedFooters = selectedFooters
453+
450454
// Overridden so we can inject our own cached files statuses.
451455
override def getPartitions: Array[SparkPartition] = {
452456
val inputFormat = if (cacheMetadata) {
453457
new FilteringParquetRowInputFormat {
454458
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
459+
460+
override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
455461
}
456462
} else {
457463
new FilteringParquetRowInputFormat

sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.sql.hive.execution.{InsertIntoHiveTable, HiveTableScan}
2929
import org.apache.spark.sql.hive.test.TestHive._
3030
import org.apache.spark.sql.hive.test.TestHive.implicits._
3131
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
32+
import org.apache.spark.sql.SaveMode
3233

3334
// The data where the partitioning key exists only in the directory structure.
3435
case class ParquetData(intField: Int, stringField: String)
@@ -409,6 +410,32 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
409410
)
410411
""")
411412
}
413+
414+
test("SPARK-6016 make sure to use the latest footers") {
415+
sql("drop table if exists spark_6016_fix")
416+
417+
// Create a DataFrame with two partitions. So, the created table will have two parquet files.
418+
val df1 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2))
419+
df1.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
420+
checkAnswer(
421+
sql("select * from spark_6016_fix"),
422+
(1 to 10).map(i => Row(i))
423+
)
424+
425+
// Create a DataFrame with four partitions. So, the created table will have four parquet files.
426+
val df2 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4))
427+
df2.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
428+
// For the bug of SPARK-6016, we are caching two outdated footers for df1. Then,
429+
// since the new table has four parquet files, we are trying to read new footers from two files
430+
// and then merge metadata in footers of these four (two outdated ones and two latest one),
431+
// which will cause an error.
432+
checkAnswer(
433+
sql("select * from spark_6016_fix"),
434+
(1 to 10).map(i => Row(i))
435+
)
436+
437+
sql("drop table spark_6016_fix")
438+
}
412439
}
413440

414441
class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {

0 commit comments

Comments
 (0)