Skip to content

Commit cc43f68

Browse files
committed
Updating unit tests
1 parent c991b1b commit cc43f68

File tree

4 files changed

+44
-18
lines changed

4 files changed

+44
-18
lines changed

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,6 @@ private object Accumulators {
290290
}
291291
}
292292

293-
def stringifyPartialValue(partialValue: Any) = "%s".format(value)
293+
def stringifyPartialValue(partialValue: Any) = "%s".format(partialValue)
294294
def stringifyValue(value: Any) = "%s".format(value)
295295
}

core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,15 @@ class AccumulableInfo (
2828
val id: Long,
2929
val name: String,
3030
val update: Option[String], // represents a partial update within a task
31-
val value: String) { }
31+
val value: String) {
32+
33+
override def equals(other: Any): Boolean = other match {
34+
case acc: AccumulableInfo =>
35+
this.id == acc.id && this.name == acc.name &&
36+
this.update == acc.update && this.value == acc.value
37+
case _ => false
38+
}
39+
}
3240

3341
object AccumulableInfo {
3442
def apply(id: Long, name: String, update: Option[String], value: String) =

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import scala.collection.Map
2525
import org.json4s.DefaultFormats
2626
import org.json4s.JsonDSL._
2727
import org.json4s.JsonAST._
28+
import org.json4s.jackson.JsonMethods._
29+
2830

2931
import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics,
3032
ShuffleWriteMetrics, TaskMetrics}
@@ -538,10 +540,10 @@ private[spark] object JsonProtocol {
538540
}
539541

540542
def accumulableInfoFromJson(json: JValue): AccumulableInfo = {
541-
val id = (json \ "id").extract[Long]
542-
val name = (json \ "name").extract[String]
543-
val update = Utils.jsonOption(json \ "update").map(_.extract[String])
544-
val value = (json \ "value").extract[String]
543+
val id = (json \ "ID").extract[Long]
544+
val name = (json \ "Name").extract[String]
545+
val update = Utils.jsonOption(json \ "Update").map(_.extract[String])
546+
val value = (json \ "Value").extract[String]
545547
AccumulableInfo(id, name, update, value)
546548
}
547549

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ class JsonProtocolSuite extends FunSuite {
132132
assert(info.accumulables.nonEmpty)
133133
val oldJson = newJson
134134
.removeField { case (field, _) => field == "Details" }
135-
.removeField { case (field, _) => field == "Accumulated Values" }
135+
.removeField { case (field, _) => field == "Accumulables" }
136136

137137
val newInfo = JsonProtocol.stageInfoFromJson(oldJson)
138138

@@ -486,20 +486,26 @@ class JsonProtocolSuite extends FunSuite {
486486
private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
487487
val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, d + i, e + i) }
488488
val stageInfo = new StageInfo(a, "greetings", b, rddInfos, "details")
489-
stageInfo.accumulables("acc1") = "val1"
490-
stageInfo.accumulables("acc2") = "val2"
489+
val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2))
490+
stageInfo.accumulables(acc1.id) = acc1
491+
stageInfo.accumulables(acc2.id) = acc2
491492
stageInfo
492493
}
493494

494495
private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = {
495496
val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL,
496497
speculative)
497-
taskInfo.accumulables += (("acc1", "val1"))
498-
taskInfo.accumulables += (("acc1", "val1"))
499-
taskInfo.accumulables += (("acc2", "val2"))
498+
val (acc1, acc2, acc3) =
499+
(makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3))
500+
taskInfo.accumulables += acc1
501+
taskInfo.accumulables += acc2
502+
taskInfo.accumulables += acc3
500503
taskInfo
501504
}
502505

506+
private def makeAccumulableInfo(id: Int): AccumulableInfo =
507+
AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id)
508+
503509
/**
504510
* Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
505511
* set to true) or read data from a shuffle otherwise.
@@ -554,7 +560,8 @@ class JsonProtocolSuite extends FunSuite {
554560
"""
555561
{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name":
556562
"greetings","Number of Tasks":200,"RDD Info":[],"Details":"details",
557-
"AccumulatedValues":{"acc2":"val2","acc1":"val1"}},"Properties":
563+
"Accumulables":[{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
564+
{"ID":1,"Name":"Accumulable1","Update":"delta1","Value":"val1"}]},"Properties":
558565
{"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}}
559566
"""
560567

@@ -565,15 +572,18 @@ class JsonProtocolSuite extends FunSuite {
565572
Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true,
566573
"Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301,
567574
"Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details",
568-
"AccumulatedValues":{"acc2":"val2","acc1":"val1"}}}
575+
"Accumulables":[{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
576+
{"ID":1,"Name":"Accumulable1","Update":"delta1","Value":"val1"}]}}
569577
"""
570578

571579
private val taskStartJsonString =
572580
"""
573581
|{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
574582
|"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
575583
|"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,
576-
|"Failed":false,"AccumulableUpdates":[{"acc1":"val1"},{"acc1":"val1"},{"acc2":"val2"}]}}
584+
|"Failed":false,"Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
585+
|"Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
586+
|{"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]}}
577587
""".stripMargin
578588

579589
private val taskGettingResultJsonString =
@@ -582,7 +592,9 @@ class JsonProtocolSuite extends FunSuite {
582592
| {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor",
583593
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0,
584594
| "Finish Time":0,"Failed":false,
585-
| "AccumulableUpdates":[{"acc1":"val1"},{"acc1":"val1"},{"acc2":"val2"}]
595+
| "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
596+
| "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
597+
| {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
586598
| }
587599
|}
588600
""".stripMargin
@@ -595,7 +607,9 @@ class JsonProtocolSuite extends FunSuite {
595607
| "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
596608
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
597609
| "Getting Result Time":0,"Finish Time":0,"Failed":false,
598-
| "AccumulableUpdates":[{"acc1":"val1"},{"acc1":"val1"},{"acc2":"val2"}]
610+
| "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
611+
| "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
612+
| {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
599613
|},
600614
|"Task Metrics":{
601615
| "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
@@ -635,7 +649,9 @@ class JsonProtocolSuite extends FunSuite {
635649
| "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
636650
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
637651
| "Getting Result Time":0,"Finish Time":0,"Failed":false,
638-
| "AccumulableUpdates":[{"acc1":"val1"},{"acc1":"val1"},{"acc2":"val2"}]
652+
| "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
653+
| "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
654+
| {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
639655
|},
640656
|"Task Metrics":{
641657
| "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,

0 commit comments

Comments
 (0)