Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9f7ee01
Off-heap caching changes.
JoshRosen Mar 24, 2016
2c1b558
Update HistoryServer tests to reflect new StorageLevel.toString
JoshRosen Mar 25, 2016
9607220
Fix problems in ChunkedByteBufferOutputStream tests.
JoshRosen Mar 25, 2016
f8951b2
Fix missing classTag in deserialization.
JoshRosen Mar 25, 2016
f8ba8d6
Use UnifiedMemoryManager in BlockManager test suites.
JoshRosen Mar 25, 2016
df8be62
Discard extra data written by serializer close when discarding partia…
JoshRosen Mar 25, 2016
f13031e
Merge remote-tracking branch 'origin/master' into off-heap-caching
JoshRosen Mar 28, 2016
af90073
Tests and several bugfixes.
JoshRosen Mar 29, 2016
33c9d9b
Merge remote-tracking branch 'origin/master' into off-heap-caching
JoshRosen Mar 29, 2016
74427e9
Add replication tests and fix more bugs.
JoshRosen Mar 29, 2016
8a0f944
Fix accidental early release; dispose on partial read.
JoshRosen Mar 29, 2016
42d0356
Use crazy hack to allocate DirectBuffers while ignoring JVM limit.
JoshRosen Mar 29, 2016
a7e0420
Merge remote-tracking branch 'origin/master' into off-heap-caching
JoshRosen Mar 30, 2016
a5e9f0d
Indentation fix.
JoshRosen Mar 30, 2016
0a01768
Merge remote-tracking branch 'origin/master' into off-heap-caching
JoshRosen Mar 31, 2016
8a702b4
Simplify StorageLevel.toString().
JoshRosen Mar 31, 2016
7816118
Remove unused import.
JoshRosen Mar 31, 2016
047b16d
Remove old TODO.
JoshRosen Mar 31, 2016
fc1eed2
Fix style nit.
JoshRosen Mar 31, 2016
e5f30c9
Expand Platform.allocateDirectBuffer comment.
JoshRosen Mar 31, 2016
b1fd4a7
Add clarifying comment to offHeapUnrollMemoryMap.
JoshRosen Mar 31, 2016
61920a9
Use allocator in ChunkedByteBuffer.copy().
JoshRosen Mar 31, 2016
fde020f
Address possible direct buffer leak.
JoshRosen Apr 1, 2016
a604078
close() idempotenency fix.
JoshRosen Apr 1, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.unsafe;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;

import sun.misc.Cleaner;
import sun.misc.Unsafe;

public final class Platform {
Expand Down Expand Up @@ -144,6 +147,35 @@ public static long reallocateMemory(long address, long oldSize, long newSize) {
return newMemory;
}

/**
* Uses internal JDK APIs to allocate a DirectByteBuffer while ignoring the JVM's
* MaxDirectMemorySize limit (the default limit is too low and we do not want to require users
* to increase it).
*/
@SuppressWarnings("unchecked")
public static ByteBuffer allocateDirectBuffer(int size) {
try {
Class cls = Class.forName("java.nio.DirectByteBuffer");
Constructor constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
constructor.setAccessible(true);
Field cleanerField = cls.getDeclaredField("cleaner");
cleanerField.setAccessible(true);
final long memory = allocateMemory(size);
ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size);
Cleaner cleaner = Cleaner.create(buffer, new Runnable() {
@Override
public void run() {
freeMemory(memory);
}
});
cleanerField.set(buffer, cleaner);
return buffer;
} catch (Exception e) {
throwException(e);
}
throw new IllegalStateException("unreachable");
}

public static void setMemory(long address, byte value, long size) {
_UNSAFE.setMemory(address, size, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel}
import org.apache.spark.util.{ByteBufferInputStream, Utils}
import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}

