Skip to content

Commit 4e97a50

Browse files
committed
Merge branch 'master' of github.com:apache/spark into df-cov
2 parents e3b0b85 + a9fc505 commit 4e97a50

File tree

45 files changed

+838
-406
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+838
-406
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
247247
private[spark] def eventLogDir: Option[URI] = _eventLogDir
248248
private[spark] def eventLogCodec: Option[String] = _eventLogCodec
249249

250-
// Generate the random name for a temp folder in Tachyon
250+
// Generate the random name for a temp folder in external block store.
251251
// Add a timestamp as the suffix here to make it more safe
252-
val tachyonFolderName = "spark-" + randomUUID.toString()
252+
val externalBlockStoreFolderName = "spark-" + randomUUID.toString()
253+
@deprecated("Use externalBlockStoreFolderName instead.", "1.4.0")
254+
val tachyonFolderName = externalBlockStoreFolderName
253255

254256
def isLocal: Boolean = (master == "local" || master.startsWith("local["))
255257

@@ -386,7 +388,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
386388
}
387389
}
388390

389-
_conf.set("spark.tachyonStore.folderName", tachyonFolderName)
391+
_conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)
390392

391393
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
392394

core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ object ExecutorExitCode {
3333
/** DiskStore failed to create a local temporary directory after many attempts. */
3434
val DISK_STORE_FAILED_TO_CREATE_DIR = 53
3535

36-
/** TachyonStore failed to initialize after many attempts. */
37-
val TACHYON_STORE_FAILED_TO_INITIALIZE = 54
36+
/** ExternalBlockStore failed to initialize after many attempts. */
37+
val EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE = 54
3838

39-
/** TachyonStore failed to create a local temporary directory after many attempts. */
40-
val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55
39+
/** ExternalBlockStore failed to create a local temporary directory after many attempts. */
40+
val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55
4141

4242
def explainExitCode(exitCode: Int): String = {
4343
exitCode match {
@@ -46,9 +46,11 @@ object ExecutorExitCode {
4646
case OOM => "OutOfMemoryError"
4747
case DISK_STORE_FAILED_TO_CREATE_DIR =>
4848
"Failed to create local directory (bad spark.local.dir?)"
49-
case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize."
50-
case TACHYON_STORE_FAILED_TO_CREATE_DIR =>
51-
"TachyonStore failed to create a local temporary directory."
49+
// TODO: replace external block store with concrete implementation name
50+
case EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE => "ExternalBlockStore failed to initialize."
51+
// TODO: replace external block store with concrete implementation name
52+
case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR =>
53+
"ExternalBlockStore failed to create a local temporary directory."
5254
case _ =>
5355
"Unknown executor exit code (" + exitCode + ")" + (
5456
if (exitCode > 128) {

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,9 +1475,9 @@ abstract class RDD[T: ClassTag](
14751475

14761476
val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else ""
14771477
val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info =>
1478-
" CachedPartitions: %d; MemorySize: %s; TachyonSize: %s; DiskSize: %s".format(
1478+
" CachedPartitions: %d; MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s".format(
14791479
info.numCachedPartitions, bytesToString(info.memSize),
1480-
bytesToString(info.tachyonSize), bytesToString(info.diskSize)))
1480+
bytesToString(info.externalBlockStoreSize), bytesToString(info.diskSize)))
14811481

14821482
s"$rdd [$persistence]" +: storageInfo
14831483
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 29 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -78,19 +78,11 @@ private[spark] class BlockManager(
7878
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
7979

8080
// Actual storage of where blocks are kept
81-
private var tachyonInitialized = false
81+
private var externalBlockStoreInitialized = false
8282
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
8383
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
84-
private[spark] lazy val tachyonStore: TachyonStore = {
85-
val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
86-
val appFolderName = conf.get("spark.tachyonStore.folderName")
87-
val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
88-
val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998")
89-
val tachyonBlockManager =
90-
new TachyonBlockManager(this, tachyonStorePath, tachyonMaster)
91-
tachyonInitialized = true
92-
new TachyonStore(this, tachyonBlockManager)
93-
}
84+
private[spark] lazy val externalBlockStore: ExternalBlockStore =
85+
new ExternalBlockStore(this, executorId)
9486

9587
private[spark]
9688
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
@@ -320,13 +312,13 @@ private[spark] class BlockManager(
320312

321313
/**
322314
* Get the BlockStatus for the block identified by the given ID, if it exists.
323-
* NOTE: This is mainly for testing, and it doesn't fetch information from Tachyon.
315+
* NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
324316
*/
325317
def getStatus(blockId: BlockId): Option[BlockStatus] = {
326318
blockInfo.get(blockId).map { info =>
327319
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
328320
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
329-
// Assume that block is not in Tachyon
321+
// Assume that block is not in external block store
330322
BlockStatus(info.level, memSize, diskSize, 0L)
331323
}
332324
}
@@ -376,10 +368,10 @@ private[spark] class BlockManager(
376368
if (info.tellMaster) {
377369
val storageLevel = status.storageLevel
378370
val inMemSize = Math.max(status.memSize, droppedMemorySize)
379-
val inTachyonSize = status.tachyonSize
371+
val inExternalBlockStoreSize = status.externalBlockStoreSize
380372
val onDiskSize = status.diskSize
381373
master.updateBlockInfo(
382-
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize)
374+
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inExternalBlockStoreSize)
383375
} else {
384376
true
385377
}
@@ -397,15 +389,17 @@ private[spark] class BlockManager(
397389
BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
398390
case level =>
399391
val inMem = level.useMemory && memoryStore.contains(blockId)
400-
val inTachyon = level.useOffHeap && tachyonStore.contains(blockId)
392+
val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId)
401393
val onDisk = level.useDisk && diskStore.contains(blockId)
402394
val deserialized = if (inMem) level.deserialized else false
403-
val replication = if (inMem || inTachyon || onDisk) level.replication else 1
404-
val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication)
395+
val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1
396+
val storageLevel =
397+
StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication)
405398
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
406-
val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L
399+
val externalBlockStoreSize =
400+
if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L
407401
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
408-
BlockStatus(storageLevel, memSize, diskSize, tachyonSize)
402+
BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize)
409403
}
410404
}
411405
}
@@ -485,11 +479,11 @@ private[spark] class BlockManager(
485479
}
486480
}
487481

