diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index baee7a216a7c..4b8fbead8f3b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -97,10 +97,6 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) - // Akka's message frame size. If task result is bigger than this, we use the block manager - // to send the result back. - private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Start worker thread pool val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker") @@ -212,7 +208,12 @@ private[spark] class Executor( val serializedDirectResult = ser.serialize(directResult) logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit) val serializedResult = { - if (serializedDirectResult.limit >= akkaFrameSize - 1024) { + // TODO: [SPARK-1112] We use the min frame size to determine whether to use Akka to send + // the task result or block manager. Since this is via the backend, whose actor system is + // initialized before receiving the Spark conf, and hence it does not know + // `spark.akka.frameSize`. A temporary solution is using the min frame size. + // [SPARK-2156] We subtract 200K to leave some space for other data in the Akka message. + if (serializedDirectResult.limit >= AkkaUtils.minFrameSizeBytes - 200 * 1024) { logInfo("Storing result for " + taskId + " in local BlockManager") val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index a8d12bb2a016..7c1ce7542bcc 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -119,6 +119,9 @@ private[spark] object AkkaUtils extends Logging { /** Returns the configured max frame size for Akka messages in bytes. */ def maxFrameSizeBytes(conf: SparkConf): Int = { - conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024 + math.max(conf.getInt("spark.akka.frameSize", 0) * 1024 * 1024, minFrameSizeBytes) } + + /** The minimum value of the max frame size for Akka messages in bytes. */ + val minFrameSizeBytes = 10 * 1024 * 1024 } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 95ba273f16a7..c7918730969c 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -179,7 +179,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("remote fetch exceeds akka frame size") { val newConf = new SparkConf - newConf.set("spark.akka.frameSize", "1") newConf.set("spark.akka.askTimeout", "1") // Fail fast val masterTracker = new MapOutputTrackerMaster(conf) @@ -188,14 +187,18 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem) val masterActor = actorRef.underlyingActor - // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception. - // Note that the size is hand-selected here because map output statuses are compressed before - // being sent. - masterTracker.registerShuffle(20, 100) - (0 until 100).foreach { i => - masterTracker.registerMapOutput(20, i, new MapStatus( - BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0))) + // Frame size should be 2 * AkkaUtils.minFrameSizeBytes, and MapOutputTrackerMasterActor should + // throw exception. + val shuffleId = 20 + val numMaps = 2 + val data = new Array[Byte](AkkaUtils.minFrameSizeBytes) + val random = new java.util.Random(0) + random.nextBytes(data) // Make it hard to compress. + masterTracker.registerShuffle(shuffleId, numMaps) + (0 until numMaps).foreach { i => + masterTracker.registerMapOutput(shuffleId, i, new MapStatus( + BlockManagerId("999", "mps", 1000, 0), data)) } - intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) } + intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(shuffleId)) } } }