Skip to content

Commit 1d2d563

Browse files
author
Ilya Ganelin
committed
Updated to use scala queues insteadof google queues
1 parent a32f0ac commit 1d2d563

File tree

5 files changed

+44
-47
lines changed

5 files changed

+44
-47
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -810,7 +810,7 @@ class DAGScheduler(
810810
* Failure: String - The reason for the failure.
811811
*
812812
*/
813-
def tryToSerializeRddDeps(rdd: RDD[_]): Array[RDDTrace] = {
813+
private[spark] def tryToSerializeRddDeps(rdd: RDD[_]): Array[RDDTrace] = {
814814
SerializationHelper.tryToSerializeRddAndDeps(closureSerializer, rdd)
815815
}
816816

@@ -869,20 +869,10 @@ class DAGScheduler(
869869
// where the JobConf/Configuration object is not thread-safe.
870870
var taskBinary: Broadcast[Array[Byte]] = null
871871

872-
// Check if RDD serialization debugging is enabled
873-
val debugSerialization: Boolean = sc.getConf.getBoolean("spark.serializer.debug", false)
874-
875872
try {
876873
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
877874
// For ResultTask, serialize and broadcast (rdd, func).
878875

879-
// Before serialization print out the RDD and its references.
880-
if (debugSerialization) {
881-
SerializationHelper
882-
.tryToSerializeRdd(closureSerializer, stage.rdd)
883-
.fold(l => logDebug(l), r => {})
884-
}
885-
886876
val taskBinaryBytes: Array[Byte] =
887877
if (stage.isShuffleMap) {
888878
closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array()
@@ -893,10 +883,18 @@ class DAGScheduler(
893883
} catch {
894884
// In the case of a failure during serialization, abort the stage.
895885
case e: NotSerializableException =>
886+
SerializationHelper
887+
.tryToSerializeRdd(closureSerializer, stage.rdd)
888+
.fold(l => logDebug(l), r => {})
889+
896890
abortStage(stage, "Task not serializable: " + e.toString)
897891
runningStages -= stage
898892
return
899893
case NonFatal(e) =>
894+
SerializationHelper
895+
.tryToSerializeRdd(closureSerializer, stage.rdd)
896+
.fold(l => logDebug(l), r => {})
897+
900898
abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
901899
runningStages -= stage
902900
return

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -464,23 +464,18 @@ private[spark] class TaskSetManager(
464464
// We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
465465
// we assume the task can be serialized without exceptions.
466466

467-
// Check if serialization debugging is enabled
468-
val debugSerialization: Boolean = sched.sc.getConf.
469-
getBoolean("spark.serializer.debug", false)
470-
471-
if (debugSerialization) {
467+
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
468+
} catch {
469+
// If the task cannot be serialized, then there's no point to re-attempt the task,
470+
// as it will always fail. So just abort the whole task-set and print a serialization
471+
// trace to help identify the failure point.
472+
case NonFatal(e) =>
472473
SerializationHelper.tryToSerialize(ser, task).fold (
473474
l => logDebug("Un-serializable reference trace for " +
474475
task.toString + ":\n" + l),
475476
r => {}
476477
)
477-
}
478-
479-
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
480-
} catch {
481-
// If the task cannot be serialized, then there's no point to re-attempt the task,
482-
// as it will always fail. So just abort the whole task-set.
483-
case NonFatal(e) =>
478+
484479
val msg = s"Failed to serialize task $taskId, not attempting to retry it."
485480
logError(msg, e)
486481
abort(s"$msg Exception during serialization: $e")

core/src/main/scala/org/apache/spark/util/ObjectWalker.scala

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ package org.apache.spark.util
1818

1919
import java.lang.reflect.{Modifier, Field}
2020

21-
import com.google.common.collect.Queues
22-
2321
import scala.collection.mutable
2422

2523

@@ -30,7 +28,7 @@ import scala.collection.mutable
3028
* This code is based on code written by Josh Rosen found here:
3129
* https://gist.github.com/JoshRosen/d6a8972c99992e97d040
3230
*/
33-
object ObjectWalker {
31+
private[spark] object ObjectWalker {
3432
def isTransient(field: Field): Boolean = Modifier.isTransient(field.getModifiers)
3533
def isStatic(field: Field): Boolean = Modifier.isStatic(field.getModifiers)
3634
def isPrimitive(field: Field): Boolean = field.getType.isPrimitive
@@ -50,13 +48,13 @@ object ObjectWalker {
5048
*/
5149
def buildRefGraph(rootObj: AnyRef): mutable.LinkedList[AnyRef] = {
5250
val visitedRefs = mutable.Set[AnyRef]()
53-
val toVisit = Queues.newArrayDeque[AnyRef]()
51+
val toVisit = new mutable.Queue[AnyRef]()
5452
var results = mutable.LinkedList[AnyRef]()
5553

56-
toVisit.add(rootObj)
54+
toVisit += rootObj
5755

58-
while (!toVisit.isEmpty) {
59-
val obj : AnyRef = toVisit.pollFirst()
56+
while (toVisit.nonEmpty) {
57+
val obj : AnyRef = toVisit.dequeue()
6058
// Store the last parent reference to enable quick retrieval of the path to a broken node
6159

6260
if (!visitedRefs.contains(obj)) {
@@ -66,26 +64,34 @@ object ObjectWalker {
6664
// Extract all the fields from the object that would be serialized. Transient and
6765
// static references are not serialized and primitive variables will always be serializable
6866
// and will not contain further references.
69-
70-
for (field <- getAllFields(obj.getClass)
71-
.filterNot(isStatic)
72-
.filterNot(isTransient)
73-
.filterNot(isPrimitive)) {
67+
for (field <- getFieldsToTest(obj)) {
7468
// Extract the field object and pass to the visitor
7569
val originalAccessibility = field.isAccessible
7670
field.setAccessible(true)
7771
val fieldObj = field.get(obj)
7872
field.setAccessible(originalAccessibility)
7973

8074
if (fieldObj != null) {
81-
toVisit.add(fieldObj)
75+
toVisit += fieldObj
8276
}
8377
}
8478
}
8579
}
8680
results
8781
}
8882

83+
/**
84+
* Get the serialiazble fields from an object reference
85+
* @param obj - Reference to the object fo rwhich to generate a serialization trace
86+
* @return a new Set containing the serializable fields of the object
87+
*/
88+
def getFieldsToTest(obj: AnyRef): mutable.Set[Field] = {
89+
getAllFields(obj.getClass)
90+
.filterNot(isStatic)
91+
.filterNot(isTransient)
92+
.filterNot(isPrimitive)
93+
}
94+
8995
/**
9096
* Get all fields (including private ones) from this class and its superclasses.
9197
* @param cls - The class from which to retrieve fields

core/src/main/scala/org/apache/spark/util/RDDWalker.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.util
1919

20-
import com.google.common.collect.Queues
21-
2220
import scala.collection.mutable
2321
import scala.collection.mutable.ArrayBuffer
2422
import scala.language.existentials
@@ -30,7 +28,7 @@ import org.apache.spark.rdd.RDD
3028
* accomplished by walking the object graph linking these RDDs. This is useful for debugging
3129
* internal RDD references. See SPARK-3694.
3230
*/
33-
object RDDWalker {
31+
private[spark] object RDDWalker {
3432
/**
3533
* Traverse the dependencies of the RDD and store them within an Array along with their depths.
3634
* Return this data structure and subsequently process it.
@@ -40,20 +38,20 @@ object RDDWalker {
4038
*/
4139
def walk(rddToWalk : RDD[_]): Array[(RDD[_], Int)] = {
4240

43-
val walkQueue = Queues.newArrayDeque[(RDD[_], Int)]()
41+
val walkQueue = new mutable.Queue[(RDD[_], Int)]()
4442
val visited = mutable.Set[RDD[_]]()
4543

4644
// Keep track of both the RDD and its depth in the traversal graph.
4745
val results = new ArrayBuffer[(RDD[_], Int)]()
4846
// Implement as a queue to perform a BFS
49-
walkQueue.addFirst(rddToWalk,0)
47+
walkQueue += ((rddToWalk,0))
5048

5149
while (!walkQueue.isEmpty) {
5250
// Pop from the queue
53-
val (rddToProcess : RDD[_], depth:Int) = walkQueue.pollFirst()
51+
val (rddToProcess : RDD[_], depth:Int) = walkQueue.dequeue()
5452
if (!visited.contains(rddToProcess)) {
5553
visited.add(rddToProcess)
56-
rddToProcess.dependencies.foreach(s => walkQueue.addFirst(s.rdd, depth + 1))
54+
rddToProcess.dependencies.foreach(s => walkQueue += ((s.rdd, depth + 1)))
5755
results.append((rddToProcess, depth))
5856
}
5957
}

core/src/main/scala/org/apache/spark/util/SerializationHelper.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,20 @@ import org.apache.spark.serializer.SerializerInstance
3131
/**
3232
* This enumeration defines variables use to standardize debugging output
3333
*/
34-
object SerializationState extends Enumeration {
34+
private[spark] object SerializationState extends Enumeration {
3535
type SerializationState = String
3636
val Failed = "Failed to serialize parent."
3737
val FailedDeps = "Failed to serialize dependencies."
3838
val Success = "Success"
3939
}
4040

41-
case class RDDTrace (rdd : RDD[_], depth : Int, result : SerializationHelper.SerializedRef)
41+
private[spark] case class RDDTrace (rdd : RDD[_], depth : Int, result : SerializationHelper.SerializedRef)
4242

4343
/**
4444
* This class is designed to encapsulate some utilities to facilitate debugging serialization
4545
* problems in the DAGScheduler and the TaskSetManager. See SPARK-3694.
4646
*/
47-
object SerializationHelper {
47+
private[spark] object SerializationHelper {
4848
type PathToRef = mutable.LinkedList[AnyRef]
4949
type BrokenRef = (AnyRef, PathToRef)
5050
type SerializedRef = Either[String, ByteBuffer]
@@ -249,7 +249,7 @@ object SerializationHelper {
249249

250250
def refString(ref: AnyRef): String = {
251251
val refCode = System.identityHashCode(ref)
252-
"Ref (" + ref.toString + ", Hash: " + refCode + ")"
252+
"Ref (" + ref.toString + ")"
253253
}
254254

255255
/**

0 commit comments

Comments
 (0)