Skip to content

Commit ae9da88

Browse files
committed
Removed unncessary TimeStampedHashMap from DAGScheduler, added try-catches in finalize() methods, and replaced ArrayBlockingQueue to LinkedBlockingQueue to avoid blocking in Java's finalizing thread.
1 parent cb0a5a6 commit ae9da88

File tree

5 files changed

+42
-33
lines changed

5 files changed

+42
-33
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark
1919

2020
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
2121

22-
import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
22+
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
2323

2424
import org.apache.spark.rdd.RDD
2525

@@ -40,8 +40,7 @@ private[spark] class ContextCleaner(env: SparkEnv) extends Logging {
4040
private case class CleanShuffle(id: Int) extends CleaningTask
4141
// TODO: add CleanBroadcast
4242

43-
private val QUEUE_CAPACITY = 1000
44-
private val queue = new ArrayBlockingQueue[CleaningTask](QUEUE_CAPACITY)
43+
private val queue = new LinkedBlockingQueue[CleaningTask]
4544

4645
protected val listeners = new ArrayBuffer[CleanerListener]
4746
with SynchronizedBuffer[CleanerListener]

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,26 @@ class ShuffleDependency[K, V](
4949
@transient rdd: RDD[_ <: Product2[K, V]],
5050
val partitioner: Partitioner,
5151
val serializerClass: String = null)
52-
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
52+
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) with Logging {
5353

5454
val shuffleId: Int = rdd.context.newShuffleId()
5555

5656
override def finalize() {
57-
if (rdd != null) {
58-
rdd.sparkContext.cleaner.cleanShuffle(shuffleId)
57+
try {
58+
if (rdd != null) {
59+
rdd.sparkContext.cleaner.cleanShuffle(shuffleId)
60+
}
61+
} catch {
62+
case t: Throwable =>
63+
// Paranoia - If logError throws error as well, report to stderr.
64+
try {
65+
logError("Error in finalize", t)
66+
} catch {
67+
case _ =>
68+
System.err.println("Error in finalize (and could not write to logError): " + t)
69+
}
70+
} finally {
71+
super.finalize()
5972
}
6073
}
6174
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1026,6 +1026,7 @@ abstract class RDD[T: ClassTag](
10261026
}
10271027

10281028
def cleanup() {
1029+
logInfo("Cleanup called on RDD " + id)
10291030
sc.cleaner.cleanRDD(this)
10301031
dependencies.filter(_.isInstanceOf[ShuffleDependency[_, _]])
10311032
.map(_.asInstanceOf[ShuffleDependency[_, _]].shuffleId)
@@ -1112,6 +1113,19 @@ abstract class RDD[T: ClassTag](
11121113
}
11131114

11141115
override def finalize() {
1115-
cleanup()
1116+
try {
1117+
cleanup()
1118+
} catch {
1119+
case t: Throwable =>
1120+
// Paranoia - If logError throws error as well, report to stderr.
1121+
try {
1122+
logError("Error in finalize", t)
1123+
} catch {
1124+
case _ =>
1125+
System.err.println("Error in finalize (and could not write to logError): " + t)
1126+
}
1127+
} finally {
1128+
super.finalize()
1129+
}
11161130
}
11171131
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -123,17 +123,17 @@ class DAGScheduler(
123123

124124
private val nextStageId = new AtomicInteger(0)
125125

126-
private[scheduler] val jobIdToStageIds = new TimeStampedHashMap[Int, HashSet[Int]]
126+
private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
127127

128-
private[scheduler] val stageIdToJobIds = new TimeStampedHashMap[Int, HashSet[Int]]
128+
private[scheduler] val stageIdToJobIds = new HashMap[Int, HashSet[Int]]
129129

130-
private[scheduler] val stageIdToStage = new TimeStampedHashMap[Int, Stage]
130+
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
131131

132-
private[scheduler] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
132+
private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]
133133

134-
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
134+
private[spark] val stageToInfos = new HashMap[Stage, StageInfo]
135135

136-
// An async scheduler event bus. The bus should be stopped when DAGSCheduler is stopped.
136+
// An async scheduler event bus. The bus should be stopped when DAGScheduler is stopped.
137137
private[spark] val listenerBus = new SparkListenerBus
138138

139139
// Contains the locations that each RDD's partitions are cached on
@@ -159,9 +159,6 @@ class DAGScheduler(
159159
val activeJobs = new HashSet[ActiveJob]
160160
val resultStageToJob = new HashMap[Stage, ActiveJob]
161161

162-
val metadataCleaner = new MetadataCleaner(
163-
MetadataCleanerType.DAG_SCHEDULER, this.cleanup, env.conf)
164-
165162
/**
166163
* Starts the event processing actor. The actor has two responsibilities:
167164
*
@@ -1094,26 +1091,10 @@ class DAGScheduler(
10941091
Nil
10951092
}
10961093

1097-
private def cleanup(cleanupTime: Long) {
1098-
Map(
1099-
"stageIdToStage" -> stageIdToStage,
1100-
"shuffleToMapStage" -> shuffleToMapStage,
1101-
"pendingTasks" -> pendingTasks,
1102-
"stageToInfos" -> stageToInfos,
1103-
"jobIdToStageIds" -> jobIdToStageIds,
1104-
"stageIdToJobIds" -> stageIdToJobIds).
1105-
foreach { case(s, t) => {
1106-
val sizeBefore = t.size
1107-
t.clearOldValues(cleanupTime)
1108-
logInfo("%s %d --> %d".format(s, sizeBefore, t.size))
1109-
}}
1110-
}
1111-
11121094
def stop() {
11131095
if (eventProcessActor != null) {
11141096
eventProcessActor ! StopDAGScheduler
11151097
}
1116-
metadataCleaner.cancel()
11171098
taskSched.stop()
11181099
listenerBus.stop()
11191100
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,9 +341,11 @@ abstract class DStream[T: ClassTag] (
341341
*/
342342
private[streaming] def clearMetadata(time: Time) {
343343
val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
344+
logDebug("Clearing references to old RDDs: [" +
345+
oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
344346
generatedRDDs --= oldRDDs.keys
345347
if (ssc.conf.getBoolean("spark.streaming.unpersist", false)) {
346-
logDebug("Unpersisting old RDDs: " + oldRDDs.keys.mkString(", "))
348+
logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))
347349
oldRDDs.values.foreach(_.unpersist(false))
348350
}
349351
logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +

0 commit comments

Comments
 (0)