/**
* A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
Expand Down Expand Up @@ -228,12 +228,12 @@ private object TorrentBroadcast extends Logging {
blockSize: Int,
serializer: Serializer,
compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = {
val bos = new ByteArrayChunkOutputStream(blockSize)
val out: OutputStream = compressionCodec.map(c => c.compressedOutputStream(bos)).getOrElse(bos)
val cbbos = new ChunkedByteBufferOutputStream(blockSize, ByteBuffer.allocate)
val out = compressionCodec.map(c => c.compressedOutputStream(cbbos)).getOrElse(cbbos)
val ser = serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject[T](obj).close()
bos.toArrays.map(ByteBuffer.wrap)
cbbos.toChunkedByteBuffer.getChunks()
}

def unBlockifyObject[T: ClassTag](
Expand Down
22 changes: 10 additions & 12 deletions core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ private[memory] class StorageMemoryPool(
memoryMode: MemoryMode
) extends MemoryPool(lock) with Logging {

private[this] val poolName: String = memoryMode match {
case MemoryMode.ON_HEAP => "on-heap storage"
case MemoryMode.OFF_HEAP => "off-heap storage"
}

@GuardedBy("lock")
private[this] var _memoryUsed: Long = 0L

Expand All @@ -60,7 +65,7 @@ private[memory] class StorageMemoryPool(

/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
*
*
* @return whether all N bytes were successfully granted.
*/
def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
Expand All @@ -83,9 +88,8 @@ private[memory] class StorageMemoryPool(
assert(numBytesToAcquire >= 0)
assert(numBytesToFree >= 0)
assert(memoryUsed <= poolSize)
// Once we support off-heap caching, this will need to change:
if (numBytesToFree > 0 && memoryMode == MemoryMode.ON_HEAP) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree)
if (numBytesToFree > 0) {
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
}
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
// back into this StorageMemoryPool in order to free memory. Therefore, these variables
Expand Down Expand Up @@ -122,14 +126,8 @@ private[memory] class StorageMemoryPool(
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
if (remainingSpaceToFree > 0) {
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
val spaceFreedByEviction = {
// Once we support off-heap caching, this will need to change:
if (memoryMode == MemoryMode.ON_HEAP) {
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
} else {
0
}
}
val spaceFreedByEviction =
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
decrementPoolSize(spaceFreedByEviction)
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable.HashMap

import org.apache.spark.{Accumulator, SparkEnv, TaskContext, TaskContextImpl}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils}
Expand Down Expand Up @@ -90,7 +90,8 @@ private[spark] abstract class Task[T](
try {
Utils.tryLogNonFatalError {
// Release memory used by this thread for unrolling blocks
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
// Notify any tasks waiting for execution memory to be freed to wake up and try to
// acquire memory again. This makes impossible the scenario where a task sleeps forever
// because there are no other tasks left to notify it. Since this is safe to do but may
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.reflect.ClassTag
import org.apache.spark.SparkConf
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage._
import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}

/**
* Component which configures serialization and compression for various Spark components, including
Expand Down Expand Up @@ -128,17 +128,9 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar

/** Serializes into a chunked byte buffer. */
def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): ChunkedByteBuffer = {
val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(1024 * 1024 * 4)
dataSerializeStream(blockId, byteArrayChunkOutputStream, values)
new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap))
}

/**
* Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
* the iterator is reached.
*/
def dataDeserialize[T: ClassTag](blockId: BlockId, bytes: ChunkedByteBuffer): Iterator[T] = {
dataDeserializeStream[T](blockId, bytes.toInputStream(dispose = true))
val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
dataSerializeStream(blockId, bbos, values)
bbos.toChunkedByteBuffer
}

/**
Expand Down
70 changes: 47 additions & 23 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.storage

import java.io._
import java.nio.ByteBuffer

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, ExecutionContext, Future}
Expand All @@ -39,6 +40,7 @@ import org.apache.spark.rpc.RpcEnv
import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage.memory._
import org.apache.spark.unsafe.Platform
import org.apache.spark.util._
import org.apache.spark.util.io.ChunkedByteBuffer

Expand Down Expand Up @@ -372,8 +374,12 @@ private[spark] class BlockManager(
val onDisk = level.useDisk && diskStore.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
val replication = if (inMem || onDisk) level.replication else 1
val storageLevel =
StorageLevel(onDisk, inMem, deserialized, replication)
val storageLevel = StorageLevel(
useDisk = onDisk,
useMemory = inMem,
useOffHeap = level.useOffHeap,
deserialized = deserialized,
replication = replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
BlockStatus(storageLevel, memSize, diskSize)
Expand Down Expand Up @@ -407,20 +413,24 @@ private[spark] class BlockManager(
val iter: Iterator[Any] = if (level.deserialized) {
memoryStore.getValues(blockId).get
} else {
serializerManager.dataDeserialize(
blockId, memoryStore.getBytes(blockId).get)(info.classTag)
serializerManager.dataDeserializeStream(
blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
}
val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
} else if (level.useDisk && diskStore.contains(blockId)) {
val iterToReturn: Iterator[Any] = {
val diskBytes = diskStore.getBytes(blockId)
if (level.deserialized) {
val diskValues = serializerManager.dataDeserialize(blockId, diskBytes)(info.classTag)
val diskValues = serializerManager.dataDeserializeStream(
blockId,
diskBytes.toInputStream(dispose = true))(info.classTag)
maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
} else {
val bytes = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)
serializerManager.dataDeserialize(blockId, bytes)(info.classTag)
val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)
.map {_.toInputStream(dispose = false)}
.getOrElse { diskBytes.toInputStream(dispose = true) }
serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
}
}
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
Expand Down Expand Up @@ -481,7 +491,8 @@ private[spark] class BlockManager(
if (level.useMemory && memoryStore.contains(blockId)) {
memoryStore.getBytes(blockId).get
} else if (level.useDisk && diskStore.contains(blockId)) {
maybeCacheDiskBytesInMemory(info, blockId, level, diskStore.getBytes(blockId))
val diskBytes = diskStore.getBytes(blockId)
maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes)
} else {
releaseLock(blockId)
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
Expand All @@ -496,8 +507,9 @@ private[spark] class BlockManager(
*/
private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
getRemoteBytes(blockId).map { data =>
new BlockResult(
serializerManager.dataDeserialize(blockId, data), DataReadMethod.Network, data.size)
val values =
serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))
new BlockResult(values, DataReadMethod.Network, data.size)
}
}

