Skip to content

Commit 911b83d

Browse files
dongjoon-hyuncloud-fan
authored andcommitted
[SPARK-23457][SQL][BRANCH-2.3] Register task completion listeners first in ParquetFileFormat
## What changes were proposed in this pull request? ParquetFileFormat leaks opened files in some cases. This PR prevents that by registering task completion listers first before initialization. - [spark-branch-2.3-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/205/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/) - [spark-master-test-sbt-hadoop-2.6](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4228/testReport/junit/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/) ``` Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null at org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36) at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769) at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:538) at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400) at ``` ## How was this patch tested? Manual. The following test case generates the same leakage. ```scala test("SPARK-23457 Register task completion listeners first in ParquetFileFormat") { withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") { withTempDir { dir => val basePath = dir.getCanonicalPath Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, "first").toString) Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, "second").toString) val df = spark.read.parquet( new Path(basePath, "first").toString, new Path(basePath, "second").toString) val e = intercept[SparkException] { df.collect() } assert(e.getCause.isInstanceOf[OutOfMemoryError]) } } } ``` Author: Dongjoon Hyun <[email protected]> Closes #20714 from dongjoon-hyun/SPARK-23457-2.3.
1 parent 4550673 commit 911b83d

File tree

1 file changed

+10
-12
lines changed

1 file changed

+10
-12
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -394,16 +394,21 @@ class ParquetFileFormat
394394
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
395395
}
396396
val taskContext = Option(TaskContext.get())
397-
val parquetReader = if (enableVectorizedReader) {
397+
if (enableVectorizedReader) {
398398
val vectorizedReader = new VectorizedParquetRecordReader(
399399
convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined)
400+
val iter = new RecordReaderIterator(vectorizedReader)
401+
// SPARK-23457 Register a task completion lister before `initialization`.
402+
taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
400403
vectorizedReader.initialize(split, hadoopAttemptContext)
401404
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
402405
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
403406
if (returningBatch) {
404407
vectorizedReader.enableReturningBatches()
405408
}
406-
vectorizedReader
409+
410+
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
411+
iter.asInstanceOf[Iterator[InternalRow]]
407412
} else {
408413
logDebug(s"Falling back to parquet-mr")
409414
// ParquetRecordReader returns UnsafeRow
@@ -413,18 +418,11 @@ class ParquetFileFormat
413418
} else {
414419
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
415420
}
421+
val iter = new RecordReaderIterator(reader)
422+
// SPARK-23457 Register a task completion lister before `initialization`.
423+
taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
416424
reader.initialize(split, hadoopAttemptContext)
417-
reader
418-
}
419425

420-
val iter = new RecordReaderIterator(parquetReader)
421-
taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
422-
423-
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
424-
if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&
425-
enableVectorizedReader) {
426-
iter.asInstanceOf[Iterator[InternalRow]]
427-
} else {
428426
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
429427
val joinedRow = new JoinedRow()
430428
val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)

0 commit comments

Comments
 (0)