Skip to content

Conversation

@rxin
Copy link
Contributor

@rxin rxin commented Jan 14, 2016

This pull request removes the external block store API. This is rarely used, and the file system interface is actually a better, more standard way to interact with external storage systems.

There are some other things to remove also, as pointed out by @JoshRosen. We will do those as follow-up pull requests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment out-of-date now that we've removed the external block store API?

@SparkQA
Copy link

SparkQA commented Jan 14, 2016

Test build #49373 has finished for PR 10752 at commit dabcf21.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PageView(val url: String, val status: Int, val zipCode: Int, val userID: Int)
    • class FlumeEventServer(receiver: FlumeReceiver) extends AvroSourceProtocol
    • abstract class Covariance(left: Expression, right: Expression) extends ImperativeAggregate
    • case class CovSample(
    • case class CovPopulation(
    • case class CaseWhen(branches: Seq[(Expression, Expression)], elseValue: Option[Expression] = None)
    • public abstract class ColumnVector
    • public final class ColumnarBatch
    • public final class Row extends InternalRow
    • public final class OffHeapColumnVector extends ColumnVector
    • public final class OnHeapColumnVector extends ColumnVector
    • class SparkOptimizer(val sqlContext: SQLContext)
    • class ObjectInputStreamWithLoader(_inputStream: InputStream, loader: ClassLoader)
    • class ConstantInputDStream[T: ClassTag](_ssc: StreamingContext, rdd: RDD[T])
    • abstract class InputDStream[T: ClassTag] (_ssc: StreamingContext)
    • abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)

@JoshRosen
Copy link
Contributor

There's some documentation which needs to be updated:

  • Remove spark.externalBlockStore.urlspark.externalBlockStore.* configurations from the configuration page.
  • Update the documentation for the OFF_HEAP storage level.
  • There's still occurrences of the string "external block store" in BlockManager comments / log messages.
  • The UI still contains text which references the external block store.

In addition, there are a few more places where dead code can be deleted:

  • In ExecutorExitCode, remove EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE and EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR.
  • The RDDInfo @DeveloperAPI should be updated to either deprecate the external block store size fields or to remove them.
  • Log messages can be updated to omit the external block store size information, since it will always be zero now.

@SparkQA
Copy link

SparkQA commented Jan 14, 2016

Test build #49377 has finished for PR 10752 at commit ae0e60e.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2016

Test build #49380 has finished for PR 10752 at commit 34ffce7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2016

Test build #49390 has finished for PR 10752 at commit 71ddeac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin rxin changed the title [SPARK-12667] Remove block manager's internal "external block store" API - WIP [SPARK-12667] Remove block manager's internal "external block store" API Jan 14, 2016
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why explicitly specify the fields here? type safety?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea - it is too easy to swap memSize / diskSize.

@andrewor14
Copy link
Contributor

LGTM, pretty straightforward change. I confirmed that JSON parts are backward compatible.

@JoshRosen
Copy link
Contributor

LGTM as well. I'm going to merge this now in order to unblock progress on other PRs that touch BlockManager. For any smaller changes, such as the doc updates, let's defer them to followups.

@asfgit asfgit closed this in ad1503f Jan 15, 2016
asfgit pushed a commit that referenced this pull request Apr 1, 2016
This patch adds support for caching blocks in the executor processes using direct / off-heap memory.

## User-facing changes

**Updated semantics of `OFF_HEAP` storage level**: In Spark 1.x, the `OFF_HEAP` storage level indicated that an RDD should be cached in Tachyon. Spark 2.x removed the external block store API that Tachyon caching was based on (see #10752 / SPARK-12667), so `OFF_HEAP` became an alias for `MEMORY_ONLY_SER`. As of this patch, `OFF_HEAP` means "serialized and cached in off-heap memory or on disk". Via the `StorageLevel` constructor, `useOffHeap` can be set if `serialized == true` and can be used to construct custom storage levels which support replication.

**Storage UI reporting**: the storage UI will now report whether in-memory blocks are stored on- or off-heap.

**Only supported by UnifiedMemoryManager**: for simplicity, this feature is only supported when the default UnifiedMemoryManager is used; applications which use the legacy memory manager (`spark.memory.useLegacyMode=true`) are not currently able to allocate off-heap storage memory, so using off-heap caching will fail with an error when legacy memory management is enabled. Given that we plan to eventually remove the legacy memory manager, this is not a significant restriction.

**Memory management policies:** the policies for dividing available memory between execution and storage are the same for both on- and off-heap memory. For off-heap memory, the total amount of memory available for use by Spark is controlled by `spark.memory.offHeap.size`, which is an absolute size. Off-heap storage memory obeys `spark.memory.storageFraction` in order to control the amount of unevictable storage memory. For example, if `spark.memory.offHeap.size` is 1 gigabyte and Spark uses the default `storageFraction` of 0.5, then up to 500 megabytes of off-heap cached blocks will be protected from eviction due to execution memory pressure. If necessary, we can split `spark.memory.storageFraction` into separate on- and off-heap configurations, but this doesn't seem necessary now and can be done later without any breaking changes.

**Use of off-heap memory does not imply use of off-heap execution (or vice-versa)**: for now, the settings controlling the use of off-heap execution memory (`spark.memory.offHeap.enabled`) and off-heap caching are completely independent, so Spark SQL can be configured to use off-heap memory for execution while continuing to cache blocks on-heap. If desired, we can change this in a followup patch so that `spark.memory.offHeap.enabled` affect the default storage level for cached SQL tables.

## Internal changes

- Rename `ByteArrayChunkOutputStream` to `ChunkedByteBufferOutputStream`
  - It now returns a `ChunkedByteBuffer` instead of an array of byte arrays.
  - Its constructor now accept an `allocator` function which is called to allocate `ByteBuffer`s. This allows us to control whether it allocates regular ByteBuffers or off-heap DirectByteBuffers.
  - Because block serialization is now performed during the unroll process, a `ChunkedByteBufferOutputStream` which is configured with a `DirectByteBuffer` allocator will use off-heap memory for both unroll and storage memory.
- The `MemoryStore`'s MemoryEntries now tracks whether blocks are stored on- or off-heap.
  - `evictBlocksToFreeSpace()` now accepts a `MemoryMode` parameter so that we don't try to evict off-heap blocks in response to on-heap memory pressure (or vice-versa).
- Make sure that off-heap buffers are properly de-allocated during MemoryStore eviction.
- The JVM limits the total size of allocated direct byte buffers using the `-XX:MaxDirectMemorySize` flag and the default tends to be fairly low (< 512 megabytes in some JVMs). To work around this limitation, this patch adds a custom DirectByteBuffer allocator which ignores this memory limit.

Author: Josh Rosen <[email protected]>

Closes #11805 from JoshRosen/off-heap-caching.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants