From 281d7c9f83344cf107ed946635eda7be922e0267 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 14 Mar 2014 20:18:35 -0700 Subject: [PATCH 1/4] Throw exception on spark.akka.frameSize exceeded + Unit tests --- .../org/apache/spark/MapOutputTracker.scala | 12 +++- .../scala/org/apache/spark/SparkEnv.scala | 2 +- .../org/apache/spark/AkkaUtilsSuite.scala | 10 +-- .../apache/spark/MapOutputTrackerSuite.scala | 70 +++++++++++++------ 4 files changed, 66 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 5968973132942..4676fb0a930aa 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -35,13 +35,21 @@ private[spark] case class GetMapOutputStatuses(shuffleId: Int) extends MapOutputTrackerMessage private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage -private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster) +private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf) extends Actor with Logging { + val maxAkkaFrameSize = conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024 // MB + def receive = { case GetMapOutputStatuses(shuffleId: Int) => val hostPort = sender.path.address.hostPort logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort) - sender ! tracker.getSerializedMapOutputStatuses(shuffleId) + val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId) + val serializedSize = mapOutputStatuses.size + if (serializedSize > maxAkkaFrameSize) { + throw new SparkException( + "spark.akka.frameSize exceeded! Map output statuses were %d bytes".format(serializedSize)) + } + sender ! mapOutputStatuses case StopMapOutputTracker => logInfo("MapOutputTrackerActor stopped!") diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 5e43b5198422c..26c362d0fea7b 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -186,7 +186,7 @@ object SparkEnv extends Logging { } mapOutputTracker.trackerActor = registerOrLookup( "MapOutputTracker", - new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster])) + new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) val shuffleFetcher = instantiateClass[ShuffleFetcher]( "spark.shuffle.fetcher", "org.apache.spark.BlockStoreShuffleFetcher") diff --git a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala index cd054c1f684ab..d2e303d81c4c8 100644 --- a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala @@ -45,12 +45,12 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker") + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") val badconf = new SparkConf badconf.set("spark.authenticate", "true") badconf.set("spark.authenticate.secret", "bad") - val securityManagerBad = new SecurityManager(badconf); + val securityManagerBad = new SecurityManager(badconf) assert(securityManagerBad.isAuthenticationEnabled() === true) @@ -84,7 +84,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker") + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") val badconf = new SparkConf badconf.set("spark.authenticate", "false") @@ -136,7 +136,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker") + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") val goodconf = new SparkConf goodconf.set("spark.authenticate", "true") @@ -189,7 +189,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext { val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker") + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") val badconf = new SparkConf badconf.set("spark.authenticate", "false") diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 8efa072a97911..d043d556d99da 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -51,14 +51,16 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("master start and stop") { val actorSystem = ActorSystem("test") val tracker = new MapOutputTrackerMaster(conf) - tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) + tracker.trackerActor = + actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf))) tracker.stop() } test("master register and fetch") { val actorSystem = ActorSystem("test") val tracker = new MapOutputTrackerMaster(conf) - tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) + tracker.trackerActor = + actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf))) tracker.registerShuffle(10, 2) val compressedSize1000 = MapOutputTracker.compressSize(1000L) val compressedSize10000 = MapOutputTracker.compressSize(10000L) @@ -77,7 +79,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("master register and unregister and fetch") { val actorSystem = ActorSystem("test") val tracker = new MapOutputTrackerMaster(conf) - tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) + tracker.trackerActor = + actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf))) tracker.registerShuffle(10, 2) val compressedSize1000 = MapOutputTracker.compressSize(1000L) val compressedSize10000 = MapOutputTracker.compressSize(10000L) @@ -97,23 +100,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { } test("remote fetch") { - val hostname = "localhost" - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, - securityManager = new SecurityManager(conf)) - System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext - - val masterTracker = new MapOutputTrackerMaster(conf) - masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker") - - val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf, - securityManager = new SecurityManager(conf)) - val slaveTracker = new MapOutputTracker(conf) - val selection = slaveSystem.actorSelection( - s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") - val timeout = AkkaUtils.lookupTimeout(conf) - slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) - + val (masterTracker, slaveTracker) = setUpMasterSlaveSystem(conf) masterTracker.registerShuffle(10, 1) masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) @@ -136,4 +123,47 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { // failure should be cached intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } } + + test("remote fetch exceeding akka frame size") { + val newConf = new SparkConf + newConf.set("spark.akka.frameSize", "1") + newConf.set("spark.akka.askTimeout", "1") // Fail fast + val (masterTracker, slaveTracker) = setUpMasterSlaveSystem(newConf) + + // Frame size should be ~123B, and no exception should be thrown + masterTracker.registerShuffle(10, 1) + masterTracker.registerMapOutput(10, 0, new MapStatus( + BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0))) + slaveTracker.getServerStatuses(10, 0) + + // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception + masterTracker.registerShuffle(20, 100) + (0 until 100).foreach { i => + masterTracker.registerMapOutput(20, i, new MapStatus( + BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0))) + } + intercept[SparkException] { slaveTracker.getServerStatuses(20, 0) } + } + + private def setUpMasterSlaveSystem(conf: SparkConf) = { + val hostname = "localhost" + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, + securityManager = new SecurityManager(conf)) + + // Will be cleared by LocalSparkContext + System.setProperty("spark.driver.port", boundPort.toString) + + val masterTracker = new MapOutputTrackerMaster(conf) + masterTracker.trackerActor = actorSystem.actorOf( + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") + + val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf, + securityManager = new SecurityManager(conf)) + val slaveTracker = new MapOutputTracker(conf) + val selection = slaveSystem.actorSelection( + s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") + val timeout = AkkaUtils.lookupTimeout(conf) + slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) + (masterTracker, slaveTracker) + } } From c9b610958da9fa01b34936ee4283bf266aec66bc Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 15 Mar 2014 11:31:48 -0700 Subject: [PATCH 2/4] Simplify test + make access to akka frame size more modular --- .../org/apache/spark/MapOutputTracker.scala | 2 +- .../org/apache/spark/util/AkkaUtils.scala | 7 ++- .../apache/spark/MapOutputTrackerSuite.scala | 56 ++++++++++--------- 3 files changed, 36 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 4676fb0a930aa..127d1195226b7 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -37,7 +37,7 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf) extends Actor with Logging { - val maxAkkaFrameSize = conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024 // MB + val maxAkkaFrameSize = AkkaUtils.maxFrameSize(conf) * 1024 * 1024 // MB def receive = { case GetMapOutputStatuses(shuffleId: Int) => diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index a6c9a9aaba8eb..a466102333267 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -49,7 +49,7 @@ private[spark] object AkkaUtils extends Logging { val akkaTimeout = conf.getInt("spark.akka.timeout", 100) - val akkaFrameSize = conf.getInt("spark.akka.frameSize", 10) + val akkaFrameSize = maxFrameSize(conf) val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" if (!akkaLogLifecycleEvents) { @@ -121,4 +121,9 @@ private[spark] object AkkaUtils extends Logging { def lookupTimeout(conf: SparkConf): FiniteDuration = { Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, "seconds") } + + /** Returns the default max frame size for Akka messages in MB. */ + def maxFrameSize(conf: SparkConf): Int = { + conf.getInt("spark.akka.frameSize", 10) + } } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index d043d556d99da..dc70576d82419 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark import scala.concurrent.Await import akka.actor._ +import akka.testkit.TestActorRef import org.scalatest.FunSuite import org.apache.spark.scheduler.MapStatus @@ -100,7 +101,25 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { } test("remote fetch") { - val (masterTracker, slaveTracker) = setUpMasterSlaveSystem(conf) + val hostname = "localhost" + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, + securityManager = new SecurityManager(conf)) + + // Will be cleared by LocalSparkContext + System.setProperty("spark.driver.port", boundPort.toString) + + val masterTracker = new MapOutputTrackerMaster(conf) + masterTracker.trackerActor = actorSystem.actorOf( + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") + + val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf, + securityManager = new SecurityManager(conf)) + val slaveTracker = new MapOutputTracker(conf) + val selection = slaveSystem.actorSelection( + s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") + val timeout = AkkaUtils.lookupTimeout(conf) + slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) + masterTracker.registerShuffle(10, 1) masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) @@ -113,7 +132,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { masterTracker.incrementEpoch() slaveTracker.updateEpoch(masterTracker.getEpoch) assert(slaveTracker.getServerStatuses(10, 0).toSeq === - Seq((BlockManagerId("a", "hostA", 1000, 0), size1000))) + Seq((BlockManagerId("a", "hostA", 1000, 0), size1000))) masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0)) masterTracker.incrementEpoch() @@ -128,13 +147,18 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { val newConf = new SparkConf newConf.set("spark.akka.frameSize", "1") newConf.set("spark.akka.askTimeout", "1") // Fail fast - val (masterTracker, slaveTracker) = setUpMasterSlaveSystem(newConf) + + val masterTracker = new MapOutputTrackerMaster(conf) + val actorSystem = ActorSystem("test") + val actorRef = TestActorRef[MapOutputTrackerMasterActor]( + new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem) + val masterActor = actorRef.underlyingActor // Frame size should be ~123B, and no exception should be thrown masterTracker.registerShuffle(10, 1) masterTracker.registerMapOutput(10, 0, new MapStatus( BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0))) - slaveTracker.getServerStatuses(10, 0) + masterActor.receive(GetMapOutputStatuses(10)) // Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception masterTracker.registerShuffle(20, 100) @@ -142,28 +166,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { masterTracker.registerMapOutput(20, i, new MapStatus( BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0))) } - intercept[SparkException] { slaveTracker.getServerStatuses(20, 0) } - } - - private def setUpMasterSlaveSystem(conf: SparkConf) = { - val hostname = "localhost" - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf, - securityManager = new SecurityManager(conf)) - - // Will be cleared by LocalSparkContext - System.setProperty("spark.driver.port", boundPort.toString) - - val masterTracker = new MapOutputTrackerMaster(conf) - masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") - - val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf, - securityManager = new SecurityManager(conf)) - val slaveTracker = new MapOutputTracker(conf) - val selection = slaveSystem.actorSelection( - s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") - val timeout = AkkaUtils.lookupTimeout(conf) - slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) - (masterTracker, slaveTracker) + intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) } } } From 9b66a8c5955a1d72d2ae7b74ba4791af78cf9a45 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 15 Mar 2014 17:24:00 -0700 Subject: [PATCH 3/4] Move around akka frame size (minor) --- .../scala/org/apache/spark/MapOutputTracker.scala | 2 +- .../scala/org/apache/spark/executor/Executor.scala | 12 +++++------- .../main/scala/org/apache/spark/util/AkkaUtils.scala | 10 +++++----- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 127d1195226b7..df98ebb08dbda 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -37,7 +37,7 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf) extends Actor with Logging { - val maxAkkaFrameSize = AkkaUtils.maxFrameSize(conf) * 1024 * 1024 // MB + val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) def receive = { case GetMapOutputStatuses(shuffleId: Int) => diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index e69f6f72d3275..07514c63377af 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -29,7 +29,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} -import org.apache.spark.util.Utils +import org.apache.spark.util.{AkkaUtils, Utils} /** * Spark executor used with Mesos, YARN, and the standalone scheduler. @@ -118,11 +118,9 @@ private[spark] class Executor( private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) Thread.currentThread.setContextClassLoader(replClassLoader) - // Akka's message frame size. If task result is bigger than this, we use the block manager - // to send the result back. - private val akkaFrameSize = { - env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size") - } + // Akka's message frame size in bytes. If task result is bigger than this, we use the block + // manager to send the result back. + private val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) // Start worker thread pool val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker") @@ -239,7 +237,7 @@ private[spark] class Executor( val serializedDirectResult = ser.serialize(directResult) logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit) val serializedResult = { - if (serializedDirectResult.limit >= akkaFrameSize - 1024) { + if (serializedDirectResult.limit >= maxAkkaFrameSize - 1024) { logInfo("Storing result for " + taskId + " in local BlockManager") val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index a466102333267..d9fb4eb6f49b2 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -49,7 +49,7 @@ private[spark] object AkkaUtils extends Logging { val akkaTimeout = conf.getInt("spark.akka.timeout", 100) - val akkaFrameSize = maxFrameSize(conf) + val akkaFrameSize = maxFrameSizeBytes(conf) val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" if (!akkaLogLifecycleEvents) { @@ -92,7 +92,7 @@ private[spark] object AkkaUtils extends Logging { |akka.remote.netty.tcp.port = $port |akka.remote.netty.tcp.tcp-nodelay = on |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s - |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB + |akka.remote.netty.tcp.maximum-frame-size = $akkaFrameSize b |akka.remote.netty.tcp.execution-pool-size = $akkaThreads |akka.actor.default-dispatcher.throughput = $akkaBatchSize |akka.log-config-on-start = $logAkkaConfig @@ -122,8 +122,8 @@ private[spark] object AkkaUtils extends Logging { Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, "seconds") } - /** Returns the default max frame size for Akka messages in MB. */ - def maxFrameSize(conf: SparkConf): Int = { - conf.getInt("spark.akka.frameSize", 10) + /** Returns the default max frame size for Akka messages in bytes. */ + def maxFrameSizeBytes(conf: SparkConf): Int = { + conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024 } } From 06bf2da11b578d245fc7ccf7507e549b2227baeb Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 15 Mar 2014 17:26:40 -0700 Subject: [PATCH 4/4] Add akka frame size to exception messages --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index df98ebb08dbda..7acb7f8825108 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -47,7 +47,8 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster val serializedSize = mapOutputStatuses.size if (serializedSize > maxAkkaFrameSize) { throw new SparkException( - "spark.akka.frameSize exceeded! Map output statuses were %d bytes".format(serializedSize)) + "spark.akka.frameSize of %d bytes exceeded! ".format(maxAkkaFrameSize) + + "Map output statuses were %d bytes".format(serializedSize)) } sender ! mapOutputStatuses