Skip to content

Commit 087aa63

Browse files
committed
Guard against double-close() of RecordReaders.
1 parent f21ef8d commit 087aa63

File tree

4 files changed

+65
-52
lines changed

4 files changed

+65
-52
lines changed

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -251,8 +251,21 @@ class HadoopRDD[K, V](
251251
}
252252

253253
override def close() {
254-
try {
255-
reader.close()
254+
if (reader != null) {
255+
// Close the reader and release it. Note: it's very important that we don't close the
256+
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
257+
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
258+
// corruption issues when reading compressed input.
259+
try {
260+
reader.close()
261+
} catch {
262+
case e: Exception =>
263+
if (!ShutdownHookManager.inShutdown()) {
264+
logWarning("Exception in RecordReader.close()", e)
265+
}
266+
} finally {
267+
reader = null
268+
}
256269
if (bytesReadCallback.isDefined) {
257270
inputMetrics.updateBytesRead()
258271
} else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
@@ -266,12 +279,6 @@ class HadoopRDD[K, V](
266279
logWarning("Unable to get input size to set InputMetrics for task", e)
267280
}
268281
}
269-
} catch {
270-
case e: Exception => {
271-
if (!ShutdownHookManager.inShutdown()) {
272-
logWarning("Exception in RecordReader.close()", e)
273-
}
274-
}
275282
}
276283
}
277284
}

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -184,30 +184,32 @@ class NewHadoopRDD[K, V](
184184
}
185185

186186
private def close() {
187-
try {
188-
if (reader != null) {
189-
// Close reader and release it
187+
if (reader != null) {
188+
// Close the reader and release it. Note: it's very important that we don't close the
189+
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
190+
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
191+
// corruption issues when reading compressed input.
192+
try {
190193
reader.close()
191-
reader = null
192-
193-
if (bytesReadCallback.isDefined) {
194-
inputMetrics.updateBytesRead()
195-
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
196-
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
197-
// If we can't get the bytes read from the FS stats, fall back to the split size,
198-
// which may be inaccurate.
199-
try {
200-
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
201-
} catch {
202-
case e: java.io.IOException =>
203-
logWarning("Unable to get input size to set InputMetrics for task", e)
194+
} catch {
195+
case e: Exception =>
196+
if (!ShutdownHookManager.inShutdown()) {
197+
logWarning("Exception in RecordReader.close()", e)
204198
}
205-
}
199+
} finally {
200+
reader = null
206201
}
207-
} catch {
208-
case e: Exception => {
209-
if (!ShutdownHookManager.inShutdown()) {
210-
logWarning("Exception in RecordReader.close()", e)
202+
if (bytesReadCallback.isDefined) {
203+
inputMetrics.updateBytesRead()
204+
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
205+
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
206+
// If we can't get the bytes read from the FS stats, fall back to the split size,
207+
// which may be inaccurate.
208+
try {
209+
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
210+
} catch {
211+
case e: java.io.IOException =>
212+
logWarning("Unable to get input size to set InputMetrics for task", e)
211213
}
212214
}
213215
}

core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -189,32 +189,34 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
189189
}
190190

191191
private def close() {
192-
try {
193-
if (reader != null) {
192+
if (reader != null) {
193+
// Close the reader and release it. Note: it's very important that we don't close the
194+
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
195+
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
196+
// corruption issues when reading compressed input.
197+
try {
194198
reader.close()
195-
reader = null
196-
197-
SqlNewHadoopRDD.unsetInputFileName()
198-
199-
if (bytesReadCallback.isDefined) {
200-
inputMetrics.updateBytesRead()
201-
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
202-
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
203-
// If we can't get the bytes read from the FS stats, fall back to the split size,
204-
// which may be inaccurate.
205-
try {
206-
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
207-
} catch {
208-
case e: java.io.IOException =>
209-
logWarning("Unable to get input size to set InputMetrics for task", e)
199+
} catch {
200+
case e: Exception =>
201+
if (!ShutdownHookManager.inShutdown()) {
202+
logWarning("Exception in RecordReader.close()", e)
210203
}
211-
}
204+
} finally {
205+
reader = null
212206
}
213-
} catch {
214-
case e: Exception =>
215-
if (!ShutdownHookManager.inShutdown()) {
216-
logWarning("Exception in RecordReader.close()", e)
207+
if (bytesReadCallback.isDefined) {
208+
inputMetrics.updateBytesRead()
209+
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
210+
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
211+
// If we can't get the bytes read from the FS stats, fall back to the split size,
212+
// which may be inaccurate.
213+
try {
214+
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
215+
} catch {
216+
case e: java.io.IOException =>
217+
logWarning("Unable to get input size to set InputMetrics for task", e)
217218
}
219+
}
218220
}
219221
}
220222
}

core/src/main/scala/org/apache/spark/util/NextIterator.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,10 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] {
6060
*/
6161
def closeIfNeeded() {
6262
if (!closed) {
63-
close()
63+
// Note: it's important that we set closed = true before calling close(), since setting it
64+
// afterwards would permit us to call close() multiple times if close() threw an exception.
6465
closed = true
66+
close()
6567
}
6668
}
6769

0 commit comments

Comments
 (0)