Skip to content

Commit ecf30ee

Browse files
andrewor14mateiz
authored andcommitted
[SPARK-1777] Prevent OOMs from single partitions
**Problem.** When caching, we currently unroll the entire RDD partition before making sure we have enough free memory. This is a common cause for OOMs especially when (1) the BlockManager has little free space left in memory, and (2) the partition is large. **Solution.** We maintain a global memory pool of `M` bytes shared across all threads, similar to the way we currently manage memory for shuffle aggregation. Then, while we unroll each partition, periodically check if there is enough space to continue. If not, drop enough RDD blocks to ensure we have at least `M` bytes to work with, then try again. If we still don't have enough space to unroll the partition, give up and drop the block to disk directly if applicable. **New configurations.** - `spark.storage.bufferFraction` - the value of `M` as a fraction of the storage memory. (default: 0.2) - `spark.storage.safetyFraction` - a margin of safety in case size estimation is slightly off. This is the equivalent of the existing `spark.shuffle.safetyFraction`. (default 0.9) For more detail, see the [design document](https://issues.apache.org/jira/secure/attachment/12651793/spark-1777-design-doc.pdf). Tests pending for performance and memory usage patterns. Author: Andrew Or <[email protected]> Closes #1165 from andrewor14/them-rdd-memories and squashes the following commits: e77f451 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories c7c8832 [Andrew Or] Simplify logic + update a few comments 269d07b [Andrew Or] Very minor changes to tests 6645a8a [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories b7e165c [Andrew Or] Add new tests for unrolling blocks f12916d [Andrew Or] Slightly clean up tests 71672a7 [Andrew Or] Update unrollSafely tests 369ad07 [Andrew Or] Correct ensureFreeSpace and requestMemory behavior f4d035c [Andrew Or] Allow one thread to unroll multiple blocks a66fbd2 [Andrew Or] Rename a few things + update comments 68730b3 [Andrew Or] Fix weird scalatest behavior e40c60d [Andrew Or] Fix MIMA excludes ff77aa1 [Andrew Or] Fix tests 1a43c06 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories b9a6eee [Andrew Or] Simplify locking behavior on unrollMemoryMap ed6cda4 [Andrew Or] Formatting fix (super minor) f9ff82e [Andrew Or] putValues -> putIterator + putArray beb368f [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 8448c9b [Andrew Or] Fix tests a49ba4d [Andrew Or] Do not expose unroll memory check period 69bc0a5 [Andrew Or] Always synchronize on putLock before unrollMemoryMap 3f5a083 [Andrew Or] Simplify signature of ensureFreeSpace dce55c8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 8288228 [Andrew Or] Synchronize put and unroll properly 4f18a3d [Andrew Or] bufferFraction -> unrollFraction 28edfa3 [Andrew Or] Update a few comments / log messages 728323b [Andrew Or] Do not synchronize every 1000 elements 5ab2329 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 129c441 [Andrew Or] Fix bug: Use toArray rather than array 9a65245 [Andrew Or] Update a few comments + minor control flow changes 57f8d85 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories abeae4f [Andrew Or] Add comment clarifying the MEMORY_AND_DISK case 3dd96aa [Andrew Or] AppendOnlyBuffer -> Vector (+ a few small changes) f920531 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 0871835 [Andrew Or] Add an effective storage level interface to BlockManager 64e7d4c [Andrew Or] Add/modify a few comments (minor) 8af2f35 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 4f4834e [Andrew Or] Use original storage level for blocks dropped to disk ecc8c2d [Andrew Or] Fix binary incompatibility 24185ea [Andrew Or] Avoid dropping a block back to disk if reading from disk 2b7ee66 [Andrew Or] Fix bug in SizeTracking* 9b9a273 [Andrew Or] Fix tests 20eb3e5 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 649bdb3 [Andrew Or] Document spark.storage.bufferFraction a10b0e7 [Andrew Or] Add initial memory request threshold + rename a few things e9c3cb0 [Andrew Or] cacheMemoryMap -> unrollMemoryMap 198e374 [Andrew Or] Unfold -> unroll 0d50155 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories d9d02a8 [Andrew Or] Remove unused param in unfoldSafely ec728d8 [Andrew Or] Add tests for safe unfolding of blocks 22b2209 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 078eb83 [Andrew Or] Add check for hasNext in PrimitiveVector.iterator 0871535 [Andrew Or] Fix tests in BlockManagerSuite d68f31e [Andrew Or] Safely unfold blocks for all memory puts 5961f50 [Andrew Or] Fix tests 195abd7 [Andrew Or] Refactor: move unfold logic to MemoryStore 1e82d00 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 3ce413e [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories d5dd3b4 [Andrew Or] Free buffer memory in finally ea02eec [Andrew Or] Fix tests b8e1d9c [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories a8704c1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories e1b8b25 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 87aa75c [Andrew Or] Fix mima excludes again (typo) 11eb921 [Andrew Or] Clarify comment (minor) 50cae44 [Andrew Or] Remove now duplicate mima exclude 7de5ef9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories df47265 [Andrew Or] Fix binary incompatibility 6d05a81 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories f94f5af [Andrew Or] Update a few comments (minor) 776aec9 [Andrew Or] Prevent OOM if a single RDD partition is too large bbd3eea [Andrew Or] Fix CacheManagerSuite to use Array 97ea499 [Andrew Or] Change BlockManager interface to use Arrays c12f093 [Andrew Or] Add SizeTrackingAppendOnlyBuffer and tests
1 parent f6ff2a6 commit ecf30ee

