From 26ad09032bf52c8fae5ce40ed96b09616bcda30b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 29 Jun 2022 17:31:04 -0700 Subject: [PATCH] Fix multiple bugs in JsonProtocol --- .../org/apache/spark/util/JsonProtocol.scala | 17 ++++-- .../apache/spark/util/JsonProtocolSuite.scala | 53 +++++++++++++++++-- 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 0c15b13d5a13d..f0755b04bef82 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -512,6 +512,7 @@ private[spark] object JsonProtocol { def storageLevelToJson(storageLevel: StorageLevel): JValue = { ("Use Disk" -> storageLevel.useDisk) ~ ("Use Memory" -> storageLevel.useMemory) ~ + ("Use Off Heap" -> storageLevel.useOffHeap) ~ ("Deserialized" -> storageLevel.deserialized) ~ ("Replication" -> storageLevel.replication) } @@ -750,7 +751,7 @@ private[spark] object JsonProtocol { def executorResourceRequestFromJson(json: JValue): ExecutorResourceRequest = { val rName = (json \ "Resource Name").extract[String] - val amount = (json \ "Amount").extract[Int] + val amount = (json \ "Amount").extract[Long] val discoveryScript = (json \ "Discovery Script").extract[String] val vendor = (json \ "Vendor").extract[String] new ExecutorResourceRequest(rName, amount, discoveryScript, vendor) @@ -758,7 +759,7 @@ private[spark] object JsonProtocol { def taskResourceRequestFromJson(json: JValue): TaskResourceRequest = { val rName = (json \ "Resource Name").extract[String] - val amount = (json \ "Amount").extract[Int] + val amount = (json \ "Amount").extract[Double] new TaskResourceRequest(rName, amount) } @@ -1202,9 +1203,19 @@ private[spark] object JsonProtocol { def storageLevelFromJson(json: JValue): StorageLevel = { val useDisk = (json \ "Use Disk").extract[Boolean] val useMemory = (json \ "Use Memory").extract[Boolean] + // The "Use Off Heap" field was added in Spark 3.4.0 + val useOffHeap = jsonOption(json \ "Use Off Heap") match { + case Some(value) => value.extract[Boolean] + case None => false + } val deserialized = (json \ "Deserialized").extract[Boolean] val replication = (json \ "Replication").extract[Int] - StorageLevel(useDisk, useMemory, deserialized, replication) + StorageLevel( + useDisk = useDisk, + useMemory = useMemory, + useOffHeap = useOffHeap, + deserialized = deserialized, + replication = replication) } def blockStatusFromJson(json: JValue): BlockStatus = { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index ea6267698c848..7a18223ec5bc7 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -136,9 +136,14 @@ class JsonProtocolSuite extends SparkFunSuite { 321L, 654L, 765L, 256912L, 123456L, 123456L, 61728L, 30364L, 15182L, 10L, 90L, 2L, 20L, 80001L))) val rprofBuilder = new ResourceProfileBuilder() - val taskReq = new TaskResourceRequests().cpus(1).resource("gpu", 1) - val execReq = - new ExecutorResourceRequests().cores(2).resource("gpu", 2, "myscript") + val taskReq = new TaskResourceRequests() + .cpus(1) + .resource("gpu", 1) + .resource("fgpa", 0.5) + val execReq: ExecutorResourceRequests = new ExecutorResourceRequests() + .cores(2) + .resource("gpu", 2, "myscript") + .resource("myCustomResource", amount = Int.MaxValue + 1L, discoveryScript = "myscript2") rprofBuilder.require(taskReq).require(execReq) val resourceProfile = rprofBuilder.build resourceProfile.setResourceProfileId(21) @@ -203,6 +208,7 @@ class JsonProtocolSuite extends SparkFunSuite { testStorageLevel(StorageLevel.MEMORY_AND_DISK_2) testStorageLevel(StorageLevel.MEMORY_AND_DISK_SER) testStorageLevel(StorageLevel.MEMORY_AND_DISK_SER_2) + testStorageLevel(StorageLevel.OFF_HEAP) // JobResult val exception = new Exception("Out of Memory! Please restock film.") @@ -319,6 +325,21 @@ class JsonProtocolSuite extends SparkFunSuite { val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) } + test("StorageLevel backward compatibility") { + // "Use Off Heap" was added in Spark 3.4.0 + val level = StorageLevel( + useDisk = false, + useMemory = true, + useOffHeap = true, + deserialized = false, + replication = 1 + ) + val newJson = JsonProtocol.storageLevelToJson(level) + val oldJson = newJson.removeField { case (field, _) => field == "Use Off Heap" } + val newLevel = JsonProtocol.storageLevelFromJson(oldJson) + assert(newLevel.useOffHeap === false) + } + test("BlockManager events backward compatibility") { // SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property. val blockManagerAdded = SparkListenerBlockManagerAdded(1L, @@ -1189,6 +1210,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1437,6 +1459,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": false, | "Replication": 2 | }, @@ -1563,6 +1586,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": false, | "Replication": 2 | }, @@ -1689,6 +1713,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": false, | "Replication": 2 | }, @@ -1722,6 +1747,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1769,6 +1795,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1787,6 +1814,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1834,6 +1862,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1852,6 +1881,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1870,6 +1900,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1917,6 +1948,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1935,6 +1967,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1953,6 +1986,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -1971,6 +2005,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -2291,6 +2326,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": true, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": false, | "Replication": 2 | }, @@ -2489,6 +2525,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Storage Level": { | "Use Disk": false, | "Use Memory": true, + | "Use Off Heap": false, | "Deserialized": true, | "Replication": 1 | }, @@ -2578,6 +2615,12 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Discovery Script":"", | "Vendor":"" | }, + | "myCustomResource":{ + | "Resource Name":"myCustomResource", + | "Amount": 2147483648, + | "Discovery Script": "myscript2", + | "Vendor" : "" + | }, | "gpu":{ | "Resource Name":"gpu", | "Amount":2, @@ -2593,6 +2636,10 @@ private[spark] object JsonProtocolSuite extends Assertions { | "gpu":{ | "Resource Name":"gpu", | "Amount":1.0 + | }, + | "fgpa":{ + | "Resource Name":"fgpa", + | "Amount":0.5 | } | } |}