From e4ed074d520ca6317a8c183ea902a4d587951d8c Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 23 Jul 2015 18:42:36 -0700 Subject: [PATCH 1/9] Initial work on adaptive execution in DAGScheduler --- .../apache/spark/MapOutputStatistics.scala | 27 +++ .../org/apache/spark/MapOutputTracker.scala | 98 +++++++-- .../scala/org/apache/spark/SparkContext.scala | 20 ++ .../apache/spark/rdd/DynamicJoinedRDD.scala | 186 ++++++++++++++++++ .../org/apache/spark/rdd/ShuffledRDD2.scala | 117 +++++++++++ .../apache/spark/scheduler/ActiveJob.scala | 9 +- .../apache/spark/scheduler/DAGScheduler.scala | 109 +++++++++- .../spark/scheduler/DAGSchedulerEvent.scala | 8 + .../apache/spark/scheduler/MapStatus.scala | 14 +- .../spark/scheduler/ShuffleMapStage.scala | 5 + .../shuffle/hash/HashShuffleReader.scala | 5 +- .../shuffle/hash/HashShuffleReaderSuite.scala | 2 +- 12 files changed, 568 insertions(+), 32 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/MapOutputStatistics.scala create mode 100644 core/src/main/scala/org/apache/spark/rdd/DynamicJoinedRDD.scala create mode 100644 core/src/main/scala/org/apache/spark/rdd/ShuffledRDD2.scala diff --git a/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala b/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala new file mode 100644 index 0000000000000..64ee9ffac02d7 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * Hold statistics about the output sizes in a map stage. + */ +@DeveloperApi +class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long]) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index a387592783850..d570cf348a677 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -18,6 +18,7 @@ package org.apache.spark import java.io._ +import java.util.Arrays import java.util.concurrent.ConcurrentHashMap import java.util.zip.{GZIPInputStream, GZIPOutputStream} @@ -132,13 +133,70 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging * describing the shuffle blocks that are stored at that block manager. */ def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) - : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, reduce $reduceId") - val startTime = System.currentTimeMillis + : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { + getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1) + } + + /** + * Called from executors to get the server URIs and output sizes for each shuffle block that + * needs to be read from a given range of reduce partitions. + * + * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, + * and the second item is a sequence of (shuffle block id, shuffle block size) tuples + * describing the shuffle blocks that are stored at that block manager. + */ + def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) + : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { + logDebug(s"Fetching outputs for shuffle $shuffleId, reduces $startPartition-$endPartition") + val statuses = getStatuses(shuffleId) + // Synchronize on the returned array because, on the driver, it gets mutated in place + statuses.synchronized { + return MapOutputTracker.convertMapStatuses( + shuffleId, startPartition, endPartition, statuses) + } + } + + /** + * Return statistics about all of the outputs for a given shuffle. + */ + def getStatistics(shuffleId: Int): MapOutputStatistics = { + val statuses = getStatuses(shuffleId) + // Synchronize on the returned array because, on the driver, it gets mutated in place + statuses.synchronized { + val totalSizes = new Array[Long](statuses(0).numPartitions) + for (s <- statuses) { + for (i <- 0 until totalSizes.length) { + totalSizes(i) += s.getSizeForBlock(i) + } + } + new MapOutputStatistics(shuffleId, totalSizes) + } + } + /** + * Return the location of a given map tasks's output, if present. Only callable on driver. + */ + def getMapOutputLocation(shuffleId: Int, mapId: Int): Option[BlockManagerId] = { + if (mapStatuses.contains(shuffleId)) { + val statuses = mapStatuses(shuffleId) + if (statuses.nonEmpty && statuses(mapId) != null) { + return Some(statuses(mapId).location) + } + } + None + } + + /** + * Get or fetch the array of MapStatuses for a given shuffle ID. NOTE: clients MUST synchronize + * on this array when reading it, because on the driver, we may be changing it in place. + * + * (It would be nice to remove this restriction in the future.) + */ + private def getStatuses(shuffleId: Int): Array[MapStatus] = { val statuses = mapStatuses.get(shuffleId).orNull if (statuses == null) { logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them") + val startTime = System.currentTimeMillis var fetchedStatuses: Array[MapStatus] = null fetching.synchronized { // Someone else is fetching it; wait for them to be done @@ -160,7 +218,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } if (fetchedStatuses == null) { - // We won the race to fetch the output locs; do so + // We won the race to fetch the statuses; do so logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint) // This try-finally prevents hangs due to timeouts: try { @@ -175,22 +233,18 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } } } - logDebug(s"Fetching map output location for shuffle $shuffleId, reduce $reduceId took " + + logDebug(s"Fetching map output statuses for shuffle $shuffleId took " + s"${System.currentTimeMillis - startTime} ms") if (fetchedStatuses != null) { - fetchedStatuses.synchronized { - return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses) - } + return fetchedStatuses } else { logError("Missing all output locations for shuffle " + shuffleId) throw new MetadataFetchFailedException( - shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId) + shuffleId, 0, "Missing all output locations for shuffle " + shuffleId) } } else { - statuses.synchronized { - return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses) - } + return statuses } } @@ -433,15 +487,16 @@ private[spark] object MapOutputTracker extends Logging { } /** - * Converts an array of MapStatuses for a given reduce ID to a sequence that, for each block - * manager ID, lists the shuffle block ids and corresponding shuffle block sizes stored at that - * block manager. + * Converts an array of MapStatuses for a given partition ID range to a sequence that, for each + * block manager ID, lists the shuffle block ids and corresponding shuffle block sizes stored at + * that block manager. * * If any of the statuses is null (indicating a missing location due to a failed mapper), * throws a FetchFailedException. * * @param shuffleId Identifier for the shuffle - * @param reduceId Identifier for the reduce task + * @param startPartition Start of partition range to fetch + * @param endPartition End of partition range to fetch (exclusive) * @param statuses List of map statuses, indexed by map ID. * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block id, shuffle block size) tuples @@ -449,7 +504,8 @@ private[spark] object MapOutputTracker extends Logging { */ private def convertMapStatuses( shuffleId: Int, - reduceId: Int, + startPartition: Int, + endPartition: Int, statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { assert (statuses != null) val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]] @@ -457,10 +513,12 @@ private[spark] object MapOutputTracker extends Logging { if (status == null) { val errorMessage = s"Missing an output location for shuffle $shuffleId" logError(errorMessage) - throw new MetadataFetchFailedException(shuffleId, reduceId, errorMessage) + throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) } else { - splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += - ((ShuffleBlockId(shuffleId, mapId, reduceId), status.getSizeForBlock(reduceId))) + for (i <- startPartition until endPartition) { + splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += + ((ShuffleBlockId(shuffleId, mapId, i), status.getSizeForBlock(i))) + } } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e27b3c4962221..1d545ba17fef0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1984,6 +1984,26 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli new SimpleFutureAction(waiter, resultFunc) } + /** + * :: DeveloperApi :: + * Submit a map stage for execution. + */ + @DeveloperApi + def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C]): + SimpleFutureAction[MapOutputStatistics] = + { + assertNotStopped() + val callSite = getCallSite() + val waiter = dagScheduler.submitMapStage( + dependency, + callSite, + localProperties.get) + // TODO: It's not nice that we use JobWaiter here, but that's what SimpleFutureAction takes + new SimpleFutureAction(waiter, env.mapOutputTracker.getStatistics(dependency.shuffleId)) + } + + + /** * Cancel active jobs for the specified group. See [[org.apache.spark.SparkContext.setJobGroup]] * for more information. diff --git a/core/src/main/scala/org/apache/spark/rdd/DynamicJoinedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/DynamicJoinedRDD.scala new file mode 100644 index 0000000000000..0676447e6992d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/DynamicJoinedRDD.scala @@ -0,0 +1,186 @@ +package org.apache.spark.rdd + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap + +import org.apache.spark._ +import org.apache.spark.annotation.DeveloperApi + +/** + * Strategy for joining each partition in a [[DynamicJoinedRDD]]. Possible values are: + * - BRING_TO_1: broadcast the partitions of RDD 2 and bring them to the nodes containing the + * partitions of RDD 1 to do a local broadcast join + * - BRING_TO_2: broadcast the partitions of RDD 1 and bring them to the nodes containing the + * partitions of RDD 2 to do a local broadcast join + * - SHUFFLE: shuffle the partitions of both RDDs to the same reduce task + */ +@DeveloperApi +object PartitionStrategy extends Enumeration { + type PartitionStrategy = Value + + val BRING_TO_1, BRING_TO_2, SHUFFLE = Value +} + +/** + * :: DeveloperApi :: + * An RDD that supports different join strategies by partition. It takes two ShuffleDependencies + * that should have the same partitioner and will produce the input sides of the join, as well as + * arrays describing what to do with each partition (which join strategy to use and, if shuffling + * it, which reduce partition to shuffle data to). The resulting RDD will have one partition for + * each shufflePartitonId used for shuffled partitions, plus one for each partition that data + * is brought to (when we broadcast one RDD's data and bring it to all partitions of the other). + * + * @param dep1 shuffle dependency for first RDD + * @param dep2 shuffle dependency for second RDD (must have same partitioner as dep1) + * @param partitionStrategies [[PartitionStrategy]] to use for each partition; must be same size + * as the number of partitions of the dependencies' partitioners + * @param shufflePartitionIds for partitions where the strategy is set to SHUFFLE, holds the + * destination partition ID to use (which is useful for coalescing data into fewer partitions); + * these IDs must be consecutive numbers from 0 to a maximum value, and the array of values must + * be non-decreasing (e.g. [0, 0, 0, 1, 1, 2, 2, 3]) + */ +@DeveloperApi +class DynamicJoinedRDD[K, V, W]( + var dep1: ShuffleDependency[K, V, V], + var dep2: ShuffleDependency[K, W, W], + val partitionStrategies: Array[PartitionStrategy.PartitionStrategy], + val shufflePartitionIds: Array[Int]) + extends RDD[(K, (V, W))](dep1.rdd.context, Nil) { + + override def getDependencies: Seq[Dependency[_]] = List(dep1, dep2) + + override protected def getPartitions: Array[Partition] = { + val parentPartitions = dep1.partitioner.numPartitions + var maxShufflePartitionId = -1 + var numBringTo1 = 0 + var numBringTo2 = 0 + for (i <- 0 until parentPartitions) { + if (partitionStrategies(i) == PartitionStrategy.BRING_TO_1) { + numBringTo1 += 1 + } else if (partitionStrategies(i) == PartitionStrategy.BRING_TO_2) { + numBringTo2 += 1 + } else { + maxShufflePartitionId = math.max(maxShufflePartitionId, shufflePartitionIds(i)) + } + } + + val numShufflePartitions = maxShufflePartitionId + 1 // Works because this starts at -1 + val totalPartitions = numShufflePartitions + (numBringTo1 + numBringTo2) * parentPartitions + + val partitions = new Array[Partition](totalPartitions) + + // Build up the partitions in this order: + // - 0 to maxShufflePartitionId: shuffled partitions + // - next numBringTo1 * parentPartitions partitions: partitions brought to RDD 1 + // - next numBringTo2 * parentPartitions partitions: partitions brought to RDD 2 + var pos = 0 + for (shufflePart <- 0 until numShufflePartitions) { + while (pos < parentPartitions && partitionStrategies(pos) != PartitionStrategy.SHUFFLE) { + pos += 1 + } + val startPos = pos + while (pos < parentPartitions && (shufflePartitionIds(pos) == shufflePart || + partitionStrategies(pos) != PartitionStrategy.SHUFFLE)) { + pos += 1 + } + val endPos = pos + partitions(shufflePart) = new DynamicJoinedRDDPartition( + shufflePart, PartitionStrategy.SHUFFLE, startPos, endPos, startPos, endPos, true) + } + + var outputPos = numShufflePartitions + for (i <- 0 until parentPartitions) { + if (partitionStrategies(i) == PartitionStrategy.BRING_TO_1) { + for (j <- 0 until parentPartitions) { + partitions(outputPos) = new DynamicJoinedRDDPartition( + outputPos, PartitionStrategy.BRING_TO_1, j, j + 1, i, i + 1, false) + outputPos += 1 + } + } else if (partitionStrategies(i) == PartitionStrategy.BRING_TO_2) { + for (j <- 0 until parentPartitions) { + partitions(outputPos) = new DynamicJoinedRDDPartition( + outputPos, PartitionStrategy.BRING_TO_2, i, i + 1, j, j + 1, true) + outputPos += 1 + } + } + } + + assert(outputPos == totalPartitions) + partitions + } + + override def compute(partition: Partition, context: TaskContext): Iterator[(K, (V, W))] = { + val part = partition.asInstanceOf[DynamicJoinedRDDPartition] + val iter1 = SparkEnv.get.shuffleManager.getReader( + dep1.shuffleHandle, part.rdd1StartPartition, part.rdd1EndPartition, context) + .read() + .asInstanceOf[Iterator[(K, V)]] + val iter2 = SparkEnv.get.shuffleManager.getReader( + dep2.shuffleHandle, part.rdd2StartPartition, part.rdd2EndPartition, context) + .read() + .asInstanceOf[Iterator[(K, W)]] + // This is a pretty slow way to join stuff, but it's just to show it + if (part.hash1) { + val map = new HashMap[K, ArrayBuffer[V]] + for (pair <- iter1) { + map.getOrElseUpdate(pair._1, new ArrayBuffer()) += pair._2 + } + iter2.flatMap { pair => + val k = pair._1 + val w = pair._2 + val values = map.getOrElse(k, Nil) + values.map(v => (k, (v, w))) + } + } else { + val map = new HashMap[K, ArrayBuffer[W]] + for (pair <- iter2) { + map.getOrElseUpdate(pair._1, new ArrayBuffer()) += pair._2 + } + iter1.flatMap { pair => + val k = pair._1 + val v = pair._2 + val values = map.getOrElse(k, Nil) + values.map(w => (k, (v, w))) + } + } + } + + override def getPreferredLocations(partition: Partition): Seq[String] = { + val part = partition.asInstanceOf[DynamicJoinedRDDPartition] + if (part.strategy == PartitionStrategy.BRING_TO_1) { + // TODO: should support passing loc.executorId as a preferred location too + val tracker = SparkEnv.get.mapOutputTracker + tracker.getMapOutputLocation(dep1.shuffleId, part.rdd1StartPartition).map(_.host).toList + } else if (part.strategy == PartitionStrategy.BRING_TO_2) { + // TODO: should support passing loc.executorId as a preferred location too + val tracker = SparkEnv.get.mapOutputTracker + tracker.getMapOutputLocation(dep2.shuffleId, part.rdd2StartPartition).map(_.host).toList + } else { + Nil + } + } + + override def clearDependencies() { + super.clearDependencies() + dep1 = null + dep2 = null + } +} + +/** + * A partition in [[DynamicJoinedRDD]]; contains its index in the final RDD, the ranges of + * partitions to request from rdd1 and rdd2, and whether to hash rdd1 or rdd2's tuples. + */ +class DynamicJoinedRDDPartition( + val index: Int, + val strategy: PartitionStrategy.PartitionStrategy, + val rdd1StartPartition: Int, + val rdd1EndPartition: Int, + val rdd2StartPartition: Int, + val rdd2EndPartition: Int, + val hash1: Boolean) + extends Partition { + + override def toString = + s"DynamicJoinedPartition($index,$strategy,$rdd1StartPartition,$rdd1EndPartition,$rdd2StartPartition,$rdd2EndPartition,$hash1)" +} diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD2.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD2.scala new file mode 100644 index 0000000000000..a3abb8a51868f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD2.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import java.util.Arrays + +import org.apache.spark._ +import org.apache.spark.annotation.DeveloperApi + +/** + * A Partitioner that might group together one or more partitions from the parent. + * + * @param parent a parent partitioner + * @param partitionStartIndices indices of partitions in parent that should create new partitions + * in child (this should be an array of increasing partition IDs). For example, if we have a + * parent with 5 partitions, and partitionStartIndices is [0, 2, 4], we get three output + * partitions, corresponding to partition ranges [0, 1], [2, 3] and [4] of the parent partitioner. + */ +@DeveloperApi +class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: Array[Int]) + extends Partitioner { + + // TODO: validate partitionStartIndices + + @transient private lazy val parentPartitionMapping: Array[Int] = { + val n = parent.numPartitions + val result = new Array[Int](n) + for (i <- 0 until partitionStartIndices.length) { + val start = partitionStartIndices(i) + val end = if (i < partitionStartIndices.length - 1) partitionStartIndices(i + 1) else n + for (j <- start until end) { + result(j) = i + } + } + result + } + + override def numPartitions: Int = partitionStartIndices.size + + override def getPartition(key: Any): Int = { + parentPartitionMapping(parent.getPartition(key)) + } + + override def equals(other: Any): Boolean = other match { + case c: CoalescedPartitioner => + c.parent == parent && Arrays.equals(c.partitionStartIndices, partitionStartIndices) + case _ => + false + } +} + +private[spark] class ShuffledRDD2Partition( + val index: Int, val startIndexInParent: Int, val endIndexInParent: Int) + extends Partition { + + override def hashCode(): Int = index +} + +/** + * :: DeveloperApi :: + * A more flexible ShuffledRDD used to debug dynamic DAG work. + * + * @param dependency a ShuffleDependency to use + * @tparam K the key class. + * @tparam V the value class. + * @tparam C the combiner class. + */ +// TODO: Make this return RDD[Product2[K, C]] or have some way to configure mutable pairs +@DeveloperApi +class ShuffledRDD2[K, V, C]( + var dependency: ShuffleDependency[K, V, C], + val partitionStartIndices: Array[Int]) + extends RDD[(K, C)](dependency.rdd.context, Nil) { + + override def getDependencies: Seq[Dependency[_]] = List(dependency) + + override val partitioner = { + Some(new CoalescedPartitioner(dependency.partitioner, partitionStartIndices)) + } + + override def getPartitions: Array[Partition] = { + val n = dependency.partitioner.numPartitions + Array.tabulate[Partition](partitionStartIndices.length) { i => + val startIndex = partitionStartIndices(i) + val endIndex = if (i < partitionStartIndices.length - 1) partitionStartIndices(i + 1) else n + new ShuffledRDD2Partition(i, startIndex, endIndex) + } + } + + override def compute(p: Partition, context: TaskContext): Iterator[(K, C)] = { + val sp = p.asInstanceOf[ShuffledRDD2Partition] + SparkEnv.get.shuffleManager.getReader( + dependency.shuffleHandle, sp.startIndexInParent, sp.endIndexInParent, context) + .read() + .asInstanceOf[Iterator[(K, C)]] + } + + override def clearDependencies() { + super.clearDependencies() + dependency = null + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala index 50a69379412d2..cb71a17f08387 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala @@ -27,14 +27,19 @@ import org.apache.spark.util.CallSite */ private[spark] class ActiveJob( val jobId: Int, - val finalStage: ResultStage, + val finalStage: Stage, val func: (TaskContext, Iterator[_]) => _, val partitions: Array[Int], val callSite: CallSite, val listener: JobListener, val properties: Properties) { - val numPartitions = partitions.length + val numPartitions = finalStage match { + case r: ResultStage => partitions.length + case m: ShuffleMapStage => m.rdd.partitions.size + } + val finished = Array.fill[Boolean](numPartitions)(false) + var numFinished = 0 } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 09e963f5cdf60..db0c73d5b50a3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -35,7 +35,7 @@ import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{ShuffledRDD2, DynamicJoinedRDD, RDD} import org.apache.spark.rpc.RpcTimeout import org.apache.spark.storage._ import org.apache.spark.util._ @@ -500,7 +500,12 @@ class DAGScheduler( jobIdToStageIds -= job.jobId jobIdToActiveJob -= job.jobId activeJobs -= job - job.finalStage.resultOfJob = None + job.finalStage match { + case r: ResultStage => + r.resultOfJob = None + case m: ShuffleMapStage => + m.waitingJobs = m.waitingJobs.filter(_ != job) + } } /** @@ -524,6 +529,7 @@ class DAGScheduler( val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { + // Return immediately if the job is running 0 tasks return new JobWaiter[U](this, jobId, 0, resultHandler) } @@ -575,6 +581,29 @@ class DAGScheduler( listener.awaitResult() // Will throw an exception if the job fails } + /** + * Submit a shuffle map stage and get a JobWaiter object back. The JobWaiter object can be used + * to block until the the job finishes executing or can be used to cancel the job. + */ + def submitMapStage[K, V, C]( + dependency: ShuffleDependency[K, V, C], + callSite: CallSite, + properties: Properties): JobWaiter[Any] = { + + val rdd = dependency.rdd + val jobId = nextJobId.getAndIncrement() + if (rdd.partitions.size == 0) { + // Return immediately if the job is running 0 tasks + return new JobWaiter[Any](this, jobId, 0, (i: Int, r: Any) => {}) + } + + assert(rdd.partitions.size > 0) + val waiter = new JobWaiter(this, jobId, rdd.partitions.size, (i: Int, r: Any) => {}) + eventProcessLoop.post(MapStageSubmitted( + jobId, dependency, callSite, waiter, SerializationUtils.clone(properties))) + waiter + } + /** * Cancel a job that is running or waiting in the queue. */ @@ -745,7 +774,63 @@ class DAGScheduler( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) } - submitWaitingStages() + submitWaitingStages() // TODO: Do we need this if finalStage is null? + } + + private[scheduler] def handleMapStageSubmitted(jobId: Int, + dependency: ShuffleDependency[_, _, _], + callSite: CallSite, + listener: JobListener, + properties: Properties) { + // Submitting this map stage might still require the creation of some parent stages, so make + // sure that happens. + var finalStage: ShuffleMapStage = null + try { + // New stage creation may throw an exception if, for example, jobs are run on a + // HadoopRDD whose underlying HDFS files have been deleted. + finalStage = newOrUsedShuffleStage(dependency, jobId) + } catch { + case e: Exception => + logWarning("Creating new stage failed due to exception - job: " + jobId, e) + listener.jobFailed(e) + return + } + if (finalStage != null) { + if (finalStage.isAvailable) { + // The stage is completely finished already, so tell the listener that all tasks are done + for (i <- 0 until finalStage.numPartitions) { + listener.taskSucceeded(i, null) + } + } else { + val job = new ActiveJob(jobId, finalStage, null, null, callSite, listener, properties) + clearCacheLocs() + logInfo("Got map stage job %s (%s) with %d output partitions".format( + jobId, callSite.shortForm, dependency.rdd.partitions.size)) + logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") + logInfo("Parents of final stage: " + finalStage.parents) + logInfo("Missing parents: " + getMissingParentStages(finalStage)) + + // Mark any finished tasks in the stage as such so the listener knows about them + for (i <- 0 until finalStage.numPartitions) { + if (finalStage.outputLocs(i).nonEmpty) { + job.finished(i) = true + job.numFinished += 1 + listener.taskSucceeded(i, null) + } + } + + val jobSubmissionTime = clock.getTimeMillis() + jobIdToActiveJob(jobId) = job + activeJobs += job + finalStage.waitingJobs = job :: finalStage.waitingJobs + val stageIds = jobIdToStageIds(jobId).toArray + val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) + listenerBus.post( + SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) + submitStage(finalStage) + } + } + submitWaitingStages() // TODO: Do we need this if finalStage is null? } /** Submits stage, but first recursively submits any missing parents. */ @@ -1058,6 +1143,21 @@ class DAGScheduler( // Note: newly runnable stages will be submitted below when we submit waiting stages } + + // Mark the task as finished in any map-stage jobs waiting on this stage + for (job <- shuffleStage.waitingJobs) { + if (!job.finished(smt.partitionId)) { + job.finished(smt.partitionId) = true + job.numFinished += 1 + job.listener.taskSucceeded(smt.partitionId, null) + // If the whole job has finished, remove it + if (job.numFinished == job.numPartitions) { + cleanupStateForJobAndIndependentStages(job) + listenerBus.post( + SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) + } + } + } } case Resubmitted => @@ -1445,6 +1545,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) + case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => + dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) + case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index f72a52e85dc15..10232f7ff3eb4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -45,6 +45,14 @@ private[scheduler] case class JobSubmitted( properties: Properties = null) extends DAGSchedulerEvent +private[scheduler] case class MapStageSubmitted( + jobId: Int, + dependency: ShuffleDependency[_, _, _], + callSite: CallSite, + listener: JobListener, // TODO: replace this with another class + properties: Properties = null) + extends DAGSchedulerEvent + private[scheduler] case class StageCancelled(stageId: Int) extends DAGSchedulerEvent private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 1efce124c0a6b..8e7e7bfd6d8e0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -32,6 +32,9 @@ private[spark] sealed trait MapStatus { /** Location where this task was run. */ def location: BlockManagerId + /** Number of map output partitions */ + def numPartitions: Int + /** * Estimated size for the reduce block, in bytes. * @@ -100,6 +103,8 @@ private[spark] class CompressedMapStatus( this(loc, uncompressedSizes.map(MapStatus.compressSize)) } + override def numPartitions: Int = compressedSizes.length + override def location: BlockManagerId = loc override def getSizeForBlock(reduceId: Int): Long = { @@ -132,6 +137,7 @@ private[spark] class CompressedMapStatus( */ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, + private[this] var totalBlocks: Int, private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, private[this] var avgSize: Long) @@ -141,7 +147,9 @@ private[spark] class HighlyCompressedMapStatus private ( require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, -1, null, -1) // For deserialization only + protected def this() = this(null, -1, -1, null, -1) // For deserialization only + + override def numPartitions: Int = totalBlocks override def location: BlockManagerId = loc @@ -155,12 +163,14 @@ private[spark] class HighlyCompressedMapStatus private ( override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { loc.writeExternal(out) + out.writeInt(totalBlocks) emptyBlocks.writeExternal(out) out.writeLong(avgSize) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) + totalBlocks = in.readInt() emptyBlocks = new RoaringBitmap() emptyBlocks.readExternal(in) avgSize = in.readLong() @@ -194,6 +204,6 @@ private[spark] object HighlyCompressedMapStatus { } else { 0 } - new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize) + new HighlyCompressedMapStatus(loc, totalNumBlocks, numNonEmptyBlocks, emptyBlocks, avgSize) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala index 48d8d8e9c4b78..7b788c6a82ca6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.ShuffleDependency import org.apache.spark.rdd.RDD import org.apache.spark.storage.BlockManagerId @@ -41,6 +43,9 @@ private[spark] class ShuffleMapStage( def isAvailable: Boolean = numAvailableOutputs == numPartitions + /** Map-stage jobs that are waiting on this particular stage to finish, if any */ + var waitingJobs: List[ActiveJob] = Nil + val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) def addOutputLoc(partition: Int, status: MapStatus): Unit = { diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index 0c8f08f0f3b1b..6ce95bdcac2aa 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -33,9 +33,6 @@ private[spark] class HashShuffleReader[K, C]( mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker) extends ShuffleReader[K, C] with Logging { - require(endPartition == startPartition + 1, - "Hash shuffle currently only supports fetching one partition") - private val dep = handle.dependency /** Read the combined key-values for this reduce task */ @@ -44,7 +41,7 @@ private[spark] class HashShuffleReader[K, C]( context, blockManager.shuffleClient, blockManager, - mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition), + mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024) diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleReaderSuite.scala index 05b3afef5b839..aea9565ad9e9c 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleReaderSuite.scala @@ -115,7 +115,7 @@ class HashShuffleReaderSuite extends SparkFunSuite with LocalSparkContext { // Make a mocked MapOutputTracker for the shuffle reader to use to determine what // shuffle data to read. val mapOutputTracker = mock(classOf[MapOutputTracker]) - when(mapOutputTracker.getMapSizesByExecutorId(shuffleId, reduceId)).thenReturn { + when(mapOutputTracker.getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)).thenReturn { // Test a scenario where all data is local, to avoid creating a bunch of additional mocks // for the code to read data over the network. val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId => From f25f211b014ddb4121cdf9241bdfa8500985a723 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Wed, 12 Aug 2015 16:44:36 -0700 Subject: [PATCH 2/9] =?UTF-8?q?Remove=20reduce=20task=20stuff,=C2=A0clean?= =?UTF-8?q?=20up,=20add=20docs=20and=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/apache/spark/MapOutputTracker.scala | 60 ++---- .../scala/org/apache/spark/SparkContext.scala | 13 +- .../apache/spark/rdd/DynamicJoinedRDD.scala | 186 ------------------ .../org/apache/spark/rdd/ShuffledRDD2.scala | 117 ----------- .../apache/spark/scheduler/ActiveJob.scala | 29 ++- .../apache/spark/scheduler/DAGScheduler.scala | 168 +++++++++++----- .../spark/scheduler/DAGSchedulerEvent.scala | 4 +- .../apache/spark/scheduler/MapStatus.scala | 14 +- .../apache/spark/scheduler/ResultStage.scala | 17 +- .../spark/scheduler/ShuffleMapStage.scala | 18 +- .../org/apache/spark/scheduler/Stage.scala | 26 ++- .../shuffle/hash/HashShuffleReader.scala | 5 +- .../spark/scheduler/DAGSchedulerSuite.scala | 115 ++++++++++- .../shuffle/hash/HashShuffleReaderSuite.scala | 2 +- 14 files changed, 333 insertions(+), 441 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/rdd/DynamicJoinedRDD.scala delete mode 100644 core/src/main/scala/org/apache/spark/rdd/ShuffledRDD2.scala diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index d570cf348a677..cfb00913b3f94 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -134,36 +134,25 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging */ def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { - getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1) - } - - /** - * Called from executors to get the server URIs and output sizes for each shuffle block that - * needs to be read from a given range of reduce partitions. - * - * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, - * and the second item is a sequence of (shuffle block id, shuffle block size) tuples - * describing the shuffle blocks that are stored at that block manager. - */ - def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) - : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, reduces $startPartition-$endPartition") + logDebug(s"Fetching outputs for shuffle $shuffleId, reduce $reduceId") val statuses = getStatuses(shuffleId) // Synchronize on the returned array because, on the driver, it gets mutated in place statuses.synchronized { - return MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses) + return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses) } } /** * Return statistics about all of the outputs for a given shuffle. + * + * @param shuffleId ID of the shuffle + * @param numPartitions number of map output partitions */ - def getStatistics(shuffleId: Int): MapOutputStatistics = { + def getStatistics(shuffleId: Int, numPartitions: Int): MapOutputStatistics = { val statuses = getStatuses(shuffleId) // Synchronize on the returned array because, on the driver, it gets mutated in place statuses.synchronized { - val totalSizes = new Array[Long](statuses(0).numPartitions) + val totalSizes = new Array[Long](numPartitions) for (s <- statuses) { for (i <- 0 until totalSizes.length) { totalSizes(i) += s.getSizeForBlock(i) @@ -173,19 +162,6 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } } - /** - * Return the location of a given map tasks's output, if present. Only callable on driver. - */ - def getMapOutputLocation(shuffleId: Int, mapId: Int): Option[BlockManagerId] = { - if (mapStatuses.contains(shuffleId)) { - val statuses = mapStatuses(shuffleId) - if (statuses.nonEmpty && statuses(mapId) != null) { - return Some(statuses(mapId).location) - } - } - None - } - /** * Get or fetch the array of MapStatuses for a given shuffle ID. NOTE: clients MUST synchronize * on this array when reading it, because on the driver, we may be changing it in place. @@ -241,7 +217,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } else { logError("Missing all output locations for shuffle " + shuffleId) throw new MetadataFetchFailedException( - shuffleId, 0, "Missing all output locations for shuffle " + shuffleId) + shuffleId, -1, "Missing all output locations for shuffle " + shuffleId) } } else { return statuses @@ -487,16 +463,15 @@ private[spark] object MapOutputTracker extends Logging { } /** - * Converts an array of MapStatuses for a given partition ID range to a sequence that, for each - * block manager ID, lists the shuffle block ids and corresponding shuffle block sizes stored at - * that block manager. + * Converts an array of MapStatuses for a given reduce ID to a sequence that, for each block + * manager ID, lists the shuffle block ids and corresponding shuffle block sizes stored at that + * block manager. * * If any of the statuses is null (indicating a missing location due to a failed mapper), * throws a FetchFailedException. * * @param shuffleId Identifier for the shuffle - * @param startPartition Start of partition range to fetch - * @param endPartition End of partition range to fetch (exclusive) + * @param reduceId Identifier for the reduce task * @param statuses List of map statuses, indexed by map ID. * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block id, shuffle block size) tuples @@ -504,8 +479,7 @@ private[spark] object MapOutputTracker extends Logging { */ private def convertMapStatuses( shuffleId: Int, - startPartition: Int, - endPartition: Int, + reduceId: Int, statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { assert (statuses != null) val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long)]] @@ -513,12 +487,10 @@ private[spark] object MapOutputTracker extends Logging { if (status == null) { val errorMessage = s"Missing an output location for shuffle $shuffleId" logError(errorMessage) - throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) + throw new MetadataFetchFailedException(shuffleId, reduceId, errorMessage) } else { - for (i <- startPartition until endPartition) { - splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += - ((ShuffleBlockId(shuffleId, mapId, i), status.getSizeForBlock(i))) - } + splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += + ((ShuffleBlockId(shuffleId, mapId, reduceId), status.getSizeForBlock(reduceId))) } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1d545ba17fef0..16defae5328ed 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1985,13 +1985,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** - * :: DeveloperApi :: - * Submit a map stage for execution. + * Submit a map stage for execution. This is currently an internal API only, but might be + * promoted to DeveloperApi in the future. */ - @DeveloperApi - def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C]): - SimpleFutureAction[MapOutputStatistics] = - { + private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C]) + : SimpleFutureAction[MapOutputStatistics] = { assertNotStopped() val callSite = getCallSite() val waiter = dagScheduler.submitMapStage( @@ -1999,7 +1997,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli callSite, localProperties.get) // TODO: It's not nice that we use JobWaiter here, but that's what SimpleFutureAction takes - new SimpleFutureAction(waiter, env.mapOutputTracker.getStatistics(dependency.shuffleId)) + new SimpleFutureAction(waiter, env.mapOutputTracker.getStatistics( + dependency.shuffleId, dependency.partitioner.numPartitions)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/DynamicJoinedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/DynamicJoinedRDD.scala deleted file mode 100644 index 0676447e6992d..0000000000000 --- a/core/src/main/scala/org/apache/spark/rdd/DynamicJoinedRDD.scala +++ /dev/null @@ -1,186 +0,0 @@ -package org.apache.spark.rdd - -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap - -import org.apache.spark._ -import org.apache.spark.annotation.DeveloperApi - -/** - * Strategy for joining each partition in a [[DynamicJoinedRDD]]. Possible values are: - * - BRING_TO_1: broadcast the partitions of RDD 2 and bring them to the nodes containing the - * partitions of RDD 1 to do a local broadcast join - * - BRING_TO_2: broadcast the partitions of RDD 1 and bring them to the nodes containing the - * partitions of RDD 2 to do a local broadcast join - * - SHUFFLE: shuffle the partitions of both RDDs to the same reduce task - */ -@DeveloperApi -object PartitionStrategy extends Enumeration { - type PartitionStrategy = Value - - val BRING_TO_1, BRING_TO_2, SHUFFLE = Value -} - -/** - * :: DeveloperApi :: - * An RDD that supports different join strategies by partition. It takes two ShuffleDependencies - * that should have the same partitioner and will produce the input sides of the join, as well as - * arrays describing what to do with each partition (which join strategy to use and, if shuffling - * it, which reduce partition to shuffle data to). The resulting RDD will have one partition for - * each shufflePartitonId used for shuffled partitions, plus one for each partition that data - * is brought to (when we broadcast one RDD's data and bring it to all partitions of the other). - * - * @param dep1 shuffle dependency for first RDD - * @param dep2 shuffle dependency for second RDD (must have same partitioner as dep1) - * @param partitionStrategies [[PartitionStrategy]] to use for each partition; must be same size - * as the number of partitions of the dependencies' partitioners - * @param shufflePartitionIds for partitions where the strategy is set to SHUFFLE, holds the - * destination partition ID to use (which is useful for coalescing data into fewer partitions); - * these IDs must be consecutive numbers from 0 to a maximum value, and the array of values must - * be non-decreasing (e.g. [0, 0, 0, 1, 1, 2, 2, 3]) - */ -@DeveloperApi -class DynamicJoinedRDD[K, V, W]( - var dep1: ShuffleDependency[K, V, V], - var dep2: ShuffleDependency[K, W, W], - val partitionStrategies: Array[PartitionStrategy.PartitionStrategy], - val shufflePartitionIds: Array[Int]) - extends RDD[(K, (V, W))](dep1.rdd.context, Nil) { - - override def getDependencies: Seq[Dependency[_]] = List(dep1, dep2) - - override protected def getPartitions: Array[Partition] = { - val parentPartitions = dep1.partitioner.numPartitions - var maxShufflePartitionId = -1 - var numBringTo1 = 0 - var numBringTo2 = 0 - for (i <- 0 until parentPartitions) { - if (partitionStrategies(i) == PartitionStrategy.BRING_TO_1) { - numBringTo1 += 1 - } else if (partitionStrategies(i) == PartitionStrategy.BRING_TO_2) { - numBringTo2 += 1 - } else { - maxShufflePartitionId = math.max(maxShufflePartitionId, shufflePartitionIds(i)) - } - } - - val numShufflePartitions = maxShufflePartitionId + 1 // Works because this starts at -1 - val totalPartitions = numShufflePartitions + (numBringTo1 + numBringTo2) * parentPartitions - - val partitions = new Array[Partition](totalPartitions) - - // Build up the partitions in this order: - // - 0 to maxShufflePartitionId: shuffled partitions - // - next numBringTo1 * parentPartitions partitions: partitions brought to RDD 1 - // - next numBringTo2 * parentPartitions partitions: partitions brought to RDD 2 - var pos = 0 - for (shufflePart <- 0 until numShufflePartitions) { - while (pos < parentPartitions && partitionStrategies(pos) != PartitionStrategy.SHUFFLE) { - pos += 1 - } - val startPos = pos - while (pos < parentPartitions && (shufflePartitionIds(pos) == shufflePart || - partitionStrategies(pos) != PartitionStrategy.SHUFFLE)) { - pos += 1 - } - val endPos = pos - partitions(shufflePart) = new DynamicJoinedRDDPartition( - shufflePart, PartitionStrategy.SHUFFLE, startPos, endPos, startPos, endPos, true) - } - - var outputPos = numShufflePartitions - for (i <- 0 until parentPartitions) { - if (partitionStrategies(i) == PartitionStrategy.BRING_TO_1) { - for (j <- 0 until parentPartitions) { - partitions(outputPos) = new DynamicJoinedRDDPartition( - outputPos, PartitionStrategy.BRING_TO_1, j, j + 1, i, i + 1, false) - outputPos += 1 - } - } else if (partitionStrategies(i) == PartitionStrategy.BRING_TO_2) { - for (j <- 0 until parentPartitions) { - partitions(outputPos) = new DynamicJoinedRDDPartition( - outputPos, PartitionStrategy.BRING_TO_2, i, i + 1, j, j + 1, true) - outputPos += 1 - } - } - } - - assert(outputPos == totalPartitions) - partitions - } - - override def compute(partition: Partition, context: TaskContext): Iterator[(K, (V, W))] = { - val part = partition.asInstanceOf[DynamicJoinedRDDPartition] - val iter1 = SparkEnv.get.shuffleManager.getReader( - dep1.shuffleHandle, part.rdd1StartPartition, part.rdd1EndPartition, context) - .read() - .asInstanceOf[Iterator[(K, V)]] - val iter2 = SparkEnv.get.shuffleManager.getReader( - dep2.shuffleHandle, part.rdd2StartPartition, part.rdd2EndPartition, context) - .read() - .asInstanceOf[Iterator[(K, W)]] - // This is a pretty slow way to join stuff, but it's just to show it - if (part.hash1) { - val map = new HashMap[K, ArrayBuffer[V]] - for (pair <- iter1) { - map.getOrElseUpdate(pair._1, new ArrayBuffer()) += pair._2 - } - iter2.flatMap { pair => - val k = pair._1 - val w = pair._2 - val values = map.getOrElse(k, Nil) - values.map(v => (k, (v, w))) - } - } else { - val map = new HashMap[K, ArrayBuffer[W]] - for (pair <- iter2) { - map.getOrElseUpdate(pair._1, new ArrayBuffer()) += pair._2 - } - iter1.flatMap { pair => - val k = pair._1 - val v = pair._2 - val values = map.getOrElse(k, Nil) - values.map(w => (k, (v, w))) - } - } - } - - override def getPreferredLocations(partition: Partition): Seq[String] = { - val part = partition.asInstanceOf[DynamicJoinedRDDPartition] - if (part.strategy == PartitionStrategy.BRING_TO_1) { - // TODO: should support passing loc.executorId as a preferred location too - val tracker = SparkEnv.get.mapOutputTracker - tracker.getMapOutputLocation(dep1.shuffleId, part.rdd1StartPartition).map(_.host).toList - } else if (part.strategy == PartitionStrategy.BRING_TO_2) { - // TODO: should support passing loc.executorId as a preferred location too - val tracker = SparkEnv.get.mapOutputTracker - tracker.getMapOutputLocation(dep2.shuffleId, part.rdd2StartPartition).map(_.host).toList - } else { - Nil - } - } - - override def clearDependencies() { - super.clearDependencies() - dep1 = null - dep2 = null - } -} - -/** - * A partition in [[DynamicJoinedRDD]]; contains its index in the final RDD, the ranges of - * partitions to request from rdd1 and rdd2, and whether to hash rdd1 or rdd2's tuples. - */ -class DynamicJoinedRDDPartition( - val index: Int, - val strategy: PartitionStrategy.PartitionStrategy, - val rdd1StartPartition: Int, - val rdd1EndPartition: Int, - val rdd2StartPartition: Int, - val rdd2EndPartition: Int, - val hash1: Boolean) - extends Partition { - - override def toString = - s"DynamicJoinedPartition($index,$strategy,$rdd1StartPartition,$rdd1EndPartition,$rdd2StartPartition,$rdd2EndPartition,$hash1)" -} diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD2.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD2.scala deleted file mode 100644 index a3abb8a51868f..0000000000000 --- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD2.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import java.util.Arrays - -import org.apache.spark._ -import org.apache.spark.annotation.DeveloperApi - -/** - * A Partitioner that might group together one or more partitions from the parent. - * - * @param parent a parent partitioner - * @param partitionStartIndices indices of partitions in parent that should create new partitions - * in child (this should be an array of increasing partition IDs). For example, if we have a - * parent with 5 partitions, and partitionStartIndices is [0, 2, 4], we get three output - * partitions, corresponding to partition ranges [0, 1], [2, 3] and [4] of the parent partitioner. - */ -@DeveloperApi -class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: Array[Int]) - extends Partitioner { - - // TODO: validate partitionStartIndices - - @transient private lazy val parentPartitionMapping: Array[Int] = { - val n = parent.numPartitions - val result = new Array[Int](n) - for (i <- 0 until partitionStartIndices.length) { - val start = partitionStartIndices(i) - val end = if (i < partitionStartIndices.length - 1) partitionStartIndices(i + 1) else n - for (j <- start until end) { - result(j) = i - } - } - result - } - - override def numPartitions: Int = partitionStartIndices.size - - override def getPartition(key: Any): Int = { - parentPartitionMapping(parent.getPartition(key)) - } - - override def equals(other: Any): Boolean = other match { - case c: CoalescedPartitioner => - c.parent == parent && Arrays.equals(c.partitionStartIndices, partitionStartIndices) - case _ => - false - } -} - -private[spark] class ShuffledRDD2Partition( - val index: Int, val startIndexInParent: Int, val endIndexInParent: Int) - extends Partition { - - override def hashCode(): Int = index -} - -/** - * :: DeveloperApi :: - * A more flexible ShuffledRDD used to debug dynamic DAG work. - * - * @param dependency a ShuffleDependency to use - * @tparam K the key class. - * @tparam V the value class. - * @tparam C the combiner class. - */ -// TODO: Make this return RDD[Product2[K, C]] or have some way to configure mutable pairs -@DeveloperApi -class ShuffledRDD2[K, V, C]( - var dependency: ShuffleDependency[K, V, C], - val partitionStartIndices: Array[Int]) - extends RDD[(K, C)](dependency.rdd.context, Nil) { - - override def getDependencies: Seq[Dependency[_]] = List(dependency) - - override val partitioner = { - Some(new CoalescedPartitioner(dependency.partitioner, partitionStartIndices)) - } - - override def getPartitions: Array[Partition] = { - val n = dependency.partitioner.numPartitions - Array.tabulate[Partition](partitionStartIndices.length) { i => - val startIndex = partitionStartIndices(i) - val endIndex = if (i < partitionStartIndices.length - 1) partitionStartIndices(i + 1) else n - new ShuffledRDD2Partition(i, startIndex, endIndex) - } - } - - override def compute(p: Partition, context: TaskContext): Iterator[(K, C)] = { - val sp = p.asInstanceOf[ShuffledRDD2Partition] - SparkEnv.get.shuffleManager.getReader( - dependency.shuffleHandle, sp.startIndexInParent, sp.endIndexInParent, context) - .read() - .asInstanceOf[Iterator[(K, C)]] - } - - override def clearDependencies() { - super.clearDependencies() - dependency = null - } -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala index cb71a17f08387..24e7e92f51e3d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala @@ -23,22 +23,41 @@ import org.apache.spark.TaskContext import org.apache.spark.util.CallSite /** - * Tracks information about an active job in the DAGScheduler. + * A running job in the DAGScheduler. Jobs can be of two types: a result job, which computes a + * ResultStage to execute an action, or a map-stage job, which computes the map outputs for a + * ShuffleMapStage before any downstream stages are submitted. The latter is used for adaptive + * query planning, to look at map output statistics before submitting later stages. We distinguish + * between these two types of jobs using the finalStage field of this class. + * + * Jobs are only tracked for "leaf" stages that clients directly submitted, through DAGScheduler's + * submitJob or submitMapStage methods. However, either type of job may cause the execution of + * may other earlier stages (for RDDs in the DAG it depends on), and multiple jobs may share some + * of these previous stages. These dependencies are managed inside DAGScheduler. + * + * @param jobId A unique ID for this job. + * @param finalStage The stage that this job computes (either a ResultStage for an action or a + * ShuffleMapStage for submitMapStage). + * @param callSite Where this job was initiated in the user's program (shown on UI). + * @param listener A listener to notify if tasks in this job finish or the job fails. + * @param properties Scheduling properties attached to the job, such as fair scheduler pool name. */ private[spark] class ActiveJob( val jobId: Int, val finalStage: Stage, - val func: (TaskContext, Iterator[_]) => _, - val partitions: Array[Int], val callSite: CallSite, val listener: JobListener, val properties: Properties) { + /** + * Number of partitions we need to compute for this job. Note that result stages may not need + * to compute all partitions in their target RDD, for actions like first() and lookup(). + */ val numPartitions = finalStage match { - case r: ResultStage => partitions.length - case m: ShuffleMapStage => m.rdd.partitions.size + case r: ResultStage => r.partitions.length + case m: ShuffleMapStage => m.rdd.partitions.length } + /** Which partitions of the stage have finished */ val finished = Array.fill[Boolean](numPartitions)(false) var numFinished = 0 diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index db0c73d5b50a3..d850724efe5f5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -35,7 +35,7 @@ import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.executor.TaskMetrics import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} -import org.apache.spark.rdd.{ShuffledRDD2, DynamicJoinedRDD, RDD} +import org.apache.spark.rdd.RDD import org.apache.spark.rpc.RpcTimeout import org.apache.spark.storage._ import org.apache.spark.util._ @@ -47,15 +47,48 @@ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat * minimal schedule to run the job. It then submits stages as TaskSets to an underlying * TaskScheduler implementation that runs them on the cluster. * - * In addition to coming up with a DAG of stages, this class also determines the preferred + * Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with + * "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks + * in each stage, but operations with shuffle dependencies require multiple stages (one to write a + * set of map output files, and another to read those files after a barrier). In the end, every + * stage will have only shuffle dependencies on other stages, and may compute multiple operations + * inside it. The actual pipelining of these operations happens in the RDD.compute() functions of + * various RDDs (MappedRDD, FilteredRDD, etc). + * + * In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred * locations to run each task on, based on the current cache status, and passes these to the * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task * a small number of times before cancelling the whole stage. * + * When looking through this code, there are several key concepts: + * + * - Jobs (represented by [[ActiveJob]]) are the top-level work items submitted to the scheduler. + * For example, when the user calls an action, like count(), a job will be submitted through + * submitJob. Each Job may require the execution of multiple stages to build intermediate data. + * + * - Stages ([[Stage]]) are sets of tasks that compute intermediate results in jobs, where each + * task computes the same function on partitions of the same RDD. Stages are separated at shuffle + * boundaries, which introduce a barrier (where we must wait for the previous stage to finish to + * fetch outputs). There are two types of stages: [[ResultStage]], for the final stage that + * executes an action, and [[ShuffleMapStage]], which writes map output files for a shuffle. + * Stages are often shared across multiple jobs, if these jobs reuse the same RDDs. + * + * - Tasks are individual units of work, each sent to one machine. + * + * - Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them + * and likewise remembers which shuffle map stages have already produced output files to avoid + * redoing the map side of a shuffle. + * + * - Preferred locations: the DAGScheduler also computes where to run each task in a stage based + * on the preferred locations of its underlying RDDs, or the location of cached or shuffle data. + * * Here's a checklist to use when making or reviewing changes to this class: * + * - All data structures should be cleared when the jobs involving them end to avoid indefinite + * accumulation of state in long-runnin programs. + * * - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to * include the new structure. This will help to catch memory leaks. */ @@ -295,12 +328,12 @@ class DAGScheduler( */ private def newResultStage( rdd: RDD[_], - numTasks: Int, + func: (TaskContext, Iterator[_]) => _, + partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = { val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId) - val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite) - + val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage @@ -504,13 +537,21 @@ class DAGScheduler( case r: ResultStage => r.resultOfJob = None case m: ShuffleMapStage => - m.waitingJobs = m.waitingJobs.filter(_ != job) + m.mapStageJobs = m.mapStageJobs.filter(_ != job) } } /** - * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object + * Submit an action job to the scheduler and get a JobWaiter object back. The JobWaiter object * can be used to block until the the job finishes executing or can be used to cancel the job. + * + * @param rdd target RDD to run tasks on + * @param func a function to run on each partition of the RDD + * @param partitions set of partitions to run on; some jobs may not want to compute on all + * partitions of the target RDD, e.g. for operations like first() + * @param callSite where in the user program this job was called + * @param resultHandler callback to pass each result to + * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name */ def submitJob[T, U]( rdd: RDD[T], @@ -542,6 +583,18 @@ class DAGScheduler( waiter } + /** + * Run an action job on the given RDD and pass all the results to the resultHandler function as + * they arrive. Throws an exception if the job fials, or returns normally if successful. + * + * @param rdd target RDD to run tasks on + * @param func a function to run on each partition of the RDD + * @param partitions set of partitions to run on; some jobs may not want to compute on all + * partitions of the target RDD, e.g. for operations like first() + * @param callSite where in the user program this job was called + * @param resultHandler callback to pass each result to + * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name + */ def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, @@ -565,6 +618,17 @@ class DAGScheduler( } } + /** + * Run an approximate job on the given RDD and pass all the results to an ApproximateEvaluator + * as they arrive. Returns a partial result object from the evaluator. + * + * @param rdd target RDD to run tasks on + * @param func a function to run on each partition of the RDD + * @param evaluator [[ApproximateEvaluator]] to receive the partial results + * @param callSite where in the user program this job was called + * @param timeout maximum time to wait for the job, in milliseconds + * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name + */ def runApproximateJob[T, U, R]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, @@ -582,8 +646,14 @@ class DAGScheduler( } /** - * Submit a shuffle map stage and get a JobWaiter object back. The JobWaiter object can be used - * to block until the the job finishes executing or can be used to cancel the job. + * Submit a shuffle map stage to run independently and get a JobWaiter object back. The waiter + * can be used to block until the the job finishes executing or can be used to cancel the job. + * This method is used for adaptive query planning, to run map stages and look at statistics + * about their outputs before submitting downstream stages. + * + * @param dependency the ShuffleDependency to run a map stage for + * @param callSite where in the user program this job was submitted + * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name */ def submitMapStage[K, V, C]( dependency: ShuffleDependency[K, V, C], @@ -612,6 +682,9 @@ class DAGScheduler( eventProcessLoop.post(JobCancelled(jobId)) } + /** + * Cancel all jobs in the given job group ID. + */ def cancelJobGroup(groupId: String): Unit = { logInfo("Asked to cancel job group " + groupId) eventProcessLoop.post(JobGroupCancelled(groupId)) @@ -749,7 +822,11 @@ class DAGScheduler( try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. +<<<<<<< HEAD finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite) +======= + finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite) +>>>>>>> 9c0f689... Remove reduce task stuff, clean up, add docs and tests } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) @@ -757,7 +834,7 @@ class DAGScheduler( return } if (finalStage != null) { - val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) + val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) @@ -774,7 +851,7 @@ class DAGScheduler( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) } - submitWaitingStages() // TODO: Do we need this if finalStage is null? + submitWaitingStages() } private[scheduler] def handleMapStageSubmitted(jobId: Int, @@ -788,7 +865,7 @@ class DAGScheduler( try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. - finalStage = newOrUsedShuffleStage(dependency, jobId) + finalStage = getShuffleMapStage(dependency, jobId) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) @@ -796,41 +873,42 @@ class DAGScheduler( return } if (finalStage != null) { - if (finalStage.isAvailable) { - // The stage is completely finished already, so tell the listener that all tasks are done - for (i <- 0 until finalStage.numPartitions) { + val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) + clearCacheLocs() + logInfo("Got map stage job %s (%s) with %d output partitions".format( + jobId, callSite.shortForm, dependency.rdd.partitions.size)) + logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") + logInfo("Parents of final stage: " + finalStage.parents) + logInfo("Missing parents: " + getMissingParentStages(finalStage)) + + // Mark any finished tasks in the stage as such so the listener knows about them + for (i <- 0 until finalStage.numPartitions) { + if (finalStage.outputLocs(i).nonEmpty) { + job.finished(i) = true + job.numFinished += 1 listener.taskSucceeded(i, null) } - } else { - val job = new ActiveJob(jobId, finalStage, null, null, callSite, listener, properties) - clearCacheLocs() - logInfo("Got map stage job %s (%s) with %d output partitions".format( - jobId, callSite.shortForm, dependency.rdd.partitions.size)) - logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") - logInfo("Parents of final stage: " + finalStage.parents) - logInfo("Missing parents: " + getMissingParentStages(finalStage)) - - // Mark any finished tasks in the stage as such so the listener knows about them - for (i <- 0 until finalStage.numPartitions) { - if (finalStage.outputLocs(i).nonEmpty) { - job.finished(i) = true - job.numFinished += 1 - listener.taskSucceeded(i, null) - } - } + } - val jobSubmissionTime = clock.getTimeMillis() - jobIdToActiveJob(jobId) = job - activeJobs += job - finalStage.waitingJobs = job :: finalStage.waitingJobs - val stageIds = jobIdToStageIds(jobId).toArray - val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) + val jobSubmissionTime = clock.getTimeMillis() + jobIdToActiveJob(jobId) = job + activeJobs += job + finalStage.mapStageJobs = job :: finalStage.mapStageJobs + val stageIds = jobIdToStageIds(jobId).toArray + val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) + listenerBus.post( + SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) + submitStage(finalStage) + + // If the whole job has finished, remove it + if (job.numFinished == job.numPartitions) { + markStageAsFinished(finalStage) + cleanupStateForJobAndIndependentStages(job) listenerBus.post( - SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) - submitStage(finalStage) + SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) } } - submitWaitingStages() // TODO: Do we need this if finalStage is null? + submitWaitingStages() } /** Submits stage, but first recursively submits any missing parents. */ @@ -899,7 +977,7 @@ class DAGScheduler( case s: ResultStage => val job = s.resultOfJob.get partitionsToCompute.map { id => - val p = job.partitions(id) + val p = s.partitions(id) (id, getPreferredLocs(stage.rdd, p)) }.toMap } @@ -929,7 +1007,7 @@ class DAGScheduler( case stage: ShuffleMapStage => closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array() case stage: ResultStage => - closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array() + closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array() } taskBinary = sc.broadcast(taskBinaryBytes) @@ -960,7 +1038,7 @@ class DAGScheduler( case stage: ResultStage => val job = stage.resultOfJob.get partitionsToCompute.map { id => - val p: Int = job.partitions(id) + val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, @@ -1145,7 +1223,7 @@ class DAGScheduler( } // Mark the task as finished in any map-stage jobs waiting on this stage - for (job <- shuffleStage.waitingJobs) { + for (job <- shuffleStage.mapStageJobs) { if (!job.finished(smt.partitionId)) { job.finished(smt.partitionId) = true job.numFinished += 1 diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 10232f7ff3eb4..dda3b6cc7f960 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -35,6 +35,7 @@ import org.apache.spark.util.CallSite */ private[scheduler] sealed trait DAGSchedulerEvent +/** A result-yielding job was submitted on a target RDD */ private[scheduler] case class JobSubmitted( jobId: Int, finalRDD: RDD[_], @@ -45,11 +46,12 @@ private[scheduler] case class JobSubmitted( properties: Properties = null) extends DAGSchedulerEvent +/** A map stage as submitted to run as a separate job */ private[scheduler] case class MapStageSubmitted( jobId: Int, dependency: ShuffleDependency[_, _, _], callSite: CallSite, - listener: JobListener, // TODO: replace this with another class + listener: JobListener, properties: Properties = null) extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 8e7e7bfd6d8e0..1efce124c0a6b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -32,9 +32,6 @@ private[spark] sealed trait MapStatus { /** Location where this task was run. */ def location: BlockManagerId - /** Number of map output partitions */ - def numPartitions: Int - /** * Estimated size for the reduce block, in bytes. * @@ -103,8 +100,6 @@ private[spark] class CompressedMapStatus( this(loc, uncompressedSizes.map(MapStatus.compressSize)) } - override def numPartitions: Int = compressedSizes.length - override def location: BlockManagerId = loc override def getSizeForBlock(reduceId: Int): Long = { @@ -137,7 +132,6 @@ private[spark] class CompressedMapStatus( */ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, - private[this] var totalBlocks: Int, private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, private[this] var avgSize: Long) @@ -147,9 +141,7 @@ private[spark] class HighlyCompressedMapStatus private ( require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, -1, -1, null, -1) // For deserialization only - - override def numPartitions: Int = totalBlocks + protected def this() = this(null, -1, null, -1) // For deserialization only override def location: BlockManagerId = loc @@ -163,14 +155,12 @@ private[spark] class HighlyCompressedMapStatus private ( override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { loc.writeExternal(out) - out.writeInt(totalBlocks) emptyBlocks.writeExternal(out) out.writeLong(avgSize) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) - totalBlocks = in.readInt() emptyBlocks = new RoaringBitmap() emptyBlocks.readExternal(in) avgSize = in.readLong() @@ -204,6 +194,6 @@ private[spark] object HighlyCompressedMapStatus { } else { 0 } - new HighlyCompressedMapStatus(loc, totalNumBlocks, numNonEmptyBlocks, emptyBlocks, avgSize) + new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala index bf81b9aca4810..c0451da1f0247 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala @@ -17,23 +17,30 @@ package org.apache.spark.scheduler +import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.util.CallSite /** - * The ResultStage represents the final stage in a job. + * ResultStages apply a function on some partitions of an RDD to compute the result of an action. + * The ResultStage object captures the function to execute, `func`, which will be applied to each + * partition, and the set of partition IDs, `partitions`. Some stages may not run on all partitions + * of the RDD, for actions like first() and lookup(). */ private[spark] class ResultStage( id: Int, rdd: RDD[_], - numTasks: Int, + val func: (TaskContext, Iterator[_]) => _, + val partitions: Array[Int], parents: List[Stage], firstJobId: Int, callSite: CallSite) - extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) { + extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) { - // The active job for this result stage. Will be empty if the job has already finished - // (e.g., because the job was cancelled). + /** + * The active job for this result stage. Will be empty if the job has already finished + * (e.g., because the job was cancelled). + */ var resultOfJob: Option[ActiveJob] = None override def toString: String = "ResultStage " + id diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala index 7b788c6a82ca6..f7317249f51c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala @@ -17,15 +17,21 @@ package org.apache.spark.scheduler -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.ShuffleDependency import org.apache.spark.rdd.RDD import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.CallSite /** - * The ShuffleMapStage represents the intermediate stages in a job. + * ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle. + * They occur right before each shuffle operation, and might contain multiple pipelined operations + * before that (e.g. map and filter). When executed, they save map output files that can later be + * fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of, + * and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready. + * + * ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage. + * For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that + * there can be multiple ActiveJobs trying to compute the same shuffle map stage. */ private[spark] class ShuffleMapStage( id: Int, @@ -39,13 +45,13 @@ private[spark] class ShuffleMapStage( override def toString: String = "ShuffleMapStage " + id + /** Map-stage jobs that were submitted to execute this stage independently (if any) */ + var mapStageJobs: List[ActiveJob] = Nil + var numAvailableOutputs: Int = 0 def isAvailable: Boolean = numAvailableOutputs == numPartitions - /** Map-stage jobs that are waiting on this particular stage to finish, if any */ - var waitingJobs: List[ActiveJob] = Nil - val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) def addOutputLoc(partition: Int, status: MapStatus): Unit = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index c086535782c23..b37eccbd0f7b8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -24,27 +24,33 @@ import org.apache.spark.rdd.RDD import org.apache.spark.util.CallSite /** - * A stage is a set of independent tasks all computing the same function that need to run as part + * A stage is a set of parallel tasks all computing the same function that need to run as part * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run * by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the * DAGScheduler runs these stages in topological order. * * Each Stage can either be a shuffle map stage, in which case its tasks' results are input for - * another stage, or a result stage, in which case its tasks directly compute the action that - * initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes - * that each output partition is on. + * other stage(s), or a result stage, in which case its tasks directly compute a Spark action + * (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also + * track the nodes that each output partition is on. * * Each Stage also has a firstJobId, identifying the job that first submitted the stage. When FIFO * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered * faster on failure. * - * The callSite provides a location in user code which relates to the stage. For a shuffle map - * stage, the callSite gives the user code that created the RDD being shuffled. For a result - * stage, the callSite gives the user code that executes the associated action (e.g. count()). - * - * A single stage can consist of multiple attempts. In that case, the latestInfo field will - * be updated for each attempt. + * Finally, a single stage can be re-executed in multiple attempts due to fault recovery. In that + * case, the Stage object will track multiple StageInfo objects to pass to listeners or the web UI. + * The latest one will be accessible through latestInfo. * + * @param id Unique stage ID + * @param rdd RDD that this stage runs on: for a shuffle map stage, it's the RDD we run map tasks + * on, while for a result stage, it's the target RDD that we ran an action on + * @param numTasks Total number of tasks in stage; result stages in particular may not need to + * compute all partitions, e.g. for first(), lookup(), and take(). + * @param parents List of stages that this stage depends on (through shuffle dependencies). + * @param firstJobId ID of the first job this stage was part of, for FIFO scheduling. + * @param callSite Location in the user program associated with this stage: either where the target + * RDD was created, for a shuffle map stage, or where the action for a result stage was called. */ private[scheduler] abstract class Stage( val id: Int, diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index 6ce95bdcac2aa..0c8f08f0f3b1b 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -33,6 +33,9 @@ private[spark] class HashShuffleReader[K, C]( mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker) extends ShuffleReader[K, C] with Logging { + require(endPartition == startPartition + 1, + "Hash shuffle currently only supports fetching one partition") + private val dep = handle.dependency /** Read the combined key-values for this reduce task */ @@ -41,7 +44,7 @@ private[spark] class HashShuffleReader[K, C]( context, blockManager.shuffleClient, blockManager, - mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), + mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition), // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 1b9ff740ff530..78eda42459895 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -229,7 +229,7 @@ class DAGSchedulerSuite } } - /** Sends the rdd to the scheduler for scheduling and returns the job id. */ + /** Submits a job to the scheduler and returns the job id. */ private def submit( rdd: RDD[_], partitions: Array[Int], @@ -240,6 +240,15 @@ class DAGSchedulerSuite jobId } + /** Submits a map stage to the scheduler and returns the job id. */ + private def submitMapStage( + shuffleDep: ShuffleDependency[_, _, _], + listener: JobListener = jobListener): Int = { + val jobId = scheduler.nextJobId.getAndIncrement() + runEvent(MapStageSubmitted(jobId, shuffleDep, CallSite("", ""), listener)) + jobId + } + /** Sends TaskSetFailed to the scheduler. */ private def failed(taskSet: TaskSet, message: String) { runEvent(TaskSetFailed(taskSet, message, None)) @@ -1313,6 +1322,110 @@ class DAGSchedulerSuite assert(stackTraceString.contains("org.scalatest.FunSuite")) } + test("simple map stage submission") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) + + // Submit a map stage by itself + submitMapStage(shuffleDep) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)))) + assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === + HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + assert(results === Map(0 -> null, 1 -> null)) + results.clear() + assertDataStructuresEmpty() + + // Submit a reduce job that depends on this map stage; it should directly do the reduce + submit(reduceRdd, Array(0)) + complete(taskSets(1), Seq((Success, 42))) + assert(results === Map(0 -> 42)) + results.clear() + assertDataStructuresEmpty() + + // Check that if we submit the map stage again, no tasks run + submitMapStage(shuffleDep) + assert(results === Map(0 -> null, 1 -> null)) + assertDataStructuresEmpty() + } + + test("map stage submission with reduce stage also depending on the data") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) + + // Submit the map stage by itself + submitMapStage(shuffleDep) + + // Submit a reduce job that depends on this map stage + submit(reduceRdd, Array(0)) + + // Complete tasks for the map stage + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostB", 1)))) + assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === + HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + assert(results === Map(0 -> null, 1 -> null)) + results.clear() + + // Complete tasks for the reduce stage + complete(taskSets(1), Seq((Success, 42))) + assert(results === Map(0 -> 42)) + results.clear() + assertDataStructuresEmpty() + + // Check that if we submit the map stage again, no tasks run + submitMapStage(shuffleDep) + assert(results === Map(0 -> null, 1 -> null)) + assertDataStructuresEmpty() + } + + test("map stage submission with fetch failure") { + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) + + // Submit a map stage by itself + submitMapStage(shuffleDep) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", reduceRdd.partitions.size)), + (Success, makeMapStatus("hostB", reduceRdd.partitions.size)))) + assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === + HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + assert(results === Map(0 -> null, 1 -> null)) + results.clear() + assertDataStructuresEmpty() + + // Submit a reduce job that depends on this map stage, but where one reduce will fail a fetch + submit(reduceRdd, Array(0, 1)) + complete(taskSets(1), Seq( + (Success, 42), + (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null))) + // Ask the scheduler to try it again; TaskSet 2 will rerun the map task that we couldn't fetch + // from, then TaskSet 3 will run the reduce stage + scheduler.resubmitFailedStages() + complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size)))) + complete(taskSets(3), Seq((Success, 43))) + assert(results === Map(0 -> 42, 1 -> 43)) + results.clear() + assertDataStructuresEmpty() + + // Run another reduce job without a failure; this should just work + submit(reduceRdd, Array(0, 1)) + complete(taskSets(4), Seq( + (Success, 44), + (Success, 45))) + assert(results === Map(0 -> 44, 1 -> 45)) + results.clear() + assertDataStructuresEmpty() + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleReaderSuite.scala index aea9565ad9e9c..05b3afef5b839 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleReaderSuite.scala @@ -115,7 +115,7 @@ class HashShuffleReaderSuite extends SparkFunSuite with LocalSparkContext { // Make a mocked MapOutputTracker for the shuffle reader to use to determine what // shuffle data to read. val mapOutputTracker = mock(classOf[MapOutputTracker]) - when(mapOutputTracker.getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)).thenReturn { + when(mapOutputTracker.getMapSizesByExecutorId(shuffleId, reduceId)).thenReturn { // Test a scenario where all data is local, to avoid creating a bunch of additional mocks // for the code to read data over the network. val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId => From c8fd9dbcf91683d124d4b74c036c90e93c2971ec Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 13 Aug 2015 14:16:30 -0700 Subject: [PATCH 3/9] Add a test to FailureSuite too --- .../scala/org/apache/spark/FailureSuite.scala | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index aa50a49c50232..f58756e6f6179 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -217,6 +217,27 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext { FailureSuiteState.clear() } + // Run a 3-task map stage where one task fails once. + test("failure in tasks in a submitMapStage") { + sc = new SparkContext("local[1,2]", "test") + val rdd = sc.makeRDD(1 to 3, 3).map { x => + FailureSuiteState.synchronized { + FailureSuiteState.tasksRun += 1 + if (x == 1 && FailureSuiteState.tasksFailed == 0) { + FailureSuiteState.tasksFailed += 1 + throw new Exception("Intentional task failure") + } + } + (x, x) + } + val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(2)) + sc.submitMapStage(dep).get() + FailureSuiteState.synchronized { + assert(FailureSuiteState.tasksRun === 4) + } + FailureSuiteState.clear() + } + // TODO: Need to add tests with shuffle fetch failures. } From ae2e7661d9d6ac0caca662650d7b8f490b14b1c4 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 13 Aug 2015 14:57:52 -0700 Subject: [PATCH 4/9] Another test suite --- .../apache/spark/MapOutputStatistics.scala | 8 +-- .../scala/org/apache/spark/SparkContext.scala | 1 - .../apache/spark/scheduler/DAGScheduler.scala | 8 +-- .../scheduler/AdaptiveSchedulingSuite.scala | 65 +++++++++++++++++++ 4 files changed, 69 insertions(+), 13 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala diff --git a/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala b/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala index 64ee9ffac02d7..78ba7f67ee9cc 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala @@ -17,11 +17,7 @@ package org.apache.spark -import org.apache.spark.annotation.DeveloperApi - /** - * :: DeveloperApi :: - * Hold statistics about the output sizes in a map stage. + * Holds statistics about the output sizes in a map stage. May become a DeveloperApi in the future. */ -@DeveloperApi -class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long]) +private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long]) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 16defae5328ed..1ce35b1803d69 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1996,7 +1996,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli dependency, callSite, localProperties.get) - // TODO: It's not nice that we use JobWaiter here, but that's what SimpleFutureAction takes new SimpleFutureAction(waiter, env.mapOutputTracker.getStatistics( dependency.shuffleId, dependency.partitioner.numPartitions)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d850724efe5f5..6644fbda3e8fc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -822,11 +822,7 @@ class DAGScheduler( try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. -<<<<<<< HEAD - finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite) -======= finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite) ->>>>>>> 9c0f689... Remove reduce task stuff, clean up, add docs and tests } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) @@ -838,7 +834,7 @@ class DAGScheduler( clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) - logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") + logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() @@ -877,7 +873,7 @@ class DAGScheduler( clearCacheLocs() logInfo("Got map stage job %s (%s) with %d output partitions".format( jobId, callSite.shortForm, dependency.rdd.partitions.size)) - logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") + logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala new file mode 100644 index 0000000000000..3fe28027c3c21 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.rdd.{ShuffledRDDPartition, RDD, ShuffledRDD} +import org.apache.spark._ + +object AdaptiveSchedulingSuiteState { + var tasksRun = 0 + + def clear(): Unit = { + tasksRun = 0 + } +} + +/** A special ShuffledRDD where we can pass a ShuffleDependency object to use */ +class CustomShuffledRDD[K, V, C](@transient dep: ShuffleDependency[K, V, C]) + extends RDD[(K, C)](dep.rdd.context, Seq(dep)) { + + override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { + val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] + SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context) + .read() + .asInstanceOf[Iterator[(K, C)]] + } + + override def getPartitions: Array[Partition] = { + Array.tabulate[Partition](dep.partitioner.numPartitions)(i => new ShuffledRDDPartition(i)) + } +} + +class AdaptiveSchedulingSuite extends SparkFunSuite with LocalSparkContext { + test("simple use of submitMapStage") { + try { + sc = new SparkContext("local[1,2]", "test") + val rdd = sc.parallelize(1 to 3, 3).map { x => + AdaptiveSchedulingSuiteState.tasksRun += 1 + (x, x) + } + val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(2)) + val shuffled = new CustomShuffledRDD[Int, Int, Int](dep) + sc.submitMapStage(dep).get() + assert(AdaptiveSchedulingSuiteState.tasksRun == 3) + assert(shuffled.collect().toSet == Set((1, 1), (2, 2), (3, 3))) + assert(AdaptiveSchedulingSuiteState.tasksRun == 3) + } finally { + AdaptiveSchedulingSuiteState.clear() + } + } +} From f644d1a285fb8048948d0cc7f3b671970fbb6100 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Wed, 2 Sep 2015 18:01:20 -0400 Subject: [PATCH 5/9] tweaks --- .../org/apache/spark/MapOutputTracker.scala | 11 +- .../scala/org/apache/spark/SparkContext.scala | 5 +- .../apache/spark/scheduler/DAGScheduler.scala | 101 +++++++++--------- .../spark/scheduler/ShuffleMapStage.scala | 2 +- 4 files changed, 57 insertions(+), 62 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index cfb00913b3f94..94eb8daa85c53 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -144,21 +144,18 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging /** * Return statistics about all of the outputs for a given shuffle. - * - * @param shuffleId ID of the shuffle - * @param numPartitions number of map output partitions */ - def getStatistics(shuffleId: Int, numPartitions: Int): MapOutputStatistics = { - val statuses = getStatuses(shuffleId) + def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { + val statuses = getStatuses(dep.shuffleId) // Synchronize on the returned array because, on the driver, it gets mutated in place statuses.synchronized { - val totalSizes = new Array[Long](numPartitions) + val totalSizes = new Array[Long](dep.partitioner.numPartitions) for (s <- statuses) { for (i <- 0 until totalSizes.length) { totalSizes(i) += s.getSizeForBlock(i) } } - new MapOutputStatistics(shuffleId, totalSizes) + new MapOutputStatistics(dep.shuffleId, totalSizes) } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1ce35b1803d69..911ea423d487e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1996,12 +1996,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli dependency, callSite, localProperties.get) - new SimpleFutureAction(waiter, env.mapOutputTracker.getStatistics( - dependency.shuffleId, dependency.partitioner.numPartitions)) + new SimpleFutureAction(waiter, env.mapOutputTracker.getStatistics(dependency)) } - - /** * Cancel active jobs for the specified group. See [[org.apache.spark.SparkContext.setJobGroup]] * for more information. diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6644fbda3e8fc..fb18a74b95cb5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -829,24 +829,25 @@ class DAGScheduler( listener.jobFailed(e) return } - if (finalStage != null) { - val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) - clearCacheLocs() - logInfo("Got job %s (%s) with %d output partitions".format( - job.jobId, callSite.shortForm, partitions.length)) - logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") - logInfo("Parents of final stage: " + finalStage.parents) - logInfo("Missing parents: " + getMissingParentStages(finalStage)) - val jobSubmissionTime = clock.getTimeMillis() - jobIdToActiveJob(jobId) = job - activeJobs += job - finalStage.resultOfJob = Some(job) - val stageIds = jobIdToStageIds(jobId).toArray - val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) - listenerBus.post( - SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) - submitStage(finalStage) - } + + val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) + clearCacheLocs() + logInfo("Got job %s (%s) with %d output partitions".format( + job.jobId, callSite.shortForm, partitions.length)) + logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") + logInfo("Parents of final stage: " + finalStage.parents) + logInfo("Missing parents: " + getMissingParentStages(finalStage)) + + val jobSubmissionTime = clock.getTimeMillis() + jobIdToActiveJob(jobId) = job + activeJobs += job + finalStage.resultOfJob = Some(job) + val stageIds = jobIdToStageIds(jobId).toArray + val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) + listenerBus.post( + SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) + submitStage(finalStage) + submitWaitingStages() } @@ -868,42 +869,42 @@ class DAGScheduler( listener.jobFailed(e) return } - if (finalStage != null) { - val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) - clearCacheLocs() - logInfo("Got map stage job %s (%s) with %d output partitions".format( - jobId, callSite.shortForm, dependency.rdd.partitions.size)) - logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") - logInfo("Parents of final stage: " + finalStage.parents) - logInfo("Missing parents: " + getMissingParentStages(finalStage)) - - // Mark any finished tasks in the stage as such so the listener knows about them - for (i <- 0 until finalStage.numPartitions) { - if (finalStage.outputLocs(i).nonEmpty) { - job.finished(i) = true - job.numFinished += 1 - listener.taskSucceeded(i, null) - } + + val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) + clearCacheLocs() + logInfo("Got map stage job %s (%s) with %d output partitions".format( + jobId, callSite.shortForm, dependency.rdd.partitions.size)) + logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") + logInfo("Parents of final stage: " + finalStage.parents) + logInfo("Missing parents: " + getMissingParentStages(finalStage)) + + // Mark any finished tasks in the stage as such so the listener knows about them + for (i <- 0 until finalStage.numPartitions) { + if (finalStage.outputLocs(i).nonEmpty) { + job.finished(i) = true + job.numFinished += 1 + listener.taskSucceeded(i, null) } + } - val jobSubmissionTime = clock.getTimeMillis() - jobIdToActiveJob(jobId) = job - activeJobs += job - finalStage.mapStageJobs = job :: finalStage.mapStageJobs - val stageIds = jobIdToStageIds(jobId).toArray - val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) + val jobSubmissionTime = clock.getTimeMillis() + jobIdToActiveJob(jobId) = job + activeJobs += job + finalStage.mapStageJobs = job :: finalStage.mapStageJobs + val stageIds = jobIdToStageIds(jobId).toArray + val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) + listenerBus.post( + SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) + submitStage(finalStage) + + // If the whole job has finished, remove it + if (job.numFinished == job.numPartitions) { + markStageAsFinished(finalStage) + cleanupStateForJobAndIndependentStages(job) listenerBus.post( - SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) - submitStage(finalStage) - - // If the whole job has finished, remove it - if (job.numFinished == job.numPartitions) { - markStageAsFinished(finalStage) - cleanupStateForJobAndIndependentStages(job) - listenerBus.post( - SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) - } + SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) } + submitWaitingStages() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala index f7317249f51c9..7d92960876403 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala @@ -45,7 +45,7 @@ private[spark] class ShuffleMapStage( override def toString: String = "ShuffleMapStage " + id - /** Map-stage jobs that were submitted to execute this stage independently (if any) */ + /** Running map-stage jobs that were submitted to execute this stage independently (if any) */ var mapStageJobs: List[ActiveJob] = Nil var numAvailableOutputs: Int = 0 From c7c9e38050f05253f12ac0de0b07ba614782748b Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Fri, 4 Sep 2015 12:04:25 -0400 Subject: [PATCH 6/9] A few more tests and docs (still want to add more) --- .../apache/spark/MapOutputStatistics.scala | 4 + .../scala/org/apache/spark/SparkContext.scala | 4 +- .../apache/spark/scheduler/DAGScheduler.scala | 79 +++++---- .../spark/scheduler/DAGSchedulerSuite.scala | 156 +++++++++++++++++- 4 files changed, 198 insertions(+), 45 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala b/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala index 78ba7f67ee9cc..f8a6f1d0d8cbb 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala @@ -19,5 +19,9 @@ package org.apache.spark /** * Holds statistics about the output sizes in a map stage. May become a DeveloperApi in the future. + * + * @param shuffleId ID of the shuffle + * @param bytesByPartitionId approximate number of output bytes for each map output partition + * (may be inexact due to use of compressed map statuses) */ private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long]) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 911ea423d487e..dee6091ce3caf 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1992,11 +1992,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli : SimpleFutureAction[MapOutputStatistics] = { assertNotStopped() val callSite = getCallSite() + var result: MapOutputStatistics = null val waiter = dagScheduler.submitMapStage( dependency, + (r: MapOutputStatistics) => { result = r }, callSite, localProperties.get) - new SimpleFutureAction(waiter, env.mapOutputTracker.getStatistics(dependency)) + new SimpleFutureAction[MapOutputStatistics](waiter, result) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index fb18a74b95cb5..a1a4010858fc8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -45,7 +45,9 @@ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a * minimal schedule to run the job. It then submits stages as TaskSets to an underlying - * TaskScheduler implementation that runs them on the cluster. + * TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent + * tasks that can run right away based on the data that's already on the cluster (e.g. map output + * files from previous stages), though it may fail if this data becomes unavailable. * * Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with * "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks @@ -84,6 +86,19 @@ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat * - Preferred locations: the DAGScheduler also computes where to run each task in a stage based * on the preferred locations of its underlying RDDs, or the location of cached or shuffle data. * + * - Cleanup: all data structures are cleared when the running jobs that depend on them finish, + * to prevent memory leaks in a long-running application. + * + * To recover from failures, the same stage might need to run multiple times, which are called + * "attempts". If the TaskScheduler reports that a task failed because a map output file from a + * previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a + * through a CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait + * a small amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any + * lost stage(s) that compute the missing tasks. As part of this process, we might also have to + * create Stage objects for old (finished) stages where we previously cleaned up the Stage object. + * Since tasks from the old attempt of a stage could still be running, care must be taken to map + * any events received in the correct Stage object. + * * Here's a checklist to use when making or reviewing changes to this class: * * - All data structures should be cleared when the jobs involving them end to avoid indefinite @@ -652,23 +667,24 @@ class DAGScheduler( * about their outputs before submitting downstream stages. * * @param dependency the ShuffleDependency to run a map stage for + * @param callback function called with the result of the job, which in this case will be a + * single MapOutputStatistics object showing how much data was produced for each partition * @param callSite where in the user program this job was submitted * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name */ def submitMapStage[K, V, C]( dependency: ShuffleDependency[K, V, C], + callback: MapOutputStatistics => Unit, callSite: CallSite, - properties: Properties): JobWaiter[Any] = { + properties: Properties): JobWaiter[MapOutputStatistics] = { val rdd = dependency.rdd val jobId = nextJobId.getAndIncrement() - if (rdd.partitions.size == 0) { - // Return immediately if the job is running 0 tasks - return new JobWaiter[Any](this, jobId, 0, (i: Int, r: Any) => {}) + if (rdd.partitions.length == 0) { + throw new SparkException("Can't run submitMapStage on RDD with 0 partitions") } - assert(rdd.partitions.size > 0) - val waiter = new JobWaiter(this, jobId, rdd.partitions.size, (i: Int, r: Any) => {}) + val waiter = new JobWaiter(this, jobId, 1, (i: Int, r: MapOutputStatistics) => callback(r)) eventProcessLoop.post(MapStageSubmitted( jobId, dependency, callSite, waiter, SerializationUtils.clone(properties))) waiter @@ -878,15 +894,6 @@ class DAGScheduler( logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) - // Mark any finished tasks in the stage as such so the listener knows about them - for (i <- 0 until finalStage.numPartitions) { - if (finalStage.outputLocs(i).nonEmpty) { - job.finished(i) = true - job.numFinished += 1 - listener.taskSucceeded(i, null) - } - } - val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job @@ -897,9 +904,11 @@ class DAGScheduler( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) - // If the whole job has finished, remove it - if (job.numFinished == job.numPartitions) { - markStageAsFinished(finalStage) + // If the whole stage has already finished, tell the listener and remove it + if (!finalStage.outputLocs.contains(Nil)) { + job.finished(0) = true + job.numFinished += 1 + listener.taskSucceeded(0, mapOutputTracker.getStatistics(dependency)) cleanupStateForJobAndIndependentStages(job) listenerBus.post( SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) @@ -1212,28 +1221,26 @@ class DAGScheduler( logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + ") because some of its tasks had failed: " + shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty) - .map(_._2).mkString(", ")) + .map(_._2).mkString(", ")) submitStage(shuffleStage) + } else { + // Mark any map-stage jobs waiting on this stage as finished + if (shuffleStage.mapStageJobs.nonEmpty) { + val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep) + for (job <- shuffleStage.mapStageJobs) { + job.finished(0) = true + job.numFinished += 1 + job.listener.taskSucceeded(0, stats) + cleanupStateForJobAndIndependentStages(job) + listenerBus.post( + SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) + } + } } // Note: newly runnable stages will be submitted below when we submit waiting stages } - - // Mark the task as finished in any map-stage jobs waiting on this stage - for (job <- shuffleStage.mapStageJobs) { - if (!job.finished(smt.partitionId)) { - job.finished(smt.partitionId) = true - job.numFinished += 1 - job.listener.taskSucceeded(smt.partitionId, null) - // If the whole job has finished, remove it - if (job.numFinished == job.numPartitions) { - cleanupStateForJobAndIndependentStages(job) - listenerBus.post( - SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) - } - } - } - } + } case Resubmitted => logInfo("Resubmitted " + task + ", so marking it as still running") diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 78eda42459895..ec728c40b40aa 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -152,6 +152,14 @@ class DAGSchedulerSuite override def jobFailed(exception: Exception) = { failure = exception } } + /** A simple helper class for creating custom JobListeners */ + class SimpleListener extends JobListener { + val results = new HashMap[Int, Any] + var failure: Exception = null + override def taskSucceeded(index: Int, result: Any) = results.put(index, result) + override def jobFailed(exception: Exception) = { failure = exception } + } + before { sc = new SparkContext("local", "DAGSchedulerSuite") sparkListener.submittedStageInfos.clear() @@ -1324,7 +1332,7 @@ class DAGSchedulerSuite test("simple map stage submission") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) - val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) val shuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) @@ -1335,7 +1343,7 @@ class DAGSchedulerSuite (Success, makeMapStatus("hostB", 1)))) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) - assert(results === Map(0 -> null, 1 -> null)) + assert(results.size === 1) results.clear() assertDataStructuresEmpty() @@ -1348,13 +1356,13 @@ class DAGSchedulerSuite // Check that if we submit the map stage again, no tasks run submitMapStage(shuffleDep) - assert(results === Map(0 -> null, 1 -> null)) + assert(results.size === 1) assertDataStructuresEmpty() } test("map stage submission with reduce stage also depending on the data") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) - val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) val shuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) @@ -1370,7 +1378,7 @@ class DAGSchedulerSuite (Success, makeMapStatus("hostB", 1)))) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) - assert(results === Map(0 -> null, 1 -> null)) + assert(results.size === 1) results.clear() // Complete tasks for the reduce stage @@ -1381,13 +1389,13 @@ class DAGSchedulerSuite // Check that if we submit the map stage again, no tasks run submitMapStage(shuffleDep) - assert(results === Map(0 -> null, 1 -> null)) + assert(results.size === 1) assertDataStructuresEmpty() } test("map stage submission with fetch failure") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) - val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) val shuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) @@ -1398,7 +1406,7 @@ class DAGSchedulerSuite (Success, makeMapStatus("hostB", reduceRdd.partitions.size)))) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) - assert(results === Map(0 -> null, 1 -> null)) + assert(results.size === 1) results.clear() assertDataStructuresEmpty() @@ -1424,6 +1432,138 @@ class DAGSchedulerSuite assert(results === Map(0 -> 44, 1 -> 45)) results.clear() assertDataStructuresEmpty() + + // Resubmit the map stage; this should also just work + submitMapStage(shuffleDep) + assert(results.size === 1) + results.clear() + assertDataStructuresEmpty() + } + + /** + * In this test, we have three RDDs with shuffle dependencies, and we submit map stage jobs + * that are waiting on each one, as well as a reduce job on the last one. We test that all of + * these jobs complete even if there are some fetch failures in both shuffles. + */ + test("map stage submission with multiple shared stages and failure") { + val rdd1 = new MyRDD(sc, 2, Nil) + val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2)) + val rdd2 = new MyRDD(sc, 2, List(dep1)) + val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2)) + val rdd3 = new MyRDD(sc, 2, List(dep2)) + + val listener1 = new SimpleListener + val listener2 = new SimpleListener + val listener3 = new SimpleListener + + submitMapStage(dep1, listener1) + submitMapStage(dep2, listener2) + submit(rdd3, Array(0, 1), listener = listener3) + + // Complete the first stage + assert(taskSets(0).stageId === 0) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", rdd1.partitions.size)), + (Success, makeMapStatus("hostB", rdd1.partitions.size)))) + assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === + HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + assert(listener1.results.size === 1) + + // When attempting the second stage, show a fetch failure + assert(taskSets(1).stageId === 1) + complete(taskSets(1), Seq( + (Success, makeMapStatus("hostA", rdd2.partitions.size)), + (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null))) + scheduler.resubmitFailedStages() + assert(listener2.results.size === 0) // Second stage listener should not have a result yet + + // Stage 0 should now be running as task set 2; make its task succeed + assert(taskSets(2).stageId === 0) + complete(taskSets(2), Seq( + (Success, makeMapStatus("hostC", rdd2.partitions.size)))) + assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === + HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) + assert(listener2.results.size === 0) // Second stage listener should still not have a result + + // Stage 1 should now be running as task set 3; make its first task succeed + assert(taskSets(3).stageId === 1) + complete(taskSets(3), Seq( + (Success, makeMapStatus("hostB", rdd2.partitions.size)), + (Success, makeMapStatus("hostD", rdd2.partitions.size)))) + assert(mapOutputTracker.getMapSizesByExecutorId(dep2.shuffleId, 0).map(_._1).toSet === + HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostD"))) + assert(listener2.results.size === 1) + + // Finally, the reduce job should be running as task set 4; make it see a fetch failure, + // then make it run again and succeed + assert(taskSets(4).stageId === 2) + complete(taskSets(4), Seq( + (Success, 52), + (FetchFailed(makeBlockManagerId("hostD"), dep2.shuffleId, 0, 0, "ignored"), null))) + scheduler.resubmitFailedStages() + + // TaskSet 5 will rerun stage 1's lost task, then TaskSet 6 will rerun stage 2 + assert(taskSets(5).stageId === 1) + complete(taskSets(5), Seq( + (Success, makeMapStatus("hostE", rdd2.partitions.size)))) + complete(taskSets(6), Seq( + (Success, 53))) + assert(listener3.results === Map(0 -> 52, 1 -> 53)) + assertDataStructuresEmpty() + } + + /** + * In this test, we run a map stage where one of the executors fails but we still receive a + * "zombie" complete message from that executor. We want to make sure the stage is not reported + * as done until all tasks have completed. + */ + test("map stage submission with executor failure late map task completions") { + val shuffleMapRdd = new MyRDD(sc, 3, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleId = shuffleDep.shuffleId + + submitMapStage(shuffleDep) + + val oldTaskSet = taskSets(0) + runEvent(CompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2), + null, createFakeTaskInfo(), null)) + assert(results.size === 0) // Map stage job should not be complete yet + + // Pretend host A was lost + val oldEpoch = mapOutputTracker.getEpoch + runEvent(ExecutorLost("exec-hostA")) + val newEpoch = mapOutputTracker.getEpoch + assert(newEpoch > oldEpoch) + + // Suppose we also get a completed event from task 1 on the same host; this should be ignored + runEvent(CompletionEvent(oldTaskSet.tasks(1), Success, makeMapStatus("hostA", 2), + null, createFakeTaskInfo(), null)) + assert(results.size === 0) // Map stage job should not be complete yet + + // A completion from another task should work because it's a non-failed host + runEvent(CompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2), + null, createFakeTaskInfo(), null)) + assert(results.size === 0) // Map stage job should not be complete yet + + // Now complete tasks in the second task set + val newTaskSet = taskSets(1) + assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on on hostA + runEvent(CompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2), + null, createFakeTaskInfo(), null)) + assert(results.size === 0) // Map stage job should not be complete yet + runEvent(CompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2), + null, createFakeTaskInfo(), null)) + assert(results.size === 1) // Map stage job should now finally be complete + assertDataStructuresEmpty() + + // Also test that a reduce stage using this shuffled data can immediately run + val reduceRDD = new MyRDD(sc, 2, List(shuffleDep)) + results.clear() + submit(reduceRDD, Array(0, 1)) + complete(taskSets(2), Seq((Success, 42), (Success, 43))) + assert(results === Map(0 -> 42, 1 -> 43)) + results.clear() + assertDataStructuresEmpty() } /** From 458ccf59e8d91c3ef0e30f864a7f998f7a3c3f5f Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Fri, 4 Sep 2015 15:46:31 -0400 Subject: [PATCH 7/9] Further improvements to tests --- .../spark/scheduler/DAGSchedulerSuite.scala | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index ec728c40b40aa..91b35aaf5e09a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -156,8 +156,8 @@ class DAGSchedulerSuite class SimpleListener extends JobListener { val results = new HashMap[Int, Any] var failure: Exception = null - override def taskSucceeded(index: Int, result: Any) = results.put(index, result) - override def jobFailed(exception: Exception) = { failure = exception } + override def taskSucceeded(index: Int, result: Any): Unit = results.put(index, result) + override def jobFailed(exception: Exception): Unit = { failure = exception } } before { @@ -1333,23 +1333,19 @@ class DAGSchedulerSuite test("simple map stage submission") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) - val shuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) // Submit a map stage by itself submitMapStage(shuffleDep) - complete(taskSets(0), Seq( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostB", 1)))) - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + assert(results.size === 0) // No results yet + completeShuffleMapStageSuccessfully(0, 0, 1) assert(results.size === 1) results.clear() assertDataStructuresEmpty() // Submit a reduce job that depends on this map stage; it should directly do the reduce submit(reduceRdd, Array(0)) - complete(taskSets(1), Seq((Success, 42))) + completeNextResultStageWithSuccess(2, 0) assert(results === Map(0 -> 42)) results.clear() assertDataStructuresEmpty() @@ -1373,16 +1369,12 @@ class DAGSchedulerSuite submit(reduceRdd, Array(0)) // Complete tasks for the map stage - complete(taskSets(0), Seq( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostB", 1)))) - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + completeShuffleMapStageSuccessfully(0, 0, 1) assert(results.size === 1) results.clear() // Complete tasks for the reduce stage - complete(taskSets(1), Seq((Success, 42))) + completeNextResultStageWithSuccess(1, 0) assert(results === Map(0 -> 42)) results.clear() assertDataStructuresEmpty() @@ -1404,8 +1396,6 @@ class DAGSchedulerSuite complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", reduceRdd.partitions.size)), (Success, makeMapStatus("hostB", reduceRdd.partitions.size)))) - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) assert(results.size === 1) results.clear() assertDataStructuresEmpty() @@ -1445,7 +1435,7 @@ class DAGSchedulerSuite * that are waiting on each one, as well as a reduce job on the last one. We test that all of * these jobs complete even if there are some fetch failures in both shuffles. */ - test("map stage submission with multiple shared stages and failure") { + test("map stage submission with multiple shared stages and failures") { val rdd1 = new MyRDD(sc, 2, Nil) val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2)) val rdd2 = new MyRDD(sc, 2, List(dep1)) @@ -1520,7 +1510,6 @@ class DAGSchedulerSuite test("map stage submission with executor failure late map task completions") { val shuffleMapRdd = new MyRDD(sc, 3, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) - val shuffleId = shuffleDep.shuffleId submitMapStage(shuffleDep) From 3ca93dd74787b85b9e49caf38c21db1119222f05 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Fri, 4 Sep 2015 15:54:38 -0400 Subject: [PATCH 8/9] Improve comments --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 +++++ .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a1a4010858fc8..8f70528d2c8a2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -684,6 +684,11 @@ class DAGScheduler( throw new SparkException("Can't run submitMapStage on RDD with 0 partitions") } + // We create a JobWaiter with only one "task", which will be marked as complete when the whole + // map stage has completed, and will be passed the MapOutputStatistics for that stage. + // This makes it easier to avoid race conditions between the user code and the map output + // tracker that might result if we told the user the stage had finished, but then they queries + // the map output tracker and some node failures had caused the output statistics to be lost. val waiter = new JobWaiter(this, jobId, 1, (i: Int, r: MapOutputStatistics) => callback(r)) eventProcessLoop.post(MapStageSubmitted( jobId, dependency, callSite, waiter, SerializationUtils.clone(properties))) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 91b35aaf5e09a..1c55f90ad9b44 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1359,7 +1359,6 @@ class DAGSchedulerSuite test("map stage submission with reduce stage also depending on the data") { val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1)) - val shuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) // Submit the map stage by itself From 8bfce4d674cc0aa88c67d4defff054fc41216681 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 13 Sep 2015 17:15:22 -0400 Subject: [PATCH 9/9] Review comments --- .../apache/spark/scheduler/ActiveJob.scala | 4 +- .../apache/spark/scheduler/DAGScheduler.scala | 39 ++++++++++--------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala index 24e7e92f51e3d..a3d2db31301b3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala @@ -31,8 +31,8 @@ import org.apache.spark.util.CallSite * * Jobs are only tracked for "leaf" stages that clients directly submitted, through DAGScheduler's * submitJob or submitMapStage methods. However, either type of job may cause the execution of - * may other earlier stages (for RDDs in the DAG it depends on), and multiple jobs may share some - * of these previous stages. These dependencies are managed inside DAGScheduler. + * other earlier stages (for RDDs in the DAG it depends on), and multiple jobs may share some of + * these previous stages. These dependencies are managed inside DAGScheduler. * * @param jobId A unique ID for this job. * @param finalStage The stage that this job computes (either a ResultStage for an action or a diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8f70528d2c8a2..b4f90e8347894 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -92,17 +92,17 @@ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat * To recover from failures, the same stage might need to run multiple times, which are called * "attempts". If the TaskScheduler reports that a task failed because a map output file from a * previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a - * through a CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait - * a small amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any - * lost stage(s) that compute the missing tasks. As part of this process, we might also have to - * create Stage objects for old (finished) stages where we previously cleaned up the Stage object. - * Since tasks from the old attempt of a stage could still be running, care must be taken to map - * any events received in the correct Stage object. + * CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small + * amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost + * stage(s) that compute the missing tasks. As part of this process, we might also have to create + * Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since + * tasks from the old attempt of a stage could still be running, care must be taken to map any + * events received in the correct Stage object. * * Here's a checklist to use when making or reviewing changes to this class: * * - All data structures should be cleared when the jobs involving them end to avoid indefinite - * accumulation of state in long-runnin programs. + * accumulation of state in long-running programs. * * - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to * include the new structure. This will help to catch memory leaks. @@ -911,12 +911,7 @@ class DAGScheduler( // If the whole stage has already finished, tell the listener and remove it if (!finalStage.outputLocs.contains(Nil)) { - job.finished(0) = true - job.numFinished += 1 - listener.taskSucceeded(0, mapOutputTracker.getStatistics(dependency)) - cleanupStateForJobAndIndependentStages(job) - listenerBus.post( - SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) + markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency)) } submitWaitingStages() @@ -1233,12 +1228,7 @@ class DAGScheduler( if (shuffleStage.mapStageJobs.nonEmpty) { val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep) for (job <- shuffleStage.mapStageJobs) { - job.finished(0) = true - job.numFinished += 1 - job.listener.taskSucceeded(0, stats) - cleanupStateForJobAndIndependentStages(job) - listenerBus.post( - SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) + markMapStageJobAsFinished(job, stats) } } } @@ -1599,6 +1589,17 @@ class DAGScheduler( Nil } + /** Mark a map stage job as finished with the given output stats, and report to its listener. */ + def markMapStageJobAsFinished(job: ActiveJob, stats: MapOutputStatistics): Unit = { + // In map stage jobs, we only create a single "task", which is to finish all of the stage + // (including reusing any previous map outputs, etc); so we just mark task 0 as done + job.finished(0) = true + job.numFinished += 1 + job.listener.taskSucceeded(0, stats) + cleanupStateForJobAndIndependentStages(job) + listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) + } + def stop() { logInfo("Stopping DAGScheduler") messageScheduler.shutdownNow()