-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-13992] Add support for off-heap caching #11805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #53483 has finished for PR 11805 at commit
|
|
Test build #53819 has finished for PR 11805 at commit
|
|
Test build #53825 has finished for PR 11805 at commit
|
bf62983 to
5a15de2
Compare
|
Test build #54096 has finished for PR 11805 at commit
|
5a15de2 to
3c122af
Compare
|
Test build #54125 has finished for PR 11805 at commit
|
|
Test build #54215 has finished for PR 11805 at commit
|
|
/cc @andrewor14, @nongli, @rxin, can you glance over the PR description to check whether the user-facing semantics are okay for now? We might want to change some bits later (maybe to add separate nice shorthands for off-heap-memory-only vs off-heap-memory-and-disk), but I'd like to understand whether any semantics need to change here. |
|
Looks pretty good actually. There are some details about naming and on/off defaults we need to figure out, but that can go in later. |
|
Test build #54239 has finished for PR 11805 at commit
|
…ryManager This patch extends Spark's `UnifiedMemoryManager` to add bookkeeping support for off-heap storage memory, an requirement for enabling off-heap caching (which will be done by #11805). The `MemoryManager`'s `storageMemoryPool` has been split into separate on- and off-heap pools and the storage and unroll memory allocation methods have been updated to accept a `memoryMode` parameter to specify whether allocations should be performed on- or off-heap. In order to reduce the testing surface, the `StaticMemoryManager` does not support off-heap caching (we plan to eventually remove the `StaticMemoryManager`, so this isn't a significant limitation). Author: Josh Rosen <[email protected]> Closes #11942 from JoshRosen/off-heap-storage-memory-bookkeeping.
…lly serialized block.
4d9489a to
df8be62
Compare
|
Test build #54272 has finished for PR 11805 at commit
|
|
Discovered a potentially major problem: if we bail out of a getOrElseUpdate call and don't end up caching a block and then do not fully consume or dispose of the resulting valuesIterator then we might leak off-heap memory. This problem didn't exist before because we'd never use off-heap memory for unroll space. I may have to add some task completion callbacks to dispose of PartiallySerializedBlocks. |
|
Alright, this should now be ready for an initial review pass. I just pushed a somewhat hacky patch to work around the direct buffer allocation limit (42d0356) and added a few simple tests. I also addressed the potential leak of direct memory in cases where partially serialized blocks' iterators aren't fully consumed. I'm sure that this probably needs a bit more work and I'll do more self-review tomorrow once I've had time to set this aside for a bit. |
|
/cc @andrewor14 @davies for review |
| package org.apache.spark.streaming.rdd | ||
|
|
||
| import java.io.File | ||
| import java.nio.ByteBuffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used
|
@JoshRosen Overall looks good. I have not verified comprehensively whether all the things are |
| val transfer = transferService | ||
| .getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 1)) | ||
| val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) | ||
| val memManager = UnifiedMemoryManager(conf, numCores = 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great!
|
Forgot to mention, you'll need to update documentation for |
| * and then serializing the values from the original input iterator. | ||
| */ | ||
| def finishWritingToStream(os: OutputStream): Unit = { | ||
| ByteStreams.copy(unrolled.toInputStream(), os) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's actually a bug here: we aren't guaranteed to free the actual offHeap memory here. I think that in my benchmarks it just so happened to be cleaned by the sun.misc.Cleaner that I attached to the DirectByteBuffer in my custom allocator. I'm going to take a closer look at this block to try to fix the leak issues here.
|
Test build #54678 has finished for PR 11805 at commit
|
|
Test build #54681 has finished for PR 11805 at commit
|
|
Test build #54682 has finished for PR 11805 at commit
|
|
deep-review this please |
|
@andrewor14 Review complete. In the future we should add tests to ensure buffers are released. @JoshRosen please file a JIRA for this issue. LGTM. |
|
Alright, I'm going to merge this into master and will file followup JIRAs to make sure that we update the user-facing documentation and add more tests before 2.0. There are a number of questions to answer about how best to expose the relevant off-heap configurations to users, but I'd rather address them separately. |
|
Filed https://issues.apache.org/jira/browse/SPARK-14336 as followup |
This patch adds support for caching blocks in the executor processes using direct / off-heap memory.
User-facing changes
Updated semantics of
OFF_HEAPstorage level: In Spark 1.x, theOFF_HEAPstorage level indicated that an RDD should be cached in Tachyon. Spark 2.x removed the external block store API that Tachyon caching was based on (see #10752 / SPARK-12667), soOFF_HEAPbecame an alias forMEMORY_ONLY_SER. As of this patch,OFF_HEAPmeans "serialized and cached in off-heap memory or on disk". Via theStorageLevelconstructor,useOffHeapcan be set ifserialized == trueand can be used to construct custom storage levels which support replication.Storage UI reporting: the storage UI will now report whether in-memory blocks are stored on- or off-heap.
Only supported by UnifiedMemoryManager: for simplicity, this feature is only supported when the default UnifiedMemoryManager is used; applications which use the legacy memory manager (
spark.memory.useLegacyMode=true) are not currently able to allocate off-heap storage memory, so using off-heap caching will fail with an error when legacy memory management is enabled. Given that we plan to eventually remove the legacy memory manager, this is not a significant restriction.Memory management policies: the policies for dividing available memory between execution and storage are the same for both on- and off-heap memory. For off-heap memory, the total amount of memory available for use by Spark is controlled by
spark.memory.offHeap.size, which is an absolute size. Off-heap storage memory obeysspark.memory.storageFractionin order to control the amount of unevictable storage memory. For example, ifspark.memory.offHeap.sizeis 1 gigabyte and Spark uses the defaultstorageFractionof 0.5, then up to 500 megabytes of off-heap cached blocks will be protected from eviction due to execution memory pressure. If necessary, we can splitspark.memory.storageFractioninto separate on- and off-heap configurations, but this doesn't seem necessary now and can be done later without any breaking changes.Use of off-heap memory does not imply use of off-heap execution (or vice-versa): for now, the settings controlling the use of off-heap execution memory (
spark.memory.offHeap.enabled) and off-heap caching are completely independent, so Spark SQL can be configured to use off-heap memory for execution while continuing to cache blocks on-heap. If desired, we can change this in a followup patch so thatspark.memory.offHeap.enabledaffect the default storage level for cached SQL tables.Internal changes
ByteArrayChunkOutputStreamtoChunkedByteBufferOutputStreamChunkedByteBufferinstead of an array of byte arrays.allocatorfunction which is called to allocateByteBuffers. This allows us to control whether it allocates regular ByteBuffers or off-heap DirectByteBuffers.ChunkedByteBufferOutputStreamwhich is configured with aDirectByteBufferallocator will use off-heap memory for both unroll and storage memory.MemoryStore's MemoryEntries now tracks whether blocks are stored on- or off-heap.evictBlocksToFreeSpace()now accepts aMemoryModeparameter so that we don't try to evict off-heap blocks in response to on-heap memory pressure (or vice-versa).-XX:MaxDirectMemorySizeflag and the default tends to be fairly low (< 512 megabytes in some JVMs). To work around this limitation, this patch adds a custom DirectByteBuffer allocator which ignores this memory limit.