File tree

21 files changed

+1165
-517
lines changed

21 files changed

+1165
-517
lines changed

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
package org.apache.spark
1919

20-
import scala.collection.mutable.{ArrayBuffer, HashSet}
20+
import scala.collection.mutable
21+
import scala.collection.mutable.ArrayBuffer
2122

22-
import org.apache.spark.executor.InputMetrics
2323
import org.apache.spark.rdd.RDD
2424
import org.apache.spark.storage._
2525

@@ -30,7 +30,7 @@ import org.apache.spark.storage._
3030
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
3131

3232
/** Keys of RDD partitions that are being computed/loaded. */
33-
private val loading = new HashSet[RDDBlockId]()
33+
private val loading = new mutable.HashSet[RDDBlockId]
3434

3535
/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
3636
def getOrCompute[T](
@@ -118,36 +118,66 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
118118
}
119119

120120
/**
121-
* Cache the values of a partition, keeping track of any updates in the storage statuses
122-
* of other blocks along the way.
121+
* Cache the values of a partition, keeping track of any updates in the storage statuses of
122+
* other blocks along the way.
123+
*
124+
* The effective storage level refers to the level that actually specifies BlockManager put
125+
* behavior, not the level originally specified by the user. This is mainly for forcing a
126+
* MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
127+
* while preserving the the original semantics of the RDD as specified by the application.
123128
*/
124129
private def putInBlockManager[T](
125130
key: BlockId,
126131
values: Iterator[T],
127-
storageLevel: StorageLevel,
128-
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = {
129-
130-
if (!storageLevel.useMemory) {
131-
/* This RDD is not to be cached in memory, so we can just pass the computed values
132-
* as an iterator directly to the BlockManager, rather than first fully unrolling
133-
* it in memory. The latter option potentially uses much more memory and risks OOM
134-
* exceptions that can be avoided. */
135-
updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
132+
level: StorageLevel,
133+
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
134+
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
135+
136+
val putLevel = effectiveStorageLevel.getOrElse(level)
137+
if (!putLevel.useMemory) {
138+
/*
139+
* This RDD is not to be cached in memory, so we can just pass the computed values as an
140+
* iterator directly to the BlockManager rather than first fully unrolling it in memory.
141+
*/
142+
updatedBlocks ++=
143+
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
136144
blockManager.get(key) match {
137145
case Some(v) => v.data.asInstanceOf[Iterator[T]]
138146
case None =>
139147
logInfo(s"Failure to store $key")
140148
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
141149
}
142150
} else {
143-
/* This RDD is to be cached in memory. In this case we cannot pass the computed values
151+
/*
152+
* This RDD is to be cached in memory. In this case we cannot pass the computed values
144153
* to the BlockManager as an iterator and expect to read it back later. This is because
145-
* we may end up dropping a partition from memory store before getting it back, e.g.
146-
* when the entirety of the RDD does not fit in memory. */
147-
val elements = new ArrayBuffer[Any]
148-
elements ++= values
149-
updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true)
150-
elements.iterator.asInstanceOf[Iterator[T]]
154+
* we may end up dropping a partition from memory store before getting it back.
155+
*
156+
* In addition, we must be careful to not unroll the entire partition in memory at once.
157+
* Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
158+
* single partition. Instead, we unroll the values cautiously, potentially aborting and
159+
* dropping the partition to disk if applicable.
160+
*/
161+
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
162+
case Left(arr) =>
163+
// We have successfully unrolled the entire partition, so cache it in memory
164+
updatedBlocks ++=
165+
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
166+
arr.iterator.asInstanceOf[Iterator[T]]
167+
case Right(it) =>
168+
// There is not enough space to cache this partition in memory
169+
logWarning(s"Not enough space to cache partition $key in memory! " +
170+
s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
171+
val returnValues = it.asInstanceOf[Iterator[T]]
172+
if (putLevel.useDisk) {
173+
logWarning(s"Persisting partition $key to disk instead.")
174+
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
175+
useOffHeap = false, deserialized = false, putLevel.replication)
176+
putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
177+
} else {
178+
returnValues
179+
}
180+
}
151181
}
152182
}
153183

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class SparkEnv (
6767
val metricsSystem: MetricsSystem,
6868
val conf: SparkConf) extends Logging {
6969

70-
// A mapping of thread ID to amount of memory used for shuffle in bytes
70+
// A mapping of thread ID to amount of memory, in bytes, used for shuffle aggregations
7171
// All accesses should be manually synchronized
7272
val shuffleMemoryMap = mutable.HashMap[Long, Long]()
7373

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,11 +266,13 @@ private[spark] class Executor(
266266
}
267267
}
268268
} finally {
269-
// TODO: Unregister shuffle memory only for ResultTask
269+
// Release memory used by this thread for shuffles
270270
val shuffleMemoryMap = env.shuffleMemoryMap
271271
shuffleMemoryMap.synchronized {
272272
shuffleMemoryMap.remove(Thread.currentThread().getId)
273273
}
274+
// Release memory used by this thread for unrolling blocks
275+
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
274276
runningTasks.remove(taskId)
275277
}
276278
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 66 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.util._
3838
private[spark] sealed trait BlockValues
3939
private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
4040
private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
41-
private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues
41+
private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
4242

