Skip to content

Commit 05c3d90

Browse files
kayousterhoutrxin
authored andcommitted
[SPARK-2185] Emit warning when task size exceeds a threshold.
This functionality was added in an earlier commit but shortly after was removed due to a bad git merge (totally my fault). Author: Kay Ousterhout <[email protected]> Closes apache#1149 from kayousterhout/warning_bug and squashes the following commits: 3f1bb00 [Kay Ousterhout] Fixed Json tests 462a664 [Kay Ousterhout] Removed task set name from warning message e89b2f6 [Kay Ousterhout] Fixed Json tests. 7af424c [Kay Ousterhout] Emit warning when task size exceeds a threshold.
1 parent 3319a3e commit 05c3d90

File tree

7 files changed

+65
-36
lines changed

7 files changed

+65
-36
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -623,16 +623,6 @@ class DAGScheduler(
623623
}
624624

625625
private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
626-
for (stage <- stageIdToStage.get(task.stageId); stageInfo <- stageToInfos.get(stage)) {
627-
if (taskInfo.serializedSize > DAGScheduler.TASK_SIZE_TO_WARN * 1024 &&
628-
!stageInfo.emittedTaskSizeWarning) {
629-
stageInfo.emittedTaskSizeWarning = true
630-
logWarning(("Stage %d (%s) contains a task of very large " +
631-
"size (%d KB). The maximum recommended task size is %d KB.").format(
632-
task.stageId, stageInfo.name, taskInfo.serializedSize / 1024,
633-
DAGScheduler.TASK_SIZE_TO_WARN))
634-
}
635-
}
636626
listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
637627
submitWaitingStages()
638628
}
@@ -1254,7 +1244,4 @@ private[spark] object DAGScheduler {
12541244
// The time, in millis, to wake up between polls of the completion queue in order to potentially
12551245
// resubmit failed stages
12561246
val POLL_TIMEOUT = 10L
1257-
1258-
// Warns the user if a stage contains a task with size greater than this value (in KB)
1259-
val TASK_SIZE_TO_WARN = 100
12601247
}

core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ class StageInfo(
3838
/** If the stage failed, the reason why. */
3939
var failureReason: Option[String] = None
4040

41-
var emittedTaskSizeWarning = false
42-
4341
def stageFailed(reason: String) {
4442
failureReason = Some(reason)
4543
completionTime = Some(System.currentTimeMillis)

core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@ class TaskInfo(
4949

5050
var failed = false
5151

52-
var serializedSize: Int = 0
53-
5452
private[spark] def markGettingResult(time: Long = System.currentTimeMillis) {
5553
gettingResultTime = time
5654
}

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ private[spark] class TaskSetManager(
166166

167167
override def schedulingMode = SchedulingMode.NONE
168168

169+
var emittedTaskSizeWarning = false
170+
169171
/**
170172
* Add a task to all the pending-task lists that it should be on. If readding is set, we are
171173
* re-adding the task so only include it in each list if it's not already there.
@@ -418,6 +420,13 @@ private[spark] class TaskSetManager(
418420
// we assume the task can be serialized without exceptions.
419421
val serializedTask = Task.serializeWithDependencies(
420422
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
423+
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
424+
!emittedTaskSizeWarning) {
425+
emittedTaskSizeWarning = true
426+
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
427+
s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
428+
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
429+
}
421430
val timeTaken = clock.getTime() - startTime
422431
addRunningTask(taskId)
423432
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
@@ -764,3 +773,9 @@ private[spark] class TaskSetManager(
764773
localityWaits = myLocalityLevels.map(getLocalityWait)
765774
}
766775
}
776+
777+
private[spark] object TaskSetManager {
778+
// The user will be warned if any stages contain a task that has a serialized size greater than
779+
// this.
780+
val TASK_SIZE_TO_WARN_KB = 100
781+
}

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,7 @@ private[spark] object JsonProtocol {
190190
("Details" -> stageInfo.details) ~
191191
("Submission Time" -> submissionTime) ~
192192
("Completion Time" -> completionTime) ~
193-
("Failure Reason" -> failureReason) ~
194-
("Emitted Task Size Warning" -> stageInfo.emittedTaskSizeWarning)
193+
("Failure Reason" -> failureReason)
195194
}
196195

197196
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
@@ -205,8 +204,7 @@ private[spark] object JsonProtocol {
205204
("Speculative" -> taskInfo.speculative) ~
206205
("Getting Result Time" -> taskInfo.gettingResultTime) ~
207206
("Finish Time" -> taskInfo.finishTime) ~
208-
("Failed" -> taskInfo.failed) ~
209-
("Serialized Size" -> taskInfo.serializedSize)
207+
("Failed" -> taskInfo.failed)
210208
}
211209

212210
def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
@@ -487,13 +485,11 @@ private[spark] object JsonProtocol {
487485
val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
488486
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
489487
val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
490-
val emittedTaskSizeWarning = (json \ "Emitted Task Size Warning").extract[Boolean]
491488

492489
val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details)
493490
stageInfo.submissionTime = submissionTime
494491
stageInfo.completionTime = completionTime
495492
stageInfo.failureReason = failureReason
496-
stageInfo.emittedTaskSizeWarning = emittedTaskSizeWarning
497493
stageInfo
498494
}
499495

@@ -509,14 +505,12 @@ private[spark] object JsonProtocol {
509505
val gettingResultTime = (json \ "Getting Result Time").extract[Long]
510506
val finishTime = (json \ "Finish Time").extract[Long]
511507
val failed = (json \ "Failed").extract[Boolean]
512-
val serializedSize = (json \ "Serialized Size").extract[Int]
513508

514509
val taskInfo =
515510
new TaskInfo(taskId, index, attempt, launchTime, executorId, host, taskLocality, speculative)
516511
taskInfo.gettingResultTime = gettingResultTime
517512
taskInfo.finishTime = finishTime
518513
taskInfo.failed = failed
519-
taskInfo.serializedSize = serializedSize
520514
taskInfo
521515
}
522516

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.util.Random
21+
2022
import scala.collection.mutable.ArrayBuffer
2123
import scala.collection.mutable
2224

@@ -83,6 +85,18 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
8385
}
8486
}
8587

88+
/**
89+
* A Task implementation that results in a large serialized task.
90+
*/
91+
class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0) {
92+
val randomBuffer = new Array[Byte](TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024)
93+
val random = new Random(0)
94+
random.nextBytes(randomBuffer)
95+
96+
override def runTask(context: TaskContext): Array[Byte] = randomBuffer
97+
override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]()
98+
}
99+
86100
class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
87101
import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
88102

@@ -434,6 +448,33 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
434448
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
435449
}
436450

451+
test("do not emit warning when serialized task is small") {
452+
sc = new SparkContext("local", "test")
453+
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
454+
val taskSet = FakeTask.createTaskSet(1)
455+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
456+
457+
assert(!manager.emittedTaskSizeWarning)
458+
459+
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
460+
461+
assert(!manager.emittedTaskSizeWarning)
462+
}
463+
464+
test("emit warning when serialized task is large") {
465+
sc = new SparkContext("local", "test")
466+
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
467+
468+
val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null)
469+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
470+
471+
assert(!manager.emittedTaskSizeWarning)
472+
473+
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
474+
475+
assert(manager.emittedTaskSizeWarning)
476+
}
477+
437478
def createTaskResult(id: Int): DirectTaskResult[Int] = {
438479
val valueSer = SparkEnv.get.serializer.newInstance()
439480
new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,6 @@ class JsonProtocolSuite extends FunSuite {
257257
assert(info1.numTasks === info2.numTasks)
258258
assert(info1.submissionTime === info2.submissionTime)
259259
assert(info1.completionTime === info2.completionTime)
260-
assert(info1.emittedTaskSizeWarning === info2.emittedTaskSizeWarning)
261260
assert(info1.rddInfos.size === info2.rddInfos.size)
262261
(0 until info1.rddInfos.size).foreach { i =>
263262
assertEquals(info1.rddInfos(i), info2.rddInfos(i))
@@ -294,7 +293,6 @@ class JsonProtocolSuite extends FunSuite {
294293
assert(info1.gettingResultTime === info2.gettingResultTime)
295294
assert(info1.finishTime === info2.finishTime)
296295
assert(info1.failed === info2.failed)
297-
assert(info1.serializedSize === info2.serializedSize)
298296
}
299297

300298
private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) {
@@ -540,9 +538,8 @@ class JsonProtocolSuite extends FunSuite {
540538
private val stageSubmittedJsonString =
541539
"""
542540
{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name":
543-
"greetings","Number of Tasks":200,"RDD Info":[],"Details":"details",
544-
"Emitted Task Size Warning":false},"Properties":{"France":"Paris","Germany":"Berlin",
545-
"Russia":"Moscow","Ukraine":"Kiev"}}
541+
"greetings","Number of Tasks":200,"RDD Info":[],"Details":"details"},"Properties":
542+
{"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}}
546543
"""
547544

548545
private val stageCompletedJsonString =
@@ -551,24 +548,23 @@ class JsonProtocolSuite extends FunSuite {
551548
"greetings","Number of Tasks":201,"RDD Info":[{"RDD ID":101,"Name":"mayor","Storage
552549
Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true,
553550
"Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301,
554-
"Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details",
555-
"Emitted Task Size Warning":false}}
551+
"Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details"}}
556552
"""
557553

558554
private val taskStartJsonString =
559555
"""
560556
|{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
561557
|"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
562558
|"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,
563-
|"Failed":false,"Serialized Size":0}}
559+
|"Failed":false}}
564560
""".stripMargin
565561

566562
private val taskGettingResultJsonString =
567563
"""
568564
|{"Event":"SparkListenerTaskGettingResult","Task Info":
569565
| {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor",
570566
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0,
571-
| "Finish Time":0,"Failed":false,"Serialized Size":0
567+
| "Finish Time":0,"Failed":false
572568
| }
573569
|}
574570
""".stripMargin
@@ -580,7 +576,7 @@ class JsonProtocolSuite extends FunSuite {
580576
|"Task Info":{
581577
| "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
582578
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
583-
| "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
579+
| "Getting Result Time":0,"Finish Time":0,"Failed":false
584580
|},
585581
|"Task Metrics":{
586582
| "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
@@ -620,7 +616,7 @@ class JsonProtocolSuite extends FunSuite {
620616
|"Task Info":{
621617
| "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
622618
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
623-
| "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
619+
| "Getting Result Time":0,"Finish Time":0,"Failed":false
624620
|},
625621
|"Task Metrics":{
626622
| "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,

0 commit comments

Comments
 (0)