Skip to content

Commit 3d25f17

Browse files
committed
Guard against double-close() of RecordReaders.
1 parent cb4e29f commit 3d25f17

File tree

4 files changed

+44
-25
lines changed

4 files changed

+44
-25
lines changed

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,21 @@ class HadoopRDD[K, V](
257257
}
258258

259259
override def close() {
260-
try {
261-
reader.close()
260+
if (reader != null) {
261+
// Close the reader and release it. Note: it's very important that we don't close the
262+
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
263+
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
264+
// corruption issues when reading compressed input.
265+
try {
266+
reader.close()
267+
} catch {
268+
case e: Exception =>
269+
if (!ShutdownHookManager.inShutdown()) {
270+
logWarning("Exception in RecordReader.close()", e)
271+
}
272+
} finally {
273+
reader = null
274+
}
262275
if (bytesReadCallback.isDefined) {
263276
inputMetrics.updateBytesRead()
264277
} else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
@@ -272,12 +285,6 @@ class HadoopRDD[K, V](
272285
logWarning("Unable to get input size to set InputMetrics for task", e)
273286
}
274287
}
275-
} catch {
276-
case e: Exception => {
277-
if (!ShutdownHookManager.inShutdown()) {
278-
logWarning("Exception in RecordReader.close()", e)
279-
}
280-
}
281288
}
282289
}
283290
}

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,19 @@ class NewHadoopRDD[K, V](
158158
}
159159

160160
private def close() {
161-
try {
162-
reader.close()
161+
if (reader != null) {
162+
// Close the reader and release it. Note: it's very important that we don't close the
163+
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
164+
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
165+
// corruption issues when reading compressed input.
166+
try {
167+
} catch {
168+
case e: Exception =>
169+
if (!ShutdownHookManager.inShutdown()) {
170+
logWarning("Exception in RecordReader.close()", e)
171+
}
172+
} finally {
173+
}
163174
if (bytesReadCallback.isDefined) {
164175
inputMetrics.updateBytesRead()
165176
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
@@ -173,12 +184,6 @@ class NewHadoopRDD[K, V](
173184
logWarning("Unable to get input size to set InputMetrics for task", e)
174185
}
175186
}
176-
} catch {
177-
case e: Exception => {
178-
if (!ShutdownHookManager.inShutdown()) {
179-
logWarning("Exception in RecordReader.close()", e)
180-
}
181-
}
182187
}
183188
}
184189
}

core/src/main/scala/org/apache/spark/util/NextIterator.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,10 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] {
6060
*/
6161
def closeIfNeeded() {
6262
if (!closed) {
63-
close()
63+
// Note: it's important that we set closed = true before calling close(), since setting it
64+
// afterwards would permit us to call close() multiple times if close() threw an exception.
6465
closed = true
66+
close()
6567
}
6668
}
6769

sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,19 @@ private[sql] class SqlNewHadoopRDD[K, V](
178178
}
179179

180180
private def close() {
181-
try {
182-
reader.close()
181+
if (reader != null) {
182+
// Close the reader and release it. Note: it's very important that we don't close the
183+
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
184+
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
185+
// corruption issues when reading compressed input.
186+
try {
187+
} catch {
188+
case e: Exception =>
189+
if (!ShutdownHookManager.inShutdown()) {
190+
logWarning("Exception in RecordReader.close()", e)
191+
}
192+
} finally {
193+
}
183194
if (bytesReadCallback.isDefined) {
184195
inputMetrics.updateBytesRead()
185196
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
@@ -193,12 +204,6 @@ private[sql] class SqlNewHadoopRDD[K, V](
193204
logWarning("Unable to get input size to set InputMetrics for task", e)
194205
}
195206
}
196-
} catch {
197-
case e: Exception => {
198-
if (!ShutdownHookManager.inShutdown()) {
199-
logWarning("Exception in RecordReader.close()", e)
200-
}
201-
}
202207
}
203208
}
204209
}

0 commit comments

Comments
 (0)