From 3d25f1798ab5f1a12c801baff29b7dff8f87bd47 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 30 Oct 2015 11:25:54 -0700 Subject: [PATCH 1/4] Guard against double-close() of RecordReaders. --- .../org/apache/spark/rdd/HadoopRDD.scala | 23 ++++++++++++------- .../org/apache/spark/rdd/NewHadoopRDD.scala | 21 ++++++++++------- .../org/apache/spark/util/NextIterator.scala | 4 +++- .../spark/sql/sources/SqlNewHadoopRDD.scala | 21 ++++++++++------- 4 files changed, 44 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 0198234dd6c9..5a2d77d15f8e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -257,8 +257,21 @@ class HadoopRDD[K, V]( } override def close() { - try { - reader.close() + if (reader != null) { + // Close the reader and release it. Note: it's very important that we don't close the + // reader more than once, since that exposes us to MAPREDUCE-5918 when running against + // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic + // corruption issues when reading compressed input. + try { + reader.close() + } catch { + case e: Exception => + if (!ShutdownHookManager.inShutdown()) { + logWarning("Exception in RecordReader.close()", e) + } + } finally { + reader = null + } if (bytesReadCallback.isDefined) { inputMetrics.updateBytesRead() } else if (split.inputSplit.value.isInstanceOf[FileSplit] || @@ -272,12 +285,6 @@ class HadoopRDD[K, V]( logWarning("Unable to get input size to set InputMetrics for task", e) } } - } catch { - case e: Exception => { - if (!ShutdownHookManager.inShutdown()) { - logWarning("Exception in RecordReader.close()", e) - } - } } } } diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 7c4e4fc4735b..561ac498c04d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -158,8 +158,19 @@ class NewHadoopRDD[K, V]( } private def close() { - try { - reader.close() + if (reader != null) { + // Close the reader and release it. Note: it's very important that we don't close the + // reader more than once, since that exposes us to MAPREDUCE-5918 when running against + // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic + // corruption issues when reading compressed input. + try { + } catch { + case e: Exception => + if (!ShutdownHookManager.inShutdown()) { + logWarning("Exception in RecordReader.close()", e) + } + } finally { + } if (bytesReadCallback.isDefined) { inputMetrics.updateBytesRead() } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] || @@ -173,12 +184,6 @@ class NewHadoopRDD[K, V]( logWarning("Unable to get input size to set InputMetrics for task", e) } } - } catch { - case e: Exception => { - if (!ShutdownHookManager.inShutdown()) { - logWarning("Exception in RecordReader.close()", e) - } - } } } } diff --git a/core/src/main/scala/org/apache/spark/util/NextIterator.scala b/core/src/main/scala/org/apache/spark/util/NextIterator.scala index e5c732a5a559..0b505a576768 100644 --- a/core/src/main/scala/org/apache/spark/util/NextIterator.scala +++ b/core/src/main/scala/org/apache/spark/util/NextIterator.scala @@ -60,8 +60,10 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] { */ def closeIfNeeded() { if (!closed) { - close() + // Note: it's important that we set closed = true before calling close(), since setting it + // afterwards would permit us to call close() multiple times if close() threw an exception. closed = true + close() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala index d5aeb460c8a6..30a22f68b3e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala @@ -178,8 +178,19 @@ private[sql] class SqlNewHadoopRDD[K, V]( } private def close() { - try { - reader.close() + if (reader != null) { + // Close the reader and release it. Note: it's very important that we don't close the + // reader more than once, since that exposes us to MAPREDUCE-5918 when running against + // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic + // corruption issues when reading compressed input. + try { + } catch { + case e: Exception => + if (!ShutdownHookManager.inShutdown()) { + logWarning("Exception in RecordReader.close()", e) + } + } finally { + } if (bytesReadCallback.isDefined) { inputMetrics.updateBytesRead() } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] || @@ -193,12 +204,6 @@ private[sql] class SqlNewHadoopRDD[K, V]( logWarning("Unable to get input size to set InputMetrics for task", e) } } - } catch { - case e: Exception => { - if (!ShutdownHookManager.inShutdown()) { - logWarning("Exception in RecordReader.close()", e) - } - } } } } From 9cb44d6d5b00804da1cd567d3870465b9848d96a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 31 Oct 2015 10:49:13 -0700 Subject: [PATCH 2/4] Update NewHadoopRDD.scala --- core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 561ac498c04d..0a738ac8799b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -164,12 +164,14 @@ class NewHadoopRDD[K, V]( // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic // corruption issues when reading compressed input. try { + reader.close() } catch { case e: Exception => if (!ShutdownHookManager.inShutdown()) { logWarning("Exception in RecordReader.close()", e) } } finally { + reader = null } if (bytesReadCallback.isDefined) { inputMetrics.updateBytesRead() From 758cc66ebd9b40fd1eae6778c38eb8d664b8d013 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sat, 31 Oct 2015 13:59:23 -0700 Subject: [PATCH 3/4] Update NewHadoopRDD.scala --- core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 0a738ac8799b..f93407d23856 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -128,7 +128,7 @@ class NewHadoopRDD[K, V]( configurable.setConf(conf) case _ => } - val reader = format.createRecordReader( + var reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) From 11d5e70e2547f9d7ad431b60ed5c3cfbb272c8c8 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 1 Nov 2015 11:48:57 -0800 Subject: [PATCH 4/4] Update SqlNewHadoopRDD.scala --- .../scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala index 30a22f68b3e4..d62db4ffc66d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala @@ -148,7 +148,7 @@ private[sql] class SqlNewHadoopRDD[K, V]( configurable.setConf(conf) case _ => } - val reader = format.createRecordReader( + var reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) @@ -184,12 +184,14 @@ private[sql] class SqlNewHadoopRDD[K, V]( // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic // corruption issues when reading compressed input. try { + reader.close() } catch { case e: Exception => if (!ShutdownHookManager.inShutdown()) { logWarning("Exception in RecordReader.close()", e) } } finally { + reader = null; } if (bytesReadCallback.isDefined) { inputMetrics.updateBytesRead()