Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ private[spark] object JsonProtocol {
def storageLevelToJson(storageLevel: StorageLevel): JValue = {
("Use Disk" -> storageLevel.useDisk) ~
("Use Memory" -> storageLevel.useMemory) ~
("Use Off Heap" -> storageLevel.useOffHeap) ~
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

("Deserialized" -> storageLevel.deserialized) ~
("Replication" -> storageLevel.replication)
}
Expand Down Expand Up @@ -750,15 +751,15 @@ private[spark] object JsonProtocol {

def executorResourceRequestFromJson(json: JValue): ExecutorResourceRequest = {
val rName = (json \ "Resource Name").extract[String]
val amount = (json \ "Amount").extract[Int]
val amount = (json \ "Amount").extract[Long]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amount is defined as a long at

@Evolving
@Since("3.1.0")
class ExecutorResourceRequest(
val resourceName: String,
val amount: Long,
val discoveryScript: String = "",
val vendor: String = "") extends Serializable {

val discoveryScript = (json \ "Discovery Script").extract[String]
val vendor = (json \ "Vendor").extract[String]
new ExecutorResourceRequest(rName, amount, discoveryScript, vendor)
}

def taskResourceRequestFromJson(json: JValue): TaskResourceRequest = {
val rName = (json \ "Resource Name").extract[String]
val amount = (json \ "Amount").extract[Int]
val amount = (json \ "Amount").extract[Double]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amount is defined as a double at

@Evolving
@Since("3.1.0")
class TaskResourceRequest(val resourceName: String, val amount: Double)
extends Serializable {

new TaskResourceRequest(rName, amount)
}

Expand Down Expand Up @@ -1202,9 +1203,19 @@ private[spark] object JsonProtocol {
def storageLevelFromJson(json: JValue): StorageLevel = {
val useDisk = (json \ "Use Disk").extract[Boolean]
val useMemory = (json \ "Use Memory").extract[Boolean]
// The "Use Off Heap" field was added in Spark 3.4.0
val useOffHeap = jsonOption(json \ "Use Off Heap") match {
case Some(value) => value.extract[Boolean]
case None => false
}
val deserialized = (json \ "Deserialized").extract[Boolean]
val replication = (json \ "Replication").extract[Int]
StorageLevel(useDisk, useMemory, deserialized, replication)
StorageLevel(
useDisk = useDisk,
useMemory = useMemory,
useOffHeap = useOffHeap,
deserialized = deserialized,
replication = replication)
}

def blockStatusFromJson(json: JValue): BlockStatus = {
Expand Down
53 changes: 50 additions & 3 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,14 @@ class JsonProtocolSuite extends SparkFunSuite {
321L, 654L, 765L, 256912L, 123456L, 123456L, 61728L,
30364L, 15182L, 10L, 90L, 2L, 20L, 80001L)))
val rprofBuilder = new ResourceProfileBuilder()
val taskReq = new TaskResourceRequests().cpus(1).resource("gpu", 1)
val execReq =
new ExecutorResourceRequests().cores(2).resource("gpu", 2, "myscript")
val taskReq = new TaskResourceRequests()
.cpus(1)
.resource("gpu", 1)
.resource("fgpa", 0.5)
val execReq: ExecutorResourceRequests = new ExecutorResourceRequests()
.cores(2)
.resource("gpu", 2, "myscript")
.resource("myCustomResource", amount = Int.MaxValue + 1L, discoveryScript = "myscript2")
rprofBuilder.require(taskReq).require(execReq)
val resourceProfile = rprofBuilder.build
resourceProfile.setResourceProfileId(21)
Expand Down Expand Up @@ -203,6 +208,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testStorageLevel(StorageLevel.MEMORY_AND_DISK_2)
testStorageLevel(StorageLevel.MEMORY_AND_DISK_SER)
testStorageLevel(StorageLevel.MEMORY_AND_DISK_SER_2)
testStorageLevel(StorageLevel.OFF_HEAP)

// JobResult
val exception = new Exception("Out of Memory! Please restock film.")
Expand Down Expand Up @@ -319,6 +325,21 @@ class JsonProtocolSuite extends SparkFunSuite {
val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
}

test("StorageLevel backward compatibility") {
// "Use Off Heap" was added in Spark 3.4.0
val level = StorageLevel(
useDisk = false,
useMemory = true,
useOffHeap = true,
deserialized = false,
replication = 1
)
val newJson = JsonProtocol.storageLevelToJson(level)
val oldJson = newJson.removeField { case (field, _) => field == "Use Off Heap" }
val newLevel = JsonProtocol.storageLevelFromJson(oldJson)
assert(newLevel.useOffHeap === false)
}

test("BlockManager events backward compatibility") {
// SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property.
val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
Expand Down Expand Up @@ -1189,6 +1210,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand Down Expand Up @@ -1437,6 +1459,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": false,
| "Replication": 2
| },
Expand Down Expand Up @@ -1563,6 +1586,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": false,
| "Replication": 2
| },
Expand Down Expand Up @@ -1689,6 +1713,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": false,
| "Replication": 2
| },
Expand Down Expand Up @@ -1722,6 +1747,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand Down Expand Up @@ -1769,6 +1795,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand All @@ -1787,6 +1814,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand Down Expand Up @@ -1834,6 +1862,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand All @@ -1852,6 +1881,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand All @@ -1870,6 +1900,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand Down Expand Up @@ -1917,6 +1948,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand All @@ -1935,6 +1967,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand All @@ -1953,6 +1986,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand All @@ -1971,6 +2005,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand Down Expand Up @@ -2291,6 +2326,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": false,
| "Replication": 2
| },
Expand Down Expand Up @@ -2489,6 +2525,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Storage Level": {
| "Use Disk": false,
| "Use Memory": true,
| "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
Expand Down Expand Up @@ -2578,6 +2615,12 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Discovery Script":"",
| "Vendor":""
| },
| "myCustomResource":{
| "Resource Name":"myCustomResource",
| "Amount": 2147483648,
| "Discovery Script": "myscript2",
| "Vendor" : ""
| },
| "gpu":{
| "Resource Name":"gpu",
| "Amount":2,
Expand All @@ -2593,6 +2636,10 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "gpu":{
| "Resource Name":"gpu",
| "Amount":1.0
| },
| "fgpa":{
| "Resource Name":"fgpa",
| "Amount":0.5
| }
| }
|}
Expand Down