Skip to content

Conversation

@dongjoon-hyun
Copy link
Member

What changes were proposed in this pull request?

ParquetFileFormat leaks opened files in some cases. This PR prevents that by registering task completion listers first before initialization.

Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null
	at org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
	at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:538)
	at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400)
	at

How was this patch tested?

Manual. The following test case generates the same leakage.

  test("SPARK-23457 Register task completion listeners first in ParquetFileFormat") {
    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") {
      withTempDir { dir =>
        val basePath = dir.getCanonicalPath
        Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, "first").toString)
        Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, "second").toString)
        val df = spark.read.parquet(
          new Path(basePath, "first").toString,
          new Path(basePath, "second").toString)
        val e = intercept[SparkException] {
          df.collect()
        }
        assert(e.getCause.isInstanceOf[OutOfMemoryError])
      }
    }
  }

…tFileFormat

ParquetFileFormat leaks opened files in some cases. This PR prevents that by registering task completion listers first before initialization.

- [spark-branch-2.3-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/205/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)
- [spark-master-test-sbt-hadoop-2.6](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4228/testReport/junit/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)

```
Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null
	at org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
	at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:538)
	at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400)
	at
```

Manual. The following test case generates the same leakage.

```scala
  test("SPARK-23457 Register task completion listeners first in ParquetFileFormat") {
    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") {
      withTempDir { dir =>
        val basePath = dir.getCanonicalPath
        Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, "first").toString)
        Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, "second").toString)
        val df = spark.read.parquet(
          new Path(basePath, "first").toString,
          new Path(basePath, "second").toString)
        val e = intercept[SparkException] {
          df.collect()
        }
        assert(e.getCause.isInstanceOf[OutOfMemoryError])
      }
    }
  }
```

Author: Dongjoon Hyun <[email protected]>

Closes #20619 from dongjoon-hyun/SPARK-23390.
@SparkQA
Copy link

SparkQA commented Mar 2, 2018

Test build #87873 has finished for PR 20714 at commit d15eba7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Retest this please

@SparkQA
Copy link

SparkQA commented Mar 2, 2018

Test build #87896 has finished for PR 20714 at commit d15eba7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Mar 3, 2018

Test build #87905 has finished for PR 20714 at commit d15eba7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Hi, @cloud-fan and @gatorsmile .
This is a backport of #20619 .

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Mar 5, 2018

Test build #87965 has finished for PR 20714 at commit d15eba7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to 2.3!

asfgit pushed a commit that referenced this pull request Mar 5, 2018
…st in ParquetFileFormat

## What changes were proposed in this pull request?

ParquetFileFormat leaks opened files in some cases. This PR prevents that by registering task completion listers first before initialization.

- [spark-branch-2.3-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/205/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)
- [spark-master-test-sbt-hadoop-2.6](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4228/testReport/junit/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)

```
Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null
	at org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
	at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:538)
	at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400)
	at
```

## How was this patch tested?

Manual. The following test case generates the same leakage.

```scala
  test("SPARK-23457 Register task completion listeners first in ParquetFileFormat") {
    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") {
      withTempDir { dir =>
        val basePath = dir.getCanonicalPath
        Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, "first").toString)
        Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, "second").toString)
        val df = spark.read.parquet(
          new Path(basePath, "first").toString,
          new Path(basePath, "second").toString)
        val e = intercept[SparkException] {
          df.collect()
        }
        assert(e.getCause.isInstanceOf[OutOfMemoryError])
      }
    }
  }
```

Author: Dongjoon Hyun <[email protected]>

Closes #20714 from dongjoon-hyun/SPARK-23457-2.3.
@dongjoon-hyun
Copy link
Member Author

Thank you, @cloud-fan !

peter-toth pushed a commit to peter-toth/spark that referenced this pull request Oct 6, 2018
…st in ParquetFileFormat

## What changes were proposed in this pull request?

ParquetFileFormat leaks opened files in some cases. This PR prevents that by registering task completion listers first before initialization.

- [spark-branch-2.3-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/205/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)
- [spark-master-test-sbt-hadoop-2.6](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4228/testReport/junit/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)

```
Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null
	at org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
	at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:538)
	at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400)
	at
```

## How was this patch tested?

Manual. The following test case generates the same leakage.

```scala
  test("SPARK-23457 Register task completion listeners first in ParquetFileFormat") {
    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") {
      withTempDir { dir =>
        val basePath = dir.getCanonicalPath
        Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, "first").toString)
        Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, "second").toString)
        val df = spark.read.parquet(
          new Path(basePath, "first").toString,
          new Path(basePath, "second").toString)
        val e = intercept[SparkException] {
          df.collect()
        }
        assert(e.getCause.isInstanceOf[OutOfMemoryError])
      }
    }
  }
```

Author: Dongjoon Hyun <[email protected]>

Closes apache#20714 from dongjoon-hyun/SPARK-23457-2.3.
@dongjoon-hyun dongjoon-hyun deleted the SPARK-23457-2.3 branch January 7, 2019 07:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants