-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-23399][SQL] Register a task completion listener first for OrcColumnarBatchReader #20590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…lumnarBatchReader
| val iter = new RecordReaderIterator(batchReader) | ||
| Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close())) | ||
|
|
||
| batchReader.initialize(fileSplit, taskAttemptContext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the reported case, the ORC file is opened here.
But, it seems that the task is killed, TaskKilled (Stage cancelled), during initBatch before registering its listener . For a case throwing Exception at initBatch, this PR prevents the open file leakage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan and @gatorsmile . Could you take a look this?
For ORC library, it looks okay when we call close correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun Thanks for this fix! My question is how do we know if close is not called before and is called now? Have you verified it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I tried to verify it manually in local, seems close is called before this change. Maybe I miss something or this is environment depending.
|
Test build #87349 has finished for PR 20590 at commit
|
|
Umm... |
|
@dongjoon-hyun One question. Do you know which test case in |
|
looks reasonable.
|
|
I know it's hard to add a test, we need a malformed ORC file to make the reader fail midway. @dongjoon-hyun do you think it's possible to generate such a ORC file? |
|
Thank you for review, @viirya , @kiszk , @cloud-fan . |
| assert(e.getCause.isInstanceOf[OutOfMemoryError]) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, All.
The above test case generates the same leakage reported in JIRA.
And, this PR fixes that. Please try this test case in IntelliJ with the master branch.
| } | ||
|
|
||
| // This should be tested manually because it raises OOM intentionally | ||
| // in order to cause `Leaked filesystem connection`. The test suite dies, too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, nice trick to fail the reader midway!
But it's a little weird to have it as a unit test, shall we just put it in the PR description and say it's manually tested? This test needs to be run manually anyway...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure!
|
Test build #87384 has finished for PR 20590 at commit
|
|
Test build #87385 has finished for PR 20590 at commit
|
|
retest this please |
| val batchReader = new OrcColumnarBatchReader( | ||
| enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, capacity) | ||
| val iter = new RecordReaderIterator(batchReader) | ||
| Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add comment why we put this registration here with SPARK-23399. Since we would forget this investigation in the future :), this comment will help us and will remind to run the test case manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure!
|
Test build #87393 has finished for PR 20590 at commit
|
|
Test build #87397 has finished for PR 20590 at commit
|
|
Test build #87402 has finished for PR 20590 at commit
|
|
Retest this please. |
|
Test build #87425 has finished for PR 20590 at commit
|
|
Hi, @cloud-fan and @gatorsmile . |
|
thanks, merging to master/2.3! |
…olumnarBatchReader This PR aims to resolve an open file leakage issue reported at [SPARK-23390](https://issues.apache.org/jira/browse/SPARK-23390) by moving the listener registration position. Currently, the sequence is like the following. 1. Create `batchReader` 2. `batchReader.initialize` opens a ORC file. 3. `batchReader.initBatch` may take a long time to alloc memory in some environment and cause errors. 4. `Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))` This PR moves 4 before 2 and 3. To sum up, the new sequence is 1 -> 4 -> 2 -> 3. Manual. The following test case makes OOM intentionally to cause leaked filesystem connection in the current code base. With this patch, leakage doesn't occurs. ```scala // This should be tested manually because it raises OOM intentionally // in order to cause `Leaked filesystem connection`. test("SPARK-23399 Register a task completion listener first for OrcColumnarBatchReader") { withSQLConf(SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") { withTempDir { dir => val basePath = dir.getCanonicalPath Seq(0).toDF("a").write.format("orc").save(new Path(basePath, "first").toString) Seq(1).toDF("a").write.format("orc").save(new Path(basePath, "second").toString) val df = spark.read.orc( new Path(basePath, "first").toString, new Path(basePath, "second").toString) val e = intercept[SparkException] { df.collect() } assert(e.getCause.isInstanceOf[OutOfMemoryError]) } } } ``` Author: Dongjoon Hyun <[email protected]> Closes #20590 from dongjoon-hyun/SPARK-23399. (cherry picked from commit 357babd) Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This PR aims to resolve an open file leakage issue reported at SPARK-23390 by moving the listener registration position. Currently, the sequence is like the following.
batchReaderbatchReader.initializeopens a ORC file.batchReader.initBatchmay take a long time to alloc memory in some environment and cause errors.Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))This PR moves 4 before 2 and 3. To sum up, the new sequence is 1 -> 4 -> 2 -> 3.
How was this patch tested?
Manual. The following test case makes OOM intentionally to cause leaked filesystem connection in the current code base. With this patch, leakage doesn't occurs.