Skip to content

Commit fc6a3e2

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-4924
Conflicts: bin/spark-submit bin/spark-submit2.cmd
2 parents f26556b + 8782eb9 commit fc6a3e2

File tree

21 files changed

+343
-82
lines changed

21 files changed

+343
-82
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
22+
/**
23+
* Exception thrown when a task cannot be serialized.
24+
*/
25+
private[spark] class TaskNotSerializableException(error: Throwable) extends Exception(error)

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
392392
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
393393
| --num-executors NUM Number of executors to launch (Default: 2).
394394
| --archives ARCHIVES Comma separated list of archives to be extracted into the
395-
| working directory of each executor.""".stripMargin
395+
| working directory of each executor.
396+
""".stripMargin
396397
)
397398
SparkSubmit.exitFn()
398399
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -866,26 +866,6 @@ class DAGScheduler(
866866
}
867867

868868
if (tasks.size > 0) {
869-
// Preemptively serialize a task to make sure it can be serialized. We are catching this
870-
// exception here because it would be fairly hard to catch the non-serializable exception
871-
// down the road, where we have several different implementations for local scheduler and
872-
// cluster schedulers.
873-
//
874-
// We've already serialized RDDs and closures in taskBinary, but here we check for all other
875-
// objects such as Partition.
876-
try {
877-
closureSerializer.serialize(tasks.head)
878-
} catch {
879-
case e: NotSerializableException =>
880-
abortStage(stage, "Task not serializable: " + e.toString)
881-
runningStages -= stage
882-
return
883-
case NonFatal(e) => // Other exceptions, such as IllegalArgumentException from Kryo.
884-
abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
885-
runningStages -= stage
886-
return
887-
}
888-
889869
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
890870
stage.pendingTasks ++= tasks
891871
logDebug("New pending tasks: " + stage.pendingTasks)

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import scala.util.Random
3131
import org.apache.spark._
3232
import org.apache.spark.TaskState.TaskState
3333
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
34+
import org.apache.spark.scheduler.TaskLocality.TaskLocality
3435
import org.apache.spark.util.Utils
3536
import org.apache.spark.executor.TaskMetrics
3637
import org.apache.spark.storage.BlockManagerId
@@ -209,6 +210,40 @@ private[spark] class TaskSchedulerImpl(
209210
.format(manager.taskSet.id, manager.parent.name))
210211
}
211212

