Skip to content

Commit cf008ca

Browse files
committed
[SPARK-11424] Guard against double-close() of RecordReaders (branch-1.4 backport)
This is a branch-1.4 backport of #9382, a fix for SPARK-11424. Author: Josh Rosen <[email protected]> Closes #9388 from JoshRosen/hadoop-decompressor-pooling-fix-branch-1.4.
1 parent 0ce1485 commit cf008ca

File tree

3 files changed

+34
-18
lines changed

3 files changed

+34
-18
lines changed

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,21 @@ class HadoopRDD[K, V](
254254
}
255255

256256
override def close() {
257-
try {
258-
reader.close()
257+
if (reader != null) {
258+
// Close the reader and release it. Note: it's very important that we don't close the
259+
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
260+
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
261+
// corruption issues when reading compressed input.
262+
try {
263+
reader.close()
264+
} catch {
265+
case e: Exception =>
266+
if (!Utils.inShutdown()) {
267+
logWarning("Exception in RecordReader.close()", e)
268+
}
269+
} finally {
270+
reader = null
271+
}
259272
if (bytesReadCallback.isDefined) {
260273
inputMetrics.updateBytesRead()
261274
} else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
@@ -269,12 +282,6 @@ class HadoopRDD[K, V](
269282
logWarning("Unable to get input size to set InputMetrics for task", e)
270283
}
271284
}
272-
} catch {
273-
case e: Exception => {
274-
if (!Utils.inShutdown()) {
275-
logWarning("Exception in RecordReader.close()", e)
276-
}
277-
}
278285
}
279286
}
280287
}

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ class NewHadoopRDD[K, V](
128128
configurable.setConf(conf)
129129
case _ =>
130130
}
131-
val reader = format.createRecordReader(
131+
var reader = format.createRecordReader(
132132
split.serializableHadoopSplit.value, hadoopAttemptContext)
133133
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
134134

@@ -158,8 +158,21 @@ class NewHadoopRDD[K, V](
158158
}
159159

160160
private def close() {
161-
try {
162-
reader.close()
161+
if (reader != null) {
162+
// Close the reader and release it. Note: it's very important that we don't close the
163+
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
164+
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
165+
// corruption issues when reading compressed input.
166+
try {
167+
reader.close()
168+
} catch {
169+
case e: Exception =>
170+
if (!Utils.inShutdown()) {
171+
logWarning("Exception in RecordReader.close()", e)
172+
}
173+
} finally {
174+
reader = null
175+
}
163176
if (bytesReadCallback.isDefined) {
164177
inputMetrics.updateBytesRead()
165178
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
@@ -173,12 +186,6 @@ class NewHadoopRDD[K, V](
173186
logWarning("Unable to get input size to set InputMetrics for task", e)
174187
}
175188
}
176-
} catch {
177-
case e: Exception => {
178-
if (!Utils.inShutdown()) {
179-
logWarning("Exception in RecordReader.close()", e)
180-
}
181-
}
182189
}
183190
}
184191
}

core/src/main/scala/org/apache/spark/util/NextIterator.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,10 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] {
6060
*/
6161
def closeIfNeeded() {
6262
if (!closed) {
63-
close()
63+
// Note: it's important that we set closed = true before calling close(), since setting it
64+
// afterwards would permit us to call close() multiple times if close() threw an exception.
6465
closed = true
66+
close()
6567
}
6668
}
6769

0 commit comments

Comments
 (0)