Skip to content

Commit 64135cb

Browse files
viiryakayousterhout
authored andcommitted
[SPARK-9067] [SQL] Close reader in NewHadoopRDD early if there is no more data
JIRA: https://issues.apache.org/jira/browse/SPARK-9067 According to the description of the JIRA ticket, calling `reader.close()` only after the task is finished will cause memory and file open limit problem since these resources are occupied even we don't need that anymore. This PR simply closes the reader early when we know there is no more data to read. Author: Liang-Chi Hsieh <[email protected]> Closes #7424 from viirya/close_reader and squashes the following commits: 3ff64e5 [Liang-Chi Hsieh] For comments. 3d20267 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into close_reader e152182 [Liang-Chi Hsieh] For comments. 5116cbe [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into close_reader 3ceb755 [Liang-Chi Hsieh] For comments. e34d98e [Liang-Chi Hsieh] For comments. 50ed729 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into close_reader 216912f [Liang-Chi Hsieh] Fix it. f429016 [Liang-Chi Hsieh] Release reader if we don't need it. a305621 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into close_reader 67569da [Liang-Chi Hsieh] Close reader early if there is no more data.
1 parent 9a11396 commit 64135cb

File tree

2 files changed

+47
-26
lines changed

2 files changed

+47
-26
lines changed

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ class NewHadoopRDD[K, V](
128128
configurable.setConf(conf)
129129
case _ =>
130130
}
131-
val reader = format.createRecordReader(
131+
private var reader = format.createRecordReader(
132132
split.serializableHadoopSplit.value, hadoopAttemptContext)
133133
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
134134

@@ -141,6 +141,12 @@ class NewHadoopRDD[K, V](
141141
override def hasNext: Boolean = {
142142
if (!finished && !havePair) {
143143
finished = !reader.nextKeyValue
144+
if (finished) {
145+
// Close and release the reader here; close() will also be called when the task
146+
// completes, but for tasks that read from many files, it helps to release the
147+
// resources early.
148+
close()
149+
}
144150
havePair = !finished
145151
}
146152
!finished
@@ -159,18 +165,23 @@ class NewHadoopRDD[K, V](
159165

160166
private def close() {
161167
try {
162-
reader.close()
163-
if (bytesReadCallback.isDefined) {
164-
inputMetrics.updateBytesRead()
165-
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
166-
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
167-
// If we can't get the bytes read from the FS stats, fall back to the split size,
168-
// which may be inaccurate.
169-
try {
170-
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
171-
} catch {
172-
case e: java.io.IOException =>
173-
logWarning("Unable to get input size to set InputMetrics for task", e)
168+
if (reader != null) {
169+
// Close reader and release it
170+
reader.close()
171+
reader = null
172+
173+
if (bytesReadCallback.isDefined) {
174+
inputMetrics.updateBytesRead()
175+
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
176+
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
177+
// If we can't get the bytes read from the FS stats, fall back to the split size,
178+
// which may be inaccurate.
179+
try {
180+
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
181+
} catch {
182+
case e: java.io.IOException =>
183+
logWarning("Unable to get input size to set InputMetrics for task", e)
184+
}
174185
}
175186
}
176187
} catch {

sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ private[sql] class SqlNewHadoopRDD[K, V](
147147
configurable.setConf(conf)
148148
case _ =>
149149
}
150-
val reader = format.createRecordReader(
150+
private var reader = format.createRecordReader(
151151
split.serializableHadoopSplit.value, hadoopAttemptContext)
152152
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
153153

@@ -160,6 +160,12 @@ private[sql] class SqlNewHadoopRDD[K, V](
160160
override def hasNext: Boolean = {
161161
if (!finished && !havePair) {
162162
finished = !reader.nextKeyValue
163+
if (finished) {
164+
// Close and release the reader here; close() will also be called when the task
165+
// completes, but for tasks that read from many files, it helps to release the
166+
// resources early.
167+
close()
168+
}
163169
havePair = !finished
164170
}
165171
!finished
@@ -178,18 +184,22 @@ private[sql] class SqlNewHadoopRDD[K, V](
178184

179185
private def close() {
180186
try {
181-
reader.close()
182-
if (bytesReadCallback.isDefined) {
183-
inputMetrics.updateBytesRead()
184-
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
185-
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
186-
// If we can't get the bytes read from the FS stats, fall back to the split size,
187-
// which may be inaccurate.
188-
try {
189-
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
190-
} catch {
191-
case e: java.io.IOException =>
192-
logWarning("Unable to get input size to set InputMetrics for task", e)
187+
if (reader != null) {
188+
reader.close()
189+
reader = null
190+
191+
if (bytesReadCallback.isDefined) {
192+
inputMetrics.updateBytesRead()
193+
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
194+
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
195+
// If we can't get the bytes read from the FS stats, fall back to the split size,
196+
// which may be inaccurate.
197+
try {
198+
inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
199+
} catch {
200+
case e: java.io.IOException =>
201+
logWarning("Unable to get input size to set InputMetrics for task", e)
202+
}
193203
}
194204
}
195205
} catch {

0 commit comments

Comments
 (0)