From 198f1861cfe4d2cd544cb3a09d3a271de1b656ab Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 12 Feb 2018 13:46:49 -0800 Subject: [PATCH 1/5] [SPARK-23399][SQL] Register a task completion listner first for OrcColumnarBatchReader --- .../spark/sql/execution/datasources/orc/OrcFileFormat.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index dbf3bc6f0ee6c..74aefb851253f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -188,6 +188,9 @@ class OrcFileFormat if (enableVectorizedReader) { val batchReader = new OrcColumnarBatchReader( enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, capacity) + val iter = new RecordReaderIterator(batchReader) + Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close())) + batchReader.initialize(fileSplit, taskAttemptContext) batchReader.initBatch( reader.getSchema, @@ -196,8 +199,6 @@ class OrcFileFormat partitionSchema, file.partitionValues) - val iter = new RecordReaderIterator(batchReader) - Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close())) iter.asInstanceOf[Iterator[InternalRow]] } else { val orcRecordReader = new OrcInputFormat[OrcStruct] From 3b8cb0a1def32924afd3e4b9e4fc702e1d53d36a Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 13 Feb 2018 01:23:50 -0800 Subject: [PATCH 2/5] Add a manual test case. --- .../datasources/orc/OrcSourceSuite.scala | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index 6f5f2fd795f74..a8499207a8677 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.datasources.orc import java.io.File import java.util.Locale +import org.apache.hadoop.fs.Path import org.apache.orc.OrcConf.COMPRESS import org.scalatest.BeforeAndAfterAll +import org.apache.spark.SparkException import org.apache.spark.sql.Row import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -160,6 +162,25 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } } } + + // This should be tested manually because it raises OOM intentionally + // in order to cause `Leaked filesystem connection`. The test suite dies, too. + ignore("SPARK-23399 Register a task completion listner first for OrcColumnarBatchReader") { + withSQLConf(SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") { + withTempDir { dir => + val basePath = dir.getCanonicalPath + Seq(0).toDF("a").write.format("orc").save(new Path(basePath, "first").toString) + Seq(1).toDF("a").write.format("orc").save(new Path(basePath, "second").toString) + val df = spark.read.orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) + val e = intercept[SparkException] { + df.collect() + } + assert(e.getCause.isInstanceOf[OutOfMemoryError]) + } + } + } } class OrcSourceSuite extends OrcSuite with SharedSQLContext { From d4cc32e7614603a162f6719c5c72e3d72e57f404 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 13 Feb 2018 01:29:43 -0800 Subject: [PATCH 3/5] fix typo --- .../spark/sql/execution/datasources/orc/OrcSourceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index a8499207a8677..ad1f800bb869b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -165,7 +165,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { // This should be tested manually because it raises OOM intentionally // in order to cause `Leaked filesystem connection`. The test suite dies, too. - ignore("SPARK-23399 Register a task completion listner first for OrcColumnarBatchReader") { + ignore("SPARK-23399 Register a task completion listener first for OrcColumnarBatchReader") { withSQLConf(SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") { withTempDir { dir => val basePath = dir.getCanonicalPath From 5a9fa0b310f490ff5d99f212d92635bf2660eb48 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 13 Feb 2018 07:31:40 -0800 Subject: [PATCH 4/5] Remove test case. --- .../datasources/orc/OrcSourceSuite.scala | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala index ad1f800bb869b..6f5f2fd795f74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala @@ -20,11 +20,9 @@ package org.apache.spark.sql.execution.datasources.orc import java.io.File import java.util.Locale -import org.apache.hadoop.fs.Path import org.apache.orc.OrcConf.COMPRESS import org.scalatest.BeforeAndAfterAll -import org.apache.spark.SparkException import org.apache.spark.sql.Row import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -162,25 +160,6 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } } } - - // This should be tested manually because it raises OOM intentionally - // in order to cause `Leaked filesystem connection`. The test suite dies, too. - ignore("SPARK-23399 Register a task completion listener first for OrcColumnarBatchReader") { - withSQLConf(SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") { - withTempDir { dir => - val basePath = dir.getCanonicalPath - Seq(0).toDF("a").write.format("orc").save(new Path(basePath, "first").toString) - Seq(1).toDF("a").write.format("orc").save(new Path(basePath, "second").toString) - val df = spark.read.orc( - new Path(basePath, "first").toString, - new Path(basePath, "second").toString) - val e = intercept[SparkException] { - df.collect() - } - assert(e.getCause.isInstanceOf[OutOfMemoryError]) - } - } - } } class OrcSourceSuite extends OrcSuite with SharedSQLContext { From 18c248544d238bd3d5ff50f621f4c12b3efc430e Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 13 Feb 2018 08:50:45 -0800 Subject: [PATCH 5/5] Add comments. --- .../spark/sql/execution/datasources/orc/OrcFileFormat.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 74aefb851253f..1de2ca2914c44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -188,6 +188,9 @@ class OrcFileFormat if (enableVectorizedReader) { val batchReader = new OrcColumnarBatchReader( enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, capacity) + // SPARK-23399 Register a task completion listener first to call `close()` in all cases. + // There is a possibility that `initialize` and `initBatch` hit some errors (like OOM) + // after opening a file. val iter = new RecordReaderIterator(batchReader) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))