Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 25 additions & 25 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger

import scala.annotation.tailrec
import scala.collection.Map
import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
import scala.concurrent.duration._
import scala.util.control.NonFatal

Expand Down Expand Up @@ -468,21 +468,21 @@ private[spark] class DAGScheduler(

/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
private def getMissingAncestorShuffleDependencies(
rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
rdd: RDD[_]): ListBuffer[ShuffleDependency[_, _, _]] = {
val ancestors = new ListBuffer[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += rdd
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
val toVisit = waitingForVisit.remove(0)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"pop" the first element

if (!visited(toVisit)) {
visited += toVisit
getShuffleDependencies(toVisit).foreach { shuffleDep =>
if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
ancestors.push(shuffleDep)
waitingForVisit.push(shuffleDep.rdd)
ancestors.prepend(shuffleDep)
waitingForVisit.prepend(shuffleDep.rdd)
} // Otherwise, the dependency and its ancestors have already been registered.
}
}
Expand All @@ -506,17 +506,17 @@ private[spark] class DAGScheduler(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += rdd
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
val toVisit = waitingForVisit.remove(0)
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
waitingForVisit.prepend(dependency.rdd)
}
}
}
Expand All @@ -529,10 +529,10 @@ private[spark] class DAGScheduler(
*/
private def traverseParentRDDsWithinStage(rdd: RDD[_], predicate: RDD[_] => Boolean): Boolean = {
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += rdd
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
val toVisit = waitingForVisit.remove(0)
if (!visited(toVisit)) {
if (!predicate(toVisit)) {
return false
Expand All @@ -542,7 +542,7 @@ private[spark] class DAGScheduler(
case _: ShuffleDependency[_, _, _] =>
// Not within the same stage with current rdd, do nothing.
case dependency =>
waitingForVisit.push(dependency.rdd)
waitingForVisit.prepend(dependency.rdd)
}
}
}
Expand All @@ -554,7 +554,8 @@ private[spark] class DAGScheduler(
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ArrayStack[RDD[_]]
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += stage.rdd
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
Expand All @@ -568,15 +569,14 @@ private[spark] class DAGScheduler(
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
waitingForVisit.prepend(narrowDep.rdd)
}
}
}
}
}
waitingForVisit.push(stage.rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
visit(waitingForVisit.remove(0))
}
missing.toList
}
Expand Down Expand Up @@ -2000,7 +2000,8 @@ private[spark] class DAGScheduler(
val visitedRdds = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ArrayStack[RDD[_]]
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += stage.rdd
def visit(rdd: RDD[_]) {
if (!visitedRdds(rdd)) {
visitedRdds += rdd
Expand All @@ -2009,17 +2010,16 @@ private[spark] class DAGScheduler(
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
waitingForVisit.push(mapStage.rdd)
waitingForVisit.prepend(mapStage.rdd)
} // Otherwise there's no need to follow the dependency back
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
waitingForVisit.prepend(narrowDep.rdd)
}
}
}
}
waitingForVisit.push(stage.rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
visit(waitingForVisit.remove(0))
}
visitedRdds.contains(target.rdd)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {

// ArrayBuffer iterator (indexable type)
d = medianKSD(
gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).to[ArrayBuffer].iterator)),
gaps(sampler.sample(ArrayBuffer(Iterator.from(0).take(20*sampleSize).toSeq: _*).iterator)),
gaps(sample(Iterator.from(0), 0.1)))
d should be < D

Expand Down Expand Up @@ -557,7 +557,7 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {

// ArrayBuffer iterator (indexable type)
d = medianKSD(
gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).to[ArrayBuffer].iterator)),
gaps(sampler.sample(ArrayBuffer(Iterator.from(0).take(20*sampleSize).toSeq: _*).iterator)),
gaps(sampleWR(Iterator.from(0), 0.1)))
d should be < D

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.graphx.lib

import scala.collection.{mutable, Map}
import scala.reflect.ClassTag