4343
/* Class for returning a fetched block and associated metrics. */
4444
private[spark] class BlockResult(
@@ -71,9 +71,9 @@ private[spark] class BlockManager(
7171

7272
// Actual storage of where blocks are kept
7373
private var tachyonInitialized = false
74-
private[storage] val memoryStore = new MemoryStore(this, maxMemory)
75-
private[storage] val diskStore = new DiskStore(this, diskBlockManager)
76-
private[storage] lazy val tachyonStore: TachyonStore = {
74+
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
75+
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
76+
private[spark] lazy val tachyonStore: TachyonStore = {
7777
val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
7878
val appFolderName = conf.get("spark.tachyonStore.folderName")
7979
val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
@@ -463,16 +463,17 @@ private[spark] class BlockManager(
463463
val values = dataDeserialize(blockId, bytes)
464464
if (level.deserialized) {
465465
// Cache the values before returning them
466-
// TODO: Consider creating a putValues that also takes in a iterator?
467-
val valuesBuffer = new ArrayBuffer[Any]
468-
valuesBuffer ++= values
469-
memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data
470-
match {
471-
case Left(values2) =>
472-
return Some(new BlockResult(values2, DataReadMethod.Disk, info.size))
473-
case _ =>
474-
throw new SparkException("Memory store did not return back an iterator")
475-
}
466+
val putResult = memoryStore.putIterator(
467+
blockId, values, level, returnValues = true, allowPersistToDisk = false)
468+
// The put may or may not have succeeded, depending on whether there was enough
469+
// space to unroll the block. Either way, the put here should return an iterator.
470+
putResult.data match {
471+
case Left(it) =>
472+
return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
473+
case _ =>
474+
// This only happens if we dropped the values back to disk (which is never)
475+
throw new SparkException("Memory store did not return an iterator!")
476+
}
476477
} else {
477478
return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
478479
}
@@ -561,13 +562,14 @@ private[spark] class BlockManager(
561562
iter
562563
}
563564

564-
def put(
565+
def putIterator(
565566
blockId: BlockId,
566567
values: Iterator[Any],
567568
level: StorageLevel,
568-
tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = {
569+
tellMaster: Boolean = true,
570+
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
569571
require(values != null, "Values is null")
570-
doPut(blockId, IteratorValues(values), level, tellMaster)
572+
doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
571573
}
572574

573575
/**
@@ -589,13 +591,14 @@ private[spark] class BlockManager(
589591
* Put a new block of values to the block manager.
590592
* Return a list of blocks updated as a result of this put.
591593
*/
592-
def put(
594+
def putArray(
593595
blockId: BlockId,
594-
values: ArrayBuffer[Any],
596+
values: Array[Any],
595597
level: StorageLevel,
596-
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
598+
tellMaster: Boolean = true,
599+
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
597600
require(values != null, "Values is null")
598-
doPut(blockId, ArrayBufferValues(values), level, tellMaster)
601+
doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
599602
}
600603

601604
/**
@@ -606,19 +609,33 @@ private[spark] class BlockManager(
606609
blockId: BlockId,
607610
bytes: ByteBuffer,
608611
level: StorageLevel,
609-
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
612+
tellMaster: Boolean = true,
613+
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
610614
require(bytes != null, "Bytes is null")
611-
doPut(blockId, ByteBufferValues(bytes), level, tellMaster)
615+
doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
612616
}
613617

618+
/**
619+
* Put the given block according to the given level in one of the block stores, replicating
620+
* the values if necessary.
621+
*
622+
* The effective storage level refers to the level according to which the block will actually be
623+
* handled. This allows the caller to specify an alternate behavior of doPut while preserving
624+
* the original level specified by the user.
625+
*/
614626
private def doPut(
615627
blockId: BlockId,
616628
data: BlockValues,
617629
level: StorageLevel,
618-
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
630+
tellMaster: Boolean = true,
631+
effectiveStorageLevel: Option[StorageLevel] = None)
632+
: Seq[(BlockId, BlockStatus)] = {
619633

620634
require(blockId != null, "BlockId is null")
621635
require(level != null && level.isValid, "StorageLevel is null or invalid")
636+
effectiveStorageLevel.foreach { level =>
637+
require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
638+
}
622639

623640
// Return value
624641
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
@@ -657,13 +674,16 @@ private[spark] class BlockManager(
657674
// Size of the block in bytes
658675
var size = 0L
659676

677+
// The level we actually use to put the block
678+
val putLevel = effectiveStorageLevel.getOrElse(level)
679+
660680
// If we're storing bytes, then initiate the replication before storing them locally.
661681
// This is faster as data is already serialized and ready to send.
662682
val replicationFuture = data match {
663-
case b: ByteBufferValues if level.replication > 1 =>
683+
case b: ByteBufferValues if putLevel.replication > 1 =>
664684
// Duplicate doesn't copy the bytes, but just creates a wrapper
665685
val bufferView = b.buffer.duplicate()
666-
Future { replicate(blockId, bufferView, level) }
686+
Future { replicate(blockId, bufferView, putLevel) }
667687
case _ => null
668688
}
669689

@@ -676,18 +696,18 @@ private[spark] class BlockManager(
676696
// returnValues - Whether to return the values put
677697
// blockStore - The type of storage to put these values into
678698
val (returnValues, blockStore: BlockStore) = {
679-
if (level.useMemory) {
699+
if (putLevel.useMemory) {
680700
// Put it in memory first, even if it also has useDisk set to true;
681701
// We will drop it to disk later if the memory store can't hold it.
682702
(true, memoryStore)
683-
} else if (level.useOffHeap) {
703+
} else if (putLevel.useOffHeap) {
684704
// Use tachyon for off-heap storage
685705
(false, tachyonStore)
686-
} else if (level.useDisk) {
706+
} else if (putLevel.useDisk) {
687707
// Don't get back the bytes from put unless we replicate them
688-
(level.replication > 1, diskStore)
708+
(putLevel.replication > 1, diskStore)
689709
} else {
690-
assert(level == StorageLevel.NONE)
710+
assert(putLevel == StorageLevel.NONE)
691711
throw new BlockException(
692712
blockId, s"Attempted to put block $blockId without specifying storage level!")
693713
}
@@ -696,22 +716,22 @@ private[spark] class BlockManager(
696716
// Actually put the values
697717
val result = data match {
698718
case IteratorValues(iterator) =>
699-
blockStore.putValues(blockId, iterator, level, returnValues)
700-
case ArrayBufferValues(array) =>
701-
blockStore.putValues(blockId, array, level, returnValues)
719+
blockStore.putIterator(blockId, iterator, putLevel, returnValues)
720+
case ArrayValues(array) =>
721+
blockStore.putArray(blockId, array, putLevel, returnValues)
702722
case ByteBufferValues(bytes) =>
703723
bytes.rewind()
704-
blockStore.putBytes(blockId, bytes, level)
724+
blockStore.putBytes(blockId, bytes, putLevel)
705725
}
706726
size = result.size
707727
result.data match {
708-
case Left (newIterator) if level.useMemory => valuesAfterPut = newIterator
728+
case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
709729
case Right (newBytes) => bytesAfterPut = newBytes
710730
case _ =>
711731
}
712732

713733
// Keep track of which blocks are dropped from memory
714-
if (level.useMemory) {
734+
if (putLevel.useMemory) {
715735
result.droppedBlocks.foreach { updatedBlocks += _ }
716736
}
717737

@@ -742,7 +762,7 @@ private[spark] class BlockManager(
742762

743763
// Either we're storing bytes and we asynchronously started replication, or we're storing
744764
// values and need to serialize and replicate them now:
745-
if (level.replication > 1) {
765+
if (putLevel.replication > 1) {
746766
data match {
747767
case ByteBufferValues(bytes) =>
748768
if (replicationFuture != null) {
@@ -758,15 +778,15 @@ private[spark] class BlockManager(
758778
}
759779
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
760780
}
761-
replicate(blockId, bytesAfterPut, level)
781+
replicate(blockId, bytesAfterPut, putLevel)
762782
logDebug("Put block %s remotely took %s"
763783
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
764784
}
765785
}
766786

767787
BlockManager.dispose(bytesAfterPut)
768788

769-
if (level.replication > 1) {
789+
if (putLevel.replication > 1) {
770790
logDebug("Putting block %s with replication took %s"
771791
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
772792
} else {
@@ -818,7 +838,7 @@ private[spark] class BlockManager(
818838
value: Any,
819839
level: StorageLevel,
820840
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
821-
put(blockId, Iterator(value), level, tellMaster)
841+
putIterator(blockId, Iterator(value), level, tellMaster)
822842
}
823843

824844
/**
@@ -829,7 +849,7 @@ private[spark] class BlockManager(
829849
*/
830850
def dropFromMemory(
831851
blockId: BlockId,
832-
data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = {
852+
data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
833853

834854
logInfo(s"Dropping block $blockId from memory")
835855
val info = blockInfo.get(blockId).orNull
@@ -853,7 +873,7 @@ private[spark] class BlockManager(
853873
logInfo(s"Writing block $blockId to disk")
854874
data match {
855875
case Left(elements) =>
856-
diskStore.putValues(blockId, elements, level, returnValues = false)
876+
diskStore.putArray(blockId, elements, level, returnValues = false)
857877
case Right(bytes) =>
858878
diskStore.putBytes(blockId, bytes, level)
859879
}
@@ -1068,9 +1088,11 @@ private[spark] class BlockManager(
10681088
private[spark] object BlockManager extends Logging {
10691089
private val ID_GENERATOR = new IdGenerator
10701090

1091+
/** Return the total amount of storage memory available. */
10711092
private def getMaxMemory(conf: SparkConf): Long = {
10721093
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
1073-
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
1094+
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
1095+
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
10741096
}
10751097

10761098
def getHeartBeatFrequency(conf: SparkConf): Long =

0 commit comments

Comments
 (0)