Skip to content

Commit 30db8ba

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into internal_row
Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
2 parents 7cbced8 + 8860405 commit 30db8ba

File tree

114 files changed

+1717
-3189
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

114 files changed

+1717
-3189
lines changed

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ function renderDagViz(forJob) {
140140

141141
// Find cached RDDs and mark them as such
142142
metadataContainer().selectAll(".cached-rdd").each(function(v) {
143-
var nodeId = VizConstants.nodePrefix + d3.select(this).text();
143+
var rddId = d3.select(this).text().trim();
144+
var nodeId = VizConstants.nodePrefix + rddId;
144145
svg.selectAll("g." + nodeId).classed("cached", true);
145146
});
146147

@@ -150,7 +151,7 @@ function renderDagViz(forJob) {
150151
/* Render the RDD DAG visualization on the stage page. */
151152
function renderDagVizForStage(svgContainer) {
152153
var metadata = metadataContainer().select(".stage-metadata");
153-
var dot = metadata.select(".dot-file").text();
154+
var dot = metadata.select(".dot-file").text().trim();
154155
var containerId = VizConstants.graphPrefix + metadata.attr("stage-id");
155156
var container = svgContainer.append("g").attr("id", containerId);
156157
renderDot(dot, container, false);

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io._
2121
import java.util.concurrent.ConcurrentHashMap
2222
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
2323

24-
import scala.collection.mutable.{HashSet, Map}
24+
import scala.collection.mutable.{HashMap, HashSet, Map}
2525
import scala.collection.JavaConversions._
2626
import scala.reflect.ClassTag
2727

@@ -284,6 +284,53 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
284284
cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
285285
}
286286

287+
/**
288+
* Return a list of locations that each have fraction of map output greater than the specified
289+
* threshold.
290+
*
291+
* @param shuffleId id of the shuffle
292+
* @param reducerId id of the reduce task
293+
* @param numReducers total number of reducers in the shuffle
294+
* @param fractionThreshold fraction of total map output size that a location must have
295+
* for it to be considered large.
296+
*
297+
* This method is not thread-safe.
298+
*/
299+
def getLocationsWithLargestOutputs(
300+
shuffleId: Int,
301+
reducerId: Int,
302+
numReducers: Int,
303+
fractionThreshold: Double)
304+
: Option[Array[BlockManagerId]] = {
305+
306+
if (mapStatuses.contains(shuffleId)) {
307+
val statuses = mapStatuses(shuffleId)
308+
if (statuses.nonEmpty) {
309+
// HashMap to add up sizes of all blocks at the same location
310+
val locs = new HashMap[BlockManagerId, Long]
311+
var totalOutputSize = 0L
312+
var mapIdx = 0
313+
while (mapIdx < statuses.length) {
314+
val status = statuses(mapIdx)
315+
val blockSize = status.getSizeForBlock(reducerId)
316+
if (blockSize > 0) {
317+
locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
318+
totalOutputSize += blockSize
319+
}
320+
mapIdx = mapIdx + 1
321+
}
322+
val topLocs = locs.filter { case (loc, size) =>
323+
size.toDouble / totalOutputSize >= fractionThreshold
324+
}
325+
// Return if we have any locations which satisfy the required threshold
326+
if (topLocs.nonEmpty) {
327+
return Some(topLocs.map(_._1).toArray)
328+
}
329+
}
330+
}
331+
None
332+
}
333+
287334
def incrementEpoch() {
288335
epochLock.synchronized {
289336
epoch += 1

core/src/main/scala/org/apache/spark/api/r/SerDe.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.api.r
1919

2020
import java.io.{DataInputStream, DataOutputStream}
21-
import java.sql.{Date, Time}
21+
import java.sql.{Timestamp, Date, Time}
2222

2323
import scala.collection.JavaConversions._
2424

@@ -107,9 +107,12 @@ private[spark] object SerDe {
107107
Date.valueOf(readString(in))
108108
}
109109

110-
def readTime(in: DataInputStream): Time = {
111-
val t = in.readDouble()
112-
new Time((t * 1000L).toLong)
110+
def readTime(in: DataInputStream): Timestamp = {
111+
val seconds = in.readDouble()
112+
val sec = Math.floor(seconds).toLong
113+
val t = new Timestamp(sec * 1000L)
114+
t.setNanos(((seconds - sec) * 1e9).toInt)
115+
t
113116
}
114117

115118
def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {
@@ -227,6 +230,9 @@ private[spark] object SerDe {
227230
case "java.sql.Time" =>
228231
writeType(dos, "time")
229232
writeTime(dos, value.asInstanceOf[Time])
233+
case "java.sql.Timestamp" =>
234+
writeType(dos, "time")
235+
writeTime(dos, value.asInstanceOf[Timestamp])
230236
case "[B" =>
231237
writeType(dos, "raw")
232238
writeBytes(dos, value.asInstanceOf[Array[Byte]])
@@ -289,6 +295,9 @@ private[spark] object SerDe {
289295
out.writeDouble(value.getTime.toDouble / 1000.0)
290296
}
291297

298+
def writeTime(out: DataOutputStream, value: Timestamp): Unit = {
299+
out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9)
300+
}
292301

293302
// NOTE: Only works for ASCII right now
294303
def writeString(out: DataOutputStream, value: String): Unit = {

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,22 @@ class DAGScheduler(
137137
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
138138
taskScheduler.setDAGScheduler(this)
139139

140+
// Flag to control if reduce tasks are assigned preferred locations
141+
private val shuffleLocalityEnabled =
142+
sc.getConf.getBoolean("spark.shuffle.reduceLocality.enabled", true)
143+
// Number of map, reduce tasks above which we do not assign preferred locations
144+
// based on map output sizes. We limit the size of jobs for which assign preferred locations
145+
// as computing the top locations by size becomes expensive.
146+
private[this] val SHUFFLE_PREF_MAP_THRESHOLD = 1000
147+
// NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
148+
private[this] val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000
149+
150+
// Fraction of total map output that must be at a location for it to considered as a preferred
151+
// location for a reduce task.
152+
// Making this larger will focus on fewer locations where most data can be read locally, but
153+
// may lead to more delay in scheduling if those locations are busy.
154+
private[scheduler] val REDUCER_PREF_LOCS_FRACTION = 0.2
155+
140156
// Called by TaskScheduler to report task's starting.
141157
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
142158
eventProcessLoop.post(BeginEvent(task, taskInfo))
@@ -1384,17 +1400,32 @@ class DAGScheduler(
13841400
if (rddPrefs.nonEmpty) {
13851401
return rddPrefs.map(TaskLocation(_))
13861402
}
1387-
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
1388-
// that has any placement preferences. Ideally we would choose based on transfer sizes,
1389-
// but this will do for now.
1403+
13901404
rdd.dependencies.foreach {
13911405
case n: NarrowDependency[_] =>
1406+
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
1407+
// that has any placement preferences. Ideally we would choose based on transfer sizes,
1408+
// but this will do for now.
13921409
for (inPart <- n.getParents(partition)) {
13931410
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
13941411
if (locs != Nil) {
13951412
return locs
13961413
}
13971414
}
1415+
case s: ShuffleDependency[_, _, _] =>
1416+
// For shuffle dependencies, pick locations which have at least REDUCER_PREF_LOCS_FRACTION
1417+
// of data as preferred locations
1418+
if (shuffleLocalityEnabled &&
1419+
rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
1420+
s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
1421+
// Get the preferred map output locations for this reducer
1422+
val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
1423+
partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION)
1424+
if (topLocsForReducer.nonEmpty) {
1425+
return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
1426+
}
1427+
}
1428+
13981429
case _ =>
13991430
}
14001431
Nil

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,4 +205,39 @@ class MapOutputTrackerSuite extends SparkFunSuite {
205205
// masterTracker.stop() // this throws an exception
206206
rpcEnv.shutdown()
207207
}
208+
209+
test("getLocationsWithLargestOutputs with multiple outputs in same machine") {
210+
val rpcEnv = createRpcEnv("test")
211+
val tracker = new MapOutputTrackerMaster(conf)
212+
tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
213+
new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf))
214+
// Setup 3 map tasks
215+
// on hostA with output size 2
216+
// on hostA with output size 2
217+
// on hostB with output size 3
218+
tracker.registerShuffle(10, 3)
219+
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
220+
Array(2L)))
221+
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("a", "hostA", 1000),
222+
Array(2L)))
223+
tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000),
224+
Array(3L)))
225+
226+
// When the threshold is 50%, only host A should be returned as a preferred location
227+
// as it has 4 out of 7 bytes of output.
228+
val topLocs50 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.5)
229+
assert(topLocs50.nonEmpty)
230+
assert(topLocs50.get.size === 1)
231+
assert(topLocs50.get.head === BlockManagerId("a", "hostA", 1000))
232+
233+
// When the threshold is 20%, both hosts should be returned as preferred locations.
234+
val topLocs20 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.2)
235+
assert(topLocs20.nonEmpty)
236+
assert(topLocs20.get.size === 2)
237+
assert(topLocs20.get.toSet ===
238+
Seq(BlockManagerId("a", "hostA", 1000), BlockManagerId("b", "hostB", 1000)).toSet)
239+
240+
tracker.stop()
241+
rpcEnv.shutdown()
242+
}
208243
}

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -490,8 +490,8 @@ class DAGSchedulerSuite
490490
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
491491
submit(reduceRdd, Array(0, 1))
492492
complete(taskSets(0), Seq(
493-
(Success, makeMapStatus("hostA", 1)),
494-
(Success, makeMapStatus("hostB", 1))))
493+
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
494+
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
495495
// the 2nd ResultTask failed
496496
complete(taskSets(1), Seq(
497497
(Success, 42),
@@ -501,7 +501,7 @@ class DAGSchedulerSuite
501501
// ask the scheduler to try it again
502502
scheduler.resubmitFailedStages()
503503
// have the 2nd attempt pass
504-
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
504+
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
505505
// we can see both result blocks now
506506
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
507507
Array("hostA", "hostB"))
@@ -517,8 +517,8 @@ class DAGSchedulerSuite
517517
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
518518
submit(reduceRdd, Array(0, 1))
519519
complete(taskSets(0), Seq(
520-
(Success, makeMapStatus("hostA", 1)),
521-
(Success, makeMapStatus("hostB", 1))))
520+
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
521+
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
522522
// The MapOutputTracker should know about both map output locations.
523523
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
524524
Array("hostA", "hostB"))
@@ -560,18 +560,18 @@ class DAGSchedulerSuite
560560
assert(newEpoch > oldEpoch)
561561
val taskSet = taskSets(0)
562562
// should be ignored for being too old
563-
runEvent(CompletionEvent(
564-
taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
563+
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
564+
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
565565
// should work because it's a non-failed host
566-
runEvent(CompletionEvent(
567-
taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null))
566+
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB",
567+
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
568568
// should be ignored for being too old
569-
runEvent(CompletionEvent(
570-
taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
569+
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
570+
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
571571
// should work because it's a new epoch
572572
taskSet.tasks(1).epoch = newEpoch
573-
runEvent(CompletionEvent(
574-
taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
573+
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA",
574+
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
575575
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
576576
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
577577
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
@@ -800,19 +800,63 @@ class DAGSchedulerSuite
800800
assertDataStructuresEmpty()
801801
}
802802

803+
test("reduce tasks should be placed locally with map output") {
804+
// Create an shuffleMapRdd with 1 partition
805+
val shuffleMapRdd = new MyRDD(sc, 1, Nil)
806+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
807+
val shuffleId = shuffleDep.shuffleId
808+
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
809+
submit(reduceRdd, Array(0))
810+
complete(taskSets(0), Seq(
811+
(Success, makeMapStatus("hostA", 1))))
812+
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
813+
Array(makeBlockManagerId("hostA")))
814+
815+
// Reducer should run on the same host that map task ran
816+
val reduceTaskSet = taskSets(1)
817+
assertLocations(reduceTaskSet, Seq(Seq("hostA")))
818+
complete(reduceTaskSet, Seq((Success, 42)))
819+
assert(results === Map(0 -> 42))
820+
assertDataStructuresEmpty
821+
}
822+
823+
test("reduce task locality preferences should only include machines with largest map outputs") {
824+
val numMapTasks = 4
825+
// Create an shuffleMapRdd with more partitions
826+
val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil)
827+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
828+
val shuffleId = shuffleDep.shuffleId
829+
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
830+
submit(reduceRdd, Array(0))
831+
832+
val statuses = (1 to numMapTasks).map { i =>
833+
(Success, makeMapStatus("host" + i, 1, (10*i).toByte))
834+
}
835+
complete(taskSets(0), statuses)
836+
837+
// Reducer should prefer the last 3 hosts as they have 20%, 30% and 40% of data
838+
val hosts = (1 to numMapTasks).map(i => "host" + i).reverse.take(numMapTasks - 1)
839+
840+
val reduceTaskSet = taskSets(1)
841+
assertLocations(reduceTaskSet, Seq(hosts))
842+
complete(reduceTaskSet, Seq((Success, 42)))
843+
assert(results === Map(0 -> 42))
844+
assertDataStructuresEmpty
845+
}
846+
803847
/**
804848
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
805849
* Note that this checks only the host and not the executor ID.
806850
*/
807851
private def assertLocations(taskSet: TaskSet, hosts: Seq[Seq[String]]) {
808852
assert(hosts.size === taskSet.tasks.size)
809853
for ((taskLocs, expectedLocs) <- taskSet.tasks.map(_.preferredLocations).zip(hosts)) {
810-
assert(taskLocs.map(_.host) === expectedLocs)
854+
assert(taskLocs.map(_.host).toSet === expectedLocs.toSet)
811855
}
812856
}
813857

814-
private def makeMapStatus(host: String, reduces: Int): MapStatus =
815-
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(2))
858+
private def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
859+
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))
816860

817861
private def makeBlockManagerId(host: String): BlockManagerId =
818862
BlockManagerId("exec-" + host, host, 12345)

0 commit comments

Comments
 (0)