Skip to content

Conversation

@JoshRosen
Copy link
Contributor

TL;DR: We can rule out one rare but potential cause of input stream corruption via defensive programming.

Background

MAPREDUCE-5918 is a bug where an instance of a decompressor ends up getting placed into a pool multiple times. Since the pool is backed by a list instead of a set, this can lead to the same decompressor being used in different places at the same time, which is not safe because those decompressors will overwrite each other's buffers. Sometimes this buffer sharing will lead to exceptions but other times it will might silently result in invalid / garbled input.

That Hadoop bug is fixed in Hadoop 2.7 but is still present in many Hadoop versions that we wish to support. As a result, I think that we should try to work around this issue in Spark via defensive programming to prevent RecordReaders from being closed multiple times.

So far, I've had a hard time coming up with explanations of exactly how double-close()s occur in practice, but I do have a couple of explanations that work on paper.

For instance, it looks like #7424, added in 1.5, introduces at least one extremely~rare corner-case path where Spark could double-close() a LineRecordReader instance in a way that triggers the bug. Here are the steps involved in the bad execution that I brainstormed up:

In this hypothetical situation, LineRecordReader.close() could fail with an exception if its InputStream failed to close.
I googled for "Exception in RecordReader.close()" and it looks like it's possible for a closed Hadoop FileSystem to trigger an error there: SPARK-757, SPARK-2491

Looking at SPARK-3052, it seems like it's possible to get spurious exceptions there when there is an error reading from Hadoop. If the Hadoop FileSystem were to get into an error state right after reading the last record then it looks like we could hit the bug here in 1.5.

The fix

This patch guards against these issues by modifying HadoopRDD.close() and NewHadoopRDD.close() so that they set reader = null even if an exception occurs in the reader.close() call. In addition, I modified NextIterator. closeIfNeeded() to guard against double-close if the first close() call throws an exception.

I don't have an easy way to test this, since I haven't been able to reproduce the bug that prompted this patch, but these changes seem safe and seem to rule out the on-paper reproductions that I was able to brainstorm up.

@JoshRosen
Copy link
Contributor Author

/cc @yhuai @marmbrus

@yhuai
Copy link
Contributor

yhuai commented Oct 30, 2015

LGTM.

@harishreedharan
Copy link
Contributor

+1.

@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44694 has finished for PR 9382 at commit 087aa63.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@JoshRosen
Copy link
Contributor Author

Flaky SQL test?

@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44703 has finished for PR 9382 at commit 087aa63.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Oct 30, 2015

test this please

@SparkQA
Copy link

SparkQA commented Oct 31, 2015

Test build #44716 has finished for PR 9382 at commit 087aa63.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Oct 31, 2015

@JoshRosen
Copy link
Contributor Author

@yhuai, here's the source code of the failing test:

  test("InputFileName") {
    withTempPath { dir =>
      val data = sparkContext.parallelize(0 to 10).toDF("id")
      data.write.parquet(dir.getCanonicalPath)
      val answer = sqlContext.read.parquet(dir.getCanonicalPath).select(inputFileName())
        .head.getString(0)
      assert(answer.contains(dir.getCanonicalPath))

      checkAnswer(data.select(inputFileName()).limit(1), Row(""))
    }
  }

Is the original assertion supposed to be checking that inputFileName() returns an empty string for data frames that were created by sc.parallelize()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, this is why the test was failing.

@JoshRosen
Copy link
Contributor Author

Spotted the problem: I accidentally deleted a SqlNewHadoopRDD.unsetInputFileName() call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, this use of nextIterator() explains why this bug was able to occur on 1.4.x: the first close() call, which throws an exception, comes from NextIterator, and the second comes from the task completion listener.

@JoshRosen
Copy link
Contributor Author

Due to the NextIterator case in HadoopRDD that I pointed out up-thread, this bug potentially impacts all released versions of Spark. Therefore, I think we should fix this in 1.5.2, 1.6.0, 1.4.2, and 1.3.2.

@SparkQA
Copy link

SparkQA commented Oct 31, 2015

Test build #44727 has finished for PR 9382 at commit 5ec97d7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Alright, fixed the tests and proofread this a second time, so I'm going to merge this to master and branch-1.5.

asfgit pushed a commit that referenced this pull request Oct 31, 2015
**TL;DR**: We can rule out one rare but potential cause of input stream corruption via defensive programming.

## Background

[MAPREDUCE-5918](https://issues.apache.org/jira/browse/MAPREDUCE-5918) is a bug where an instance of a decompressor ends up getting placed into a pool multiple times. Since the pool is backed by a list instead of a set, this can lead to the same decompressor being used in different places at the same time, which is not safe because those decompressors will overwrite each other's buffers. Sometimes this buffer sharing will lead to exceptions but other times it will might silently result in invalid / garbled input.

That Hadoop bug is fixed in Hadoop 2.7 but is still present in many Hadoop versions that we wish to support. As a result, I think that we should try to work around this issue in Spark via defensive programming to prevent RecordReaders from being closed multiple times.

So far, I've had a hard time coming up with explanations of exactly how double-`close()`s occur in practice, but I do have a couple of explanations that work on paper.

For instance, it looks like #7424, added in 1.5, introduces at least one extremely~rare corner-case path where Spark could double-close() a LineRecordReader instance in a way that triggers the bug. Here are the steps involved in the bad execution that I brainstormed up:

* [The task has finished reading input, so we call close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L168).
* [While handling the close call and trying to close the reader, reader.close() throws an exception]( https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L190)
* We don't set `reader = null` after handling this exception, so the [TaskCompletionListener also ends up calling NewHadoopRDD.close()](https://github.com/apache/spark/blob/v1.5.1/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L156), which, in turn, closes the record reader again.

In this hypothetical situation, `LineRecordReader.close()` could [fail with an exception if its InputStream failed to close](https://github.com/apache/hadoop/blob/release-1.2.1/src/mapred/org/apache/hadoop/mapred/LineRecordReader.java#L212).
I googled for "Exception in RecordReader.close()" and it looks like it's possible for a closed Hadoop FileSystem to trigger an error there: [SPARK-757](https://issues.apache.org/jira/browse/SPARK-757), [SPARK-2491](https://issues.apache.org/jira/browse/SPARK-2491)

Looking at [SPARK-3052](https://issues.apache.org/jira/browse/SPARK-3052), it seems like it's possible to get spurious exceptions there when there is an error reading from Hadoop. If the Hadoop FileSystem were to get into an error state _right_ after reading the last record then it looks like we could hit the bug here in 1.5.

## The fix

This patch guards against these issues by modifying `HadoopRDD.close()` and `NewHadoopRDD.close()` so that they set `reader = null` even if an exception occurs in the `reader.close()` call. In addition, I modified `NextIterator. closeIfNeeded()` to guard against double-close if the first `close()` call throws an exception.

I don't have an easy way to test this, since I haven't been able to reproduce the bug that prompted this patch, but these changes seem safe and seem to rule out the on-paper reproductions that I was able to brainstorm up.

Author: Josh Rosen <[email protected]>

Closes #9382 from JoshRosen/hadoop-decompressor-pooling-fix and squashes the following commits:

5ec97d7 [Josh Rosen] Add SqlNewHadoopRDD.unsetInputFileName() that I accidentally deleted.
ae46cf4 [Josh Rosen] Merge remote-tracking branch 'origin/master' into hadoop-decompressor-pooling-fix
087aa63 [Josh Rosen] Guard against double-close() of RecordReaders.

(cherry picked from commit ac4118d)
Signed-off-by: Josh Rosen <[email protected]>
@asfgit asfgit closed this in ac4118d Oct 31, 2015
@JoshRosen JoshRosen deleted the hadoop-decompressor-pooling-fix branch October 31, 2015 17:49
asfgit pushed a commit that referenced this pull request Nov 2, 2015
….4 backport)

This is a branch-1.4 backport of #9382, a fix for SPARK-11424.

Author: Josh Rosen <[email protected]>

Closes #9388 from JoshRosen/hadoop-decompressor-pooling-fix-branch-1.4.
JoshRosen added a commit to JoshRosen/spark that referenced this pull request Nov 3, 2015
….4 backport)

This is a branch-1.4 backport of apache#9382, a fix for SPARK-11424.

Author: Josh Rosen <[email protected]>

Closes apache#9388 from JoshRosen/hadoop-decompressor-pooling-fix-branch-1.4.
asfgit pushed a commit that referenced this pull request Nov 3, 2015
….3 backport)

This is a branch-1.3 backport of #9382, a fix for SPARK-11424.

Author: Josh Rosen <[email protected]>

Closes #9423 from JoshRosen/hadoop-decompressor-pooling-fix-branch-1.3.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants