From 5ca1edd0d01b66265d306e552c82b49d25537579 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 18 Jun 2014 18:48:20 -0700 Subject: [PATCH 1/5] use min akka frame size to decide whether to send result via akka or block manager --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 5 ++++- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index baee7a216a7c..2095e044379a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -99,7 +99,10 @@ private[spark] class Executor( // Akka's message frame size. If task result is bigger than this, we use the block manager // to send the result back. - private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) + // TODO: [SPARK-1112] We use the min frame size to determine whether to use Akka to send the + // TODO: task result or block manager. Since this is via the backend, whose actor system is + // TODO: initialized before receiving the Spark conf, the safe bet is using the min frame size. + private val akkaFrameSize = AkkaUtils.minFrameSizeBytes // Start worker thread pool val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker") diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index a8d12bb2a016..9816e57f65c0 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -119,6 +119,9 @@ private[spark] object AkkaUtils extends Logging { /** Returns the configured max frame size for Akka messages in bytes. */ def maxFrameSizeBytes(conf: SparkConf): Int = { - conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024 + math.max(conf.getInt("spark.akka.frameSize", 0) * 1024 * 1024, minFrameSizeBytes) } + + /** The min frame size for Akka messages in bytes. */ + val minFrameSizeBytes = 10 * 1024 * 1024 } From c2778313c0c90eafb501e7e1aaec725980539b8a Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 18 Jun 2014 19:09:06 -0700 Subject: [PATCH 2/5] remove akkaFrameSize variable to avoid confusion --- .../org/apache/spark/executor/Executor.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 2095e044379a..5d8c8ea63aea 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -97,13 +97,6 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) - // Akka's message frame size. If task result is bigger than this, we use the block manager - // to send the result back. - // TODO: [SPARK-1112] We use the min frame size to determine whether to use Akka to send the - // TODO: task result or block manager. Since this is via the backend, whose actor system is - // TODO: initialized before receiving the Spark conf, the safe bet is using the min frame size. - private val akkaFrameSize = AkkaUtils.minFrameSizeBytes - // Start worker thread pool val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker") @@ -215,7 +208,13 @@ private[spark] class Executor( val serializedDirectResult = ser.serialize(directResult) logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit) val serializedResult = { - if (serializedDirectResult.limit >= akkaFrameSize - 1024) { + // Akka's message frame size. If task result is bigger than this, we use the block manager + // to send the result back. + // TODO: [SPARK-1112] We use the min frame size to determine whether to use Akka to send + // TODO: the task result or block manager. Since this is via the backend, whose actor + // TODO: system is initialized before receiving the Spark conf, and hence it does not know + // TODO: `spark.akka.frameSize`. A temporary solution is using the min frame size. + if (serializedDirectResult.limit >= AkkaUtils.minFrameSizeBytes - 1024) { logInfo("Storing result for " + taskId + " in local BlockManager") val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( From 41c85e757f50624772898e92131af7c1039a6f12 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 18 Jun 2014 19:30:32 -0700 Subject: [PATCH 3/5] leave some space for other data in akka message --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 5d8c8ea63aea..c4508a72e7bc 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -214,7 +214,8 @@ private[spark] class Executor( // TODO: the task result or block manager. Since this is via the backend, whose actor // TODO: system is initialized before receiving the Spark conf, and hence it does not know // TODO: `spark.akka.frameSize`. A temporary solution is using the min frame size. - if (serializedDirectResult.limit >= AkkaUtils.minFrameSizeBytes - 1024) { + // [SPARK-2156] We subtract 200K to leave some space for other data in the Akka message. + if (serializedDirectResult.limit >= AkkaUtils.minFrameSizeBytes - 200 * 1024) { logInfo("Storing result for " + taskId + " in local BlockManager") val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( From ec3ec2881ef1d24b3e1ddc79d6a4d52d6aee2776 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 18 Jun 2014 21:43:21 -0700 Subject: [PATCH 4/5] address comments --- .../main/scala/org/apache/spark/executor/Executor.scala | 8 +++----- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index c4508a72e7bc..4b8fbead8f3b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -208,12 +208,10 @@ private[spark] class Executor( val serializedDirectResult = ser.serialize(directResult) logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit) val serializedResult = { - // Akka's message frame size. If task result is bigger than this, we use the block manager - // to send the result back. // TODO: [SPARK-1112] We use the min frame size to determine whether to use Akka to send - // TODO: the task result or block manager. Since this is via the backend, whose actor - // TODO: system is initialized before receiving the Spark conf, and hence it does not know - // TODO: `spark.akka.frameSize`. A temporary solution is using the min frame size. + // the task result or block manager. Since this is via the backend, whose actor system is + // initialized before receiving the Spark conf, and hence it does not know + // `spark.akka.frameSize`. A temporary solution is using the min frame size. // [SPARK-2156] We subtract 200K to leave some space for other data in the Akka message. if (serializedDirectResult.limit >= AkkaUtils.minFrameSizeBytes - 200 * 1024) { logInfo("Storing result for " + taskId + " in local BlockManager") diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 9816e57f65c0..7c1ce7542bcc 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -122,6 +122,6 @@ private[spark] object AkkaUtils extends Logging { math.max(conf.getInt("spark.akka.frameSize", 0) * 1024 * 1024, minFrameSizeBytes) } - /** The min frame size for Akka messages in bytes. */ + /** The minimum value of the max frame size for Akka messages in bytes. */ val minFrameSizeBytes = 10 * 1024 * 1024 } From 4f376a4be2467053dd4fc6346c550b7b8c9ea447 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Wed, 18 Jun 2014 22:42:04 -0700 Subject: [PATCH 5/5] fix tests --- .../apache/spark/MapOutputTrackerSuite.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 95ba273f16a7..c7918730969c 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -179,7 +179,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("remote fetch exceeds akka frame size") { val newConf = new SparkConf - newConf.set("spark.akka.frameSize", "1") newConf.set("spark.akka.askTimeout", "1") // Fail fast val masterTracker = new MapOutputTrackerMaster(conf) @@ -188,14 +187,18 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem) val masterActor = actorRef.underlyingActor - // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception. - // Note that the size is hand-selected here because map output statuses are compressed before - // being sent. - masterTracker.registerShuffle(20, 100) - (0 until 100).foreach { i => - masterTracker.registerMapOutput(20, i, new MapStatus( - BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0))) + // Frame size should be 2 * AkkaUtils.minFrameSizeBytes, and MapOutputTrackerMasterActor should + // throw exception. + val shuffleId = 20 + val numMaps = 2 + val data = new Array[Byte](AkkaUtils.minFrameSizeBytes) + val random = new java.util.Random(0) + random.nextBytes(data) // Make it hard to compress. + masterTracker.registerShuffle(shuffleId, numMaps) + (0 until numMaps).foreach { i => + masterTracker.registerMapOutput(shuffleId, i, new MapStatus( + BlockManagerId("999", "mps", 1000, 0), data)) } - intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) } + intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(shuffleId)) } } }