From 67569dae4697ecd0ab5b7b71d928b6d51a69a4c7 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 16 Jul 2015 01:14:54 +0800 Subject: [PATCH 1/7] Close reader early if there is no more data. --- .../src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 7 ++++++- .../org/apache/spark/sql/sources/SqlNewHadoopRDD.scala | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index f827270ee6a4..65fb429453e4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -141,6 +141,9 @@ class NewHadoopRDD[K, V]( override def hasNext: Boolean = { if (!finished && !havePair) { finished = !reader.nextKeyValue + if (finished) { + reader.close() + } havePair = !finished } !finished @@ -159,7 +162,9 @@ class NewHadoopRDD[K, V]( private def close() { try { - reader.close() + if (!finished) { + reader.close() + } if (bytesReadCallback.isDefined) { inputMetrics.updateBytesRead() } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] || diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala index 2bdc34102125..3818505706fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala @@ -161,6 +161,9 @@ private[sql] class SqlNewHadoopRDD[K, V]( override def hasNext: Boolean = { if (!finished && !havePair) { finished = !reader.nextKeyValue + if (finished) { + reader.close() + } havePair = !finished } !finished @@ -179,7 +182,9 @@ private[sql] class SqlNewHadoopRDD[K, V]( private def close() { try { - reader.close() + if (!finished) { + reader.close() + } if (bytesReadCallback.isDefined) { inputMetrics.updateBytesRead() } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] || From f4290167f60a54ff312aab69809e42236212d6f4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 16 Jul 2015 17:22:53 +0800 Subject: [PATCH 2/7] Release reader if we don't need it. --- core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 3 +++ .../scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala | 3 +++ 2 files changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 65fb429453e4..3c5db48f2f1a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -142,7 +142,9 @@ class NewHadoopRDD[K, V]( if (!finished && !havePair) { finished = !reader.nextKeyValue if (finished) { + // Close reader and release it reader.close() + reader = null } havePair = !finished } @@ -164,6 +166,7 @@ class NewHadoopRDD[K, V]( try { if (!finished) { reader.close() + reader = null } if (bytesReadCallback.isDefined) { inputMetrics.updateBytesRead() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala index 3818505706fb..fd3ab5c56689 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala @@ -162,7 +162,9 @@ private[sql] class SqlNewHadoopRDD[K, V]( if (!finished && !havePair) { finished = !reader.nextKeyValue if (finished) { + // Close reader and release it reader.close() + reader = null } havePair = !finished } @@ -184,6 +186,7 @@ private[sql] class SqlNewHadoopRDD[K, V]( try { if (!finished) { reader.close() + reader = null } if (bytesReadCallback.isDefined) { inputMetrics.updateBytesRead() From 216912fe0e606e9a992378c9082be1876b934d12 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 16 Jul 2015 17:51:49 +0800 Subject: [PATCH 3/7] Fix it. --- core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 2 +- .../scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 3c5db48f2f1a..6dd43ccd2cb2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -128,7 +128,7 @@ class NewHadoopRDD[K, V]( configurable.setConf(conf) case _ => } - val reader = format.createRecordReader( + private var reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala index fd3ab5c56689..29a92f3ddabc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala @@ -148,7 +148,7 @@ private[sql] class SqlNewHadoopRDD[K, V]( configurable.setConf(conf) case _ => } - val reader = format.createRecordReader( + private var reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) From e34d98ef837738cc03a3a97932bd81e410d5b6cc Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 20 Jul 2015 16:00:21 +0800 Subject: [PATCH 4/7] For comments. --- core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 6dd43ccd2cb2..61180a43131c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -143,8 +143,7 @@ class NewHadoopRDD[K, V]( finished = !reader.nextKeyValue if (finished) { // Close reader and release it - reader.close() - reader = null + close() } havePair = !finished } @@ -164,7 +163,8 @@ class NewHadoopRDD[K, V]( private def close() { try { - if (!finished) { + if (reader != null) { + // Close reader and release it reader.close() reader = null } From 3ceb755a54bd278813b1e65c4bbe9ca0d012ee26 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 20 Jul 2015 16:03:12 +0800 Subject: [PATCH 5/7] For comments. --- .../scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala index 29a92f3ddabc..f023cb24bb8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala @@ -163,8 +163,7 @@ private[sql] class SqlNewHadoopRDD[K, V]( finished = !reader.nextKeyValue if (finished) { // Close reader and release it - reader.close() - reader = null + close() } havePair = !finished } @@ -184,7 +183,7 @@ private[sql] class SqlNewHadoopRDD[K, V]( private def close() { try { - if (!finished) { + if (reader != null) { reader.close() reader = null } From e152182ebf8a7af5be51f6feec627392d02e96f4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 22 Jul 2015 10:41:55 +0800 Subject: [PATCH 6/7] For comments. --- .../org/apache/spark/rdd/NewHadoopRDD.scala | 25 ++++++++++--------- .../spark/sql/execution/SqlNewHadoopRDD.scala | 25 ++++++++++--------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 61180a43131c..cfaf4b5bbb5f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -167,18 +167,19 @@ class NewHadoopRDD[K, V]( // Close reader and release it reader.close() reader = null - } - if (bytesReadCallback.isDefined) { - inputMetrics.updateBytesRead() - } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] || - split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) { - // If we can't get the bytes read from the FS stats, fall back to the split size, - // which may be inaccurate. - try { - inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength) - } catch { - case e: java.io.IOException => - logWarning("Unable to get input size to set InputMetrics for task", e) + + if (bytesReadCallback.isDefined) { + inputMetrics.updateBytesRead() + } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] || + split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) { + // If we can't get the bytes read from the FS stats, fall back to the split size, + // which may be inaccurate. + try { + inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength) + } catch { + case e: java.io.IOException => + logWarning("Unable to get input size to set InputMetrics for task", e) + } } } } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala index b5a6e914ab63..7487894be5d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala @@ -185,18 +185,19 @@ private[sql] class SqlNewHadoopRDD[K, V]( if (reader != null) { reader.close() reader = null - } - if (bytesReadCallback.isDefined) { - inputMetrics.updateBytesRead() - } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] || - split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) { - // If we can't get the bytes read from the FS stats, fall back to the split size, - // which may be inaccurate. - try { - inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength) - } catch { - case e: java.io.IOException => - logWarning("Unable to get input size to set InputMetrics for task", e) + + if (bytesReadCallback.isDefined) { + inputMetrics.updateBytesRead() + } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] || + split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) { + // If we can't get the bytes read from the FS stats, fall back to the split size, + // which may be inaccurate. + try { + inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength) + } catch { + case e: java.io.IOException => + logWarning("Unable to get input size to set InputMetrics for task", e) + } } } } catch { From 3ff64e576c066b9242f2c5bc22e5ab2a760d44d4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 24 Jul 2015 10:50:47 +0800 Subject: [PATCH 7/7] For comments. --- core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 4 +++- .../org/apache/spark/sql/execution/SqlNewHadoopRDD.scala | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index cfaf4b5bbb5f..f83a051f5da1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -142,7 +142,9 @@ class NewHadoopRDD[K, V]( if (!finished && !havePair) { finished = !reader.nextKeyValue if (finished) { - // Close reader and release it + // Close and release the reader here; close() will also be called when the task + // completes, but for tasks that read from many files, it helps to release the + // resources early. close() } havePair = !finished diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala index 7487894be5d8..3d75b6a91def 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala @@ -161,7 +161,9 @@ private[sql] class SqlNewHadoopRDD[K, V]( if (!finished && !havePair) { finished = !reader.nextKeyValue if (finished) { - // Close reader and release it + // Close and release the reader here; close() will also be called when the task + // completes, but for tasks that read from many files, it helps to release the + // resources early. close() } havePair = !finished