-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-12817] Add BlockManager.getOrElseUpdate and remove CacheManager #11436
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Previously these methods would downgrade the exclusive write lock to a shared read lock, but this behavior is only needed in one place (CacheManager) and I'm planning to replace that with a BlockManager getOrElseUpdate method, so it makes sense to make lock downgrading the exception rather than the common case.
| }.cache() | ||
| } | ||
|
|
||
| test("get uncached rdd") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Open question: which test cases do we need to preserve in the new code? Should I test this more end-to-end to make sure that caching still works (i.e. doesn't recompute + stores blocks as side effect), or is that handled at a higher level of the stack in existing tests?
|
much simpler! |
|
Test build #52200 has finished for PR 11436 at commit
|
|
There's one corner-case which this patch doesn't handle correctly: if we are trying to cache a block, start unrolling the iterator, and then conclude that the iterator won't fit in memory and we can't drop it down to disk, then we need to have a way to keep the values that we've already computed. TL;DR: the memoryStore itself now needs a |
|
Alright, took another shot at addressing the correctness issue. Will address other comments after tests pass. |
| if (bytesAfterPut == null) { | ||
| if (valuesAfterPut == null) { | ||
| throw new SparkException( | ||
| "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/cc @tdas, the goal of this change is to avoid attempting to replicate deserialized, memory-only blocks if their initial cache / persist fails due to a lack of memory.
For the purposes of this patch, we need to do this to prevent the iterator from being consumed so that it can be passed back to the caller. More generally, though, I think that we should have this change to avoid OOMs by trying to serialize an entire partition which was too large to be stored.
|
Test build #52212 has finished for PR 11436 at commit
|
|
Test build #52226 has finished for PR 11436 at commit
|
| } | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add that this is only called on the executors? (otherwise people might wonder why we don't just use sc.env instead of SparkEnv.get)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
@JoshRosen This looks good, but still somewhat complex. In particular, there are a few unintuitive signatures in |
| existingMetrics.incBytesReadInternal(blockResult.bytes) | ||
| new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) { | ||
| override def next(): T = { | ||
| existingMetrics.incRecordsReadInternal(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
crazy idea: what if we push all of this metrics code into getOrElseUpdate? Then we don't need the complicated left or right signature anymore and we can contain all the complexity within BlockManager itself. We might need to pass in TaskContext but I think it's pretty doable to make getOrElseUpdate return an Iterator[T] instead. Or just do TaskContext.get there; doPut already does that anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered this but had some reservations due to the fact that we might not want to consider every read of a BlockManager iterator to be a read of an input record (since we also read from iterators in the implementation of getSingle() and in a couple of other places, and those methods are used for things like TorrentBroadcast blocks which I don't think we want to treat as input records in for the purposes of these metrics).
On the other hand, this getOrElseUpdate method really only makes sense in the context of caching code anyways, so maybe it's fine to couple its semantics a little more tightly to its only current use.
|
LGTM. There are still a few clunky interfaces but there's not much we can do about them without doing a bigger refactor, which is outside the scope. I think the main confusing bit is now resolved so this is just waiting for tests. |
|
Test build #52269 has finished for PR 11436 at commit
|
|
Test build #52271 has finished for PR 11436 at commit
|
|
Yeah, I wish there was a straightforward way to simplify this code, but I think that a lot of the remaining complexity is pretty inherent in the current requirements / behaviors. I'm going to merge this in a little bit. |
|
Test build #52294 has finished for PR 11436 at commit
|
|
Merged into master. |
## Motivation As a pre-requisite to off-heap caching of blocks, we need a mechanism to prevent pages / blocks from being evicted while they are being read. With on-heap objects, evicting a block while it is being read merely leads to memory-accounting problems (because we assume that an evicted block is a candidate for garbage-collection, which will not be true during a read), but with off-heap memory this will lead to either data corruption or segmentation faults. ## Changes ### BlockInfoManager and reader/writer locks This patch adds block-level read/write locks to the BlockManager. It introduces a new `BlockInfoManager` component, which is contained within the `BlockManager`, holds the `BlockInfo` objects that the `BlockManager` uses for tracking block metadata, and exposes APIs for locking blocks in either shared read or exclusive write modes. `BlockManager`'s `get*()` and `put*()` methods now implicitly acquire the necessary locks. After a `get()` call successfully retrieves a block, that block is locked in a shared read mode. A `put()` call will block until it acquires an exclusive write lock. If the write succeeds, the write lock will be downgraded to a shared read lock before returning to the caller. This `put()` locking behavior allows us store a block and then immediately turn around and read it without having to worry about it having been evicted between the write and the read, which will allow us to significantly simplify `CacheManager` in the future (see #10748). See `BlockInfoManagerSuite`'s test cases for a more detailed specification of the locking semantics. ### Auto-release of locks at the end of tasks Our locking APIs support explicit release of locks (by calling `unlock()`), but it's not always possible to guarantee that locks will be released prior to the end of the task. One reason for this is our iterator interface: since our iterators don't support an explicit `close()` operator to signal that no more records will be consumed, operations like `take()` or `limit()` don't have a good means to release locks on their input iterators' blocks. Another example is broadcast variables, whose block locks can only be released at the end of the task. To address this, `BlockInfoManager` uses a pair of maps to track the set of locks acquired by each task. Lock acquisitions automatically record the current task attempt id by obtaining it from `TaskContext`. When a task finishes, code in `Executor` calls `BlockInfoManager.unlockAllLocksForTask(taskAttemptId)` to free locks. ### Locking and the MemoryStore In order to prevent in-memory blocks from being evicted while they are being read, the `MemoryStore`'s `evictBlocksToFreeSpace()` method acquires write locks on blocks which it is considering as candidates for eviction. These lock acquisitions are non-blocking, so a block which is being read will not be evicted. By holding write locks until the eviction is performed or skipped (in case evicting the blocks would not free enough memory), we avoid a race where a new reader starts to read a block after the block has been marked as an eviction candidate but before it has been removed. ### Locking and remote block transfer This patch makes small changes to to block transfer and network layer code so that locks acquired by the BlockTransferService are released as soon as block transfer messages are consumed and released by Netty. This builds on top of #11193, a bug fix related to freeing of network layer ManagedBuffers. ## FAQ - **Why not use Java's built-in [`ReadWriteLock`](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReadWriteLock.html)?** Our locks operate on a per-task rather than per-thread level. Under certain circumstances a task may consist of multiple threads, so using `ReadWriteLock` would mean that we might call `unlock()` from a thread which didn't hold the lock in question, an operation which has undefined semantics. If we could rely on Java 8 classes, we might be able to use [`StampedLock`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/StampedLock.html) to work around this issue. - **Why not detect "leaked" locks in tests?**: See above notes about `take()` and `limit`. Author: Josh Rosen <[email protected]> Closes #10705 from JoshRosen/pin-pages.
CacheManager directly calls MemoryStore.unrollSafely() and has its own logic for handling graceful fallback to disk when cached data does not fit in memory. However, this logic also exists inside of the MemoryStore itself, so this appears to be unnecessary duplication. Thanks to the addition of block-level read/write locks in apache#10705, we can refactor the code to remove the CacheManager and replace it with an atomic `BlockManager.getOrElseUpdate()` method. This pull request replaces / subsumes apache#10748. /cc andrewor14 and nongli for review. Note that this changes the locking semantics of a couple of internal BlockManager methods (`doPut()` and `lockNewBlockForWriting`), so please pay attention to the Scaladoc changes and new test cases for those methods. Author: Josh Rosen <[email protected]> Closes apache#11436 from JoshRosen/remove-cachemanager.
CacheManager directly calls MemoryStore.unrollSafely() and has its own logic for handling graceful fallback to disk when cached data does not fit in memory. However, this logic also exists inside of the MemoryStore itself, so this appears to be unnecessary duplication.
Thanks to the addition of block-level read/write locks in #10705, we can refactor the code to remove the CacheManager and replace it with an atomic
BlockManager.getOrElseUpdate()method.This pull request replaces / subsumes #10748.
/cc @andrewor14 and @nongli for review. Note that this changes the locking semantics of a couple of internal BlockManager methods (
doPut()andlockNewBlockForWriting), so please pay attention to the Scaladoc changes and new test cases for those methods.