213+
private def resourceOfferSingleTaskSet(
214+
taskSet: TaskSetManager,
215+
maxLocality: TaskLocality,
216+
shuffledOffers: Seq[WorkerOffer],
217+
availableCpus: Array[Int],
218+
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
219+
var launchedTask = false
220+
for (i <- 0 until shuffledOffers.size) {
221+
val execId = shuffledOffers(i).executorId
222+
val host = shuffledOffers(i).host
223+
if (availableCpus(i) >= CPUS_PER_TASK) {
224+
try {
225+
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
226+
tasks(i) += task
227+
val tid = task.taskId
228+
taskIdToTaskSetId(tid) = taskSet.taskSet.id
229+
taskIdToExecutorId(tid) = execId
230+
executorsByHost(host) += execId
231+
availableCpus(i) -= CPUS_PER_TASK
232+
assert(availableCpus(i) >= 0)
233+
launchedTask = true
234+
}
235+
} catch {
236+
case e: TaskNotSerializableException =>
237+
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
238+
// Do not offer resources for this task, but don't throw an error to allow other
239+
// task sets to be submitted.
240+
return launchedTask
241+
}
242+
}
243+
}
244+
return launchedTask
245+
}
246+
212247
/**
213248
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
214249
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
@@ -251,23 +286,8 @@ private[spark] class TaskSchedulerImpl(
251286
var launchedTask = false
252287
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
253288
do {
254-
launchedTask = false
255-
for (i <- 0 until shuffledOffers.size) {
256-
val execId = shuffledOffers(i).executorId
257-
val host = shuffledOffers(i).host
258-
if (availableCpus(i) >= CPUS_PER_TASK) {
259-
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
260-
tasks(i) += task
261-
val tid = task.taskId
262-
taskIdToTaskSetId(tid) = taskSet.taskSet.id
263-
taskIdToExecutorId(tid) = execId
264-
executorsByHost(host) += execId
265-
availableCpus(i) -= CPUS_PER_TASK
266-
assert(availableCpus(i) >= 0)
267-
launchedTask = true
268-
}
269-
}
270-
}
289+
launchedTask = resourceOfferSingleTaskSet(
290+
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
271291
} while (launchedTask)
272292
}
273293

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
package org.apache.spark.scheduler
1919

2020
import java.io.NotSerializableException
21+
import java.nio.ByteBuffer
2122
import java.util.Arrays
2223

2324
import scala.collection.mutable.ArrayBuffer
2425
import scala.collection.mutable.HashMap
2526
import scala.collection.mutable.HashSet
2627
import scala.math.{min, max}
28+
import scala.util.control.NonFatal
2729

2830
import org.apache.spark._
2931
import org.apache.spark.executor.TaskMetrics
@@ -417,6 +419,7 @@ private[spark] class TaskSetManager(
417419
* @param host the host Id of the offered resource
418420
* @param maxLocality the maximum locality we want to schedule the tasks at
419421
*/
422+
@throws[TaskNotSerializableException]
420423
def resourceOffer(
421424
execId: String,
422425
host: String,
@@ -456,10 +459,17 @@ private[spark] class TaskSetManager(
456459
}
457460
// Serialize and return the task
458461
val startTime = clock.getTime()
459-
// We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
460-
// we assume the task can be serialized without exceptions.
461-
val serializedTask = Task.serializeWithDependencies(
462-
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
462+
val serializedTask: ByteBuffer = try {
463+
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
464+
} catch {
465+
// If the task cannot be serialized, then there's no point to re-attempt the task,
466+
// as it will always fail. So just abort the whole task-set.
467+
case NonFatal(e) =>
468+
val msg = s"Failed to serialize task $taskId, not attempting to retry it."
469+
logError(msg, e)
470+
abort(s"$msg Exception during serialization: $e")
471+
throw new TaskNotSerializableException(e)
472+
}
463473
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
464474
!emittedTaskSizeWarning) {
465475
emittedTaskSizeWarning = true

core/src/test/scala/org/apache/spark/SharedSparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
3030
var conf = new SparkConf(false)
3131

3232
override def beforeAll() {
33-
_sc = new SparkContext("local", "test", conf)
33+
_sc = new SparkContext("local[4]", "test", conf)
3434
super.beforeAll()
3535
}
3636

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
package org.apache.spark.rdd
1919

20+
import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
21+
22+
import com.esotericsoftware.kryo.KryoException
23+
2024
import scala.collection.mutable.{ArrayBuffer, HashMap}
2125
import scala.collection.JavaConverters._
2226
import scala.reflect.ClassTag
@@ -887,6 +891,23 @@ class RDDSuite extends FunSuite with SharedSparkContext {
887891
assert(ancestors6.count(_.isInstanceOf[CyclicalDependencyRDD[_]]) === 3)
888892
}
889893

894+
test("task serialization exception should not hang scheduler") {
895+
class BadSerializable extends Serializable {
896+
@throws(classOf[IOException])
897+
private def writeObject(out: ObjectOutputStream): Unit = throw new KryoException("Bad serialization")
898+
899+
@throws(classOf[IOException])
900+
private def readObject(in: ObjectInputStream): Unit = {}
901+
}
902+
// Note that in the original bug, SPARK-4349, that this verifies, the job would only hang if there were
903+
// more threads in the Spark Context than there were number of objects in this sequence.
904+
intercept[Throwable] {
905+
sc.parallelize(Seq(new BadSerializable, new BadSerializable)).collect
906+
}
907+
// Check that the context has not crashed
908+
sc.parallelize(1 to 100).map(x => x*2).collect
909+
}
910+
890911
/** A contrived RDD that allows the manual addition of dependencies after creation. */
891912
private class CyclicalDependencyRDD[T: ClassTag] extends RDD[T](sc, Nil) {
892913
private val mutableDependencies: ArrayBuffer[Dependency[_]] = ArrayBuffer.empty
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
21+
22+
import org.apache.spark.TaskContext
23+
24+
/**
25+
* A Task implementation that fails to serialize.
26+
*/
27+
private[spark] class NotSerializableFakeTask(myId: Int, stageId: Int) extends Task[Array[Byte]](stageId, 0) {
28+
override def runTask(context: TaskContext): Array[Byte] = Array.empty[Byte]
29+
override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]()
30+
31+
@throws(classOf[IOException])
32+
private def writeObject(out: ObjectOutputStream): Unit = {
33+
if (stageId == 0) {
34+
throw new IllegalStateException("Cannot serialize")
35+
}
36+
}
37+
38+
@throws(classOf[IOException])
39+
private def readObject(in: ObjectInputStream): Unit = {}
40+
}

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,34 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
100100
assert(1 === taskDescriptions.length)
101101
assert("executor0" === taskDescriptions(0).executorId)
102102
}
103+
104+
test("Scheduler does not crash when tasks are not serializable") {
105+
sc = new SparkContext("local", "TaskSchedulerImplSuite")
106+
val taskCpus = 2
107+
108+
sc.conf.set("spark.task.cpus", taskCpus.toString)
109+
val taskScheduler = new TaskSchedulerImpl(sc)
110+
taskScheduler.initialize(new FakeSchedulerBackend)
111+
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
112+
val dagScheduler = new DAGScheduler(sc, taskScheduler) {
113+
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
114+
override def executorAdded(execId: String, host: String) {}
115+
}
116+
val numFreeCores = 1
117+
taskScheduler.setDAGScheduler(dagScheduler)
118+
var taskSet = new TaskSet(Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
119+
val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
120+
new WorkerOffer("executor1", "host1", numFreeCores))
121+
taskScheduler.submitTasks(taskSet)
122+
var taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
123+
assert(0 === taskDescriptions.length)
124+
125+
// Now check that we can still submit tasks
126+
// Even if one of the tasks has not-serializable tasks, the other task set should still be processed without error
127+
taskScheduler.submitTasks(taskSet)
128+
taskScheduler.submitTasks(FakeTask.createTaskSet(1))
129+
taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
130+
assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
131+
}
132+
103133
}

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
2021
import java.util.Random
2122

2223
import scala.collection.mutable.ArrayBuffer
@@ -563,6 +564,19 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
563564
assert(manager.emittedTaskSizeWarning)
564565
}
565566

567+
test("Not serializable exception thrown if the task cannot be serialized") {
568+
sc = new SparkContext("local", "test")
569+
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
570+
571+
val taskSet = new TaskSet(Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
572+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
573+
574+
intercept[TaskNotSerializableException] {
575+
manager.resourceOffer("exec1", "host1", ANY)
576+
}
577+
assert(manager.isZombie)
578+
}
579+
566580
test("abort the job if total size of results is too large") {
567581
val conf = new SparkConf().set("spark.driver.maxResultSize", "2m")
568582
sc = new SparkContext("local", "test", conf)

0 commit comments

Comments
 (0)