Skip to content

Commit 2acb177

Browse files
committed
Move getNarrowAncestors to RDD.scala
1 parent bfe83f0 commit 2acb177

File tree

2 files changed

+16
-25
lines changed

2 files changed

+16
-25
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,21 @@ abstract class RDD[T: ClassTag](
235235
}
236236
}
237237

238+
/**
239+
* Return the ancestors of the given RDD that are related to it only through a sequence of
240+
* narrow dependencies. This traverses the given RDD's dependency tree using DFS.
241+
*/
242+
private[spark] def getNarrowAncestors(
243+
ancestors: ArrayBuffer[RDD[_]] = ArrayBuffer.empty): Seq[RDD[_]] = {
244+
val narrowDependencies = dependencies.collect { case d: NarrowDependency[_] => d }
245+
val narrowParents = narrowDependencies.map(_.rdd)
246+
narrowParents.foreach { parent =>
247+
ancestors += parent
248+
parent.getNarrowAncestors(ancestors)
249+
}
250+
ancestors
251+
}
252+
238253
/**
239254
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
240255
*/

core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,7 @@
1717

1818
package org.apache.spark.scheduler
1919

20-
import scala.collection.mutable.ArrayBuffer
21-
22-
import org.apache.spark.NarrowDependency
2320
import org.apache.spark.annotation.DeveloperApi
24-
import org.apache.spark.rdd.RDD
2521
import org.apache.spark.storage.RDDInfo
2622

2723
/**
@@ -54,28 +50,8 @@ private[spark] object StageInfo {
5450
* sequence of narrow dependencies should also be associated with this Stage.
5551
*/
5652
def fromStage(stage: Stage): StageInfo = {
57-
val ancestorRddInfos = getNarrowAncestors(stage.rdd).map(RDDInfo.fromRdd)
53+
val ancestorRddInfos = stage.rdd.getNarrowAncestors().map(RDDInfo.fromRdd)
5854
val rddInfos = ancestorRddInfos ++ Seq(RDDInfo.fromRdd(stage.rdd))
5955
new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos)
6056
}
61-
62-
/**
63-
* Return the ancestors of the given RDD that are related to it only through a sequence of
64-
* narrow dependencies. This traverses the given RDD's dependency tree using DFS.
65-
*/
66-
private def getNarrowAncestors(
67-
rdd: RDD[_],
68-
ancestors: ArrayBuffer[RDD[_]] = ArrayBuffer.empty): Seq[RDD[_]] = {
69-
val narrowParents = getNarrowDependencies(rdd).map(_.rdd)
70-
narrowParents.foreach { parent =>
71-
ancestors += parent
72-
getNarrowAncestors(parent, ancestors)
73-
}
74-
ancestors
75-
}
76-
77-
/** Return the narrow dependencies of the given RDD. */
78-
private def getNarrowDependencies(rdd: RDD[_]): Seq[NarrowDependency[_]] = {
79-
rdd.dependencies.collect { case d: NarrowDependency[_] => d }
80-
}
8157
}

0 commit comments

Comments
 (0)