Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -267,33 +267,6 @@
<artifactId>oro</artifactId>
<version>${oro.version}</version>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
<version>0.8.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-underfs-glusterfs</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.seleniumhq.selenium</groupId>
<artifactId>selenium-java</artifactId>
Expand Down
6 changes: 0 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] def eventLogDir: Option[URI] = _eventLogDir
private[spark] def eventLogCodec: Option[String] = _eventLogCodec

// Generate the random name for a temp folder in external block store.
// Add a timestamp as the suffix here to make it more safe
val externalBlockStoreFolderName = "spark-" + randomUUID.toString()

def isLocal: Boolean = (master == "local" || master.startsWith("local["))

/**
Expand Down Expand Up @@ -423,8 +419,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

_conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)

if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")

// "_jobProgressListener" should be set up before creating SparkEnv because when creating
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.rdd

import scala.reflect.ClassTag

import org.apache.spark.{Logging, SparkEnv, SparkException, TaskContext}
import org.apache.spark.{Logging, SparkEnv, TaskContext}
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -72,12 +72,6 @@ private[spark] object LocalRDDCheckpointData {
* This method is idempotent.
*/
def transformStorageLevel(level: StorageLevel): StorageLevel = {
// If this RDD is to be cached off-heap, fail fast since we cannot provide any
// correctness guarantees about subsequent computations after the first one
if (level.useOffHeap) {
throw new SparkException("Local checkpointing is not compatible with off-heap caching.")
}

StorageLevel(useDisk = true, level.useMemory, level.deserialized, level.replication)
}
}
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ class JobData private[spark](
val numSkippedStages: Int,
val numFailedStages: Int)

// Q: should Tachyon size go in here as well? currently the UI only shows it on the overall storage
// page ... does anybody pay attention to it?
class RDDStorageInfo private[spark](
val id: Int,
val name: String,
Expand Down
55 changes: 8 additions & 47 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,8 @@ private[spark] class BlockManager(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))

// Actual storage of where blocks are kept
private var externalBlockStoreInitialized = false
private[spark] val memoryStore = new MemoryStore(this, memoryManager)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
private[spark] lazy val externalBlockStore: ExternalBlockStore = {
externalBlockStoreInitialized = true
new ExternalBlockStore(this, executorId)
}
memoryManager.setMemoryStore(memoryStore)

// Note: depending on the memory manager, `maxStorageMemory` may actually vary over time.
Expand Down Expand Up @@ -313,8 +308,7 @@ private[spark] class BlockManager(
blockInfo.asScala.get(blockId).map { info =>
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
// Assume that block is not in external block store
BlockStatus(info.level, memSize, diskSize, 0L)
BlockStatus(info.level, memSize = memSize, diskSize = diskSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why explicitly specify the fields here? type safety?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea - it is too easy to swap memSize / diskSize.

}
}

Expand Down Expand Up @@ -363,10 +357,8 @@ private[spark] class BlockManager(
if (info.tellMaster) {
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
val inExternalBlockStoreSize = status.externalBlockStoreSize
val onDiskSize = status.diskSize
master.updateBlockInfo(
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inExternalBlockStoreSize)
master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
} else {
true
}
Expand All @@ -381,20 +373,17 @@ private[spark] class BlockManager(
info.synchronized {
info.level match {
case null =>
BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
val deserialized = if (inMem) level.deserialized else false
val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1
val replication = if (inMem || onDisk) level.replication else 1
val storageLevel =
StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication)
StorageLevel(onDisk, inMem, deserialized, replication)
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
val externalBlockStoreSize =
if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize)
BlockStatus(storageLevel, memSize, diskSize)
}
}
}
Expand Down Expand Up @@ -475,25 +464,6 @@ private[spark] class BlockManager(
}
}

// Look for the block in external block store
if (level.useOffHeap) {
logDebug(s"Getting block $blockId from ExternalBlockStore")
if (externalBlockStore.contains(blockId)) {
val result = if (asBlockResult) {
externalBlockStore.getValues(blockId)
.map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
externalBlockStore.getBytes(blockId)
}
result match {
case Some(values) =>
return result
case None =>
logDebug(s"Block $blockId not found in ExternalBlockStore")
}
}
}

// Look for block on disk, potentially storing it back in memory if required
if (level.useDisk) {
logDebug(s"Getting block $blockId from disk")
Expand Down Expand Up @@ -786,9 +756,6 @@ private[spark] class BlockManager(
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
(true, memoryStore)
} else if (putLevel.useOffHeap) {
// Use external block store
(false, externalBlockStore)
} else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
(putLevel.replication > 1, diskStore)
Expand Down Expand Up @@ -909,8 +876,7 @@ private[spark] class BlockManager(
val peersForReplication = new ArrayBuffer[BlockManagerId]
val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
val tLevel = StorageLevel(
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
val startTime = System.currentTimeMillis
val random = new Random(blockId.hashCode)

Expand Down Expand Up @@ -1120,9 +1086,7 @@ private[spark] class BlockManager(
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
val removedFromExternalBlockStore =
if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
if (!removedFromMemory && !removedFromDisk) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or external block store")
}
Expand Down Expand Up @@ -1212,9 +1176,6 @@ private[spark] class BlockManager(
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
if (externalBlockStoreInitialized) {
externalBlockStore.clear()
}
futureExecutionContext.shutdownNow()
logInfo("BlockManager stopped")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,9 @@ class BlockManagerMaster(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
externalBlockStoreSize: Long): Boolean = {
diskSize: Long): Boolean = {
val res = driverEndpoint.askWithRetry[Boolean](
UpdateBlockInfo(blockManagerId, blockId, storageLevel,
memSize, diskSize, externalBlockStoreSize))
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize))
logDebug(s"Updated info of block $blockId")
res
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ class BlockManagerMasterEndpoint(
register(blockManagerId, maxMemSize, slaveEndpoint)
context.reply(true)

case _updateBlockInfo @ UpdateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) =>
context.reply(updateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))
case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))

