Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Jul 15, 2015

JIRA: https://issues.apache.org/jira/browse/SPARK-9067

According to the description of the JIRA ticket, calling reader.close() only after the task is finished will cause memory and file open limit problem since these resources are occupied even we don't need that anymore.

This PR simply closes the reader early when we know there is no more data to read.

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37382 has finished for PR 7424 at commit 67569da.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #37487 has finished for PR 7424 at commit f429016.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Jul 16, 2015

I was going to say that a load of the iterators implemented in Spark could be simpler if they used Guava's AbstractIterator, since it avoids a bunch of logic and corner cases, but it may be too hard to convert them now. For example this still has some minor corner case problems like it will never close the reader unless hasNext is called after the last element is consumed.

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #37489 has finished for PR 7424 at commit 216912f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jul 16, 2015

The test is passed locally.

@viirya
Copy link
Member Author

viirya commented Jul 16, 2015

please retest this.

@sarutak
Copy link
Member

sarutak commented Jul 16, 2015

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #37515 has finished for PR 7424 at commit 216912f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@knizhnik
Copy link

Thank you
Now the problem is fixed

@viirya
Copy link
Member Author

viirya commented Jul 19, 2015

cc @rxin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense if we just call close here?

@rxin
Copy link
Contributor

rxin commented Jul 20, 2015

cc @zsxwing for review

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @srowen mentioned previously, this won't close the reader if calling close before reaching at the end of stream. E.g., if there are 10 items in this Iterator, but the user only uses the first item and then exits the task (it will trigger TaskCompletionListener to call close).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we explicitly call reader.close() here, will it not be closed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides, I think @srowen didn't mean that this won't close the reader.

@SparkQA
Copy link

SparkQA commented Jul 20, 2015

Test build #37823 has finished for PR 7424 at commit 3ceb755.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 20, 2015

Test build #37822 has finished for PR 7424 at commit e34d98e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Jul 21, 2015

@viirya inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength) is not idempotent. Could you also fix it?

@viirya
Copy link
Member Author

viirya commented Jul 21, 2015

@zsxwing ok, I will fix it later.

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #38017 has finished for PR 7424 at commit e152182.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class FormatString(children: Expression*) extends Expression with ImplicitCastInputTypes

@zsxwing
Copy link
Member

zsxwing commented Jul 22, 2015

LGTM. @rxin could you take a final look?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you change this comment (and the one in SqlNewHadoopRdd) to say something like "Close and release the reader here; close() will also be called when the task completes, but for tasks that read from many files, it helps to release the resources early"? I'm just worried this could be removed later on if someone things it's redundant with the close() in task completion listener.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I added it.

@kayousterhout
Copy link
Contributor

LGTM other than the comment improvement

@SparkQA
Copy link

SparkQA commented Jul 24, 2015

Test build #38299 has finished for PR 7424 at commit 3ff64e5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ChangePrecision(child: Expression) extends UnaryExpression
    • abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable with Unevaluable
    • abstract class AggregateFunction1 extends LeafExpression with Serializable
    • case class DecimalType(precision: Int, scale: Int) extends FractionalType
    • case class DecimalConversion(precision: Int, scale: Int) extends JDBCConversion

@viirya
Copy link
Member Author

viirya commented Jul 24, 2015

ping @zsxwing @rxin

@asfgit asfgit closed this in 64135cb Jul 24, 2015
@kayousterhout
Copy link
Contributor

@rxin I merged this but wasn't sure whether to backport to 1.3/1.4; I'll let you do that if you think it's necessary. Thanks for this fix @viirya!

asfgit pushed a commit that referenced this pull request Oct 31, 2015
**TL;DR**: We can rule out one rare but potential cause of input stream corruption via defensive programming.

## Background

