Skip to content

Commit 7947c18

Browse files
committed
increase slack size for akka
1 parent 4ab696a commit 7947c18

File tree

4 files changed

+11
-7
lines changed

4 files changed

+11
-7
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ private[spark] object CoarseGrainedExecutorBackend {
112112
// Bootstrap to fetch the driver's Spark properties.
113113
val executorConf = new SparkConf
114114
val (fetcher, _) = AkkaUtils.createActorSystem(
115-
"driverConfFetcher", hostname, 0, executorConf, new SecurityManager(executorConf))
115+
"driverPropsFetcher", hostname, 0, executorConf, new SecurityManager(executorConf))
116116
val driver = fetcher.actorSelection(driverUrl)
117117
val timeout = new Timeout(5, TimeUnit.MINUTES)
118118
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ private[spark] class Executor(
212212
val serializedDirectResult = ser.serialize(directResult)
213213
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
214214
val serializedResult = {
215-
if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
215+
if (serializedDirectResult.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
216216
logInfo("Storing result for " + taskId + " in local BlockManager")
217217
val blockId = TaskResultBlockId(taskId)
218218
env.blockManager.putBytes(

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
145145
for (task <- tasks.flatten) {
146146
val ser = SparkEnv.get.closureSerializer.newInstance()
147147
val serializedTask = ser.serialize(task)
148-
if (serializedTask.limit >= akkaFrameSize - 1024) {
148+
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
149149
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
150150
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
151151
try {
152-
var msg = "Serialized task %s:%d was %d bytes which " +
153-
"exceeds spark.akka.frameSize (%d bytes). " +
154-
"Consider using broadcast variables for large values."
155-
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize)
152+
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
153+
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
154+
"spark.akka.frameSize or using broadcast variables for large values."
155+
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
156+
AkkaUtils.reservedSizeBytes)
156157
taskSet.abort(msg)
157158
} catch {
158159
case e: Exception => logError("Exception in error callback", e)

core/src/main/scala/org/apache/spark/util/AkkaUtils.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,4 +121,7 @@ private[spark] object AkkaUtils extends Logging {
121121
def maxFrameSizeBytes(conf: SparkConf): Int = {
122122
conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024
123123
}
124+
125+
/** Space reserved for extra data in an Akka message besides serialized task or task result. */
126+
val reservedSizeBytes = 200 * 1024
124127
}

0 commit comments

Comments
 (0)