Skip to content

Commit a7aed6c

Browse files
committed
add Tachyon migration code
1 parent 186de31 commit a7aed6c

21 files changed

+260
-365
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
247247
private[spark] def eventLogDir: Option[URI] = _eventLogDir
248248
private[spark] def eventLogCodec: Option[String] = _eventLogCodec
249249

250-
// Generate the random name for a temp folder in Tachyon
250+
// Generate the random name for a temp folder in OffHeap
251251
// Add a timestamp as the suffix here to make it more safe
252-
val tachyonFolderName = "spark-" + randomUUID.toString()
252+
val offHeapFolderName = "spark-" + randomUUID.toString()
253+
conf.set("spark.offHeapStore.folderName", offHeapFolderName)
253254

254255
def isLocal: Boolean = (master == "local" || master.startsWith("local["))
255256

core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ object ExecutorExitCode {
3333
/** DiskStore failed to create a local temporary directory after many attempts. */
3434
val DISK_STORE_FAILED_TO_CREATE_DIR = 53
3535

36-
/** TachyonStore failed to initialize after many attempts. */
37-
val TACHYON_STORE_FAILED_TO_INITIALIZE = 54
36+
/** OffHeapStore failed to initialize after many attempts. */
37+
val OFFHEAP_STORE_FAILED_TO_INITIALIZE = 54
3838

39-
/** TachyonStore failed to create a local temporary directory after many attempts. */
40-
val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55
39+
/** OffHeapStore failed to create a local temporary directory after many attempts. */
40+
val OFFHEAP_STORE_FAILED_TO_CREATE_DIR = 55
4141

4242
def explainExitCode(exitCode: Int): String = {
4343
exitCode match {
@@ -46,9 +46,9 @@ object ExecutorExitCode {
4646
case OOM => "OutOfMemoryError"
4747
case DISK_STORE_FAILED_TO_CREATE_DIR =>
4848
"Failed to create local directory (bad spark.local.dir?)"
49-
case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize."
50-
case TACHYON_STORE_FAILED_TO_CREATE_DIR =>
51-
"TachyonStore failed to create a local temporary directory."
49+
case OFFHEAP_STORE_FAILED_TO_INITIALIZE => "OffHeap Store failed to initialize."
50+
case OFFHEAP_STORE_FAILED_TO_CREATE_DIR =>
51+
"OffHeap Store failed to create a local temporary directory."
5252
case _ =>
5353
"Unknown executor exit code (" + exitCode + ")" + (
5454
if (exitCode > 128) {

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,9 +1475,9 @@ abstract class RDD[T: ClassTag](
14751475

14761476
val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else ""
14771477
val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info =>
1478-
" CachedPartitions: %d; MemorySize: %s; TachyonSize: %s; DiskSize: %s".format(
1478+
" CachedPartitions: %d; MemorySize: %s; OffHeap: %s; DiskSize: %s".format(
14791479
info.numCachedPartitions, bytesToString(info.memSize),
1480-
bytesToString(info.tachyonSize), bytesToString(info.diskSize)))
1480+
bytesToString(info.offHeapSize), bytesToString(info.diskSize)))
14811481

14821482
s"$rdd [$persistence]" +: storageInfo
14831483
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -78,19 +78,10 @@ private[spark] class BlockManager(
7878
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
7979

8080
// Actual storage of where blocks are kept
81-
private var tachyonInitialized = false
81+
private var offHeapInitialized = false
8282
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
8383
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
84-
private[spark] lazy val tachyonStore: TachyonStore = {
85-
val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
86-
val appFolderName = conf.get("spark.tachyonStore.folderName")
87-
val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
88-
val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998")
89-
val tachyonBlockManager =
90-
new TachyonBlockManager(this, tachyonStorePath, tachyonMaster)
91-
tachyonInitialized = true
92-
new TachyonStore(this, tachyonBlockManager)
93-
}
84+
private[spark] lazy val offHeapStore: OffHeapStore = new OffHeapStore(this, executorId)
9485

9586
private[spark]
9687
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
@@ -376,10 +367,10 @@ private[spark] class BlockManager(
376367
if (info.tellMaster) {
377368
val storageLevel = status.storageLevel
378369
val inMemSize = Math.max(status.memSize, droppedMemorySize)
379-
val inTachyonSize = status.tachyonSize
370+
val inOffHeapSize = status.offHeapSize
380371
val onDiskSize = status.diskSize
381372
master.updateBlockInfo(
382-
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize)
373+
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inOffHeapSize)
383374
} else {
384375
true
385376
}
@@ -397,15 +388,15 @@ private[spark] class BlockManager(
397388
BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
398389
case level =>
399390
val inMem = level.useMemory && memoryStore.contains(blockId)
400-
val inTachyon = level.useOffHeap && tachyonStore.contains(blockId)
391+
val inOffHeap = level.useOffHeap && offHeapStore.contains(blockId)
401392
val onDisk = level.useDisk && diskStore.contains(blockId)
402393
val deserialized = if (inMem) level.deserialized else false
403-
val replication = if (inMem || inTachyon || onDisk) level.replication else 1
404-
val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication)
394+
val replication = if (inMem || inOffHeap || onDisk) level.replication else 1
395+
val storageLevel = StorageLevel(onDisk, inMem, inOffHeap, deserialized, replication)
405396
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
406-
val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L
397+
val offHeapSize = if (inOffHeap) offHeapStore.getSize(blockId) else 0L
407398
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
408-
BlockStatus(storageLevel, memSize, diskSize, tachyonSize)
399+
BlockStatus(storageLevel, memSize, diskSize, offHeapSize)
409400
}
410401
}
411402
}
@@ -485,11 +476,11 @@ private[spark] class BlockManager(
485476
}
486477
}
487478

488-
// Look for the block in Tachyon
479+
// Look for the block in offheap
489480
if (level.useOffHeap) {
490-
logDebug(s"Getting block $blockId from tachyon")
491-
if (tachyonStore.contains(blockId)) {
492-
tachyonStore.getBytes(blockId) match {
481+
logDebug(s"Getting block $blockId from offheap")
482+
if (offHeapStore.contains(blockId)) {
483+
offHeapStore.getBytes(blockId) match {
493484
case Some(bytes) =>
494485
if (!asBlockResult) {
495486
return Some(bytes)
@@ -498,7 +489,7 @@ private[spark] class BlockManager(
498489
dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
499490
}
500491
case None =>
501-
logDebug(s"Block $blockId not found in tachyon")
492+
logDebug(s"Block $blockId not found in offHeap")
502493
}
503494
}
504495
}
@@ -766,8 +757,8 @@ private[spark] class BlockManager(
766757
// We will drop it to disk later if the memory store can't hold it.
767758
(true, memoryStore)
768759
} else if (putLevel.useOffHeap) {
769-
// Use tachyon for off-heap storage
770-
(false, tachyonStore)
760+
// Use off-heap storage
761+
(false, offHeapStore)
771762
} else if (putLevel.useDisk) {
772763
// Don't get back the bytes from put unless we replicate them
773764
(putLevel.replication > 1, diskStore)
@@ -802,7 +793,7 @@ private[spark] class BlockManager(
802793

803794
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
804795
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
805-
// Now that the block is in either the memory, tachyon, or disk store,
796+
// Now that the block is in either the memory, offheap, or disk store,
806797
// let other threads read it, and tell the master about it.
807798
marked = true
808799
putBlockInfo.markReady(size)
@@ -1099,10 +1090,10 @@ private[spark] class BlockManager(
10991090
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
11001091
val removedFromMemory = memoryStore.remove(blockId)
11011092
val removedFromDisk = diskStore.remove(blockId)
1102-
val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
1103-
if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
1093+
val removedFromOffHeap = if (offHeapInitialized) offHeapStore.remove(blockId) else false
1094+
if (!removedFromMemory && !removedFromDisk && !removedFromOffHeap) {
11041095
logWarning(s"Block $blockId could not be removed as it was not found in either " +
1105-
"the disk, memory, or tachyon store")
1096+
"the disk, memory, or offheap store")
11061097
}
11071098
blockInfo.remove(blockId)
11081099
if (tellMaster && info.tellMaster) {
@@ -1136,7 +1127,7 @@ private[spark] class BlockManager(
11361127
val level = info.level
11371128
if (level.useMemory) { memoryStore.remove(id) }
11381129
if (level.useDisk) { diskStore.remove(id) }
1139-
if (level.useOffHeap) { tachyonStore.remove(id) }
1130+
if (level.useOffHeap) { offHeapStore.remove(id) }
11401131
iterator.remove()
11411132
logInfo(s"Dropped block $id")
11421133
}
@@ -1216,8 +1207,8 @@ private[spark] class BlockManager(
12161207
blockInfo.clear()
12171208
memoryStore.clear()
12181209
diskStore.clear()
1219-
if (tachyonInitialized) {
1220-
tachyonStore.clear()
1210+
if (offHeapInitialized) {
1211+
offHeapStore.clear()
12211212
}
12221213
metadataCleaner.cancel()
12231214
broadcastCleaner.cancel()

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ class BlockManagerMaster(
5454
storageLevel: StorageLevel,
5555
memSize: Long,
5656
diskSize: Long,
57-
tachyonSize: Long): Boolean = {
57+
externalBlockStoreSize: Long): Boolean = {
5858
val res = driverEndpoint.askWithRetry[Boolean](
59-
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
59+
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, externalBlockStoreSize))
6060
logDebug(s"Updated info of block $blockId")
6161
res
6262
}

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ class BlockManagerMasterEndpoint(
6060
context.reply(true)
6161

6262
case UpdateBlockInfo(
63-
blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) =>
63+
blockManagerId, blockId, storageLevel, deserializedSize, size, offHeapSize) =>
6464
context.reply(updateBlockInfo(
65-
blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize))
65+
blockManagerId, blockId, storageLevel, deserializedSize, size, offHeapSize))
6666

6767
case GetLocations(blockId) =>
6868
context.reply(getLocations(blockId))
@@ -314,7 +314,7 @@ class BlockManagerMasterEndpoint(
314314
storageLevel: StorageLevel,
315315
memSize: Long,
316316
diskSize: Long,
317-
tachyonSize: Long): Boolean = {
317+
offHeapSize: Long): Boolean = {
318318

319319
if (!blockManagerInfo.contains(blockManagerId)) {
320320
if (blockManagerId.isDriver && !isLocal) {
@@ -332,7 +332,7 @@ class BlockManagerMasterEndpoint(
332332
}
333333

334334
blockManagerInfo(blockManagerId).updateBlockInfo(
335-
blockId, storageLevel, memSize, diskSize, tachyonSize)
335+
blockId, storageLevel, memSize, diskSize, offHeapSize)
336336

337337
var locations: mutable.HashSet[BlockManagerId] = null
338338
if (blockLocations.containsKey(blockId)) {
@@ -396,8 +396,8 @@ case class BlockStatus(
396396
storageLevel: StorageLevel,
397397
memSize: Long,
398398
diskSize: Long,
399-
tachyonSize: Long) {
400-
def isCached: Boolean = memSize + diskSize + tachyonSize > 0
399+
offHeapSize: Long) {
400+
def isCached: Boolean = memSize + diskSize + offHeapSize > 0
401401
}
402402

403403
@DeveloperApi
@@ -429,7 +429,7 @@ private[spark] class BlockManagerInfo(
429429
storageLevel: StorageLevel,
430430
memSize: Long,
431431
diskSize: Long,
432-
tachyonSize: Long) {
432+
offHeapSize: Long) {
433433

434434
updateLastSeenMs()
435435

@@ -445,9 +445,9 @@ private[spark] class BlockManagerInfo(
445445
}
446446

447447
if (storageLevel.isValid) {
448-
/* isValid means it is either stored in-memory, on-disk or on-Tachyon.
448+
/* isValid means it is either stored in-memory, on-disk or on-OffHeap.
449449
* The memSize here indicates the data size in or dropped from memory,
450-
* tachyonSize here indicates the data size in or dropped from Tachyon,
450+
* offHeapSize here indicates the data size in or dropped from offHeap,
451451
* and the diskSize here indicates the data size in or dropped to disk.
452452
* They can be both larger than 0, when a block is dropped from memory to disk.
453453
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
@@ -464,9 +464,9 @@ private[spark] class BlockManagerInfo(
464464
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
465465
}
466466
if (storageLevel.useOffHeap) {
467-
_blocks.put(blockId, BlockStatus(storageLevel, 0, 0, tachyonSize))
468-
logInfo("Added %s on tachyon on %s (size: %s)".format(
469-
blockId, blockManagerId.hostPort, Utils.bytesToString(tachyonSize)))
467+
_blocks.put(blockId, BlockStatus(storageLevel, 0, 0, offHeapSize))
468+
logInfo("Added %s on offHeap on %s (size: %s)".format(
469+
blockId, blockManagerId.hostPort, Utils.bytesToString(offHeapSize)))
470470
}
471471
} else if (_blocks.containsKey(blockId)) {
472472
// If isValid is not true, drop the block.
@@ -482,8 +482,8 @@ private[spark] class BlockManagerInfo(
482482
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
483483
}
484484
if (blockStatus.storageLevel.useOffHeap) {
485-
logInfo("Removed %s on %s on tachyon (size: %s)".format(
486-
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize)))
485+
logInfo("Removed %s on %s on offheap (size: %s)".format(
486+
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.offHeapSize)))
487487
}
488488
}
489489
}

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ private[spark] object BlockManagerMessages {
6060
var storageLevel: StorageLevel,
6161
var memSize: Long,
6262
var diskSize: Long,
63-
var tachyonSize: Long)
63+
var offHeapSize: Long)
6464
extends ToBlockManagerMaster
6565
with Externalizable {
6666

@@ -72,7 +72,7 @@ private[spark] object BlockManagerMessages {
7272
storageLevel.writeExternal(out)
7373
out.writeLong(memSize)
7474
out.writeLong(diskSize)
75-
out.writeLong(tachyonSize)
75+
out.writeLong(offHeapSize)
7676
}
7777

7878
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -81,7 +81,7 @@ private[spark] object BlockManagerMessages {
8181
storageLevel = StorageLevel(in)
8282
memSize = in.readLong()
8383
diskSize = in.readLong()
84-
tachyonSize = in.readLong()
84+
offHeapSize = in.readLong()
8585
}
8686
}
8787

core/src/main/scala/org/apache/spark/storage/OffHeapBlockManager.scala

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ trait OffHeapBlockManager {
3636
*
3737
* @throws java.io.IOException when FS init failure.
3838
*/
39-
def init(blockManager: BlockManager, executorId: String)
39+
def init(blockManager: BlockManager, executorId: String): Unit
4040

4141
/**
4242
* remove the cache from offheap
@@ -87,24 +87,23 @@ trait OffHeapBlockManager {
8787

8888
object OffHeapBlockManager extends Logging{
8989
val MAX_DIR_CREATION_ATTEMPTS = 10
90-
val subDirsPerDir = 64
91-
def create(blockManager: BlockManager,
92-
executorId: String): Option[OffHeapBlockManager] = {
93-
val sNames = blockManager.conf.getOption("spark.offHeapStore.blockManager")
94-
sNames match {
95-
case Some(name) =>
96-
try {
97-
val instance = Class.forName(name)
98-
.newInstance()
99-
.asInstanceOf[OffHeapBlockManager]
100-
instance.setup(blockManager, executorId)
101-
Some(instance)
102-
} catch {
103-
case NonFatal(t) =>
104-
logError("Cannot initialize offHeap store")
105-
None
106-
}
107-
case None => None
108-
}
90+
val SUB_DIRS_PER_DIR = "64"
91+
def create(blockManager: BlockManager, executorId: String): Option[OffHeapBlockManager] = {
92+
val sNames = blockManager.conf.getOption("spark.offHeapStore.blockManager")
93+
sNames match {
94+
case Some(name) =>
95+
try {
96+
val instance = Class.forName(name)
97+
.newInstance()
98+
.asInstanceOf[OffHeapBlockManager]
99+
instance.setup(blockManager, executorId)
100+
Some(instance)
101+
} catch {
102+
case NonFatal(t) =>
103+
logError("Cannot initialize offHeap store")
104+
None
105+
}
106+
case None => None
107+
}
109108
}
110109
}

0 commit comments

Comments
 (0)