Expand Down Expand Up @@ -745,7 +757,8 @@ private[spark] class BlockManager(
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
val putSucceeded = if (level.deserialized) {
val values = serializerManager.dataDeserialize(blockId, bytes)(classTag)
val values =
serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag)
memoryStore.putIteratorAsValues(blockId, values, classTag) match {
case Right(_) => true
case Left(iter) =>
Expand All @@ -755,7 +768,7 @@ private[spark] class BlockManager(
false
}
} else {
memoryStore.putBytes(blockId, size, () => bytes)
memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
}
if (!putSucceeded && level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
Expand Down Expand Up @@ -893,7 +906,7 @@ private[spark] class BlockManager(
}
}
} else { // !level.deserialized
memoryStore.putIteratorAsBytes(blockId, iterator(), classTag) match {
memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
case Right(s) =>
size = s
case Left(partiallySerializedValues) =>
Expand Down Expand Up @@ -951,40 +964,46 @@ private[spark] class BlockManager(
* Attempts to cache spilled bytes read from disk into the MemoryStore in order to speed up
* subsequent reads. This method requires the caller to hold a read lock on the block.
*
* @return a copy of the bytes. The original bytes passed this method should no longer
* be used after this method returns.
* @return a copy of the bytes from the memory store if the put succeeded, otherwise None.
* If this returns bytes from the memory store then the original disk store bytes will
* automatically be disposed and the caller should not continue to use them. Otherwise,
* if this returns None then the original disk store bytes will be unaffected.
*/
private def maybeCacheDiskBytesInMemory(
blockInfo: BlockInfo,
blockId: BlockId,
level: StorageLevel,
diskBytes: ChunkedByteBuffer): ChunkedByteBuffer = {
diskBytes: ChunkedByteBuffer): Option[ChunkedByteBuffer] = {
require(!level.deserialized)
if (level.useMemory) {
// Synchronize on blockInfo to guard against a race condition where two readers both try to
// put values read from disk into the MemoryStore.
blockInfo.synchronized {
if (memoryStore.contains(blockId)) {
diskBytes.dispose()
memoryStore.getBytes(blockId).get
Some(memoryStore.getBytes(blockId).get)
} else {
val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, () => {
val allocator = level.memoryMode match {
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}
val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, level.memoryMode, () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we
// cannot put it into MemoryStore, copyForMemory should not be created. That's why
// this action is put into a `() => ChunkedByteBuffer` and created lazily.
diskBytes.copy()
diskBytes.copy(allocator)
})
if (putSucceeded) {
diskBytes.dispose()
memoryStore.getBytes(blockId).get
Some(memoryStore.getBytes(blockId).get)
} else {
diskBytes
None
}
}
}
} else {
diskBytes
None
}
}

Expand Down Expand Up @@ -1055,7 +1074,12 @@ private[spark] class BlockManager(
val peersForReplication = new ArrayBuffer[BlockManagerId]
val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
val tLevel = StorageLevel(
useDisk = level.useDisk,
useMemory = level.useMemory,
useOffHeap = level.useOffHeap,
deserialized = level.deserialized,
replication = 1)
val startTime = System.currentTimeMillis
val random = new Random(blockId.hashCode)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ private[spark] class BlockManagerInfo(
}

if (storageLevel.isValid) {
/* isValid means it is either stored in-memory, on-disk or on-externalBlockStore.
/* isValid means it is either stored in-memory or on-disk.
* The memSize here indicates the data size in or dropped from memory,
* externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
* and the diskSize here indicates the data size in or dropped to disk.
Expand Down
Loading