Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions core/src/main/scala/org/apache/spark/MapOutputStatistics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

/**
* Holds statistics about the output sizes in a map stage. May become a DeveloperApi in the future.
*
* @param shuffleId ID of the shuffle
* @param bytesByPartitionId approximate number of output bytes for each map output partition
* (may be inexact due to use of compressed map statuses)
*/
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment to make it clear that bytesByPartitionId is indexed by the shuffle-read aka reduce side? (at least, I think it is, if I'm reading the code right.) Also probably worth adding a comment that this is just an estimate of the size, and can really be off by an arbitrary amount given the approximations made in HighlyCompressedMapStatus.

49 changes: 38 additions & 11 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.io._
import java.util.Arrays
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

Expand Down Expand Up @@ -132,13 +133,43 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
* describing the shuffle blocks that are stored at that block manager.
*/
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, reduce $reduceId")
val startTime = System.currentTimeMillis
val statuses = getStatuses(shuffleId)
// Synchronize on the returned array because, on the driver, it gets mutated in place
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
}
}

/**
* Return statistics about all of the outputs for a given shuffle.
*/
def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
val statuses = getStatuses(dep.shuffleId)
// Synchronize on the returned array because, on the driver, it gets mutated in place
statuses.synchronized {
val totalSizes = new Array[Long](dep.partitioner.numPartitions)
for (s <- statuses) {
for (i <- 0 until totalSizes.length) {
totalSizes(i) += s.getSizeForBlock(i)
}
}
new MapOutputStatistics(dep.shuffleId, totalSizes)
}
}

/**
* Get or fetch the array of MapStatuses for a given shuffle ID. NOTE: clients MUST synchronize
* on this array when reading it, because on the driver, we may be changing it in place.
*
* (It would be nice to remove this restriction in the future.)
*/
private def getStatuses(shuffleId: Int): Array[MapStatus] = {
val statuses = mapStatuses.get(shuffleId).orNull
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
val startTime = System.currentTimeMillis
var fetchedStatuses: Array[MapStatus] = null
fetching.synchronized {
// Someone else is fetching it; wait for them to be done
Expand All @@ -160,7 +191,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}

if (fetchedStatuses == null) {
// We won the race to fetch the output locs; do so
// We won the race to fetch the statuses; do so
logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
// This try-finally prevents hangs due to timeouts:
try {
Expand All @@ -175,22 +206,18 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}
}
}
logDebug(s"Fetching map output location for shuffle $shuffleId, reduce $reduceId took " +
logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
s"${System.currentTimeMillis - startTime} ms")

if (fetchedStatuses != null) {
fetchedStatuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
return fetchedStatuses
} else {
logError("Missing all output locations for shuffle " + shuffleId)
throw new MetadataFetchFailedException(
shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
shuffleId, -1, "Missing all output locations for shuffle " + shuffleId)
}
} else {
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
}
return statuses
}
}

Expand Down
17 changes: 17 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1984,6 +1984,23 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
new SimpleFutureAction(waiter, resultFunc)
}

/**
* Submit a map stage for execution. This is currently an internal API only, but might be
* promoted to DeveloperApi in the future.
*/
private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C])
: SimpleFutureAction[MapOutputStatistics] = {
assertNotStopped()
val callSite = getCallSite()
var result: MapOutputStatistics = null
val waiter = dagScheduler.submitMapStage(
dependency,
(r: MapOutputStatistics) => { result = r },
callSite,
localProperties.get)
new SimpleFutureAction[MapOutputStatistics](waiter, result)
}

/**
* Cancel active jobs for the specified group. See [[org.apache.spark.SparkContext.setJobGroup]]
* for more information.
Expand Down
34 changes: 29 additions & 5 deletions core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,42 @@ import org.apache.spark.TaskContext
import org.apache.spark.util.CallSite

/**
* Tracks information about an active job in the DAGScheduler.
* A running job in the DAGScheduler. Jobs can be of two types: a result job, which computes a
* ResultStage to execute an action, or a map-stage job, which computes the map outputs for a
* ShuffleMapStage before any downstream stages are submitted. The latter is used for adaptive
* query planning, to look at map output statistics before submitting later stages. We distinguish
* between these two types of jobs using the finalStage field of this class.
*
* Jobs are only tracked for "leaf" stages that clients directly submitted, through DAGScheduler's
* submitJob or submitMapStage methods. However, either type of job may cause the execution of
* other earlier stages (for RDDs in the DAG it depends on), and multiple jobs may share some of
* these previous stages. These dependencies are managed inside DAGScheduler.
*
* @param jobId A unique ID for this job.
* @param finalStage The stage that this job computes (either a ResultStage for an action or a
* ShuffleMapStage for submitMapStage).
* @param callSite Where this job was initiated in the user's program (shown on UI).
* @param listener A listener to notify if tasks in this job finish or the job fails.
* @param properties Scheduling properties attached to the job, such as fair scheduler pool name.
*/
private[spark] class ActiveJob(
val jobId: Int,
val finalStage: ResultStage,
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
val finalStage: Stage,
val callSite: CallSite,
val listener: JobListener,
val properties: Properties) {

val numPartitions = partitions.length
/**
* Number of partitions we need to compute for this job. Note that result stages may not need
* to compute all partitions in their target RDD, for actions like first() and lookup().
*/
val numPartitions = finalStage match {
case r: ResultStage => r.partitions.length
case m: ShuffleMapStage => m.rdd.partitions.length
}

/** Which partitions of the stage have finished */
val finished = Array.fill[Boolean](numPartitions)(false)

var numFinished = 0
}
Loading