Skip to content

Conversation

@liutang123
Copy link
Contributor

@liutang123 liutang123 commented Jan 8, 2018

What changes were proposed in this pull request?

ChainedIterator.UnsafeExternalSorter remains a Queue of UnsafeSorterIterator. When call getIterator function of UnsafeExternalSorter, UnsafeExternalSorter passes an ArrayList of UnsafeSorterSpillReader to the constructor of UnsafeExternalSorter. But, UnsafeSorterSpillReader maintains a byte array as buffer, witch capacity is more than 1 MB. When spilling frequently, this case maybe causes OOM.

In this PR, I try to change buffer allocation in UnsafeSorterSpillReader lazily.

How was this patch tested?

Existing tests.

@jerryshao
Copy link
Contributor

@liutang123 , can you please tell us how to produce your issue easily?

@liutang123
Copy link
Contributor Author

Hi, @jerryshao , we can produce this issue as follows:

$ bin/spark-shell --master local --conf spark.sql.windowExec.buffer.spill.threshold=1 --driver-memory 1G 
scala>sc.range(1, 2000).toDF.registerTempTable("test_table")
scala>spark.sql("select row_number() over (partition by 1)  from test_table").collect

This will cause OOM.
The above case is an extreme case.
Normally, the spark.sql.windowExec.buffer.spill.threshold is 4096 by default and the cache used in UnsafeSorterSpillReader is more than 1MB. When the rows in a window is more than 4096000, UnsafeExternalSorter.ChainedIterator will has a queue witch contains 1000 UnsafeSorterSpillReader(s). So, the queue costs a lot of memory and is liable to cause OOM.

@jerryshao
Copy link
Contributor

Thanks, let me try to reproduce it locally.

@jerryshao
Copy link
Contributor

The code here should be fine for normal case. The problem is that there're so many spill files, which requires to maintain lots of handler's buffer. A lazy buffer allocation could solve this problem, IIUC. It is not related to queue or something else.

@liutang123
Copy link
Contributor Author

I think that a lazy buffer allocation can not thoroughly solve this problem because UnsafeSorterSpillReader has BufferedFileInputStream witch will allocate off heap memory.

@jerryshao
Copy link
Contributor

I think that a lazy buffer allocation can not thoroughly solve this problem because UnsafeSorterSpillReader has BufferedFileInputStream witch will allocate off heap memory.

Can you please explain more. From my understanding the off heap memory in BufferedFileInputStream is the key issue for your scenario here. I don't think the logics you changed in ChainedIterator matters a lot. So a lazy allocation of off-heap memory should be enough IIUC.

@liutang123
Copy link
Contributor Author

hi, @jerryshao , I try lazily allocate all the InputStream and byte arr in UnsafeSorterSpillReader.
And would you please look at this when you have time?

@gatorsmile
Copy link
Member

cc @jiangxb1987

baseObject = arr;
}
ByteStreams.readFully(in, arr, 0, recordLength);
ByteStreams.readFully(getIn(), arr, 0, recordLength);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it fine if recordLength is greater than 1024 * 1024?

@JoshRosen
Copy link
Contributor

JoshRosen commented Jul 14, 2019

I stumbled across this PR while looking through the open Spark core PRs.

It sounds like the problem here is that we don't need to allocate the input stream and read buffer until it's actually time to read the spill, but we're currently doing that too early:

  • In getSortedIterator(), we have to construct all readers before we can return the first record because we must find the first record according to the sorted ordering and that requires looking at all spill files.
  • However, we do not have this constraint in getIterator(), which returns an unsorted iterator and is used in ExternalAppendOnlyUnsafeRowArray (which uses the sorter only for its spilling capabilities, not for sorting). In this case, we can initialize one-at-a-time only once we actually need to read the spill.

Given this context, lazy initialization makes sense to me. However, this PR is a bit outdated and has some merge conflicts. I would be supportive of this change if the conflicts are resolved and the PR description is updated.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants