From 4e0882188ae90b71fa75f83f248a313d21afa042 Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Sun, 8 Jan 2017 19:18:59 +0800 Subject: [PATCH 01/14] Move task serialization from the TaskSetManager to the CoarseGrainedSchedulerBackend --- .../org/apache/spark/executor/Executor.scala | 4 +- .../apache/spark/scheduler/DAGScheduler.scala | 5 +++ .../spark/scheduler/TaskDescription.scala | 35 +++++++++++++++- .../spark/scheduler/TaskSchedulerImpl.scala | 26 ++++-------- .../spark/scheduler/TaskSetManager.scala | 28 ++----------- .../scala/org/apache/spark/util/Utils.scala | 7 +++- .../spark/scheduler/DAGSchedulerSuite.scala | 27 ++++++++++++ .../scheduler/TaskSchedulerImplSuite.scala | 23 ----------- .../spark/scheduler/TaskSetManagerSuite.scala | 41 ------------------- 9 files changed, 86 insertions(+), 110 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 975a6e4eeb33a..1128fce160689 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -287,8 +287,8 @@ private[spark] class Executor( Executor.taskDeserializationProps.set(taskDescription.properties) updateDependencies(taskDescription.addedFiles, taskDescription.addedJars) - task = ser.deserialize[Task[Any]]( - taskDescription.serializedTask, Thread.currentThread.getContextClassLoader) + task = Utils.deserialize(taskDescription.serializedTask, + Thread.currentThread.getContextClassLoader).asInstanceOf[Task[Any]] task.localProperties = taskDescription.properties task.setTaskMemoryManager(taskMemoryManager) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 692ed8083475c..1aed8095e3d26 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -992,6 +992,11 @@ class DAGScheduler( JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } + if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { + logWarning(s"Stage ${stage.id} contains a task of very large size " + + s"(${taskBinaryBytes.length / 1024} KB). The maximum recommended task size is " + + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") + } taskBinary = sc.broadcast(taskBinaryBytes) } catch { // In the case of a failure during serialization, abort the stage. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 78aa5c40010cc..31924766d57ca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -23,7 +23,10 @@ import java.util.Properties import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, Map} +import scala.util.control.NonFatal +import org.apache.spark.internal.Logging +import org.apache.spark.TaskNotSerializableException import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils} /** @@ -52,7 +55,36 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, - val serializedTask: ByteBuffer) { + private var serializedTask_ : ByteBuffer, + private var task_ : Task[_] = null) extends Logging { + + def this( + taskId: Long, + attemptNumber: Int, + executorId: String, + name: String, + index: Int, // Index within this task's TaskSet + addedFiles: Map[String, Long], + addedJars: Map[String, Long], + properties: Properties, + task: Task[_]) { + this(taskId, attemptNumber, executorId, name, index, + addedFiles, addedJars, properties, null, task) + } + + lazy val serializedTask: ByteBuffer = { + if (serializedTask_ == null) { + serializedTask_ = try { + ByteBuffer.wrap(Utils.serialize(task_)) + } catch { + case NonFatal(e) => + val msg = s"Failed to serialize task $taskId, not attempting to retry it." + logError(msg, e) + throw new TaskNotSerializableException(e) + } + } + serializedTask_ + } override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index) } @@ -66,6 +98,7 @@ private[spark] object TaskDescription { } } + @throws[TaskNotSerializableException] def encode(taskDescription: TaskDescription): ByteBuffer = { val bytesOut = new ByteBufferOutputStream(4096) val dataOut = new DataOutputStream(bytesOut) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index bfbcfa1aa386f..9c5fcf14941ea 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -277,23 +277,15 @@ private[spark] class TaskSchedulerImpl private[scheduler]( val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host if (availableCpus(i) >= CPUS_PER_TASK) { - try { - for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { - tasks(i) += task - val tid = task.taskId - taskIdToTaskSetManager(tid) = taskSet - taskIdToExecutorId(tid) = execId - executorIdToRunningTaskIds(execId).add(tid) - availableCpus(i) -= CPUS_PER_TASK - assert(availableCpus(i) >= 0) - launchedTask = true - } - } catch { - case e: TaskNotSerializableException => - logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") - // Do not offer resources for this task, but don't throw an error to allow other - // task sets to be submitted. - return launchedTask + for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { + tasks(i) += task + val tid = task.taskId + taskIdToTaskSetManager(tid) = taskSet + taskIdToExecutorId(tid) = execId + executorIdToRunningTaskIds(execId).add(tid) + availableCpus(i) -= CPUS_PER_TASK + assert(availableCpus(i) >= 0) + launchedTask = true } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 3b25513bea057..c332ec7ede9c2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -65,7 +65,6 @@ private[spark] class TaskSetManager( // Serializer for closures and tasks. val env = SparkEnv.get - val ser = env.closureSerializer.newInstance() val tasks = taskSet.tasks val numTasks = tasks.length @@ -182,8 +181,6 @@ private[spark] class TaskSetManager( override def schedulingMode: SchedulingMode = SchedulingMode.NONE - var emittedTaskSizeWarning = false - /** Add a task to all the pending-task lists that it should be on. */ private def addPendingTask(index: Int) { for (loc <- tasks(index).preferredLocations) { @@ -413,7 +410,6 @@ private[spark] class TaskSetManager( * @param host the host Id of the offered resource * @param maxLocality the maximum locality we want to schedule the tasks at */ - @throws[TaskNotSerializableException] def resourceOffer( execId: String, host: String, @@ -454,25 +450,7 @@ private[spark] class TaskSetManager( currentLocalityIndex = getLocalityIndex(taskLocality) lastLaunchTime = curTime } - // Serialize and return the task - val serializedTask: ByteBuffer = try { - ser.serialize(task) - } catch { - // If the task cannot be serialized, then there's no point to re-attempt the task, - // as it will always fail. So just abort the whole task-set. - case NonFatal(e) => - val msg = s"Failed to serialize task $taskId, not attempting to retry it." - logError(msg, e) - abort(s"$msg Exception during serialization: $e") - throw new TaskNotSerializableException(e) - } - if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && - !emittedTaskSizeWarning) { - emittedTaskSizeWarning = true - logWarning(s"Stage ${task.stageId} contains a task of very large size " + - s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + - s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") - } + addRunningTask(taskId) // We used to log the time it takes to serialize the task, but task size is already @@ -480,7 +458,7 @@ private[spark] class TaskSetManager( // val timeTaken = clock.getTime() - startTime val taskName = s"task ${info.id} in stage ${taskSet.id}" logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + - s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)") + s"partition ${task.partitionId}, $taskLocality)") sched.dagScheduler.taskStarted(task, info) new TaskDescription( @@ -492,7 +470,7 @@ private[spark] class TaskSetManager( sched.sc.addedFiles, sched.sc.addedJars, task.localProperties, - serializedTask) + task) } } else { None diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 10e5233679562..b0403fe9a0097 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -150,7 +150,12 @@ private[spark] object Utils extends Logging { /** Deserialize an object using Java serialization and the given ClassLoader */ def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = { - val bis = new ByteArrayInputStream(bytes) + deserialize(ByteBuffer.wrap(bytes), loader) + } + + /** Deserialize an object using Java serialization and the given ClassLoader */ + def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { + val bis = new ByteBufferInputStream(bytes) val ois = new ObjectInputStream(bis) { override def resolveClass(desc: ObjectStreamClass): Class[_] = { // scalastyle:off classforname diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 8eaf9dfcf49b1..a00342facd72c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler +import java.io.{IOException, NotSerializableException, ObjectInputStream, ObjectOutputStream} import java.util.Properties import java.util.concurrent.atomic.AtomicBoolean @@ -517,6 +518,32 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } + test("unserializable partition") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new Partitioner { + override def numPartitions = 1 + + override def getPartition(key: Any) = 1 + + @throws(classOf[IOException]) + private def writeObject(out: ObjectOutputStream): Unit = { + throw new NotSerializableException() + } + + @throws(classOf[IOException]) + private def readObject(in: ObjectInputStream): Unit = {} + }) + + // Submit a map stage by itself + submitMapStage(shuffleDep) + assert(failure.getMessage.startsWith( + "Job aborted due to stage failure: Task not serializable")) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + assert(sparkListener.failedStages.contains(0)) + assert(sparkListener.failedStages.size === 1) + assertDataStructuresEmpty() + } + test("trivial job failure") { submit(new MyRDD(sc, 1, Nil), Array(0)) failed(taskSets(0), "some failure") diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 9ae0bcd9b8860..5edd1992002c1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -178,29 +178,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } - test("Scheduler does not crash when tasks are not serializable") { - val taskCpus = 2 - val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) - val numFreeCores = 1 - val taskSet = new TaskSet( - Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null) - val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", taskCpus), - new WorkerOffer("executor1", "host1", numFreeCores)) - taskScheduler.submitTasks(taskSet) - var taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten - assert(0 === taskDescriptions.length) - assert(failedTaskSet) - assert(failedTaskSetReason.contains("Failed to serialize task")) - - // Now check that we can still submit tasks - // Even if one of the task sets has not-serializable tasks, the other task set should - // still be processed without error - taskScheduler.submitTasks(FakeTask.createTaskSet(1)) - taskScheduler.submitTasks(taskSet) - taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten - assert(taskDescriptions.map(_.executorId) === Seq("executor0")) - } - test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") { val taskScheduler = setupScheduler() val attempt1 = FakeTask.createTaskSet(1, 0) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index d03a0c990a02b..04f7a99d95289 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -598,47 +598,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1) } - test("do not emit warning when serialized task is small") { - sc = new SparkContext("local", "test") - sched = new FakeTaskScheduler(sc, ("exec1", "host1")) - val taskSet = FakeTask.createTaskSet(1) - val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) - - assert(!manager.emittedTaskSizeWarning) - - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) - - assert(!manager.emittedTaskSizeWarning) - } - - test("emit warning when serialized task is large") { - sc = new SparkContext("local", "test") - sched = new FakeTaskScheduler(sc, ("exec1", "host1")) - - val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null) - val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) - - assert(!manager.emittedTaskSizeWarning) - - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) - - assert(manager.emittedTaskSizeWarning) - } - - test("Not serializable exception thrown if the task cannot be serialized") { - sc = new SparkContext("local", "test") - sched = new FakeTaskScheduler(sc, ("exec1", "host1")) - - val taskSet = new TaskSet( - Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null) - val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) - - intercept[TaskNotSerializableException] { - manager.resourceOffer("exec1", "host1", ANY) - } - assert(manager.isZombie) - } - test("abort the job if total size of results is too large") { val conf = new SparkConf().set("spark.driver.maxResultSize", "2m") sc = new SparkContext("local", "test", conf) From cb738e5b4f17445babff4fe68f0076bd802738b5 Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Wed, 11 Jan 2017 14:05:53 +0800 Subject: [PATCH 02/14] review commits --- .../spark/scheduler/TaskDescription.scala | 17 ++++++++++++----- .../apache/spark/scheduler/TaskSetManager.scala | 2 ++ .../cluster/CoarseGrainedSchedulerBackend.scala | 10 ++++++++++ .../spark/scheduler/TaskSetManagerSuite.scala | 13 +++++++++++++ 4 files changed, 37 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 31924766d57ca..66e4f0d1e0394 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -55,8 +55,7 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, - private var serializedTask_ : ByteBuffer, - private var task_ : Task[_] = null) extends Logging { + private var serializedTask_ : ByteBuffer) extends Logging { def this( taskId: Long, @@ -68,12 +67,20 @@ private[spark] class TaskDescription( addedJars: Map[String, Long], properties: Properties, task: Task[_]) { - this(taskId, attemptNumber, executorId, name, index, - addedFiles, addedJars, properties, null, task) + this(taskId, attemptNumber, executorId, name, index, + addedFiles, addedJars, properties, null.asInstanceOf[ByteBuffer]) + task_ = task } - lazy val serializedTask: ByteBuffer = { + private var task_ : Task[_] = null + + def serializedTask: ByteBuffer = { if (serializedTask_ == null) { + // This is where we serialize the task on the driver before sending it to the executor. + // This is not done when creating the TaskDescription so we can postpone this serialization + // to later in the scheduling process -- particularly, + // so it can happen in another thread by the CoarseGrainedSchedulerBackend. + // On the executors, this will already be populated by decode serializedTask_ = try { ByteBuffer.wrap(Utils.serialize(task_)) } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c332ec7ede9c2..fd84ce58d60d6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -181,6 +181,8 @@ private[spark] class TaskSetManager( override def schedulingMode: SchedulingMode = SchedulingMode.NONE + var emittedTaskSizeWarning = false + /** Add a task to all the pending-task lists that it should be on. */ private def addPendingTask(index: Int) { for (loc <- tasks(index).preferredLocations) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 94abe30bb12f2..ae36b3c9af016 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -258,6 +258,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { val serializedTask = TaskDescription.encode(task) + if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { + scheduler.taskIdToTaskSetManager.get(task.taskId).filterNot(_.emittedTaskSizeWarning). + foreach { taskSetMgr => + taskSetMgr.emittedTaskSizeWarning = true + val stageId = taskSetMgr.taskSet.stageId + logWarning(s"Stage $stageId contains a task of very large size " + + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") + } + } if (serializedTask.limit >= maxRpcMessageSize) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 04f7a99d95289..41485277aea70 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -598,6 +598,19 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1) } + test("do not emit warning when serialized task is small") { + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + val taskSet = FakeTask.createTaskSet(1) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) + + assert(!manager.emittedTaskSizeWarning) + + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + + assert(!manager.emittedTaskSizeWarning) + } + test("abort the job if total size of results is too large") { val conf = new SparkConf().set("spark.driver.maxResultSize", "2m") sc = new SparkContext("local", "test", conf) From 13ab2bf4673b39d80be87b0125e07b43a9a62ecb Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Fri, 13 Jan 2017 10:10:03 +0800 Subject: [PATCH 03/14] add test "Scheduler aborts stages that have unserializable partition" --- .../CoarseGrainedSchedulerBackend.scala | 72 ++++++++++++------- .../CoarseGrainedSchedulerBackendSuite.scala | 43 ++++++++++- .../spark/scheduler/DAGSchedulerSuite.scala | 2 +- 3 files changed, 87 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ae36b3c9af016..fc58b993c558b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -23,7 +23,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Future -import scala.concurrent.duration.Duration +import scala.util.control.NonFatal import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} import org.apache.spark.internal.Logging @@ -31,6 +31,7 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.ENDPOINT_NAME +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.abortTaskSetManager import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils} /** @@ -257,42 +258,45 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { - val serializedTask = TaskDescription.encode(task) - if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { - scheduler.taskIdToTaskSetManager.get(task.taskId).filterNot(_.emittedTaskSizeWarning). - foreach { taskSetMgr => - taskSetMgr.emittedTaskSizeWarning = true - val stageId = taskSetMgr.taskSet.stageId - logWarning(s"Stage $stageId contains a task of very large size " + - s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + - s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") - } + val serializedTask = try { + TaskDescription.encode(task) + } catch { + case NonFatal(e) => + abortTaskSetManager(scheduler, task.taskId, + s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) + null } - if (serializedTask.limit >= maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => - try { - var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + - "spark.rpc.message.maxSize (%d bytes). Consider increasing " + - "spark.rpc.message.maxSize or using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) - taskSetMgr.abort(msg) - } catch { - case e: Exception => logError("Exception in error callback", e) - } + + if (serializedTask != null && serializedTask.limit >= maxRpcMessageSize) { + val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + + "spark.rpc.message.maxSize or using broadcast variables for large values." + abortTaskSetManager(scheduler, task.taskId, + msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)) + } else if (serializedTask != null) { + if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { + scheduler.taskIdToTaskSetManager.get(task.taskId).filterNot(_.emittedTaskSizeWarning). + foreach { taskSetMgr => + taskSetMgr.emittedTaskSizeWarning = true + val stageId = taskSetMgr.taskSet.stageId + logWarning(s"Stage $stageId contains a task of very large size " + + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") + } } - } - else { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK - logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + - s"${executorData.executorHost}.") + logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} " + + s" hostname: ${executorData.executorHost}.") executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } + } } + // Remove a disconnected slave from the cluster private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { logDebug(s"Asked to remove executor $executorId with reason $reason") @@ -631,6 +635,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } -private[spark] object CoarseGrainedSchedulerBackend { +private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" + // abort TaskSetManager without exception + def abortTaskSetManager( + scheduler: TaskSchedulerImpl, + taskId: Long, + msg: => String, + exception: Option[Throwable] = None): Unit = { + scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => + try { + taskSetMgr.abort(msg, exception) + } catch { + case e: Exception => logError("Exception in error callback", e) + } + } + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 04cccc67e328e..c309ea713e600 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -17,11 +17,37 @@ package org.apache.spark.scheduler -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} +import java.io.{IOException, NotSerializableException, ObjectInputStream, ObjectOutputStream} + +import org.apache.spark._ +import org.apache.spark.rdd.RDD import org.apache.spark.util.{RpcUtils, SerializableBuffer} -class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext { +class NotSerializablePartitionRDD( + sc: SparkContext, + numPartitions: Int) extends RDD[(Int, Int)](sc, Nil) with Serializable { + + override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = + throw new RuntimeException("should not be reached") + + override def getPartitions: Array[Partition] = (0 until numPartitions).map(i => new Partition { + override def index: Int = i + + @throws(classOf[IOException]) + private def writeObject(out: ObjectOutputStream): Unit = { + throw new NotSerializableException() + } + + @throws(classOf[IOException]) + private def readObject(in: ObjectInputStream): Unit = {} + }).toArray + override def getPreferredLocations(partition: Partition): Seq[String] = Nil + + override def toString: String = "DAGSchedulerSuiteRDD " + id +} + +class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext { test("serialized task larger than max RPC message size") { val conf = new SparkConf conf.set("spark.rpc.message.maxSize", "1") @@ -38,4 +64,17 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo assert(smaller.size === 4) } + test("Scheduler aborts stages that have unserializable partition") { + val conf = new SparkConf() + .setMaster("local-cluster[2, 1, 1024]") + .setAppName("test") + .set("spark.dynamicAllocation.testing", "true") + sc = new SparkContext(conf) + val myRDD = new NotSerializablePartitionRDD(sc, 2) + val e = intercept[SparkException] { + myRDD.count() + } + assert(e.getMessage.contains("Failed to serialize task")) + + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a00342facd72c..ae98ee7cdbc12 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -518,7 +518,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } - test("unserializable partition") { + test("unserializable partitioner") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new Partitioner { override def numPartitions = 1 From 543a454c3cc8853253dbcffa61f7d7612e1a3d23 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 13 Jan 2017 15:42:44 -0600 Subject: [PATCH 04/14] refactor --- .../CoarseGrainedExecutorBackend.scala | 4 +-- .../org/apache/spark/executor/Executor.scala | 12 ++++++--- .../spark/scheduler/TaskDescription.scala | 27 ++++++++----------- .../CoarseGrainedSchedulerBackend.scala | 2 +- .../local/LocalSchedulerBackend.scala | 2 +- .../apache/spark/executor/ExecutorSuite.scala | 19 ++++++------- .../scheduler/TaskDescriptionSuite.scala | 7 +++-- 7 files changed, 36 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b376ecd301eab..8f4909ea2939e 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -92,9 +92,9 @@ private[spark] class CoarseGrainedExecutorBackend( if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { - val taskDesc = TaskDescription.decode(data.value) + val (taskDesc, serializedTask) = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) - executor.launchTask(this, taskDesc) + executor.launchTask(this, taskDesc, serializedTask) } case KillTask(taskId, _, interruptThread) => diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 1128fce160689..54449b1bcd2c4 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -150,8 +150,11 @@ private[spark] class Executor( private[executor] def numRunningTasks: Int = runningTasks.size() - def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { - val tr = new TaskRunner(context, taskDescription) + def launchTask( + context: ExecutorBackend, + taskDescription: TaskDescription, + serializedTask: ByteBuffer): Unit = { + val tr = new TaskRunner(context, taskDescription, serializedTask) runningTasks.put(taskDescription.taskId, tr) threadPool.execute(tr) } @@ -208,7 +211,8 @@ private[spark] class Executor( class TaskRunner( execBackend: ExecutorBackend, - private val taskDescription: TaskDescription) + private val taskDescription: TaskDescription, + private val serializedTask: ByteBuffer) extends Runnable { val taskId = taskDescription.taskId @@ -287,7 +291,7 @@ private[spark] class Executor( Executor.taskDeserializationProps.set(taskDescription.properties) updateDependencies(taskDescription.addedFiles, taskDescription.addedJars) - task = Utils.deserialize(taskDescription.serializedTask, + task = Utils.deserialize(serializedTask, Thread.currentThread.getContextClassLoader).asInstanceOf[Task[Any]] task.localProperties = taskDescription.properties task.setTaskMemoryManager(taskMemoryManager) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 66e4f0d1e0394..175a9e71d6715 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -54,8 +54,7 @@ private[spark] class TaskDescription( val index: Int, // Index within this task's TaskSet val addedFiles: Map[String, Long], val addedJars: Map[String, Long], - val properties: Properties, - private var serializedTask_ : ByteBuffer) extends Logging { + val properties: Properties) extends Logging { def this( taskId: Long, @@ -68,20 +67,16 @@ private[spark] class TaskDescription( properties: Properties, task: Task[_]) { this(taskId, attemptNumber, executorId, name, index, - addedFiles, addedJars, properties, null.asInstanceOf[ByteBuffer]) + addedFiles, addedJars, properties) task_ = task } - private var task_ : Task[_] = null - - def serializedTask: ByteBuffer = { - if (serializedTask_ == null) { + def serializedTask: ByteBuffer = { // This is where we serialize the task on the driver before sending it to the executor. // This is not done when creating the TaskDescription so we can postpone this serialization // to later in the scheduling process -- particularly, // so it can happen in another thread by the CoarseGrainedSchedulerBackend. - // On the executors, this will already be populated by decode - serializedTask_ = try { + try { ByteBuffer.wrap(Utils.serialize(task_)) } catch { case NonFatal(e) => @@ -90,8 +85,8 @@ private[spark] class TaskDescription( throw new TaskNotSerializableException(e) } } - serializedTask_ - } + + private var task_ : Task[_] = null override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index) } @@ -106,7 +101,7 @@ private[spark] object TaskDescription { } @throws[TaskNotSerializableException] - def encode(taskDescription: TaskDescription): ByteBuffer = { + def encode(taskDescription: TaskDescription, serializedTask: ByteBuffer): ByteBuffer = { val bytesOut = new ByteBufferOutputStream(4096) val dataOut = new DataOutputStream(bytesOut) @@ -130,7 +125,7 @@ private[spark] object TaskDescription { } // Write the task. The task is already serialized, so write it directly to the byte buffer. - Utils.writeByteBuffer(taskDescription.serializedTask, bytesOut) + Utils.writeByteBuffer(serializedTask, bytesOut) dataOut.close() bytesOut.close() @@ -146,7 +141,7 @@ private[spark] object TaskDescription { map } - def decode(byteBuffer: ByteBuffer): TaskDescription = { + def decode(byteBuffer: ByteBuffer): (TaskDescription, ByteBuffer) = { val dataIn = new DataInputStream(new ByteBufferInputStream(byteBuffer)) val taskId = dataIn.readLong() val attemptNumber = dataIn.readInt() @@ -170,7 +165,7 @@ private[spark] object TaskDescription { // Create a sub-buffer for the serialized task into its own buffer (to be deserialized later). val serializedTask = byteBuffer.slice() - new TaskDescription(taskId, attemptNumber, executorId, name, index, taskFiles, taskJars, - properties, serializedTask) + (new TaskDescription(taskId, attemptNumber, executorId, name, index, taskFiles, taskJars, + properties), serializedTask) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index fc58b993c558b..87a2dd7866e0d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -259,7 +259,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { val serializedTask = try { - TaskDescription.encode(task) + TaskDescription.encode(task, task.serializedTask) } catch { case NonFatal(e) => abortTaskSetManager(scheduler, task.taskId, diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index 625f998cd4608..fc760a8ec0912 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -84,7 +84,7 @@ private[spark] class LocalEndpoint( val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) for (task <- scheduler.resourceOffers(offers).flatten) { freeCores -= scheduler.CPUS_PER_TASK - executor.launchTask(executorBackend, task) + executor.launchTask(executorBackend, task, task.serializedTask) } } } diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index b743ff5376c49..f787b541c3567 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -49,7 +49,7 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) val serializedTask = serializer.newInstance().serialize(new FakeTask(0, 0)) - val taskDescription = createFakeTaskDescription(serializedTask) + val taskDescription = createFakeTaskDescription() // we use latches to force the program to run in this order: // +-----------------------------+---------------------------------------+ @@ -99,7 +99,7 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug try { executor = new Executor("id", "localhost", env, userClassPath = Nil, isLocal = true) // the task will be launched in a dedicated worker thread - executor.launchTask(mockExecutorBackend, taskDescription) + executor.launchTask(mockExecutorBackend, taskDescription, serializedTask) if (!executorSuiteHelper.latch1.await(5, TimeUnit.SECONDS)) { fail("executor did not send first status update in time") @@ -128,9 +128,9 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug val serializer = new JavaSerializer(conf) val env = createMockEnv(conf, serializer) val serializedTask = serializer.newInstance().serialize(new NonDeserializableTask) - val taskDescription = createFakeTaskDescription(serializedTask) + val taskDescription = createFakeTaskDescription() - val failReason = runTaskAndGetFailReason(taskDescription) + val failReason = runTaskAndGetFailReason(taskDescription, serializedTask) failReason match { case ef: ExceptionFailure => assert(ef.exception.isDefined) @@ -155,7 +155,7 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug mockEnv } - private def createFakeTaskDescription(serializedTask: ByteBuffer): TaskDescription = { + private def createFakeTaskDescription(): TaskDescription = { new TaskDescription( taskId = 0, attemptNumber = 0, @@ -164,17 +164,18 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug index = 0, addedFiles = Map[String, Long](), addedJars = Map[String, Long](), - properties = new Properties, - serializedTask) + properties = new Properties) } - private def runTaskAndGetFailReason(taskDescription: TaskDescription): TaskFailedReason = { + private def runTaskAndGetFailReason( + taskDescription: TaskDescription, + serializedTask: ByteBuffer): TaskFailedReason = { val mockBackend = mock[ExecutorBackend] var executor: Executor = null try { executor = new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, isLocal = true) // the task will be launched in a dedicated worker thread - executor.launchTask(mockBackend, taskDescription) + executor.launchTask(mockBackend, taskDescription, serializedTask) eventually(timeout(5 seconds), interval(10 milliseconds)) { assert(executor.numRunningTasks === 0) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala index 9f1fe0515732e..cac74f317e304 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala @@ -48,12 +48,11 @@ class TaskDescriptionSuite extends SparkFunSuite { index = 19, originalFiles, originalJars, - originalProperties, - taskBuffer + originalProperties ) - val serializedTaskDescription = TaskDescription.encode(originalTaskDescription) - val decodedTaskDescription = TaskDescription.decode(serializedTaskDescription) + val serializedTaskDescription = TaskDescription.encode(originalTaskDescription, taskBuffer) + val (decodedTaskDescription, _) = TaskDescription.decode(serializedTaskDescription) // Make sure that all of the fields in the decoded task description match the original. assert(decodedTaskDescription.taskId === originalTaskDescription.taskId) From 6f042062fbea721856fc037baf1c87c79e8703b5 Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Mon, 16 Jan 2017 09:10:44 +0800 Subject: [PATCH 05/14] create all the serialized tasks to make sure they all work --- .../CoarseGrainedSchedulerBackend.scala | 51 ++++++++++--------- .../local/LocalSchedulerBackend.scala | 24 +++++++-- .../CoarseGrainedSchedulerBackendSuite.scala | 8 +-- .../scheduler/TaskDescriptionSuite.scala | 4 +- .../spark/executor/MesosExecutorBackend.scala | 4 +- .../MesosFineGrainedSchedulerBackend.scala | 45 ++++++++++------ ...esosFineGrainedSchedulerBackendSuite.scala | 6 +-- 7 files changed, 88 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 87a2dd7866e0d..7dfba4c9ef4d3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.cluster +import java.nio.ByteBuffer import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import javax.annotation.concurrent.GuardedBy @@ -257,24 +258,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { - for (task <- tasks.flatten) { - val serializedTask = try { - TaskDescription.encode(task, task.serializedTask) - } catch { - case NonFatal(e) => + val serializedTasks = tasks.flatten.map { task => + var serializedTask: ByteBuffer = null + try { + serializedTask = TaskDescription.encode(task, task.serializedTask) + if (serializedTask.limit >= maxRpcMessageSize) { + val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + + "spark.rpc.message.maxSize or using broadcast variables for large values." abortTaskSetManager(scheduler, task.taskId, - s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) - null - } - - if (serializedTask != null && serializedTask.limit >= maxRpcMessageSize) { - val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + - "spark.rpc.message.maxSize (%d bytes). Consider increasing " + - "spark.rpc.message.maxSize or using broadcast variables for large values." - abortTaskSetManager(scheduler, task.taskId, - msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)) - } else if (serializedTask != null) { - if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { + msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)) + serializedTask = null + } else if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { scheduler.taskIdToTaskSetManager.get(task.taskId).filterNot(_.emittedTaskSizeWarning). foreach { taskSetMgr => taskSetMgr.emittedTaskSizeWarning = true @@ -284,6 +279,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") } } + } catch { + case NonFatal(e) => + abortTaskSetManager(scheduler, task.taskId, + s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) + } + (task, serializedTask) + } + + if (!serializedTasks.exists(b => b._2 eq null)) { + for ((task, serializedTask) <- serializedTasks) { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK @@ -292,11 +297,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } - } } - // Remove a disconnected slave from the cluster private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { logDebug(s"Asked to remove executor $executorId with reason $reason") @@ -639,11 +642,11 @@ private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" // abort TaskSetManager without exception def abortTaskSetManager( - scheduler: TaskSchedulerImpl, - taskId: Long, - msg: => String, - exception: Option[Throwable] = None): Unit = { - scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => + scheduler: TaskSchedulerImpl, + taskId: Long, + msg: => String, + exception: Option[Throwable] = None): Unit = { + scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => try { taskSetMgr.abort(msg, exception) } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index fc760a8ec0912..73d32d4a276db 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -21,6 +21,8 @@ import java.io.File import java.net.URL import java.nio.ByteBuffer +import scala.util.control.NonFatal + import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.{Executor, ExecutorBackend} @@ -28,7 +30,9 @@ import org.apache.spark.internal.Logging import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.abortTaskSetManager import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.util.RpcUtils private case class ReviveOffers() @@ -58,6 +62,7 @@ private[spark] class LocalEndpoint( private val executor = new Executor( localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true) + private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(SparkEnv.get.conf) override def receive: PartialFunction[Any, Unit] = { case ReviveOffers => @@ -82,9 +87,22 @@ private[spark] class LocalEndpoint( def reviveOffers() { val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) - for (task <- scheduler.resourceOffers(offers).flatten) { - freeCores -= scheduler.CPUS_PER_TASK - executor.launchTask(executorBackend, task, task.serializedTask) + val serializedTasks = scheduler.resourceOffers(offers).flatten.map { task => + var serializedTask: ByteBuffer = null + try { + serializedTask = task.serializedTask + } catch { + case NonFatal(e) => + abortTaskSetManager(scheduler, task.taskId, + s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) + } + (task, serializedTask) + } + if (!serializedTasks.exists(b => b._2 eq null)) { + for ((taskDesc, serializedTask) <- serializedTasks) { + freeCores -= scheduler.CPUS_PER_TASK + executor.launchTask(executorBackend, taskDesc, serializedTask) + } } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index c309ea713e600..518670486b2e6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -24,8 +24,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.util.{RpcUtils, SerializableBuffer} class NotSerializablePartitionRDD( - sc: SparkContext, - numPartitions: Int) extends RDD[(Int, Int)](sc, Nil) with Serializable { + sc: SparkContext, + numPartitions: Int) extends RDD[(Int, Int)](sc, Nil) with Serializable { override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = throw new RuntimeException("should not be reached") @@ -75,6 +75,8 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo myRDD.count() } assert(e.getMessage.contains("Failed to serialize task")) - + assertResult(10) { + sc.parallelize(1 to 10).count() + } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala index cac74f317e304..6cd7d49e44be1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala @@ -52,7 +52,7 @@ class TaskDescriptionSuite extends SparkFunSuite { ) val serializedTaskDescription = TaskDescription.encode(originalTaskDescription, taskBuffer) - val (decodedTaskDescription, _) = TaskDescription.decode(serializedTaskDescription) + val (decodedTaskDescription, serializedTask) = TaskDescription.decode(serializedTaskDescription) // Make sure that all of the fields in the decoded task description match the original. assert(decodedTaskDescription.taskId === originalTaskDescription.taskId) @@ -63,6 +63,6 @@ class TaskDescriptionSuite extends SparkFunSuite { assert(decodedTaskDescription.addedFiles.equals(originalFiles)) assert(decodedTaskDescription.addedJars.equals(originalJars)) assert(decodedTaskDescription.properties.equals(originalTaskDescription.properties)) - assert(decodedTaskDescription.serializedTask.equals(taskBuffer)) + assert(serializedTask.equals(taskBuffer)) } } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index b252539782580..29711a17097ca 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -85,12 +85,12 @@ private[spark] class MesosExecutorBackend } override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { - val taskDescription = TaskDescription.decode(taskInfo.getData.asReadOnlyByteBuffer()) + val (taskDesc, serializedTask) = TaskDescription.decode(taskInfo.getData.asReadOnlyByteBuffer()) if (executor == null) { logError("Received launchTask but executor was null") } else { SparkHadoopUtil.get.runAsSparkUser { () => - executor.launchTask(this, taskDescription) + executor.launchTask(this, taskDesc, serializedTask) } } } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index 7e561916a71e2..b76f17e0281b5 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -18,10 +18,12 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File +import java.nio.ByteBuffer import java.util.{ArrayList => JArrayList, Collections, List => JList} import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, HashSet} +import scala.util.control.NonFatal import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _} import org.apache.mesos.protobuf.ByteString @@ -29,6 +31,7 @@ import org.apache.mesos.protobuf.ByteString import org.apache.spark.{SparkContext, SparkException, TaskState} import org.apache.spark.executor.MesosExecutorBackend import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.abortTaskSetManager import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.util.Utils @@ -293,22 +296,32 @@ private[spark] class MesosFineGrainedSchedulerBackend( val slavesIdsOfAcceptedOffers = HashSet[String]() // Call into the TaskSchedulerImpl - val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty) - acceptedOffers - .foreach { offer => - offer.foreach { taskDesc => - val slaveId = taskDesc.executorId - slavesIdsOfAcceptedOffers += slaveId - taskIdToSlaveId(taskDesc.taskId) = slaveId - val (mesosTask, remainingResources) = createMesosTask( - taskDesc, - slaveIdToResources(slaveId), - slaveId) - mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) - .add(mesosTask) - slaveIdToResources(slaveId) = remainingResources - } + val serializedTasks = scheduler.resourceOffers(workerOffers).flatten.map { task => + var serializedTask: ByteBuffer = null + try { + serializedTask = task.serializedTask + } catch { + case NonFatal(e) => + abortTaskSetManager(scheduler, task.taskId, + s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) } + (task, serializedTask) + } + if (!serializedTasks.exists(b => b._2 eq null)) { + serializedTasks.foreach { case (taskDesc, _) => + val slaveId = taskDesc.executorId + slavesIdsOfAcceptedOffers += slaveId + taskIdToSlaveId(taskDesc.taskId) = slaveId + val (mesosTask, remainingResources) = createMesosTask( + taskDesc, + slaveIdToResources(slaveId), + slaveId) + mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) + .add(mesosTask) + slaveIdToResources(slaveId) = remainingResources + + } + } // Reply to the offers val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? @@ -351,7 +364,7 @@ private[spark] class MesosFineGrainedSchedulerBackend( .setExecutor(executorInfo) .setName(task.name) .addAllResources(cpuResources.asJava) - .setData(ByteString.copyFrom(TaskDescription.encode(task))) + .setData(ByteString.copyFrom(TaskDescription.encode(task, task.serializedTask))) .build() (taskInfo, finalResources.asJava) } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 4ee85b91830a9..03f0cf7ecb3b1 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -255,8 +255,7 @@ class MesosFineGrainedSchedulerBackendSuite index = 0, addedFiles = mutable.Map.empty[String, Long], addedJars = mutable.Map.empty[String, Long], - properties = new Properties(), - ByteBuffer.wrap(new Array[Byte](0))) + properties = new Properties()) when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) when(taskScheduler.CPUS_PER_TASK).thenReturn(2) @@ -363,8 +362,7 @@ class MesosFineGrainedSchedulerBackendSuite index = 0, addedFiles = mutable.Map.empty[String, Long], addedJars = mutable.Map.empty[String, Long], - properties = new Properties(), - ByteBuffer.wrap(new Array[Byte](0))) + properties = new Properties()) when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) when(taskScheduler.CPUS_PER_TASK).thenReturn(1) From 0c8eed759b7fd81d52851d322c9367d7a1f7908c Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Sun, 22 Jan 2017 11:28:04 +0800 Subject: [PATCH 06/14] review commits --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 +++-- .../apache/spark/scheduler/local/LocalSchedulerBackend.scala | 1 - 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7dfba4c9ef4d3..0ab423c8e02fd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -640,13 +640,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" + // abort TaskSetManager without exception - def abortTaskSetManager( + private[scheduler] def abortTaskSetManager( scheduler: TaskSchedulerImpl, taskId: Long, msg: => String, exception: Option[Throwable] = None): Unit = { - scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => + scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => try { taskSetMgr.abort(msg, exception) } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index 73d32d4a276db..8b8934eaba9b9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -62,7 +62,6 @@ private[spark] class LocalEndpoint( private val executor = new Executor( localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true) - private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(SparkEnv.get.conf) override def receive: PartialFunction[Any, Unit] = { case ReviveOffers => From 0cdfeb0b0584357e83d5bddff2089619fac69db4 Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Wed, 25 Jan 2017 09:09:20 +0800 Subject: [PATCH 07/14] add lock on the scheduler object --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0ab423c8e02fd..1342256eee7de 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -646,7 +646,7 @@ private[spark] object CoarseGrainedSchedulerBackend extends Logging { scheduler: TaskSchedulerImpl, taskId: Long, msg: => String, - exception: Option[Throwable] = None): Unit = { + exception: Option[Throwable] = None): Unit = scheduler.synchronized { scheduler.taskIdToTaskSetManager.get(taskId).foreach { taskSetMgr => try { taskSetMgr.abort(msg, exception) From 9ebaba297079abee84a228a5ce12a6d3ed39309b Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Mon, 6 Feb 2017 16:38:48 -0800 Subject: [PATCH 08/14] Consolidate TaskDescrition constructors. This commit also does all task serializion in the encode() method, so now the encode() method just takes the TaskDescription as an input parameter. --- .../spark/scheduler/TaskDescription.scala | 59 ++++++++----------- .../CoarseGrainedSchedulerBackend.scala | 2 +- .../local/LocalSchedulerBackend.scala | 2 +- .../scheduler/TaskDescriptionSuite.scala | 12 ++-- 4 files changed, 33 insertions(+), 42 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 175a9e71d6715..fc6739a964206 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -54,40 +54,27 @@ private[spark] class TaskDescription( val index: Int, // Index within this task's TaskSet val addedFiles: Map[String, Long], val addedJars: Map[String, Long], - val properties: Properties) extends Logging { - - def this( - taskId: Long, - attemptNumber: Int, - executorId: String, - name: String, - index: Int, // Index within this task's TaskSet - addedFiles: Map[String, Long], - addedJars: Map[String, Long], - properties: Properties, - task: Task[_]) { - this(taskId, attemptNumber, executorId, name, index, - addedFiles, addedJars, properties) - task_ = task - } - - def serializedTask: ByteBuffer = { - // This is where we serialize the task on the driver before sending it to the executor. - // This is not done when creating the TaskDescription so we can postpone this serialization - // to later in the scheduling process -- particularly, - // so it can happen in another thread by the CoarseGrainedSchedulerBackend. - try { - ByteBuffer.wrap(Utils.serialize(task_)) - } catch { - case NonFatal(e) => - val msg = s"Failed to serialize task $taskId, not attempting to retry it." - logError(msg, e) - throw new TaskNotSerializableException(e) - } + val properties: Properties, + // Task object corresponding to the TaskDescription. This is only defined on the master; on + // the worker, the Task object is handled separately from the TaskDescription so that it can + // deserialized after the TaskDescription is deserialized. + @transient private val task: Task[_] = null) extends Logging { + + /** + * Serializes the task for this TaskDescription and returns the serialized task. + * + * This method should only be used on the master (to serialize a task to send to a worker). + */ + def serializeTask(): ByteBuffer = { + try { + ByteBuffer.wrap(Utils.serialize(task)) + } catch { + case NonFatal(e) => + val msg = s"Failed to serialize task ${taskId}." + logError(msg, e) + throw new TaskNotSerializableException(e) } - - private var task_ : Task[_] = null - + } override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index) } @@ -101,7 +88,7 @@ private[spark] object TaskDescription { } @throws[TaskNotSerializableException] - def encode(taskDescription: TaskDescription, serializedTask: ByteBuffer): ByteBuffer = { + def encode(taskDescription: TaskDescription): ByteBuffer = { val bytesOut = new ByteBufferOutputStream(4096) val dataOut = new DataOutputStream(bytesOut) @@ -124,8 +111,8 @@ private[spark] object TaskDescription { dataOut.writeUTF(value) } - // Write the task. The task is already serialized, so write it directly to the byte buffer. - Utils.writeByteBuffer(serializedTask, bytesOut) + // Serialize and write the task. + Utils.writeByteBuffer(taskDescription.serializeTask(), bytesOut) dataOut.close() bytesOut.close() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 1342256eee7de..b839c65c40336 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -261,7 +261,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val serializedTasks = tasks.flatten.map { task => var serializedTask: ByteBuffer = null try { - serializedTask = TaskDescription.encode(task, task.serializedTask) + serializedTask = TaskDescription.encode(task) if (serializedTask.limit >= maxRpcMessageSize) { val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index 8b8934eaba9b9..5e566c75246b9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -89,7 +89,7 @@ private[spark] class LocalEndpoint( val serializedTasks = scheduler.resourceOffers(offers).flatten.map { task => var serializedTask: ByteBuffer = null try { - serializedTask = task.serializedTask + serializedTask = task.serializeTask } catch { case NonFatal(e) => abortTaskSetManager(scheduler, task.taskId, diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala index 6cd7d49e44be1..d8007dde18c65 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala @@ -37,7 +37,6 @@ class TaskDescriptionSuite extends SparkFunSuite { originalProperties.put("property1", "18") originalProperties.put("property2", "test value") - // Create a dummy byte buffer for the task. val taskBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4)) val originalTaskDescription = new TaskDescription( @@ -48,10 +47,15 @@ class TaskDescriptionSuite extends SparkFunSuite { index = 19, originalFiles, originalJars, - originalProperties - ) + originalProperties, + // Pass in null for the task, because we override the serialize method below anyway (which + // is the only time task is used). + task = null + ) { + override def serializeTask() = taskBuffer + } - val serializedTaskDescription = TaskDescription.encode(originalTaskDescription, taskBuffer) + val serializedTaskDescription = TaskDescription.encode(originalTaskDescription) val (decodedTaskDescription, serializedTask) = TaskDescription.decode(serializedTaskDescription) // Make sure that all of the fields in the decoded task description match the original. From 7d6e7a609f289a1704b773aba766d9276dd822ad Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Tue, 7 Feb 2017 23:21:11 +0800 Subject: [PATCH 09/14] Refactor the taskDesc serialization code --- .../CoarseGrainedSchedulerBackend.scala | 101 ++++++++++++------ .../local/LocalSchedulerBackend.scala | 25 ++--- .../MesosFineGrainedSchedulerBackend.scala | 38 +++---- 3 files changed, 92 insertions(+), 72 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b839c65c40336..93538e8d9220c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -32,7 +32,7 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.ENDPOINT_NAME -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.abortTaskSetManager +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.prepareSerializedTask import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils} /** @@ -258,43 +258,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { - val serializedTasks = tasks.flatten.map { task => - var serializedTask: ByteBuffer = null - try { - serializedTask = TaskDescription.encode(task) - if (serializedTask.limit >= maxRpcMessageSize) { - val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + - "spark.rpc.message.maxSize (%d bytes). Consider increasing " + - "spark.rpc.message.maxSize or using broadcast variables for large values." - abortTaskSetManager(scheduler, task.taskId, - msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)) - serializedTask = null - } else if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { - scheduler.taskIdToTaskSetManager.get(task.taskId).filterNot(_.emittedTaskSizeWarning). - foreach { taskSetMgr => - taskSetMgr.emittedTaskSizeWarning = true - val stageId = taskSetMgr.taskSet.stageId - logWarning(s"Stage $stageId contains a task of very large size " + - s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + - s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") - } - } - } catch { - case NonFatal(e) => - abortTaskSetManager(scheduler, task.taskId, - s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) - } - (task, serializedTask) - } - - if (!serializedTasks.exists(b => b._2 eq null)) { - for ((task, serializedTask) <- serializedTasks) { + val abortTaskSet = new HashSet[TaskSetManager]() + for (task <- tasks.flatten) { + val serializedTask = prepareSerializedTask(scheduler, task, + abortTaskSet, maxRpcMessageSize) + if (serializedTask != null) { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK - logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} " + s" hostname: ${executorData.executorHost}.") - executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } } @@ -642,7 +614,7 @@ private[spark] object CoarseGrainedSchedulerBackend extends Logging { val ENDPOINT_NAME = "CoarseGrainedScheduler" // abort TaskSetManager without exception - private[scheduler] def abortTaskSetManager( + private def abortTaskSetManager( scheduler: TaskSchedulerImpl, taskId: Long, msg: => String, @@ -655,4 +627,63 @@ private[spark] object CoarseGrainedSchedulerBackend extends Logging { } } } + + private def isZombieTaskSetManager( + scheduler: TaskSchedulerImpl, + taskId: Long): Unit = scheduler.synchronized { + !scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie) + } + + private def getTaskSetManager( + scheduler: TaskSchedulerImpl, + taskId: Long): Option[TaskSetManager] = scheduler.synchronized { + scheduler.taskIdToTaskSetManager.get(taskId) + } + + private[scheduler] def prepareSerializedTask( + scheduler: TaskSchedulerImpl, + task: TaskDescription, + abortSet: HashSet[TaskSetManager], + maxRpcMessageSize: Long): ByteBuffer = { + var serializedTask: ByteBuffer = null + if (abortSet.isEmpty || !getTaskSetManager(scheduler, task.taskId).exists(_.isZombie)) { + try { + serializedTask = TaskDescription.encode(task) + } catch { + case NonFatal(e) => + abortTaskSetManager(scheduler, task.taskId, + s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) + scheduler.taskIdToTaskSetManager.get(task.taskId).foreach(t => abortSet.add(t)) + } + } + + if (serializedTask != null && serializedTask.limit >= maxRpcMessageSize) { + val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + + "spark.rpc.message.maxSize or using broadcast variables for large values." + abortTaskSetManager(scheduler, task.taskId, + msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)) + getTaskSetManager(scheduler, task.taskId).foreach(t => abortSet.add(t)) + serializedTask = null + } else if (serializedTask != null) { + emittedTaskSizeWarning(scheduler, serializedTask, task.taskId) + } + serializedTask + } + + private def emittedTaskSizeWarning( + scheduler: TaskSchedulerImpl, + serializedTask: ByteBuffer, + taskId: Long): Unit = { + if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { + getTaskSetManager(scheduler, taskId).filterNot(_.emittedTaskSizeWarning). + foreach { taskSetMgr => + taskSetMgr.emittedTaskSizeWarning = true + val stageId = taskSetMgr.taskSet.stageId + logWarning(s"Stage $stageId contains a task of very large size " + + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") + } + } + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index 5e566c75246b9..0f1d8d8fe7542 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -21,7 +21,7 @@ import java.io.File import java.net.URL import java.nio.ByteBuffer -import scala.util.control.NonFatal +import scala.collection.mutable.HashSet import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskState} import org.apache.spark.TaskState.TaskState @@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.abortTaskSetManager +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.prepareSerializedTask import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.util.RpcUtils @@ -63,6 +63,8 @@ private[spark] class LocalEndpoint( private val executor = new Executor( localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true) + private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(SparkEnv.get.conf) + override def receive: PartialFunction[Any, Unit] = { case ReviveOffers => reviveOffers() @@ -86,20 +88,13 @@ private[spark] class LocalEndpoint( def reviveOffers() { val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) - val serializedTasks = scheduler.resourceOffers(offers).flatten.map { task => - var serializedTask: ByteBuffer = null - try { - serializedTask = task.serializeTask - } catch { - case NonFatal(e) => - abortTaskSetManager(scheduler, task.taskId, - s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) - } - (task, serializedTask) - } - if (!serializedTasks.exists(b => b._2 eq null)) { - for ((taskDesc, serializedTask) <- serializedTasks) { + val abortTaskSet = new HashSet[TaskSetManager]() + for (task <- scheduler.resourceOffers(offers).flatten) { + val buffer = prepareSerializedTask(scheduler, task, + abortTaskSet, maxRpcMessageSize) + if (buffer != null) { freeCores -= scheduler.CPUS_PER_TASK + val (taskDesc, serializedTask) = TaskDescription.decode(buffer) executor.launchTask(executorBackend, taskDesc, serializedTask) } } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index b76f17e0281b5..e0a720b05ebec 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -23,7 +23,6 @@ import java.util.{ArrayList => JArrayList, Collections, List => JList} import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, HashSet} -import scala.util.control.NonFatal import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _} import org.apache.mesos.protobuf.ByteString @@ -31,9 +30,9 @@ import org.apache.mesos.protobuf.ByteString import org.apache.spark.{SparkContext, SparkException, TaskState} import org.apache.spark.executor.MesosExecutorBackend import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.abortTaskSetManager +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.prepareSerializedTask import org.apache.spark.scheduler.cluster.ExecutorInfo -import org.apache.spark.util.Utils +import org.apache.spark.util.{RpcUtils, Utils} /** * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a @@ -70,6 +69,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( private val rejectOfferDurationForUnmetConstraints = getRejectOfferDurationForUnmetConstraints(sc) + private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(sc.conf) + @volatile var appId: String = _ override def start() { @@ -294,32 +295,24 @@ private[spark] class MesosFineGrainedSchedulerBackend( val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]] val slavesIdsOfAcceptedOffers = HashSet[String]() - + val abortTaskSet = new HashSet[TaskSetManager]() // Call into the TaskSchedulerImpl - val serializedTasks = scheduler.resourceOffers(workerOffers).flatten.map { task => - var serializedTask: ByteBuffer = null - try { - serializedTask = task.serializedTask - } catch { - case NonFatal(e) => - abortTaskSetManager(scheduler, task.taskId, - s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) - } - (task, serializedTask) - } - if (!serializedTasks.exists(b => b._2 eq null)) { - serializedTasks.foreach { case (taskDesc, _) => - val slaveId = taskDesc.executorId + val acceptedOffers = scheduler.resourceOffers(workerOffers).flatten + for (task <- acceptedOffers) { + val serializedTask = prepareSerializedTask(scheduler, task, + abortTaskSet, maxRpcMessageSize) + if (serializedTask != null) { + val slaveId = task.executorId slavesIdsOfAcceptedOffers += slaveId - taskIdToSlaveId(taskDesc.taskId) = slaveId + taskIdToSlaveId(task.taskId) = slaveId val (mesosTask, remainingResources) = createMesosTask( - taskDesc, + task, + serializedTask, slaveIdToResources(slaveId), slaveId) mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) .add(mesosTask) slaveIdToResources(slaveId) = remainingResources - } } @@ -347,6 +340,7 @@ private[spark] class MesosFineGrainedSchedulerBackend( /** Turn a Spark TaskDescription into a Mesos task and also resources unused by the task */ def createMesosTask( task: TaskDescription, + serializedTask: ByteBuffer, resources: JList[Resource], slaveId: String): (MesosTaskInfo, JList[Resource]) = { val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build() @@ -364,7 +358,7 @@ private[spark] class MesosFineGrainedSchedulerBackend( .setExecutor(executorInfo) .setName(task.name) .addAllResources(cpuResources.asJava) - .setData(ByteString.copyFrom(TaskDescription.encode(task, task.serializedTask))) + .setData(ByteString.copyFrom(serializedTask)) .build() (taskInfo, finalResources.asJava) } From fa2c3493d5c0b3ae65de2a990678284303dd9567 Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Fri, 10 Feb 2017 00:58:15 +0800 Subject: [PATCH 10/14] Add ut: serialization task errors do not affect each other --- .../CoarseGrainedSchedulerBackend.scala | 5 ++ .../CoarseGrainedSchedulerBackendSuite.scala | 63 ++++++++++++++++++- 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 93538e8d9220c..b26f83fc6cf04 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -197,6 +197,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp makeOffers() } + // Only be used for testing. + case ReviveOffers => + makeOffers() + context.reply(true) + case StopDriver => context.reply(true) stop() diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 518670486b2e6..471e8358fdc51 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -18,12 +18,21 @@ package org.apache.spark.scheduler import java.io.{IOException, NotSerializableException, ObjectInputStream, ObjectOutputStream} +import java.util.Properties + +import scala.collection.mutable + +import org.mockito.Matchers._ +import org.mockito.Mockito._ import org.apache.spark._ import org.apache.spark.rdd.RDD +import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, ReviveOffers} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.{RpcUtils, SerializableBuffer} -class NotSerializablePartitionRDD( +private[spark] class NotSerializablePartitionRDD( sc: SparkContext, numPartitions: Int) extends RDD[(Int, Int)](sc, Nil) with Serializable { @@ -79,4 +88,56 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo sc.parallelize(1 to 10).count() } } + + test("serialization task errors do not affect each other") { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + sc = new SparkContext(conf) + val rpcEnv = sc.env.rpcEnv + + val endpointRef = mock(classOf[RpcEndpointRef]) + val mockAddress = mock(classOf[RpcAddress]) + when(endpointRef.address).thenReturn(mockAddress) + val message = RegisterExecutor("1", endpointRef, "localhost", 4, Map.empty) + + val taskScheduler = mock(classOf[TaskSchedulerImpl]) + when(taskScheduler.CPUS_PER_TASK).thenReturn(1) + when(taskScheduler.sc).thenReturn(sc) + when(taskScheduler.mapOutputTracker).thenReturn(sc.env.mapOutputTracker) + val taskIdToTaskSetManager = new mutable.HashMap[Long, TaskSetManager] + when(taskScheduler.taskIdToTaskSetManager).thenReturn(taskIdToTaskSetManager) + val dagScheduler = mock(classOf[DAGScheduler]) + when(taskScheduler.dagScheduler).thenReturn(dagScheduler) + val taskSet1 = FakeTask.createTaskSet(1) + val taskSet2 = FakeTask.createTaskSet(1) + taskSet1.tasks(0) = new NotSerializableFakeTask(1, 0) + + def createTaskDescription(taskId: Long, task: Task[_]): TaskDescription = { + new TaskDescription( + taskId = 1L, + attemptNumber = 0, + executorId = "1", + name = "localhost", + index = 0, + addedFiles = mutable.Map.empty[String, Long], + addedJars = mutable.Map.empty[String, Long], + properties = new Properties(), + task = task) + } + + when(taskScheduler.resourceOffers(any[IndexedSeq[WorkerOffer]])).thenReturn(Seq(Seq( + createTaskDescription(1, taskSet1.tasks.head), + createTaskDescription(2, taskSet2.tasks.head)))) + taskIdToTaskSetManager(1L) = new TaskSetManager(taskScheduler, taskSet1, 1) + taskIdToTaskSetManager(2L) = new TaskSetManager(taskScheduler, taskSet2, 1) + + val backend = new CoarseGrainedSchedulerBackend(taskScheduler, rpcEnv) + backend.start() + backend.driverEndpoint.askWithRetry[Boolean](message) + backend.driverEndpoint.askWithRetry[Boolean](ReviveOffers) + assert(taskIdToTaskSetManager(1L).isZombie === true) + assert(taskIdToTaskSetManager(2L).isZombie === false) + backend.stop() + } } From 58f9b13765c66702edd94dbeb6f69d2a931c5a2b Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Sun, 26 Feb 2017 01:17:03 +0800 Subject: [PATCH 11/14] askWithRetry => askSync --- .../spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 471e8358fdc51..93520c949c2ff 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -134,8 +134,8 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo val backend = new CoarseGrainedSchedulerBackend(taskScheduler, rpcEnv) backend.start() - backend.driverEndpoint.askWithRetry[Boolean](message) - backend.driverEndpoint.askWithRetry[Boolean](ReviveOffers) + backend.driverEndpoint.askSync[Boolean](message) + backend.driverEndpoint.askSync[Boolean](ReviveOffers) assert(taskIdToTaskSetManager(1L).isZombie === true) assert(taskIdToTaskSetManager(2L).isZombie === false) backend.stop() From 550ec11c9d5ff1ccfe6d84194856af3a4c7d86a1 Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Mon, 27 Feb 2017 18:19:13 +0800 Subject: [PATCH 12/14] fix the import ordering in TaskDescription.scala --- .../main/scala/org/apache/spark/scheduler/TaskDescription.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index fc6739a964206..915105741ada8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -25,8 +25,8 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, Map} import scala.util.control.NonFatal -import org.apache.spark.internal.Logging import org.apache.spark.TaskNotSerializableException +import org.apache.spark.internal.Logging import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils} /** From 9b80c254e9617359902619eb41a243e94da0ccbb Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Wed, 1 Mar 2017 16:06:39 +0800 Subject: [PATCH 13/14] review commits --- .../apache/spark/scheduler/DAGScheduler.scala | 5 -- .../spark/scheduler/TaskDescription.scala | 13 ++--- .../CoarseGrainedSchedulerBackend.scala | 53 ++++++++----------- .../local/LocalSchedulerBackend.scala | 2 + .../apache/spark/executor/ExecutorSuite.scala | 4 +- .../CoarseGrainedSchedulerBackendSuite.scala | 8 ++- 6 files changed, 39 insertions(+), 46 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 1aed8095e3d26..692ed8083475c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -992,11 +992,6 @@ class DAGScheduler( JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } - if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { - logWarning(s"Stage ${stage.id} contains a task of very large size " + - s"(${taskBinaryBytes.length / 1024} KB). The maximum recommended task size is " + - s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") - } taskBinary = sc.broadcast(taskBinaryBytes) } catch { // In the case of a failure during serialization, abort the stage. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 915105741ada8..c3c5ec9e0e1ce 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -55,22 +55,22 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, - // Task object corresponding to the TaskDescription. This is only defined on the master; on - // the worker, the Task object is handled separately from the TaskDescription so that it can - // deserialized after the TaskDescription is deserialized. + // Task object corresponding to the TaskDescription. This is only defined on the driver; on + // the executor, the Task object is handled separately from the TaskDescription so that it can + // be deserialized after the TaskDescription is deserialized. @transient private val task: Task[_] = null) extends Logging { /** * Serializes the task for this TaskDescription and returns the serialized task. * - * This method should only be used on the master (to serialize a task to send to a worker). + * This method should only be used on the driver (to serialize a task to send to a executor). */ def serializeTask(): ByteBuffer = { try { ByteBuffer.wrap(Utils.serialize(task)) } catch { case NonFatal(e) => - val msg = s"Failed to serialize task ${taskId}." + val msg = s"Failed to serialize task $taskId." logError(msg, e) throw new TaskNotSerializableException(e) } @@ -153,6 +153,7 @@ private[spark] object TaskDescription { val serializedTask = byteBuffer.slice() (new TaskDescription(taskId, attemptNumber, executorId, name, index, taskFiles, taskJars, - properties), serializedTask) + properties), + serializedTask) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b26f83fc6cf04..cdcb88931e7d8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -197,11 +197,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp makeOffers() } - // Only be used for testing. - case ReviveOffers => - makeOffers() - context.reply(true) - case StopDriver => context.reply(true) stop() @@ -263,15 +258,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { - val abortTaskSet = new HashSet[TaskSetManager]() + val abortedTaskSets = new HashSet[TaskSetManager]() for (task <- tasks.flatten) { val serializedTask = prepareSerializedTask(scheduler, task, - abortTaskSet, maxRpcMessageSize) + abortedTaskSets, maxRpcMessageSize) if (serializedTask != null) { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK - logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} " + - s" hostname: ${executorData.executorHost}.") + logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + + s"${executorData.executorHost}, serializedTask: ${serializedTask.limit} bytes.") executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } } @@ -628,37 +623,33 @@ private[spark] object CoarseGrainedSchedulerBackend extends Logging { try { taskSetMgr.abort(msg, exception) } catch { - case e: Exception => logError("Exception in error callback", e) + case e: Exception => logError("Exception while aborting taskset", e) } } } - private def isZombieTaskSetManager( - scheduler: TaskSchedulerImpl, - taskId: Long): Unit = scheduler.synchronized { - !scheduler.taskIdToTaskSetManager.get(taskId).exists(_.isZombie) - } - private def getTaskSetManager( - scheduler: TaskSchedulerImpl, - taskId: Long): Option[TaskSetManager] = scheduler.synchronized { + scheduler: TaskSchedulerImpl, + taskId: Long): Option[TaskSetManager] = scheduler.synchronized { scheduler.taskIdToTaskSetManager.get(taskId) } private[scheduler] def prepareSerializedTask( - scheduler: TaskSchedulerImpl, - task: TaskDescription, - abortSet: HashSet[TaskSetManager], - maxRpcMessageSize: Long): ByteBuffer = { + scheduler: TaskSchedulerImpl, + task: TaskDescription, + abortedTaskSets: HashSet[TaskSetManager], + maxRpcMessageSize: Long): ByteBuffer = { var serializedTask: ByteBuffer = null - if (abortSet.isEmpty || !getTaskSetManager(scheduler, task.taskId).exists(_.isZombie)) { + getTaskSetManager(scheduler, task.taskId).foreach { taskSetManager => try { - serializedTask = TaskDescription.encode(task) + if (!taskSetManager.isZombie && !abortedTaskSets.contains(taskSetManager)) { + serializedTask = TaskDescription.encode(task) + } } catch { case NonFatal(e) => abortTaskSetManager(scheduler, task.taskId, s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach(t => abortSet.add(t)) + abortedTaskSets.add(taskSetManager) } } @@ -668,18 +659,18 @@ private[spark] object CoarseGrainedSchedulerBackend extends Logging { "spark.rpc.message.maxSize or using broadcast variables for large values." abortTaskSetManager(scheduler, task.taskId, msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)) - getTaskSetManager(scheduler, task.taskId).foreach(t => abortSet.add(t)) + getTaskSetManager(scheduler, task.taskId).foreach(t => abortedTaskSets.add(t)) serializedTask = null } else if (serializedTask != null) { - emittedTaskSizeWarning(scheduler, serializedTask, task.taskId) + maybeEmitTaskSizeWarning(scheduler, serializedTask, task.taskId) } serializedTask } - private def emittedTaskSizeWarning( - scheduler: TaskSchedulerImpl, - serializedTask: ByteBuffer, - taskId: Long): Unit = { + private def maybeEmitTaskSizeWarning( + scheduler: TaskSchedulerImpl, + serializedTask: ByteBuffer, + taskId: Long): Unit = { if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { getTaskSetManager(scheduler, taskId).filterNot(_.emittedTaskSizeWarning). foreach { taskSetMgr => diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index 0f1d8d8fe7542..8f44b7def2db6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -90,6 +90,8 @@ private[spark] class LocalEndpoint( val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) val abortTaskSet = new HashSet[TaskSetManager]() for (task <- scheduler.resourceOffers(offers).flatten) { + // make sure the task is serializable, + // so that it can be launched in a distributed environment. val buffer = prepareSerializedTask(scheduler, task, abortTaskSet, maxRpcMessageSize) if (buffer != null) { diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index f787b541c3567..26399f6bd9973 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -168,8 +168,8 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug } private def runTaskAndGetFailReason( - taskDescription: TaskDescription, - serializedTask: ByteBuffer): TaskFailedReason = { + taskDescription: TaskDescription, + serializedTask: ByteBuffer): TaskFailedReason = { val mockBackend = mock[ExecutorBackend] var executor: Executor = null try { diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 93520c949c2ff..1b5e8ca68b9be 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -28,7 +28,7 @@ import org.mockito.Mockito._ import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, ReviveOffers} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RetrieveSparkAppConfig} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.{RpcUtils, SerializableBuffer} @@ -135,7 +135,11 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo val backend = new CoarseGrainedSchedulerBackend(taskScheduler, rpcEnv) backend.start() backend.driverEndpoint.askSync[Boolean](message) - backend.driverEndpoint.askSync[Boolean](ReviveOffers) + backend.reviveOffers() + // Make sure that the ReviveOffers message has been processed. + // backend.driverEndpoint is thread safe. However, If you modify it, + // please modify the code here + backend.driverEndpoint.askSync[Any](RetrieveSparkAppConfig) assert(taskIdToTaskSetManager(1L).isZombie === true) assert(taskIdToTaskSetManager(2L).isZombie === false) backend.stop() From b2b1eec3c41873eb217cf041f3cf6d71d4cfa265 Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Wed, 1 Mar 2017 21:53:57 +0800 Subject: [PATCH 14/14] fix NPE in CoarseGrainedSchedulerBackend.scala --- .../CoarseGrainedSchedulerBackend.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index cdcb88931e7d8..e16575d35f636 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -640,17 +640,18 @@ private[spark] object CoarseGrainedSchedulerBackend extends Logging { abortedTaskSets: HashSet[TaskSetManager], maxRpcMessageSize: Long): ByteBuffer = { var serializedTask: ByteBuffer = null - getTaskSetManager(scheduler, task.taskId).foreach { taskSetManager => - try { - if (!taskSetManager.isZombie && !abortedTaskSets.contains(taskSetManager)) { - serializedTask = TaskDescription.encode(task) - } - } catch { - case NonFatal(e) => - abortTaskSetManager(scheduler, task.taskId, - s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) - abortedTaskSets.add(taskSetManager) + + try { + if (abortedTaskSets.isEmpty || + !getTaskSetManager(scheduler, task.taskId). + exists(t => t.isZombie || abortedTaskSets.contains(t))) { + serializedTask = TaskDescription.encode(task) } + } catch { + case NonFatal(e) => + abortTaskSetManager(scheduler, task.taskId, + s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) + getTaskSetManager(scheduler, task.taskId).foreach(abortedTaskSets.add) } if (serializedTask != null && serializedTask.limit >= maxRpcMessageSize) {