diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 175a9e71d6715..fc6739a964206 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -54,40 +54,27 @@ private[spark] class TaskDescription( val index: Int, // Index within this task's TaskSet val addedFiles: Map[String, Long], val addedJars: Map[String, Long], - val properties: Properties) extends Logging { - - def this( - taskId: Long, - attemptNumber: Int, - executorId: String, - name: String, - index: Int, // Index within this task's TaskSet - addedFiles: Map[String, Long], - addedJars: Map[String, Long], - properties: Properties, - task: Task[_]) { - this(taskId, attemptNumber, executorId, name, index, - addedFiles, addedJars, properties) - task_ = task - } - - def serializedTask: ByteBuffer = { - // This is where we serialize the task on the driver before sending it to the executor. - // This is not done when creating the TaskDescription so we can postpone this serialization - // to later in the scheduling process -- particularly, - // so it can happen in another thread by the CoarseGrainedSchedulerBackend. - try { - ByteBuffer.wrap(Utils.serialize(task_)) - } catch { - case NonFatal(e) => - val msg = s"Failed to serialize task $taskId, not attempting to retry it." - logError(msg, e) - throw new TaskNotSerializableException(e) - } + val properties: Properties, + // Task object corresponding to the TaskDescription. This is only defined on the master; on + // the worker, the Task object is handled separately from the TaskDescription so that it can + // deserialized after the TaskDescription is deserialized. + @transient private val task: Task[_] = null) extends Logging { + + /** + * Serializes the task for this TaskDescription and returns the serialized task. + * + * This method should only be used on the master (to serialize a task to send to a worker). + */ + def serializeTask(): ByteBuffer = { + try { + ByteBuffer.wrap(Utils.serialize(task)) + } catch { + case NonFatal(e) => + val msg = s"Failed to serialize task ${taskId}." + logError(msg, e) + throw new TaskNotSerializableException(e) } - - private var task_ : Task[_] = null - + } override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index) } @@ -101,7 +88,7 @@ private[spark] object TaskDescription { } @throws[TaskNotSerializableException] - def encode(taskDescription: TaskDescription, serializedTask: ByteBuffer): ByteBuffer = { + def encode(taskDescription: TaskDescription): ByteBuffer = { val bytesOut = new ByteBufferOutputStream(4096) val dataOut = new DataOutputStream(bytesOut) @@ -124,8 +111,8 @@ private[spark] object TaskDescription { dataOut.writeUTF(value) } - // Write the task. The task is already serialized, so write it directly to the byte buffer. - Utils.writeByteBuffer(serializedTask, bytesOut) + // Serialize and write the task. + Utils.writeByteBuffer(taskDescription.serializeTask(), bytesOut) dataOut.close() bytesOut.close() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4227b47ff8f52..20bbef34f0fed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -248,7 +248,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val serializedTasks = tasks.flatten.map { task => var serializedTask: ByteBuffer = null try { - serializedTask = TaskDescription.encode(task, task.serializedTask) + serializedTask = TaskDescription.encode(task) if (serializedTask.limit >= maxRpcMessageSize) { val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index 8b8934eaba9b9..5e566c75246b9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -89,7 +89,7 @@ private[spark] class LocalEndpoint( val serializedTasks = scheduler.resourceOffers(offers).flatten.map { task => var serializedTask: ByteBuffer = null try { - serializedTask = task.serializedTask + serializedTask = task.serializeTask } catch { case NonFatal(e) => abortTaskSetManager(scheduler, task.taskId, diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala index 6cd7d49e44be1..d8007dde18c65 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala @@ -37,7 +37,6 @@ class TaskDescriptionSuite extends SparkFunSuite { originalProperties.put("property1", "18") originalProperties.put("property2", "test value") - // Create a dummy byte buffer for the task. val taskBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4)) val originalTaskDescription = new TaskDescription( @@ -48,10 +47,15 @@ class TaskDescriptionSuite extends SparkFunSuite { index = 19, originalFiles, originalJars, - originalProperties - ) + originalProperties, + // Pass in null for the task, because we override the serialize method below anyway (which + // is the only time task is used). + task = null + ) { + override def serializeTask() = taskBuffer + } - val serializedTaskDescription = TaskDescription.encode(originalTaskDescription, taskBuffer) + val serializedTaskDescription = TaskDescription.encode(originalTaskDescription) val (decodedTaskDescription, serializedTask) = TaskDescription.decode(serializedTaskDescription) // Make sure that all of the fields in the decoded task description match the original.