Skip to content

Commit 67bffd3

Browse files
pwendellaarondav
authored andcommitted
[SPARK-1112, 2156] (1.0 edition) Use correct akka frame size and overhead amounts.
SPARK-1112: This is a more conservative version of #1132 that doesn't change around the actor system initialization on the executor. Instead we just directly read the current frame size limit from the ActorSystem. SPARK-2156: This uses the same fixe as in #1132. Author: Patrick Wendell <[email protected]> Closes #1172 from pwendell/akka-10-fix and squashes the following commits: d56297e [Patrick Wendell] Set limit in LocalBackend to preserve test expectations 9f5ed19 [Patrick Wendell] [SPARK-1112, 2156] (1.0 edition) Use correct akka frame size and overhead amounts.
1 parent 64316af commit 67bffd3

File tree

7 files changed

+33
-18
lines changed

7 files changed

+33
-18
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ private[spark] class CoarseGrainedExecutorBackend(
3434
driverUrl: String,
3535
executorId: String,
3636
hostPort: String,
37-
cores: Int)
37+
cores: Int,
38+
actorSystem: ActorSystem)
3839
extends Actor
3940
with ExecutorBackend
4041
with Logging {
@@ -94,6 +95,9 @@ private[spark] class CoarseGrainedExecutorBackend(
9495
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
9596
driver ! StatusUpdate(executorId, taskId, state, data)
9697
}
98+
99+
override def akkaFrameSize() = actorSystem.settings.config.getBytes(
100+
"akka.remote.netty.tcp.maximum-frame-size")
97101
}
98102

99103
private[spark] object CoarseGrainedExecutorBackend {
@@ -113,7 +117,7 @@ private[spark] object CoarseGrainedExecutorBackend {
113117
val sparkHostPort = hostname + ":" + boundPort
114118
actorSystem.actorOf(
115119
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
116-
sparkHostPort, cores),
120+
sparkHostPort, cores, actorSystem),
117121
name = "Executor")
118122
workerUrl.foreach {
119123
url =>

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,6 @@ private[spark] class Executor(
9797
private val urlClassLoader = createClassLoader()
9898
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
9999

100-
// Akka's message frame size. If task result is bigger than this, we use the block manager
101-
// to send the result back.
102-
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
103-
104100
// Start worker thread pool
105101
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
106102

@@ -211,8 +207,10 @@ private[spark] class Executor(
211207
task.metrics.getOrElse(null))
212208
val serializedDirectResult = ser.serialize(directResult)
213209
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
210+
214211
val serializedResult = {
215-
if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
212+
if (serializedDirectResult.limit >= execBackend.akkaFrameSize() -
213+
AkkaUtils.reservedSizeBytes) {
216214
logInfo("Storing result for " + taskId + " in local BlockManager")
217215
val blockId = TaskResultBlockId(taskId)
218216
env.blockManager.putBytes(

core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,7 @@ import org.apache.spark.TaskState.TaskState
2626
*/
2727
private[spark] trait ExecutorBackend {
2828
def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
29+
30+
// Exists as a work around for SPARK-1112. This only exists in branch-1.x of Spark.
31+
def akkaFrameSize(): Long = Long.MaxValue
2932
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
143143
for (task <- tasks.flatten) {
144144
val ser = SparkEnv.get.closureSerializer.newInstance()
145145
val serializedTask = ser.serialize(task)
146-
if (serializedTask.limit >= akkaFrameSize - 1024) {
146+
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
147147
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
148148
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
149149
try {

core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ package org.apache.spark.scheduler.local
2020
import java.nio.ByteBuffer
2121

2222
import akka.actor.{Actor, ActorRef, Props}
23-
2423
import org.apache.spark.{Logging, SparkEnv, TaskState}
2524
import org.apache.spark.TaskState.TaskState
2625
import org.apache.spark.executor.{Executor, ExecutorBackend}
2726
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
27+
import org.apache.spark.util.AkkaUtils
2828

2929
private case class ReviveOffers()
3030

@@ -106,4 +106,8 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
106106
override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
107107
localActor ! StatusUpdate(taskId, state, serializedData)
108108
}
109+
110+
// This limit is calculated only to preserve expected behavior in tests. In reality, since this
111+
// backend sends messages over the existing actor system, there is no need to enforce a limit.
112+
override def akkaFrameSize() = AkkaUtils.maxFrameSizeBytes(scheduler.sc.getConf)
109113
}

core/src/main/scala/org/apache/spark/util/AkkaUtils.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,4 +121,7 @@ private[spark] object AkkaUtils extends Logging {
121121
def maxFrameSizeBytes(conf: SparkConf): Int = {
122122
conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024
123123
}
124+
125+
/** Space reserved for extra data in an Akka message besides serialized task or task result. */
126+
val reservedSizeBytes = 200 * 1024
124127
}

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
182182

183183
test("remote fetch exceeds akka frame size") {
184184
val newConf = new SparkConf
185-
newConf.set("spark.akka.frameSize", "1")
186185
newConf.set("spark.akka.askTimeout", "1") // Fail fast
187186

188187
val masterTracker = new MapOutputTrackerMaster(conf)
@@ -191,14 +190,18 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
191190
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
192191
val masterActor = actorRef.underlyingActor
193192

194-
// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.
195-
// Note that the size is hand-selected here because map output statuses are compressed before
196-
// being sent.
197-
masterTracker.registerShuffle(20, 100)
198-
(0 until 100).foreach { i =>
199-
masterTracker.registerMapOutput(20, i, new MapStatus(
200-
BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0)))
193+
// Frame size should be 2 * the configured frame size, and MapOutputTrackerMasterActor should
194+
// throw exception.
195+
val shuffleId = 20
196+
val numMaps = 2
197+
val data = new Array[Byte](AkkaUtils.maxFrameSizeBytes(conf))
198+
val random = new java.util.Random(0)
199+
random.nextBytes(data) // Make it hard to compress.
200+
masterTracker.registerShuffle(shuffleId, numMaps)
201+
(0 until numMaps).foreach { i =>
202+
masterTracker.registerMapOutput(shuffleId, i, new MapStatus(
203+
BlockManagerId("999", "mps", 1000, 0), data))
201204
}
202-
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
205+
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(shuffleId)) }
203206
}
204207
}

0 commit comments

Comments
 (0)