case GetLocations(blockId) =>
Expand Down Expand Up @@ -325,8 +324,7 @@ class BlockManagerMasterEndpoint(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
externalBlockStoreSize: Long): Boolean = {
diskSize: Long): Boolean = {

if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
Expand All @@ -343,8 +341,7 @@ class BlockManagerMasterEndpoint(
return true
}

blockManagerInfo(blockManagerId).updateBlockInfo(
blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)

var locations: mutable.HashSet[BlockManagerId] = null
if (blockLocations.containsKey(blockId)) {
Expand Down Expand Up @@ -404,17 +401,13 @@ class BlockManagerMasterEndpoint(
}

@DeveloperApi
case class BlockStatus(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
externalBlockStoreSize: Long) {
def isCached: Boolean = memSize + diskSize + externalBlockStoreSize > 0
case class BlockStatus(storageLevel: StorageLevel, memSize: Long, diskSize: Long) {
def isCached: Boolean = memSize + diskSize > 0
}

@DeveloperApi
object BlockStatus {
def empty: BlockStatus = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
def empty: BlockStatus = BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
}

private[spark] class BlockManagerInfo(
Expand Down Expand Up @@ -443,8 +436,7 @@ private[spark] class BlockManagerInfo(
blockId: BlockId,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
externalBlockStoreSize: Long) {
diskSize: Long) {

updateLastSeenMs()

Expand All @@ -468,25 +460,19 @@ private[spark] class BlockManagerInfo(
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
var blockStatus: BlockStatus = null
if (storageLevel.useMemory) {
blockStatus = BlockStatus(storageLevel, memSize, 0, 0)
blockStatus = BlockStatus(storageLevel, memSize = memSize, diskSize = 0)
_blocks.put(blockId, blockStatus)
_remainingMem -= memSize
logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
Utils.bytesToString(_remainingMem)))
}
if (storageLevel.useDisk) {
blockStatus = BlockStatus(storageLevel, 0, diskSize, 0)
blockStatus = BlockStatus(storageLevel, memSize = 0, diskSize = diskSize)
_blocks.put(blockId, blockStatus)
logInfo("Added %s on disk on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
if (storageLevel.useOffHeap) {
blockStatus = BlockStatus(storageLevel, 0, 0, externalBlockStoreSize)
_blocks.put(blockId, blockStatus)
logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))
}
if (!blockId.isBroadcast && blockStatus.isCached) {
_cachedBlocks += blockId
}
Expand All @@ -504,11 +490,6 @@ private[spark] class BlockManagerInfo(
logInfo("Removed %s on %s on disk (size: %s)".format(
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
}
if (blockStatus.storageLevel.useOffHeap) {
logInfo("Removed %s on %s on externalBlockStore (size: %s)".format(
blockId, blockManagerId.hostPort,
Utils.bytesToString(blockStatus.externalBlockStoreSize)))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,18 @@ private[spark] object BlockManagerMessages {
var blockId: BlockId,
var storageLevel: StorageLevel,
var memSize: Long,
var diskSize: Long,
var externalBlockStoreSize: Long)
var diskSize: Long)
extends ToBlockManagerMaster
with Externalizable {

def this() = this(null, null, null, 0, 0, 0) // For deserialization only
def this() = this(null, null, null, 0, 0) // For deserialization only

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
blockManagerId.writeExternal(out)
out.writeUTF(blockId.name)
storageLevel.writeExternal(out)
out.writeLong(memSize)
out.writeLong(diskSize)
out.writeLong(externalBlockStoreSize)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
Expand All @@ -85,7 +83,6 @@ private[spark] object BlockManagerMessages {
storageLevel = StorageLevel(in)
memSize = in.readLong()
diskSize = in.readLong()
externalBlockStoreSize = in.readLong()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ private[spark] case class BlockUIData(
location: String,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
externalBlockStoreSize: Long)
diskSize: Long)

/**
* The aggregated status of stream blocks in an executor
Expand All @@ -41,8 +40,6 @@ private[spark] case class ExecutorStreamBlockStatus(

def totalDiskSize: Long = blocks.map(_.diskSize).sum

def totalExternalBlockStoreSize: Long = blocks.map(_.externalBlockStoreSize).sum

def numStreamBlocks: Int = blocks.size

}
Expand All @@ -62,7 +59,6 @@ private[spark] class BlockStatusListener extends SparkListener {
val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
val memSize = blockUpdated.blockUpdatedInfo.memSize
val diskSize = blockUpdated.blockUpdatedInfo.diskSize
val externalBlockStoreSize = blockUpdated.blockUpdatedInfo.externalBlockStoreSize

synchronized {
// Drop the update info if the block manager is not registered
Expand All @@ -74,8 +70,7 @@ private[spark] class BlockStatusListener extends SparkListener {
blockManagerId.hostPort,
storageLevel,
memSize,
diskSize,
externalBlockStoreSize)
diskSize)
)
} else {
// If isValid is not true, it means we should drop the block.
Expand Down
Loading