From 2a89adc26c85cdfb3790060a7cd91967ebec2f1b Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 7 May 2014 23:21:07 +0800 Subject: [PATCH 01/16] SPARK-1712: TaskDescription instance is too big causes Spark to hang --- .../CoarseGrainedExecutorBackend.scala | 9 ++++++--- .../cluster/CoarseGrainedClusterMessage.scala | 2 +- .../CoarseGrainedSchedulerBackend.scala | 18 +++++++++++++++--- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index e912ae8a5d3c5..18229e764bb18 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -22,11 +22,12 @@ import java.nio.ByteBuffer import akka.actor._ import akka.remote._ -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf} import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ +import org.apache.spark.scheduler.TaskDescription import org.apache.spark.util.{AkkaUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( @@ -61,12 +62,14 @@ private[spark] class CoarseGrainedExecutorBackend( logError("Slave registration failed: " + message) System.exit(1) - case LaunchTask(taskDesc) => - logInfo("Got assigned task " + taskDesc.taskId) + case LaunchTask(data) => if (executor == null) { logError("Received LaunchTask command but executor was null") System.exit(1) } else { + val ser = SparkEnv.get.closureSerializer.newInstance() + val taskDesc =ser.deserialize[TaskDescription](data.value) + logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index ddbc74e82ac49..ca74069ef885c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable private[spark] object CoarseGrainedClusterMessages { // Driver to executors - case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage + case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index a6d6b3d26a3c6..7b927314eef48 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -27,10 +27,10 @@ import akka.actor._ import akka.pattern.ask import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} -import org.apache.spark.{Logging, SparkException, TaskState} +import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState} import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils} /** * A scheduler backend that waits for coarse grained executors to connect to it through Akka. @@ -48,6 +48,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A var totalCoreCount = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) + private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] @@ -141,7 +142,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { freeCores(task.executorId) -= scheduler.CPUS_PER_TASK - executorActor(task.executorId) ! LaunchTask(task) + val ser = SparkEnv.get.closureSerializer.newInstance() + val taskBytes = ser.serialize(task).array() + val serializedTask = ser.serialize(taskBytes) + if (serializedTask.limit >= akkaFrameSize - 1024) { + var msg = "Serialized task %s:%d were %d bytes which " + + "exceeds spark.akka.frameSize (%d bytes)." + msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize) + val exception = new SparkException(msg) + logError(msg, exception) + throw exception + } + executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask)) } } From 3ea1ca16ef940f94f9d133d9849568721440156d Mon Sep 17 00:00:00 2001 From: witgo Date: Tue, 13 May 2014 00:10:22 +0800 Subject: [PATCH 02/16] remove duplicate serialize --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7b927314eef48..d091fbd469abc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -143,8 +143,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A for (task <- tasks.flatten) { freeCores(task.executorId) -= scheduler.CPUS_PER_TASK val ser = SparkEnv.get.closureSerializer.newInstance() - val taskBytes = ser.serialize(task).array() - val serializedTask = ser.serialize(taskBytes) + val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - 1024) { var msg = "Serialized task %s:%d were %d bytes which " + "exceeds spark.akka.frameSize (%d bytes)." From 79655800d19ff14dcdd813bbd2a8d295bf3dfa1d Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 14 May 2014 10:27:24 +0800 Subject: [PATCH 03/16] fix Statement order --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index d091fbd469abc..65b2355e97da5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -141,7 +141,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { - freeCores(task.executorId) -= scheduler.CPUS_PER_TASK val ser = SparkEnv.get.closureSerializer.newInstance() val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - 1024) { @@ -152,6 +151,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A logError(msg, exception) throw exception } + freeCores(task.executorId) -= scheduler.CPUS_PER_TASK executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask)) } } From 1d35c7d5417a67a509df7451adb8b4effeb3d09b Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 14 May 2014 11:20:36 +0800 Subject: [PATCH 04/16] fix hang --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 65b2355e97da5..f1eb6e857af55 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -147,9 +147,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A var msg = "Serialized task %s:%d were %d bytes which " + "exceeds spark.akka.frameSize (%d bytes)." msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize) - val exception = new SparkException(msg) - logError(msg, exception) - throw exception + scheduler.error(msg) + // TODO: Need to throw an exception? + throw new SparkException(msg) } freeCores(task.executorId) -= scheduler.CPUS_PER_TASK executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask)) From 9e4ffa7568dfea0ef6f74cef5349e5d0cec03b8b Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 14 May 2014 11:32:55 +0800 Subject: [PATCH 05/16] review commit --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f1eb6e857af55..78de65df43104 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -149,7 +149,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize) scheduler.error(msg) // TODO: Need to throw an exception? - throw new SparkException(msg) + // throw new SparkException(msg) } freeCores(task.executorId) -= scheduler.CPUS_PER_TASK executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask)) From 4afe71d0413bd1214ca11e6318015cccd3faf85e Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 14 May 2014 11:40:33 +0800 Subject: [PATCH 06/16] review commit --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 78de65df43104..72d161dc86495 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -144,8 +144,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A val ser = SparkEnv.get.closureSerializer.newInstance() val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - 1024) { - var msg = "Serialized task %s:%d were %d bytes which " + - "exceeds spark.akka.frameSize (%d bytes)." + var msg = "Serialized task %s:%d was %d bytes which " + + "exceeds spark.akka.frameSize (%d bytes)." + + "Consider using broadcast variables for large values" msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize) scheduler.error(msg) // TODO: Need to throw an exception? From 158b2dc3d61348cbdb53506ca1e59922e8d99048 Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 14 May 2014 11:51:25 +0800 Subject: [PATCH 07/16] review commit --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 72d161dc86495..3cb3ca94ab761 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -152,8 +152,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // TODO: Need to throw an exception? // throw new SparkException(msg) } - freeCores(task.executorId) -= scheduler.CPUS_PER_TASK - executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask)) + else { + freeCores(task.executorId) -= scheduler.CPUS_PER_TASK + executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask)) + } } } From 0a428cf2f871ac8da413dfbe68c4764fc64583d4 Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 14 May 2014 13:54:09 +0800 Subject: [PATCH 08/16] add unit tests --- .../CoarseGrainedSchedulerBackendSuite.scala | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala new file mode 100644 index 0000000000000..e4ecfb3bea264 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.{SparkConf, SparkException, SparkContext} +import org.apache.spark.util.{SerializableBuffer, AkkaUtils} +import org.apache.spark.SparkContext._ +import java.nio.ByteBuffer + +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} +class CoarseGrainedSchedulerBackendSuite extends FunSuite with +BeforeAndAfter with BeforeAndAfterAll { + + override def beforeAll { + System.setProperty("spark.akka.frameSize", "1") + System.setProperty("spark.default.parallelism", "1") + + } + + override def afterAll { + System.clearProperty("spark.akka.frameSize") + System.clearProperty("spark.default.parallelism") + } + + test("serialized task larger than Akka frame size") { + val conf = new SparkConf + val sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf) + val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf) + val buffer = new SerializableBuffer(ByteBuffer.allocate(2 * frameSize)) + val larger = sc.parallelize(Seq(buffer)) + val thrown = intercept[SparkException] { + larger.collect() + } + assert(thrown.getMessage.contains("Consider using broadcast variables for large values")) + val smaller = sc.parallelize(1 to 4).collect() + assert(smaller.size === 4) + } + +} From 062c182ac43e10b71597fad351a32d54a01928b4 Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 14 May 2014 14:06:50 +0800 Subject: [PATCH 09/16] fix small bug for code style --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 4 ++-- .../spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 3cb3ca94ab761..42e0a0f20133e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -145,8 +145,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - 1024) { var msg = "Serialized task %s:%d was %d bytes which " + - "exceeds spark.akka.frameSize (%d bytes)." + - "Consider using broadcast variables for large values" + "exceeds spark.akka.frameSize (%d bytes). " + + "Consider using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize) scheduler.error(msg) // TODO: Need to throw an exception? diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index e4ecfb3bea264..79bfb8d0dfe78 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -23,13 +23,13 @@ import org.apache.spark.SparkContext._ import java.nio.ByteBuffer import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} + class CoarseGrainedSchedulerBackendSuite extends FunSuite with -BeforeAndAfter with BeforeAndAfterAll { + BeforeAndAfter with BeforeAndAfterAll { override def beforeAll { System.setProperty("spark.akka.frameSize", "1") System.setProperty("spark.default.parallelism", "1") - } override def afterAll { From 689495d496984b485c4f53d86c98f6b8fbebdb61 Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 14 May 2014 14:32:15 +0800 Subject: [PATCH 10/16] fix scala style bug --- core/src/main/scala/org/apache/spark/Partitioner.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 62747960618a9..2251fbde1fd55 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -204,8 +204,9 @@ class BoundaryPartitioner[K : Ordering : ClassTag, V]( * 3 partitions, 500 keys / partition and if rdd has 700 keys, 1 partition will be entirely * empty. */ - if(counts(currPartition) >= keysPerPartition) + if(counts(currPartition) >= keysPerPartition) { currPartition = (currPartition + 1) % numPartitions + } partition } From b1174bdd49aea0acf051b21655f439dd17a95724 Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 14 May 2014 15:36:46 +0800 Subject: [PATCH 11/16] merge master --- core/src/main/scala/org/apache/spark/Partitioner.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 0d80fb35d5201..9155159cf6aeb 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -156,4 +156,3 @@ class RangePartitioner[K : Ordering : ClassTag, V]( false } } - From b0930b045cc574b309d0826e4268b7e12f07f4d0 Mon Sep 17 00:00:00 2001 From: witgo Date: Wed, 14 May 2014 16:07:00 +0800 Subject: [PATCH 12/16] review commit --- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 26daf4b10ce96..2279d77c91c89 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -68,7 +68,7 @@ private[spark] class CoarseGrainedExecutorBackend( System.exit(1) } else { val ser = SparkEnv.get.closureSerializer.newInstance() - val taskDesc =ser.deserialize[TaskDescription](data.value) + val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) } From 44a59eeee673f97dd32920cf13649cf36aa1e967 Mon Sep 17 00:00:00 2001 From: witgo Date: Thu, 15 May 2014 09:54:58 +0800 Subject: [PATCH 13/16] review commit --- .../CoarseGrainedSchedulerBackend.scala | 18 +++++++++++++----- .../CoarseGrainedSchedulerBackendSuite.scala | 3 +-- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 42e0a0f20133e..78c23f2ef6b7d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -144,11 +144,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A val ser = SparkEnv.get.closureSerializer.newInstance() val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - 1024) { - var msg = "Serialized task %s:%d was %d bytes which " + - "exceeds spark.akka.frameSize (%d bytes). " + - "Consider using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize) - scheduler.error(msg) + val taskSetId = scheduler.taskIdToTaskSetId(task.taskId) + scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => + try { + var msg = "Serialized task %s:%d was %d bytes which " + + "exceeds spark.akka.frameSize (%d bytes). " + + "Consider using broadcast variables for large values." + msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize) + taskSet.abort(msg) + } catch { + case e: Exception => logError("Exception in error callback", e) + } + } + // scheduler.error(msg) // TODO: Need to throw an exception? // throw new SparkException(msg) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 79bfb8d0dfe78..9cd7c85698732 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.scheduler import org.apache.spark.{SparkConf, SparkException, SparkContext} import org.apache.spark.util.{SerializableBuffer, AkkaUtils} import org.apache.spark.SparkContext._ -import java.nio.ByteBuffer import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} @@ -41,7 +40,7 @@ class CoarseGrainedSchedulerBackendSuite extends FunSuite with val conf = new SparkConf val sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf) val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf) - val buffer = new SerializableBuffer(ByteBuffer.allocate(2 * frameSize)) + val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize)) val larger = sc.parallelize(Seq(buffer)) val thrown = intercept[SparkException] { larger.collect() From 63636b6a323903b6d55e9c9dc73d54b9f6da47c3 Mon Sep 17 00:00:00 2001 From: witgo Date: Thu, 15 May 2014 10:44:00 +0800 Subject: [PATCH 14/16] review commit --- .../CoarseGrainedSchedulerBackendSuite.scala | 20 +++++-------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 9cd7c85698732..bae3cb2e96046 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -19,25 +19,15 @@ package org.apache.spark.scheduler import org.apache.spark.{SparkConf, SparkException, SparkContext} import org.apache.spark.util.{SerializableBuffer, AkkaUtils} -import org.apache.spark.SparkContext._ -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} +import org.scalatest.FunSuite -class CoarseGrainedSchedulerBackendSuite extends FunSuite with - BeforeAndAfter with BeforeAndAfterAll { +class CoarseGrainedSchedulerBackendSuite extends FunSuite { - override def beforeAll { - System.setProperty("spark.akka.frameSize", "1") - System.setProperty("spark.default.parallelism", "1") - } - - override def afterAll { - System.clearProperty("spark.akka.frameSize") - System.clearProperty("spark.default.parallelism") - } - - test("serialized task larger than Akka frame size") { + test("serialized task larger than akka frame size") { val conf = new SparkConf + conf.set("spark.akka.frameSize","1") + conf.set("spark.default.parallelism","1") val sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf) val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf) val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize)) From 52e67529647732700203f32a83c09795acffd63b Mon Sep 17 00:00:00 2001 From: witgo Date: Thu, 15 May 2014 11:29:36 +0800 Subject: [PATCH 15/16] reset test SparkContext --- .../scheduler/CoarseGrainedSchedulerBackendSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index bae3cb2e96046..efef9d26dadca 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -17,18 +17,18 @@ package org.apache.spark.scheduler -import org.apache.spark.{SparkConf, SparkException, SparkContext} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkContext} import org.apache.spark.util.{SerializableBuffer, AkkaUtils} import org.scalatest.FunSuite -class CoarseGrainedSchedulerBackendSuite extends FunSuite { +class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext { test("serialized task larger than akka frame size") { val conf = new SparkConf conf.set("spark.akka.frameSize","1") conf.set("spark.default.parallelism","1") - val sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf) + sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf) val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf) val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize)) val larger = sc.parallelize(Seq(buffer)) From 0f524833ead09a6a46e138e59b9835018f28e69e Mon Sep 17 00:00:00 2001 From: witgo Date: Sat, 17 May 2014 10:32:17 +0800 Subject: [PATCH 16/16] review commit --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 78c23f2ef6b7d..e47a060683a2d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -156,9 +156,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A case e: Exception => logError("Exception in error callback", e) } } - // scheduler.error(msg) - // TODO: Need to throw an exception? - // throw new SparkException(msg) } else { freeCores(task.executorId) -= scheduler.CPUS_PER_TASK