diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index f3bd0797aa035..47faaa8639f6e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -27,10 +27,10 @@ import scala.math.max import scala.math.min import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted, - SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState} + SparkEnv, SparkException, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.TaskMetrics -import org.apache.spark.util.{Clock, SystemClock} +import org.apache.spark.util.{AkkaUtils, Clock, SystemClock} /** * Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of @@ -56,6 +56,7 @@ private[spark] class TaskSetManager( { val conf = sched.sc.conf + private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) /* * Sometimes if an executor is dead or in an otherwise invalid state, the driver * does not realize right away leading to repeated task failures. If enabled, @@ -414,6 +415,14 @@ private[spark] class TaskSetManager( // we assume the task can be serialized without exceptions. val serializedTask = Task.serializeWithDependencies( task, sched.sc.addedFiles, sched.sc.addedJars, ser) + if (serializedTask.limit >= akkaFrameSize - 1024) { + var msg = "Serialized task %s:%d were %d bytes which " + + "exceeds spark.akka.frameSize (%d bytes)." + msg = msg.format(taskSet.id, index, serializedTask.limit, akkaFrameSize) + val exception = new SparkException(msg) + logError(msg, exception) + throw exception + } val timeTaken = clock.getTime() - startTime addRunningTask(taskId) logInfo("Serialized task %s:%d as %d bytes in %d ms".format(