Skip to content
37 changes: 24 additions & 13 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class NewHadoopRDD[K, V](
configurable.setConf(conf)
case _ =>
}
val reader = format.createRecordReader(
private var reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)

Expand All @@ -141,6 +141,12 @@ class NewHadoopRDD[K, V](
override def hasNext: Boolean = {
if (!finished && !havePair) {
finished = !reader.nextKeyValue
if (finished) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense if we just call close here?

// Close and release the reader here; close() will also be called when the task
// completes, but for tasks that read from many files, it helps to release the
// resources early.
close()
}
havePair = !finished
}
!finished
Expand All @@ -159,18 +165,23 @@ class NewHadoopRDD[K, V](

private def close() {
try {
reader.close()
if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
if (reader != null) {
// Close reader and release it
reader.close()
reader = null

if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
}
}
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private[sql] class SqlNewHadoopRDD[K, V](
configurable.setConf(conf)
case _ =>
}
val reader = format.createRecordReader(
private var reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)

Expand All @@ -160,6 +160,12 @@ private[sql] class SqlNewHadoopRDD[K, V](
override def hasNext: Boolean = {
if (!finished && !havePair) {
finished = !reader.nextKeyValue
if (finished) {
// Close and release the reader here; close() will also be called when the task
// completes, but for tasks that read from many files, it helps to release the
// resources early.
close()
}
havePair = !finished
}
!finished
Expand All @@ -178,18 +184,22 @@ private[sql] class SqlNewHadoopRDD[K, V](

private def close() {
try {
reader.close()
if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
if (reader != null) {
reader.close()
reader = null

if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
}
}
}
} catch {
Expand Down