Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerBlockManagerAdded(blockManagerId: BlockManagerId, maxMem: Long)
case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId)
extends SparkListenerEvent

@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
blockLocations.remove(blockId)
}
}
listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId))
listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
}

private def expireDeadHosts() {
Expand Down Expand Up @@ -325,6 +325,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}

private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
val time = System.currentTimeMillis()
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
case Some(manager) =>
Expand All @@ -340,9 +341,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
id.hostPort, Utils.bytesToString(maxMemSize)))

blockManagerInfo(id) =
new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
new BlockManagerInfo(id, time, maxMemSize, slaveActor)
}
listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize))
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
}

private def updateBlockInfo(
Expand Down
12 changes: 8 additions & 4 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,15 @@ private[spark] object JsonProtocol {
val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId)
("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~
("Block Manager ID" -> blockManagerId) ~
("Maximum Memory" -> blockManagerAdded.maxMem)
("Maximum Memory" -> blockManagerAdded.maxMem) ~
("Timestamp" -> blockManagerAdded.time)
}

def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId)
("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~
("Block Manager ID" -> blockManagerId)
("Block Manager ID" -> blockManagerId) ~
("Timestamp" -> blockManagerRemoved.time)
}

def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = {
Expand Down Expand Up @@ -466,12 +468,14 @@ private[spark] object JsonProtocol {
def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = {
val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
val maxMem = (json \ "Maximum Memory").extract[Long]
SparkListenerBlockManagerAdded(blockManagerId, maxMem)
val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
SparkListenerBlockManagerAdded(time, blockManagerId, maxMem)
}

def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {
val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
SparkListenerBlockManagerRemoved(blockManagerId)
val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
SparkListenerBlockManagerRemoved(time, blockManagerId)
}

def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,34 +36,34 @@ class StorageStatusListenerSuite extends FunSuite {

// Block manager add
assert(listener.executorIdToStorageStatus.size === 0)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
assert(listener.executorIdToStorageStatus.size === 1)
assert(listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1)
assert(listener.executorIdToStorageStatus("big").maxMem === 1000L)
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
assert(listener.executorIdToStorageStatus.size === 2)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2)
assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)

// Block manager remove
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1))
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm1))
assert(listener.executorIdToStorageStatus.size === 1)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2))
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm2))
assert(listener.executorIdToStorageStatus.size === 0)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(!listener.executorIdToStorageStatus.get("fat").isDefined)
}

test("task end without updated blocks") {
val listener = new StorageStatusListener
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics = new TaskMetrics

// Task end with no updated blocks
Expand All @@ -79,8 +79,8 @@ class StorageStatusListenerSuite extends FunSuite {

test("task end with updated blocks") {
val listener = new StorageStatusListener
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
Expand Down Expand Up @@ -128,7 +128,7 @@ class StorageStatusListenerSuite extends FunSuite {

test("unpersist RDD") {
val listener = new StorageStatusListener
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
val myRddInfo1 = rddInfo1
val myRddInfo2 = rddInfo2
val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener._rddInfoMap.size === 3)
assert(storageListener.rddInfoList.size === 0) // not cached
Expand Down Expand Up @@ -175,7 +175,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L, 0L))
taskMetrics0.updatedBlocks = Some(Seq(block0))
taskMetrics1.updatedBlocks = Some(Seq(block1))
bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener.rddInfoList.size === 0)
bus.postToAll(SparkListenerTaskEnd(0, 0, "big", Success, taskInfo, taskMetrics0))
Expand Down
37 changes: 33 additions & 4 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import java.util.Properties

import scala.collection.Map

import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
import org.scalatest.FunSuite

Expand Down Expand Up @@ -52,9 +55,9 @@ class JsonProtocolSuite extends FunSuite {
"System Properties" -> Seq(("Username", "guest"), ("Password", "guest")),
"Classpath Entries" -> Seq(("Super library", "/tmp/super_library"))
))
val blockManagerAdded = SparkListenerBlockManagerAdded(
val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
BlockManagerId("Stars", "In your multitude...", 300), 500)
val blockManagerRemoved = SparkListenerBlockManagerRemoved(
val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
BlockManagerId("Scarce", "to be counted...", 100))
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield")
Expand Down Expand Up @@ -151,6 +154,28 @@ class JsonProtocolSuite extends FunSuite {
assert(newMetrics.inputMetrics.isEmpty)
}

test("BlockManager events backward compatibility") {
// SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property.
val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
BlockManagerId("Stars", "In your multitude...", 300), 500)
val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
BlockManagerId("Scarce", "to be counted...", 100))

val oldBmAdded = JsonProtocol.blockManagerAddedToJson(blockManagerAdded)
.removeField({ _._1 == "Timestamp" })

val deserializedBmAdded = JsonProtocol.blockManagerAddedFromJson(oldBmAdded)
assert(SparkListenerBlockManagerAdded(-1L, blockManagerAdded.blockManagerId,
blockManagerAdded.maxMem) === deserializedBmAdded)

val oldBmRemoved = JsonProtocol.blockManagerRemovedToJson(blockManagerRemoved)
.removeField({ _._1 == "Timestamp" })

val deserializedBmRemoved = JsonProtocol.blockManagerRemovedFromJson(oldBmRemoved)
assert(SparkListenerBlockManagerRemoved(-1L, blockManagerRemoved.blockManagerId) ===
deserializedBmRemoved)
}


/** -------------------------- *
| Helper test running methods |
Expand Down Expand Up @@ -242,8 +267,10 @@ class JsonProtocolSuite extends FunSuite {
assertEquals(e1.environmentDetails, e2.environmentDetails)
case (e1: SparkListenerBlockManagerAdded, e2: SparkListenerBlockManagerAdded) =>
assert(e1.maxMem === e2.maxMem)
assert(e1.time === e2.time)
assertEquals(e1.blockManagerId, e2.blockManagerId)
case (e1: SparkListenerBlockManagerRemoved, e2: SparkListenerBlockManagerRemoved) =>
assert(e1.time === e2.time)
assertEquals(e1.blockManagerId, e2.blockManagerId)
case (e1: SparkListenerUnpersistRDD, e2: SparkListenerUnpersistRDD) =>
assert(e1.rddId == e2.rddId)
Expand Down Expand Up @@ -945,7 +972,8 @@ class JsonProtocolSuite extends FunSuite {
| "Host": "In your multitude...",
| "Port": 300
| },
| "Maximum Memory": 500
| "Maximum Memory": 500,
| "Timestamp": 1
|}
"""

Expand All @@ -957,7 +985,8 @@ class JsonProtocolSuite extends FunSuite {
| "Executor ID": "Scarce",
| "Host": "to be counted...",
| "Port": 100
| }
| },
| "Timestamp": 2
|}
"""

Expand Down