import org.apache.spark.graphx._
Expand Down Expand Up @@ -51,11 +52,14 @@ object LabelPropagation {
}
def mergeMessage(count1: Map[VertexId, Long], count2: Map[VertexId, Long])
: Map[VertexId, Long] = {
(count1.keySet ++ count2.keySet).map { i =>
// Mimics the optimization of breakOut, not present in Scala 2.13, while working in 2.12
val map = mutable.Map[VertexId, Long]()
(count1.keySet ++ count2.keySet).foreach { i =>
val count1Val = count1.getOrElse(i, 0L)
val count2Val = count2.getOrElse(i, 0L)
i -> (count1Val + count2Val)
}(collection.breakOut)
map.put(i, count1Val + count2Val)
}
map
}
def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]): VertexId = {
if (message.isEmpty) attr else message.maxBy(_._2)._1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.graphx.lib

import scala.collection.{mutable, Map}
import scala.reflect.ClassTag

import org.apache.spark.graphx._
Expand All @@ -34,9 +35,12 @@ object ShortestPaths extends Serializable {
private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }

private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
(spmap1.keySet ++ spmap2.keySet).map {
k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
}(collection.breakOut)
// Mimics the optimization of breakOut, not present in Scala 2.13, while working in 2.12
val map = mutable.Map[VertexId, Int]()
(spmap1.keySet ++ spmap2.keySet).foreach { k =>
map.put(k, math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue)))
}
map
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.graphx.util.GraphGenerators