488-
// Look for the block in Tachyon
482+
// Look for the block in external block store
489483
if (level.useOffHeap) {
490-
logDebug(s"Getting block $blockId from tachyon")
491-
if (tachyonStore.contains(blockId)) {
492-
tachyonStore.getBytes(blockId) match {
484+
logDebug(s"Getting block $blockId from ExternalBlockStore")
485+
if (externalBlockStore.contains(blockId)) {
486+
externalBlockStore.getBytes(blockId) match {
493487
case Some(bytes) =>
494488
if (!asBlockResult) {
495489
return Some(bytes)
@@ -498,7 +492,7 @@ private[spark] class BlockManager(
498492
dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
499493
}
500494
case None =>
501-
logDebug(s"Block $blockId not found in tachyon")
495+
logDebug(s"Block $blockId not found in externalBlockStore")
502496
}
503497
}
504498
}
@@ -766,8 +760,8 @@ private[spark] class BlockManager(
766760
// We will drop it to disk later if the memory store can't hold it.
767761
(true, memoryStore)
768762
} else if (putLevel.useOffHeap) {
769-
// Use tachyon for off-heap storage
770-
(false, tachyonStore)
763+
// Use external block store
764+
(false, externalBlockStore)
771765
} else if (putLevel.useDisk) {
772766
// Don't get back the bytes from put unless we replicate them
773767
(putLevel.replication > 1, diskStore)
@@ -802,7 +796,7 @@ private[spark] class BlockManager(
802796

803797
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
804798
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
805-
// Now that the block is in either the memory, tachyon, or disk store,
799+
// Now that the block is in either the memory, externalBlockStore, or disk store,
806800
// let other threads read it, and tell the master about it.
807801
marked = true
808802
putBlockInfo.markReady(size)
@@ -1099,10 +1093,11 @@ private[spark] class BlockManager(
10991093
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
11001094
val removedFromMemory = memoryStore.remove(blockId)
11011095
val removedFromDisk = diskStore.remove(blockId)
1102-
val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
1103-
if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
1096+
val removedFromExternalBlockStore =
1097+
if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
1098+
if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
11041099
logWarning(s"Block $blockId could not be removed as it was not found in either " +
1105-
"the disk, memory, or tachyon store")
1100+
"the disk, memory, or external block store")
11061101
}
11071102
blockInfo.remove(blockId)
11081103
if (tellMaster && info.tellMaster) {
@@ -1136,7 +1131,7 @@ private[spark] class BlockManager(
11361131
val level = info.level
11371132
if (level.useMemory) { memoryStore.remove(id) }
11381133
if (level.useDisk) { diskStore.remove(id) }
1139-
if (level.useOffHeap) { tachyonStore.remove(id) }
1134+
if (level.useOffHeap) { externalBlockStore.remove(id) }
11401135
iterator.remove()
11411136
logInfo(s"Dropped block $id")
11421137
}
@@ -1216,8 +1211,8 @@ private[spark] class BlockManager(
12161211
blockInfo.clear()
12171212
memoryStore.clear()
12181213
diskStore.clear()
1219-
if (tachyonInitialized) {
1220-
tachyonStore.clear()
1214+
if (externalBlockStoreInitialized) {
1215+
externalBlockStore.clear()
12211216
}
12221217
metadataCleaner.cancel()
12231218
broadcastCleaner.cancel()

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ class BlockManagerMaster(
5454
storageLevel: StorageLevel,
5555
memSize: Long,
5656
diskSize: Long,
57-
tachyonSize: Long): Boolean = {
57+
externalBlockStoreSize: Long): Boolean = {
5858
val res = driverEndpoint.askWithRetry[Boolean](
59-
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
59+
UpdateBlockInfo(blockManagerId, blockId, storageLevel,
60+
memSize, diskSize, externalBlockStoreSize))
6061
logDebug(s"Updated info of block $blockId")
6162
res
6263
}

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ class BlockManagerMasterEndpoint(
6060
context.reply(true)
6161

6262
case UpdateBlockInfo(
63-
blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) =>
63+
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) =>
6464
context.reply(updateBlockInfo(
65-
blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize))
65+
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))
6666

6767
case GetLocations(blockId) =>
6868
context.reply(getLocations(blockId))
@@ -314,7 +314,7 @@ class BlockManagerMasterEndpoint(
314314
storageLevel: StorageLevel,
315315
memSize: Long,
316316
diskSize: Long,
317-
tachyonSize: Long): Boolean = {
317+
externalBlockStoreSize: Long): Boolean = {
318318

319319
if (!blockManagerInfo.contains(blockManagerId)) {
320320
if (blockManagerId.isDriver && !isLocal) {
@@ -332,7 +332,7 @@ class BlockManagerMasterEndpoint(
332332
}
333333

334334
blockManagerInfo(blockManagerId).updateBlockInfo(
335-
blockId, storageLevel, memSize, diskSize, tachyonSize)
335+
blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)
336336

337337
var locations: mutable.HashSet[BlockManagerId] = null
338338
if (blockLocations.containsKey(blockId)) {
@@ -396,8 +396,8 @@ case class BlockStatus(
396396
storageLevel: StorageLevel,
397397
memSize: Long,
398398
diskSize: Long,
399-
tachyonSize: Long) {
400-
def isCached: Boolean = memSize + diskSize + tachyonSize > 0
399+
externalBlockStoreSize: Long) {
400+
def isCached: Boolean = memSize + diskSize + externalBlockStoreSize > 0
401401
}
402402

403403
@DeveloperApi
@@ -429,7 +429,7 @@ private[spark] class BlockManagerInfo(
429429
storageLevel: StorageLevel,
430430
memSize: Long,
431431
diskSize: Long,
432-
tachyonSize: Long) {
432+
externalBlockStoreSize: Long) {
433433

434434
updateLastSeenMs()
435435

@@ -445,9 +445,9 @@ private[spark] class BlockManagerInfo(
445445
}
446446

447447
if (storageLevel.isValid) {
448-
/* isValid means it is either stored in-memory, on-disk or on-Tachyon.
448+
/* isValid means it is either stored in-memory, on-disk or on-externalBlockStore.
449449
* The memSize here indicates the data size in or dropped from memory,
450-
* tachyonSize here indicates the data size in or dropped from Tachyon,
450+
* externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
451451
* and the diskSize here indicates the data size in or dropped to disk.
452452
* They can be both larger than 0, when a block is dropped from memory to disk.
453453
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
@@ -464,9 +464,9 @@ private[spark] class BlockManagerInfo(
464464
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
465465
}
466466
if (storageLevel.useOffHeap) {
467-
_blocks.put(blockId, BlockStatus(storageLevel, 0, 0, tachyonSize))
468-
logInfo("Added %s on tachyon on %s (size: %s)".format(
469-
blockId, blockManagerId.hostPort, Utils.bytesToString(tachyonSize)))
467+
_blocks.put(blockId, BlockStatus(storageLevel, 0, 0, externalBlockStoreSize))
468+
logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(
469+
blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))
470470
}
471471
} else if (_blocks.containsKey(blockId)) {
472472
// If isValid is not true, drop the block.
@@ -482,8 +482,9 @@ private[spark] class BlockManagerInfo(
482482
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
483483
}
484484
if (blockStatus.storageLevel.useOffHeap) {
485-
logInfo("Removed %s on %s on tachyon (size: %s)".format(
486-
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize)))
485+
logInfo("Removed %s on %s on externalBlockStore (size: %s)".format(
486+
blockId, blockManagerId.hostPort,
487+
Utils.bytesToString(blockStatus.externalBlockStoreSize)))
487488
}
488489
}
489490
}

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ private[spark] object BlockManagerMessages {
6060
var storageLevel: StorageLevel,
6161
var memSize: Long,
6262
var diskSize: Long,
63-
var tachyonSize: Long)
63+
var externalBlockStoreSize: Long)
6464
extends ToBlockManagerMaster
6565
with Externalizable {
6666

@@ -72,7 +72,7 @@ private[spark] object BlockManagerMessages {
7272
storageLevel.writeExternal(out)
7373
out.writeLong(memSize)
7474
out.writeLong(diskSize)
75-
out.writeLong(tachyonSize)
75+
out.writeLong(externalBlockStoreSize)
7676
}
7777

7878
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -81,7 +81,7 @@ private[spark] object BlockManagerMessages {
8181
storageLevel = StorageLevel(in)
8282
memSize = in.readLong()
8383
diskSize = in.readLong()
84-
tachyonSize = in.readLong()
84+
externalBlockStoreSize = in.readLong()
8585
}
8686
}
8787

0 commit comments

Comments
 (0)