From efdda8ef17ba180cb36cd8104f320c87f86163a9 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 5 May 2014 16:30:51 -0700 Subject: [PATCH 1/9] Add timestamps to block manager events. These are not used by the UI but are useful when analysing the logs from a spark job. --- .../org/apache/spark/scheduler/SparkListener.scala | 4 ++-- .../spark/storage/BlockManagerMasterActor.scala | 7 ++++--- .../scala/org/apache/spark/util/JsonProtocol.scala | 12 ++++++++---- .../org/apache/spark/util/JsonProtocolSuite.scala | 4 ++-- 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index d01d318633877..0553fae9da03b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -65,11 +65,11 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S extends SparkListenerEvent @DeveloperApi -case class SparkListenerBlockManagerAdded(blockManagerId: BlockManagerId, maxMem: Long) +case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long) extends SparkListenerEvent @DeveloperApi -case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId) +case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId) extends SparkListenerEvent @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index bd31e3c5a187f..5bfffc8f0c150 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -206,7 +206,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockLocations.remove(blockId) } } - listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId)) + listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId)) } private def expireDeadHosts() { @@ -328,6 +328,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { + val time = System.currentTimeMillis() if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { case Some(manager) => @@ -343,9 +344,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus id.hostPort, Utils.bytesToString(maxMemSize))) blockManagerInfo(id) = - new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor) + new BlockManagerInfo(id, time, maxMemSize, slaveActor) } - listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize)) + listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize)) } private def updateBlockInfo( diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index bb6079154aafe..7f2702f63d54b 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -147,13 +147,15 @@ private[spark] object JsonProtocol { val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId) ("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~ ("Block Manager ID" -> blockManagerId) ~ - ("Maximum Memory" -> blockManagerAdded.maxMem) + ("Maximum Memory" -> blockManagerAdded.maxMem) ~ + ("Timestamp" -> blockManagerAdded.time) } def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = { val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId) ("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~ - ("Block Manager ID" -> blockManagerId) + ("Block Manager ID" -> blockManagerId) ~ + ("Timestamp" -> blockManagerRemoved.time) } def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = { @@ -448,12 +450,14 @@ private[spark] object JsonProtocol { def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = { val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") val maxMem = (json \ "Maximum Memory").extract[Long] - SparkListenerBlockManagerAdded(blockManagerId, maxMem) + val time = (json \ "Timestamp").extract[Long] + SparkListenerBlockManagerAdded(time, blockManagerId, maxMem) } def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = { val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") - SparkListenerBlockManagerRemoved(blockManagerId) + val time = (json \ "Timestamp").extract[Long] + SparkListenerBlockManagerRemoved(time,blockManagerId) } def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 9305b6d9738e1..582d9f483bc80 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -52,9 +52,9 @@ class JsonProtocolSuite extends FunSuite { "System Properties" -> Seq(("Username", "guest"), ("Password", "guest")), "Classpath Entries" -> Seq(("Super library", "/tmp/super_library")) )) - val blockManagerAdded = SparkListenerBlockManagerAdded( + val blockManagerAdded = SparkListenerBlockManagerAdded(1L, BlockManagerId("Stars", "In your multitude...", 300, 400), 500) - val blockManagerRemoved = SparkListenerBlockManagerRemoved( + val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L, BlockManagerId("Scarce", "to be counted...", 100, 200)) val unpersistRdd = SparkListenerUnpersistRDD(12345) val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield") From aca1151bd131127fcc953f07a979e998d8edfcd8 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 5 May 2014 16:46:43 -0700 Subject: [PATCH 2/9] Fix unit test to check new fields. Note that assertJsonStringEquals is not really enabled; I updated the JSON string but they're not really being tested right now (and if I enable the assert, then other tests break). Also, not sure how important are tests that check generated strings like that. --- .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 582d9f483bc80..b381c47bb4f57 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -235,8 +235,10 @@ class JsonProtocolSuite extends FunSuite { assertEquals(e1.environmentDetails, e2.environmentDetails) case (e1: SparkListenerBlockManagerAdded, e2: SparkListenerBlockManagerAdded) => assert(e1.maxMem === e2.maxMem) + assert(e1.time == e2.time) assertEquals(e1.blockManagerId, e2.blockManagerId) case (e1: SparkListenerBlockManagerRemoved, e2: SparkListenerBlockManagerRemoved) => + assert(e1.time == e2.time) assertEquals(e1.blockManagerId, e2.blockManagerId) case (e1: SparkListenerUnpersistRDD, e2: SparkListenerUnpersistRDD) => assert(e1.rddId == e2.rddId) @@ -657,13 +659,14 @@ class JsonProtocolSuite extends FunSuite { private val blockManagerAddedJsonString = """ {"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"Stars", - "Host":"In your multitude...","Port":300,"Netty Port":400},"Maximum Memory":500} + "Host":"In your multitude...","Port":300,"Netty Port":400},"Maximum Memory":500, + "Timestamp":1} """ private val blockManagerRemovedJsonString = """ {"Event":"SparkListenerBlockManagerRemoved","Block Manager ID":{"Executor ID":"Scarce", - "Host":"to be counted...","Port":100,"Netty Port":200}} + "Host":"to be counted...","Port":100,"Netty Port":200},"Timestamp":1} """ private val unpersistRDDJsonString = From ef728248c1d5f2e8447db7ce5d21b1a6d9b45ce7 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 4 Jun 2014 10:10:25 -0700 Subject: [PATCH 3/9] Handle backwards compatibility with 1.0.0. This allows events generated by the 1.0.0 release to be read by a newer History Server. --- .../org/apache/spark/util/JsonProtocol.scala | 6 ++--- .../apache/spark/util/JsonProtocolSuite.scala | 25 +++++++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 7f2702f63d54b..9b0acc2de1eec 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -450,14 +450,14 @@ private[spark] object JsonProtocol { def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = { val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") val maxMem = (json \ "Maximum Memory").extract[Long] - val time = (json \ "Timestamp").extract[Long] + val time = (json \ "Timestamp").extract[Option[Long]].getOrElse(-1L) SparkListenerBlockManagerAdded(time, blockManagerId, maxMem) } def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = { val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") - val time = (json \ "Timestamp").extract[Long] - SparkListenerBlockManagerRemoved(time,blockManagerId) + val time = (json \ "Timestamp").extract[Option[Long]].getOrElse(-1L) + SparkListenerBlockManagerRemoved(time, blockManagerId) } def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index b381c47bb4f57..70a10e67c9ab2 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -21,6 +21,9 @@ import java.util.Properties import scala.collection.Map +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.JsonAST._ import org.json4s.jackson.JsonMethods._ import org.scalatest.FunSuite @@ -144,6 +147,28 @@ class JsonProtocolSuite extends FunSuite { assert(newMetrics.inputMetrics.isEmpty) } + test("BlockManager events backward compatibility") { + // SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property. + val blockManagerAdded = SparkListenerBlockManagerAdded(1L, + BlockManagerId("Stars", "In your multitude...", 300, 400), 500) + val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L, + BlockManagerId("Scarce", "to be counted...", 100, 200)) + + val oldBmAdded = JsonProtocol.blockManagerAddedToJson(blockManagerAdded) + .removeField({ _._1 == "Timestamp" }) + + val deserializedBmAdded = JsonProtocol.blockManagerAddedFromJson(oldBmAdded) + assert(SparkListenerBlockManagerAdded(-1L, blockManagerAdded.blockManagerId, + blockManagerAdded.maxMem) === deserializedBmAdded) + + val oldBmRemoved = JsonProtocol.blockManagerRemovedToJson(blockManagerRemoved) + .removeField({ _._1 == "Timestamp" }) + + val deserializedBmRemoved = JsonProtocol.blockManagerRemovedFromJson(oldBmRemoved) + assert(SparkListenerBlockManagerRemoved(-1L, blockManagerRemoved.blockManagerId) === + deserializedBmRemoved) + } + /** -------------------------- * | Helper test running methods | From b37a10f225de8a9c8037351b2c077cfdb4f873fa Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 9 Jun 2014 16:19:02 -0700 Subject: [PATCH 4/9] Use === in test assertions. --- .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 70a10e67c9ab2..45c51cbefeae8 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -260,10 +260,10 @@ class JsonProtocolSuite extends FunSuite { assertEquals(e1.environmentDetails, e2.environmentDetails) case (e1: SparkListenerBlockManagerAdded, e2: SparkListenerBlockManagerAdded) => assert(e1.maxMem === e2.maxMem) - assert(e1.time == e2.time) + assert(e1.time === e2.time) assertEquals(e1.blockManagerId, e2.blockManagerId) case (e1: SparkListenerBlockManagerRemoved, e2: SparkListenerBlockManagerRemoved) => - assert(e1.time == e2.time) + assert(e1.time === e2.time) assertEquals(e1.blockManagerId, e2.blockManagerId) case (e1: SparkListenerUnpersistRDD, e2: SparkListenerUnpersistRDD) => assert(e1.rddId == e2.rddId) From 45e3bf8b939f8a307a062bae0a1701d517162a33 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 24 Jun 2014 17:57:57 -0700 Subject: [PATCH 5/9] Fix unit test after merge. --- .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 45c51cbefeae8..73b470a92a92d 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -691,7 +691,7 @@ class JsonProtocolSuite extends FunSuite { private val blockManagerRemovedJsonString = """ {"Event":"SparkListenerBlockManagerRemoved","Block Manager ID":{"Executor ID":"Scarce", - "Host":"to be counted...","Port":100,"Netty Port":200},"Timestamp":1} + "Host":"to be counted...","Port":100,"Netty Port":200},"Timestamp":2} """ private val unpersistRDDJsonString = From d6f381ce10811bdb692a5c6f2c7851f254dd4408 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 4 Aug 2014 12:45:20 -0700 Subject: [PATCH 6/9] Update tests added after patch was created. --- .../storage/StorageStatusListenerSuite.scala | 18 +++++++++--------- .../spark/ui/storage/StorageTabSuite.scala | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala index 51fb646a3cb61..8a7883b3266dd 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala @@ -36,13 +36,13 @@ class StorageStatusListenerSuite extends FunSuite { // Block manager add assert(listener.executorIdToStorageStatus.size === 0) - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) assert(listener.executorIdToStorageStatus.size === 1) assert(listener.executorIdToStorageStatus.get("big").isDefined) assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1) assert(listener.executorIdToStorageStatus("big").maxMem === 1000L) assert(listener.executorIdToStorageStatus("big").numBlocks === 0) - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) assert(listener.executorIdToStorageStatus.size === 2) assert(listener.executorIdToStorageStatus.get("fat").isDefined) assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2) @@ -50,11 +50,11 @@ class StorageStatusListenerSuite extends FunSuite { assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) // Block manager remove - listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1)) + listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm1)) assert(listener.executorIdToStorageStatus.size === 1) assert(!listener.executorIdToStorageStatus.get("big").isDefined) assert(listener.executorIdToStorageStatus.get("fat").isDefined) - listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2)) + listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm2)) assert(listener.executorIdToStorageStatus.size === 0) assert(!listener.executorIdToStorageStatus.get("big").isDefined) assert(!listener.executorIdToStorageStatus.get("fat").isDefined) @@ -62,8 +62,8 @@ class StorageStatusListenerSuite extends FunSuite { test("task end without updated blocks") { val listener = new StorageStatusListener - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) val taskMetrics = new TaskMetrics // Task end with no updated blocks @@ -79,8 +79,8 @@ class StorageStatusListenerSuite extends FunSuite { test("task end with updated blocks") { val listener = new StorageStatusListener - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) val taskMetrics1 = new TaskMetrics val taskMetrics2 = new TaskMetrics val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L)) @@ -127,7 +127,7 @@ class StorageStatusListenerSuite extends FunSuite { test("unpersist RDD") { val listener = new StorageStatusListener - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) val taskMetrics1 = new TaskMetrics val taskMetrics2 = new TaskMetrics val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L)) diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 6e68dcb3425aa..438aa3dbd70fe 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -107,7 +107,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { val myRddInfo1 = rddInfo1 val myRddInfo2 = rddInfo2 val stageInfo0 = new StageInfo(0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details") - bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L)) + bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener._rddInfoMap.size === 3) assert(storageListener.rddInfoList.size === 0) // not cached From 7d2fe9e5c37005593f574d3895064dfa5f995d0d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 14 Aug 2014 20:33:28 -0700 Subject: [PATCH 7/9] Review feedback. --- core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 9b0acc2de1eec..bf9f09eda32d0 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -450,7 +450,7 @@ private[spark] object JsonProtocol { def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = { val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") val maxMem = (json \ "Maximum Memory").extract[Long] - val time = (json \ "Timestamp").extract[Option[Long]].getOrElse(-1L) + val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L) SparkListenerBlockManagerAdded(time, blockManagerId, maxMem) } From ec06218e568dd1bf638f69a878e3cdd37a1dc7f9 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 2 Sep 2014 10:25:56 -0700 Subject: [PATCH 8/9] Review feedback. --- core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index cccba3066b1ba..1fc536b096996 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -474,7 +474,7 @@ private[spark] object JsonProtocol { def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = { val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") - val time = (json \ "Timestamp").extract[Option[Long]].getOrElse(-1L) + val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L) SparkListenerBlockManagerRemoved(time, blockManagerId) } From d5d6e66383a809963448abcafd3583e7f145c9ae Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 2 Sep 2014 10:36:25 -0700 Subject: [PATCH 9/9] Fix tests. --- .../scala/org/apache/spark/ui/storage/StorageTabSuite.scala | 2 +- .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index ea2d32dfe6e57..e1bc1379b5d80 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -175,7 +175,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L, 0L)) taskMetrics0.updatedBlocks = Some(Seq(block0)) taskMetrics1.updatedBlocks = Some(Seq(block1)) - bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L)) + bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener.rddInfoList.size === 0) bus.postToAll(SparkListenerTaskEnd(0, 0, "big", Success, taskInfo, taskMetrics0)) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 5125c7e29cdb1..c84bafce37f70 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -157,9 +157,9 @@ class JsonProtocolSuite extends FunSuite { test("BlockManager events backward compatibility") { // SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property. val blockManagerAdded = SparkListenerBlockManagerAdded(1L, - BlockManagerId("Stars", "In your multitude...", 300, 400), 500) + BlockManagerId("Stars", "In your multitude...", 300), 500) val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L, - BlockManagerId("Scarce", "to be counted...", 100, 200)) + BlockManagerId("Scarce", "to be counted...", 100)) val oldBmAdded = JsonProtocol.blockManagerAddedToJson(blockManagerAdded) .removeField({ _._1 == "Timestamp" })