Skip to content

Commit f00c851

Browse files
committed
Fix JsonProtocol compatibility
1 parent b89c258 commit f00c851

File tree

2 files changed

+35
-20
lines changed

2 files changed

+35
-20
lines changed

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,21 @@ import org.apache.spark.scheduler._
3131
import org.apache.spark.storage._
3232
import org.apache.spark._
3333

34+
/**
35+
* Serializes SparkListener events to/from JSON. This protocol provides strong backwards-
36+
* and forwards-compatibility guarantees: any version of Spark should be able to read JSON output
37+
* written by any other version, including newer versions.
38+
*
39+
* JsonProtocolSuite contains backwards-compatibility tests which check that the current version of
40+
* JsonProtocol is able to read output written by earlier versions. We do not currently have tests
41+
* for reading newer JSON output with older Spark versions.
42+
*
43+
* To ensure that we provide these guarantees, follow these rules when modifying these methods:
44+
*
45+
* - Never delete any JSON fields.
46+
* - Any new JSON fields should be optional; use `Utils.jsonOption` when reading these fields
47+
* in `*FromJson` methods.
48+
*/
3449
private[spark] object JsonProtocol {
3550
// TODO: Remove this file and put JSON serialization into each individual class.
3651

@@ -121,8 +136,8 @@ private[spark] object JsonProtocol {
121136
val properties = propertiesToJson(jobStart.properties)
122137
("Event" -> Utils.getFormattedClassName(jobStart)) ~
123138
("Job ID" -> jobStart.jobId) ~
124-
// ("Stage IDs" -> jobStart.stageIds) ~ // Removed in 1.2.0
125-
("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ // Added in 1.2.0
139+
("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ // Added in Spark 1.2.0
140+
("Stage IDs" -> jobStart.stageIds) ~
126141
("Properties" -> properties)
127142
}
128143

@@ -454,20 +469,13 @@ private[spark] object JsonProtocol {
454469

455470
def jobStartFromJson(json: JValue): SparkListenerJobStart = {
456471
val jobId = (json \ "Job ID").extract[Int]
472+
val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int])
457473
val properties = propertiesFromJson(json \ "Properties")
458-
val stageInfos = {
459-
// Prior to 1.2.0, we serialized stageIds but not stageInfos; in 1.2.0, we do the opposite.
460-
// This block of code handles backwards compatibility:
461-
val stageIds: Option[Seq[Int]] =
462-
Utils.jsonOption(json \ "Stage IDs").map(_.extract[List[JValue]].map(_.extract[Int]))
463-
if (stageIds.isDefined) { // Reading JSON written prior to 1.2.0
464-
stageIds.get.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
465-
} else { // Reading JSON written after 1.2.0
466-
Utils.jsonOption(json \ "Stage Infos")
467-
.map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse(Seq.empty)
474+
// The "Stage Infos" field was added in Spark 1.2.0
475+
val stageInfos = Utils.jsonOption(json \ "Stage Infos")
476+
.map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
477+
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
468478
}
469-
}
470-
471479
SparkListenerJobStart(jobId, stageInfos, properties)
472480
}
473481

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import java.util.Properties
2121

2222
import scala.collection.Map
2323

24-
import org.json4s.JsonAST.JObject
25-
import org.json4s.JsonDSL._
2624
import org.json4s.jackson.JsonMethods._
2725
import org.scalatest.FunSuite
2826

@@ -51,7 +49,8 @@ class JsonProtocolSuite extends FunSuite {
5149
makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true))
5250
val jobStart = {
5351
val stageIds = Seq[Int](1, 2, 3, 4)
54-
val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400L, x * 500L))
52+
val stageInfos = stageIds.map(x =>
53+
makeStageInfo(x, x * 200, x * 300, x * 400L, x * 500L))
5554
SparkListenerJobStart(10, stageInfos, properties)
5655
}
5756
val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
@@ -233,11 +232,13 @@ class JsonProtocolSuite extends FunSuite {
233232
test("SparkListenerJobStart backward compatibility") {
234233
// Prior to Spark 1.2.0, SparkListenerJobStart did not have a "Stage Infos" property.
235234
val stageIds = Seq[Int](1, 2, 3, 4)
236-
val stageInfos = stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
235+
val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400, x * 500))
236+
val dummyStageInfos =
237+
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
237238
val jobStart = SparkListenerJobStart(10, stageInfos, properties)
238239
val oldEvent = JsonProtocol.jobStartToJson(jobStart).removeField({_._1 == "Stage Infos"})
239-
.asInstanceOf[JObject] ~ ("Stage IDs" -> stageIds)
240-
val expectedJobStart = SparkListenerJobStart(10, stageInfos, properties)
240+
val expectedJobStart =
241+
SparkListenerJobStart(10, dummyStageInfos, properties)
241242
assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldEvent))
242243
}
243244

@@ -1322,6 +1323,12 @@ class JsonProtocolSuite extends FunSuite {
13221323
| ]
13231324
| }
13241325
| ],
1326+
| "Stage IDs": [
1327+
| 1,
1328+
| 2,
1329+
| 3,
1330+
| 4
1331+
| ],
13251332
| "Properties": {
13261333
| "France": "Paris",
13271334
| "Germany": "Berlin",

0 commit comments

Comments
 (0)