Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ private[spark] class Executor(
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)

// Akka's message frame size. If task result is bigger than this, we use the block manager
// to send the result back.
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

// Start worker thread pool
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")

Expand Down Expand Up @@ -212,7 +208,12 @@ private[spark] class Executor(
val serializedDirectResult = ser.serialize(directResult)
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
val serializedResult = {
if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
// TODO: [SPARK-1112] We use the min frame size to determine whether to use Akka to send
// the task result or block manager. Since this is via the backend, whose actor system is
// initialized before receiving the Spark conf, and hence it does not know
// `spark.akka.frameSize`. A temporary solution is using the min frame size.
// [SPARK-2156] We subtract 200K to leave some space for other data in the Akka message.
if (serializedDirectResult.limit >= AkkaUtils.minFrameSizeBytes - 200 * 1024) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

200 KB may not enough , Its value should increase as the serialized DirectResult becomes larger .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested with a double array of size very close to 10 * 1024 * 1024. The akka message overhead is about 30-60K. This PR doesn't fix the issues with receiving new tasks from the driver that are bigger than 10MB. @pwendell is working on it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 200K and should we change the similar code that does this for sending task closures (which also subtracts 1024)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We computed the difference between the size of the akka message and the size of serialized task result (~10M). The difference is smaller than 60K. I set 200K to be safe. Could you point me to the places where we use 1024?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout I saw the line in CoarseGrainedSchedulerBackend. Should the overhead be bounded by a fixed size or proportional to the message size?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: your last question, I have no idea...you definitely know more about this than I do at this point. Using the higher, 200K value seems like a safe alternative to what's currently there. It would be great to add this constant in AkkaUtils so we don't need to manually track this down if it changes again in the future.

logInfo("Storing result for " + taskId + " in local BlockManager")
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ private[spark] object AkkaUtils extends Logging {

/** Returns the configured max frame size for Akka messages in bytes. */
def maxFrameSizeBytes(conf: SparkConf): Int = {
conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024
math.max(conf.getInt("spark.akka.frameSize", 0) * 1024 * 1024, minFrameSizeBytes)
}

/** The minimum value of the max frame size for Akka messages in bytes. */
val minFrameSizeBytes = 10 * 1024 * 1024
}
21 changes: 12 additions & 9 deletions core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {

test("remote fetch exceeds akka frame size") {
val newConf = new SparkConf
newConf.set("spark.akka.frameSize", "1")
newConf.set("spark.akka.askTimeout", "1") // Fail fast

val masterTracker = new MapOutputTrackerMaster(conf)
Expand All @@ -188,14 +187,18 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
val masterActor = actorRef.underlyingActor

// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.
// Note that the size is hand-selected here because map output statuses are compressed before
// being sent.
masterTracker.registerShuffle(20, 100)
(0 until 100).foreach { i =>
masterTracker.registerMapOutput(20, i, new MapStatus(
BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0)))
// Frame size should be 2 * AkkaUtils.minFrameSizeBytes, and MapOutputTrackerMasterActor should
// throw exception.
val shuffleId = 20
val numMaps = 2
val data = new Array[Byte](AkkaUtils.minFrameSizeBytes)
val random = new java.util.Random(0)
random.nextBytes(data) // Make it hard to compress.
masterTracker.registerShuffle(shuffleId, numMaps)
(0 until numMaps).foreach { i =>
masterTracker.registerMapOutput(shuffleId, i, new MapStatus(
BlockManagerId("999", "mps", 1000, 0), data))
}
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(shuffleId)) }
}
}