[MAPREDUCE-5918](https://issues.apache.org/jira/browse/MAPREDUCE-5918) is a bug where an instance of a decompressor ends up getting placed into a pool multiple times. Since the pool is backed by a list instead of a set, this can lead to the same decompressor being used in different places at the same time, which is not safe because those decompressors will overwrite each other's buffers. Sometimes this buffer sharing will lead to exceptions but other times it will might silently result in invalid / garbled input.

That Hadoop bug is fixed in Hadoop 2.7 but is still present in many Hadoop versions that we wish to support. As a result, I think that we should try to work around this issue in Spark via defensive programming to prevent RecordReaders from being closed multiple times.

So far, I've had a hard time coming up with explanations of exactly how double-`close()`s occur in practice, but I do have a couple of explanations that work on paper.

For instance, it looks like #7424, added in 1.5, introduces at least one extremely~rare corner-case path where Spark could double-close() a LineRecordReader instance in a way that triggers the bug. Here are the steps involved in the bad execution that I brainstormed up:

* [The task has finished reading input, so we call close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L168).
* [While handling the close call and trying to close the reader, reader.close() throws an exception]( https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L190)
* We don't set `reader = null` after handling this exception, so the [TaskCompletionListener also ends up calling NewHadoopRDD.close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L156), which, in turn, closes the record reader again.

In this hypothetical situation, `LineRecordReader.close()` could [fail with an exception if its InputStream failed to close](https://github.com/apache/hadoop/blob/release-1.2.1/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java#L212).
I googled for "Exception in RecordReader.close()" and it looks like it's possible for a closed Hadoop FileSystem to trigger an error there: [SPARK-757](https://issues.apache.org/jira/browse/SPARK-757), [SPARK-2491](https://issues.apache.org/jira/browse/SPARK-2491)

Looking at [SPARK-3052](https://issues.apache.org/jira/browse/SPARK-3052), it seems like it's possible to get spurious exceptions there when there is an error reading from Hadoop. If the Hadoop FileSystem were to get into an error state _right_ after reading the last record then it looks like we could hit the bug here in 1.5.

## The fix

This patch guards against these issues by modifying `HadoopRDD.close()` and `NewHadoopRDD.close()` so that they set `reader = null` even if an exception occurs in the `reader.close()` call. In addition, I modified `NextIterator. closeIfNeeded()` to guard against double-close if the first `close()` call throws an exception.

I don't have an easy way to test this, since I haven't been able to reproduce the bug that prompted this patch, but these changes seem safe and seem to rule out the on-paper reproductions that I was able to brainstorm up.

Author: Josh Rosen <[email protected]>

Closes #9382 from JoshRosen/hadoop-decompressor-pooling-fix and squashes the following commits:

5ec97d7 [Josh Rosen] Add SqlNewHadoopRDD.unsetInputFileName() that I accidentally deleted.
ae46cf4 [Josh Rosen] Merge remote-tracking branch 'origin/master' into hadoop-decompressor-pooling-fix
087aa63 [Josh Rosen] Guard against double-close() of RecordReaders.

(cherry picked from commit ac4118d)
Signed-off-by: Josh Rosen <[email protected]>
asfgit pushed a commit that referenced this pull request Oct 31, 2015
**TL;DR**: We can rule out one rare but potential cause of input stream corruption via defensive programming.

## Background

[MAPREDUCE-5918](https://issues.apache.org/jira/browse/MAPREDUCE-5918) is a bug where an instance of a decompressor ends up getting placed into a pool multiple times. Since the pool is backed by a list instead of a set, this can lead to the same decompressor being used in different places at the same time, which is not safe because those decompressors will overwrite each other's buffers. Sometimes this buffer sharing will lead to exceptions but other times it will might silently result in invalid / garbled input.

That Hadoop bug is fixed in Hadoop 2.7 but is still present in many Hadoop versions that we wish to support. As a result, I think that we should try to work around this issue in Spark via defensive programming to prevent RecordReaders from being closed multiple times.

So far, I've had a hard time coming up with explanations of exactly how double-`close()`s occur in practice, but I do have a couple of explanations that work on paper.

For instance, it looks like #7424, added in 1.5, introduces at least one extremely~rare corner-case path where Spark could double-close() a LineRecordReader instance in a way that triggers the bug. Here are the steps involved in the bad execution that I brainstormed up:

* [The task has finished reading input, so we call close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L168).
* [While handling the close call and trying to close the reader, reader.close() throws an exception]( https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L190)
* We don't set `reader = null` after handling this exception, so the [TaskCompletionListener also ends up calling NewHadoopRDD.close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L156), which, in turn, closes the record reader again.

In this hypothetical situation, `LineRecordReader.close()` could [fail with an exception if its InputStream failed to close](https://github.com/apache/hadoop/blob/release-1.2.1/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java#L212).
I googled for "Exception in RecordReader.close()" and it looks like it's possible for a closed Hadoop FileSystem to trigger an error there: [SPARK-757](https://issues.apache.org/jira/browse/SPARK-757), [SPARK-2491](https://issues.apache.org/jira/browse/SPARK-2491)

Looking at [SPARK-3052](https://issues.apache.org/jira/browse/SPARK-3052), it seems like it's possible to get spurious exceptions there when there is an error reading from Hadoop. If the Hadoop FileSystem were to get into an error state _right_ after reading the last record then it looks like we could hit the bug here in 1.5.

## The fix

This patch guards against these issues by modifying `HadoopRDD.close()` and `NewHadoopRDD.close()` so that they set `reader = null` even if an exception occurs in the `reader.close()` call. In addition, I modified `NextIterator. closeIfNeeded()` to guard against double-close if the first `close()` call throws an exception.

I don't have an easy way to test this, since I haven't been able to reproduce the bug that prompted this patch, but these changes seem safe and seem to rule out the on-paper reproductions that I was able to brainstorm up.

Author: Josh Rosen <[email protected]>

Closes #9382 from JoshRosen/hadoop-decompressor-pooling-fix and squashes the following commits:

5ec97d7 [Josh Rosen] Add SqlNewHadoopRDD.unsetInputFileName() that I accidentally deleted.
ae46cf4 [Josh Rosen] Merge remote-tracking branch 'origin/master' into hadoop-decompressor-pooling-fix
087aa63 [Josh Rosen] Guard against double-close() of RecordReaders.
kiszk pushed a commit to kiszk/spark-gpu that referenced this pull request Dec 26, 2015
**TL;DR**: We can rule out one rare but potential cause of input stream corruption via defensive programming.

## Background

[MAPREDUCE-5918](https://issues.apache.org/jira/browse/MAPREDUCE-5918) is a bug where an instance of a decompressor ends up getting placed into a pool multiple times. Since the pool is backed by a list instead of a set, this can lead to the same decompressor being used in different places at the same time, which is not safe because those decompressors will overwrite each other's buffers. Sometimes this buffer sharing will lead to exceptions but other times it will might silently result in invalid / garbled input.

That Hadoop bug is fixed in Hadoop 2.7 but is still present in many Hadoop versions that we wish to support. As a result, I think that we should try to work around this issue in Spark via defensive programming to prevent RecordReaders from being closed multiple times.

So far, I've had a hard time coming up with explanations of exactly how double-`close()`s occur in practice, but I do have a couple of explanations that work on paper.

For instance, it looks like apache/spark#7424, added in 1.5, introduces at least one extremely~rare corner-case path where Spark could double-close() a LineRecordReader instance in a way that triggers the bug. Here are the steps involved in the bad execution that I brainstormed up:

* [The task has finished reading input, so we call close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L168).
* [While handling the close call and trying to close the reader, reader.close() throws an exception]( https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L190)
* We don't set `reader = null` after handling this exception, so the [TaskCompletionListener also ends up calling NewHadoopRDD.close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L156), which, in turn, closes the record reader again.

In this hypothetical situation, `LineRecordReader.close()` could [fail with an exception if its InputStream failed to close](https://github.com/apache/hadoop/blob/release-1.2.1/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java#L212).
I googled for "Exception in RecordReader.close()" and it looks like it's possible for a closed Hadoop FileSystem to trigger an error there: [SPARK-757](https://issues.apache.org/jira/browse/SPARK-757), [SPARK-2491](https://issues.apache.org/jira/browse/SPARK-2491)

Looking at [SPARK-3052](https://issues.apache.org/jira/browse/SPARK-3052), it seems like it's possible to get spurious exceptions there when there is an error reading from Hadoop. If the Hadoop FileSystem were to get into an error state _right_ after reading the last record then it looks like we could hit the bug here in 1.5.

## The fix

This patch guards against these issues by modifying `HadoopRDD.close()` and `NewHadoopRDD.close()` so that they set `reader = null` even if an exception occurs in the `reader.close()` call. In addition, I modified `NextIterator. closeIfNeeded()` to guard against double-close if the first `close()` call throws an exception.

I don't have an easy way to test this, since I haven't been able to reproduce the bug that prompted this patch, but these changes seem safe and seem to rule out the on-paper reproductions that I was able to brainstorm up.

Author: Josh Rosen <[email protected]>

Closes #9382 from JoshRosen/hadoop-decompressor-pooling-fix and squashes the following commits:

5ec97d7 [Josh Rosen] Add SqlNewHadoopRDD.unsetInputFileName() that I accidentally deleted.
ae46cf4 [Josh Rosen] Merge remote-tracking branch 'origin/master' into hadoop-decompressor-pooling-fix
087aa63 [Josh Rosen] Guard against double-close() of RecordReaders.
@viirya viirya deleted the close_reader branch December 27, 2023 18:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants