Skip to content

Commit 121f4c7

Browse files
witgomarkhamstra
authored andcommitted
[SPARK-1712]: TaskDescription instance is too big causes Spark to hang
Author: witgo <[email protected]> Closes apache#694 from witgo/SPARK-1712_new and squashes the following commits: 0f52483 [witgo] review commit 83ce29b [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 52e6752 [witgo] reset test SparkContext 63636b6 [witgo] review commit 44a59ee [witgo] review commit 3b6d48c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 926bd6a [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 9a5cfad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 03cc562 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new b0930b0 [witgo] review commit b1174bd [witgo] merge master f76679b [witgo] merge master 689495d [witgo] fix scala style bug 1d35c3c [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 062c182 [witgo] fix small bug for code style 0a428cf [witgo] add unit tests 158b2dc [witgo] review commit 4afe71d [witgo] review commit 9e4ffa7 [witgo] review commit 1d35c7d [witgo] fix hang 7965580 [witgo] fix Statement order 0e29eac [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 3ea1ca1 [witgo] remove duplicate serialize 743a7ad [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 86e2048 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1712_new 2a89adc [witgo] SPARK-1712: TaskDescription instance is too big causes Spark to hang Conflicts: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
1 parent 2ef10ba commit 121f4c7

File tree

4 files changed

+73
-8
lines changed

4 files changed

+73
-8
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ import java.nio.ByteBuffer
2222
import akka.actor._
2323
import akka.remote._
2424

25-
import org.apache.spark.{SparkConf, SparkContext, Logging}
25+
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, Logging}
2626
import org.apache.spark.TaskState.TaskState
2727
import org.apache.spark.deploy.SparkHadoopUtil
2828
import org.apache.spark.deploy.worker.WorkerWatcher
2929
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
30+
import org.apache.spark.scheduler.TaskDescription
3031
import org.apache.spark.util.{AkkaUtils, Utils}
3132

3233
private[spark] class CoarseGrainedExecutorBackend(
@@ -60,12 +61,14 @@ private[spark] class CoarseGrainedExecutorBackend(
6061
logError("Slave registration failed: " + message)
6162
System.exit(1)
6263

63-
case LaunchTask(taskDesc) =>
64-
logInfo("Got assigned task " + taskDesc.taskId)
64+
case LaunchTask(data) =>
6565
if (executor == null) {
6666
logError("Received LaunchTask command but executor was null")
6767
System.exit(1)
6868
} else {
69+
val ser = SparkEnv.get.closureSerializer.newInstance()
70+
val taskDesc = ser.deserialize[TaskDescription](data.value)
71+
logInfo("Got assigned task " + taskDesc.taskId)
6972
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
7073
}
7174

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
2929
private[spark] object CoarseGrainedClusterMessages {
3030

3131
// Driver to executors
32-
case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
32+
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
3333

3434
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
3535
extends CoarseGrainedClusterMessage

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ import akka.actor._
2727
import akka.pattern.ask
2828
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
2929

30-
import org.apache.spark.{SparkException, Logging, TaskState}
30+
import org.apache.spark.{SparkException, SparkEnv, Logging, TaskState}
3131
import org.apache.spark.{Logging, SparkException, TaskState}
3232
import org.apache.spark.scheduler.{TaskSchedulerImpl, SchedulerBackend, SlaveLost, TaskDescription,
3333
WorkerOffer}
3434
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
35-
import org.apache.spark.util.{AkkaUtils, Utils}
35+
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
3636

3737
/**
3838
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -50,6 +50,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
5050
var totalCoreCount = new AtomicInteger(0)
5151
val conf = scheduler.sc.conf
5252
private val timeout = AkkaUtils.askTimeout(conf)
53+
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
5354

5455
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
5556
private val executorActor = new HashMap[String, ActorRef]
@@ -139,8 +140,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
139140
// Launch tasks returned by a set of resource offers
140141
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
141142
for (task <- tasks.flatten) {
142-
freeCores(task.executorId) -= 1
143-
executorActor(task.executorId) ! LaunchTask(task)
143+
val ser = SparkEnv.get.closureSerializer.newInstance()
144+
val serializedTask = ser.serialize(task)
145+
if (serializedTask.limit >= akkaFrameSize - 1024) {
146+
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
147+
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
148+
try {
149+
var msg = "Serialized task %s:%d was %d bytes which " +
150+
"exceeds spark.akka.frameSize (%d bytes). " +
151+
"Consider using broadcast variables for large values."
152+
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize)
153+
taskSet.abort(msg)
154+
} catch {
155+
case e: Exception => logError("Exception in error callback", e)
156+
}
157+
}
158+
}
159+
else {
160+
freeCores(task.executorId) -= 1
161+
executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
162+
}
144163
}
145164
}
146165

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkContext}
21+
import org.apache.spark.util.{SerializableBuffer, AkkaUtils}
22+
23+
import org.scalatest.FunSuite
24+
25+
class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext {
26+
27+
test("serialized task larger than akka frame size") {
28+
val conf = new SparkConf
29+
conf.set("spark.akka.frameSize","1")
30+
conf.set("spark.default.parallelism","1")
31+
sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf)
32+
val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
33+
val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
34+
val larger = sc.parallelize(Seq(buffer))
35+
val thrown = intercept[SparkException] {
36+
larger.collect()
37+
}
38+
assert(thrown.getMessage.contains("Consider using broadcast variables for large values"))
39+
val smaller = sc.parallelize(1 to 4).collect()
40+
assert(smaller.size === 4)
41+
}
42+
43+
}

0 commit comments

Comments
 (0)