Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 11 additions & 64 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,17 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {

// Otherwise, cache the values and keep track of any updates in block statuses
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
val cachedValues = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is moved from the old putInBlockManager method.

updatedBlocks ++=
blockManager.putIterator(key, computedValues, storageLevel, tellMaster = true)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
val msg = s"Block manager failed to return cached value for $key!"
logInfo(msg)
throw new BlockException(key, msg)
}
}
val metrics = context.taskMetrics
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
Expand Down Expand Up @@ -126,67 +136,4 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
}
}

/**
* Cache the values of a partition, keeping track of any updates in the storage statuses of
* other blocks along the way.
*
* The effective storage level refers to the level that actually specifies BlockManager put
* behavior, not the level originally specified by the user. This is mainly for forcing a
* MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
* while preserving the the original semantics of the RDD as specified by the application.
*/
private def putInBlockManager[T](
key: BlockId,
values: Iterator[T],
level: StorageLevel,
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {

val putLevel = effectiveStorageLevel.getOrElse(level)
if (!putLevel.useMemory) {
/*
* This RDD is not to be cached in memory, so we can just pass the computed values as an
* iterator directly to the BlockManager rather than first fully unrolling it in memory.
*/
updatedBlocks ++=
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
}
} else {
/*
* This RDD is to be cached in memory. In this case we cannot pass the computed values
* to the BlockManager as an iterator and expect to read it back later. This is because
* we may end up dropping a partition from memory store before getting it back.
*
* In addition, we must be careful to not unroll the entire partition in memory at once.
* Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
* single partition. Instead, we unroll the values cautiously, potentially aborting and
* dropping the partition to disk if applicable.
*/
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
case Left(arr) =>
// We have successfully unrolled the entire partition, so cache it in memory
updatedBlocks ++=
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
useOffHeap = false, deserialized = false, putLevel.replication)
putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
} else {
returnValues
}
}
}
}

}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind deleting this block of code is the fact that MemoryStore.putIterator seems to implement essentially the same logic, using unrollSafely and falling back to disk when necessary.

24 changes: 3 additions & 21 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import org.apache.spark.util._
private[spark] sealed trait BlockValues
private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues

/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
Expand Down Expand Up @@ -646,10 +645,9 @@ private[spark] class BlockManager(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
require(values != null, "Values is null")
doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
doPut(blockId, IteratorValues(values), level, tellMaster)
}

/**
Expand All @@ -669,20 +667,6 @@ private[spark] class BlockManager(
syncWrites, writeMetrics, blockId)
}

/**
* Put a new block of values to the block manager.
* Return a list of blocks updated as a result of this put.
*/
def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(values != null, "Values is null")
doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
}

/**
* Put a new block of serialized bytes to the block manager.
* Return a list of blocks updated as a result of this put.
Expand Down Expand Up @@ -803,8 +787,6 @@ private[spark] class BlockManager(
val result = data match {
case IteratorValues(iterator) =>
blockStore.putIterator(blockId, iterator, putLevel, returnValues)
case ArrayValues(array) =>
blockStore.putArray(blockId, array, putLevel, returnValues)
case ByteBufferValues(bytes) =>
bytes.rewind()
blockStore.putBytes(blockId, bytes, putLevel)
Expand Down Expand Up @@ -1052,7 +1034,7 @@ private[spark] class BlockManager(
logInfo(s"Writing block $blockId to disk")
data() match {
case Left(elements) =>
diskStore.putArray(blockId, elements, level, returnValues = false)
diskStore.putIterator(blockId, elements.toIterator, level, returnValues = false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
Expand Down
8 changes: 0 additions & 8 deletions core/src/main/scala/org/apache/spark/storage/BlockStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.storage

import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Logging

/**
Expand All @@ -43,12 +41,6 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends
level: StorageLevel,
returnValues: Boolean): PutResult

def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult

/**
* Return the size of a block in bytes.
*/
Expand Down
9 changes: 0 additions & 9 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode

import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -58,14 +57,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
PutResult(bytes.limit(), Right(bytes.duplicate()))
}

override def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
putIterator(blockId, values.toIterator, level, returnValues)
}

override def putIterator(
blockId: BlockId,
values: Iterator[Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,11 @@ private[spark] class ExternalBlockStore(blockManager: BlockManager, executorId:
putIntoExternalBlockStore(blockId, bytes, returnValues = true)
}

override def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
putIntoExternalBlockStore(blockId, values.toIterator, returnValues)
}

override def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
putIntoExternalBlockStore(blockId, values, returnValues)
}

private def putIntoExternalBlockStore(
blockId: BlockId,
values: Iterator[_],
returnValues: Boolean): PutResult = {
logTrace(s"Attempting to put block $blockId into ExternalBlockStore")
// we should never hit here if externalBlockManager is None. Handle it anyway for safety.
try {
Expand Down
42 changes: 11 additions & 31 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
putIterator(blockId, values, level, returnValues = true)
} else {
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
tryToPut(blockId, () => bytes, bytes.limit, deserialized = false, droppedBlocks)
PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
}
}
Expand All @@ -122,23 +122,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
PutResult(size, data, droppedBlocks)
}

override def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
if (level.deserialized) {
val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
tryToPut(blockId, values, sizeEstimate, deserialized = true, droppedBlocks)
PutResult(sizeEstimate, Left(values.iterator), droppedBlocks)
} else {
val bytes = blockManager.dataSerialize(blockId, values.iterator)
tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
}
}

override def putIterator(
blockId: BlockId,
values: Iterator[Any],
Expand Down Expand Up @@ -170,9 +153,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
unrolledValues match {
case Left(arrayValues) =>
// Values are fully unrolled in memory, so store them as an array
val res = putArray(blockId, arrayValues, level, returnValues)
droppedBlocks ++= res.droppedBlocks
PutResult(res.size, res.data, droppedBlocks)
if (level.deserialized) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an inlining of the previous putArray code.

val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true, droppedBlocks)
PutResult(sizeEstimate, Left(arrayValues.iterator), droppedBlocks)
} else {
val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
tryToPut(blockId, () => bytes, bytes.limit, deserialized = false, droppedBlocks)
PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
}
case Right(iteratorValues) =>
// Not enough space to unroll this block; drop to disk if applicable
if (level.useDisk && allowPersistToDisk) {
Expand Down Expand Up @@ -246,7 +235,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
* This method returns either an array with the contents of the entire block or an iterator
* containing the values of the block (if the array would have exceeded available memory).
*/
def unrollSafely(
private[storage] def unrollSafely(
blockId: BlockId,
values: Iterator[Any],
droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
Expand Down Expand Up @@ -333,15 +322,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
blockId.asRDDId.map(_.rddId)
}

private def tryToPut(
blockId: BlockId,
value: Any,
size: Long,
deserialized: Boolean,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
tryToPut(blockId, () => value, size, deserialized, droppedBlocks)
}

/**
* Try to put in a set of values, if we can free up enough space. The value should either be
* an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
Expand Down