object GridPageRank {
def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double): Seq[(VertexId, Double)] = {
val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int])
val inNbrs = Array.fill(nRows * nCols)(collection.mutable.ArrayBuffer.empty[Int])
val outDegree = Array.fill(nRows * nCols)(0)
// Convert row column address into vertex ids (row major order)
def sub2ind(r: Int, c: Int): Int = r * nCols + c
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,16 @@ private[spark] object RandomForest extends Logging with Serializable {
training the same tree in the next iteration. This focus allows us to send fewer trees to
workers on each iteration; see topNodesForGroup below.
*/
val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
val nodeStack = new mutable.ListBuffer[(Int, LearningNode)]

val rng = new Random()
rng.setSeed(seed)

// Allocate and queue root nodes.
val topNodes = Array.fill[LearningNode](numTrees)(LearningNode.emptyNode(nodeIndex = 1))
Range(0, numTrees).foreach(treeIndex => nodeStack.push((treeIndex, topNodes(treeIndex))))
for (treeIndex <- 0 until numTrees) {
nodeStack.prepend((treeIndex, topNodes(treeIndex)))
}

timer.stop("init")

Expand Down Expand Up @@ -398,7 +400,7 @@ private[spark] object RandomForest extends Logging with Serializable {
nodesForGroup: Map[Int, Array[LearningNode]],
treeToNodeToIndexInfo: Map[Int, Map[Int, NodeIndexInfo]],
splits: Array[Array[Split]],
nodeStack: mutable.ArrayStack[(Int, LearningNode)],
nodeStack: mutable.ListBuffer[(Int, LearningNode)],
timer: TimeTracker = new TimeTracker,
nodeIdCache: Option[NodeIdCache] = None): Unit = {

Expand Down Expand Up @@ -639,10 +641,10 @@ private[spark] object RandomForest extends Logging with Serializable {

// enqueue left child and right child if they are not leaves
if (!leftChildIsLeaf) {
nodeStack.push((treeIndex, node.leftChild.get))
nodeStack.prepend((treeIndex, node.leftChild.get))
}
if (!rightChildIsLeaf) {
nodeStack.push((treeIndex, node.rightChild.get))
nodeStack.prepend((treeIndex, node.rightChild.get))
}

logDebug("leftChildIndex = " + node.leftChild.get.id +
Expand Down Expand Up @@ -1042,8 +1044,8 @@ private[spark] object RandomForest extends Logging with Serializable {
var partNumSamples = 0.0
var unweightedNumSamples = 0.0
featureSamples.foreach { case (sampleWeight, feature) =>
partValueCountMap(feature) = partValueCountMap.getOrElse(feature, 0.0) + sampleWeight;
partNumSamples += sampleWeight;
partValueCountMap(feature) = partValueCountMap.getOrElse(feature, 0.0) + sampleWeight
partNumSamples += sampleWeight
unweightedNumSamples += 1.0
}

Expand Down Expand Up @@ -1131,7 +1133,7 @@ private[spark] object RandomForest extends Logging with Serializable {
* The feature indices are None if not subsampling features.
*/
private[tree] def selectNodesToSplit(
nodeStack: mutable.ArrayStack[(Int, LearningNode)],
nodeStack: mutable.ListBuffer[(Int, LearningNode)],
maxMemoryUsage: Long,
metadata: DecisionTreeMetadata,
rng: Random): (Map[Int, Array[LearningNode]], Map[Int, Map[Int, NodeIndexInfo]]) = {
Expand All @@ -1146,7 +1148,7 @@ private[spark] object RandomForest extends Logging with Serializable {
// so we allow one iteration if memUsage == 0.
var groupDone = false
while (nodeStack.nonEmpty && !groupDone) {
val (treeIndex, node) = nodeStack.top
val (treeIndex, node) = nodeStack.head
// Choose subset of features for node (if subsampling).
val featureSubset: Option[Array[Int]] = if (metadata.subsamplingFeatures) {
Some(SamplingUtils.reservoirSampleAndCount(Range(0,
Expand All @@ -1157,7 +1159,7 @@ private[spark] object RandomForest extends Logging with Serializable {
// Check if enough memory remains to add this node to the group.
val nodeMemUsage = RandomForest.aggregateSizeForNode(metadata, featureSubset) * 8L
if (memUsage + nodeMemUsage <= maxMemoryUsage || memUsage == 0) {
nodeStack.pop()
nodeStack.remove(0)
mutableNodesForGroup.getOrElseUpdate(treeIndex, new mutable.ArrayBuffer[LearningNode]()) +=
node
mutableTreeToNodeToIndexInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {

import RandomForestSuite.mapToVec

private val seed = 42

/////////////////////////////////////////////////////////////////////////////
// Tests for split calculation
/////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -350,7 +348,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
val treeToNodeToIndexInfo = Map(0 -> Map(
topNode.id -> new RandomForest.NodeIndexInfo(0, None)
))
val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
val nodeStack = new mutable.ListBuffer[(Int, LearningNode)]
RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode),
nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack)

Expand Down Expand Up @@ -392,7 +390,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
val treeToNodeToIndexInfo = Map(0 -> Map(
topNode.id -> new RandomForest.NodeIndexInfo(0, None)
))
val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
val nodeStack = new mutable.ListBuffer[(Int, LearningNode)]
RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode),
nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack)

Expand Down Expand Up @@ -505,11 +503,11 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
val failString = s"Failed on test with:" +
s"numTrees=$numTrees, featureSubsetStrategy=$featureSubsetStrategy," +
s" numFeaturesPerNode=$numFeaturesPerNode, seed=$seed"
val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
val nodeStack = new mutable.ListBuffer[(Int, LearningNode)]
val topNodes: Array[LearningNode] = new Array[LearningNode](numTrees)
Range(0, numTrees).foreach { treeIndex =>
topNodes(treeIndex) = LearningNode.emptyNode(nodeIndex = 1)
nodeStack.push((treeIndex, topNodes(treeIndex)))
nodeStack.prepend((treeIndex, topNodes(treeIndex)))
}
val rng = new scala.util.Random(seed = seed)
val (nodesForGroup: Map[Int, Array[LearningNode]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext {

test("number of features more than 65535") {
val data1 = sc.parallelize(Array(
Vectors.dense((1 to 100000).map(_ => 2.0).to[scala.Vector].toArray),
Vectors.dense((1 to 100000).map(_ => 0.0).to[scala.Vector].toArray)
Vectors.dense(Array.fill(100000)(2.0)),
Vectors.dense(Array.fill(100000)(0.0))
), 2)

val pca = new PCA(2).fit(data1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private[spark] object SparkAppLauncher extends Logging {
appConf.toStringArray :+ appArguments.mainAppResource

if (appArguments.appArgs.nonEmpty) {
commandLine ++= appArguments.appArgs.to[ArrayBuffer]
commandLine ++= appArguments.appArgs
}
logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}")
ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class EquivalentExpressions {
}

// For each expression, the set of equivalent expressions.
private val equivalenceMap = mutable.HashMap.empty[Expr, mutable.MutableList[Expression]]
private val equivalenceMap = mutable.HashMap.empty[Expr, mutable.ArrayBuffer[Expression]]

/**
* Adds each expression to this data structure, grouping them with existing equivalent
Expand All @@ -56,7 +56,7 @@ class EquivalentExpressions {
f.get += expr
true
} else {
equivalenceMap.put(e, mutable.MutableList(expr))
equivalenceMap.put(e, mutable.ArrayBuffer(expr))
false
}
} else {
Expand Down Expand Up @@ -102,7 +102,7 @@ class EquivalentExpressions {
* an empty collection if there are none.
*/
def getEquivalentExprs(e: Expression): Seq[Expression] = {
equivalenceMap.getOrElse(Expr(e), mutable.MutableList())
equivalenceMap.getOrElse(Expr(e), Seq.empty)
}

/**
Expand Down
Loading