Skip to content

Commit 67569da

Browse files
committed
Close reader early if there is no more data.
1 parent 6f69025 commit 67569da

File tree

2 files changed

+12
-2
lines changed

2 files changed

+12
-2
lines changed

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,9 @@ class NewHadoopRDD[K, V](
141141
override def hasNext: Boolean = {
142142
if (!finished && !havePair) {
143143
finished = !reader.nextKeyValue
144+
if (finished) {
145+
reader.close()
146+
}
144147
havePair = !finished
145148
}
146149
!finished
@@ -159,7 +162,9 @@ class NewHadoopRDD[K, V](
159162

160163
private def close() {
161164
try {
162-
reader.close()
165+
if (!finished) {
166+
reader.close()
167+
}
163168
if (bytesReadCallback.isDefined) {
164169
inputMetrics.updateBytesRead()
165170
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||

sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ private[sql] class SqlNewHadoopRDD[K, V](
161161
override def hasNext: Boolean = {
162162
if (!finished && !havePair) {
163163
finished = !reader.nextKeyValue
164+
if (finished) {
165+
reader.close()
166+
}
164167
havePair = !finished
165168
}
166169
!finished
@@ -179,7 +182,9 @@ private[sql] class SqlNewHadoopRDD[K, V](
179182

180183
private def close() {
181184
try {
182-
reader.close()
185+
if (!finished) {
186+
reader.close()
187+
}
183188
if (bytesReadCallback.isDefined) {
184189
inputMetrics.updateBytesRead()
185190
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||

0 commit comments

Comments
 (0)