Skip to content

Commit 8c29707

Browse files
committed
Merge branch 'master' into SPARK-8103
2 parents 89a59b6 + 424b007 commit 8c29707

File tree

79 files changed

+1137
-495
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+1137
-495
lines changed

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ spark-env.sh
2828
spark-env.cmd
2929
spark-env.sh.template
3030
log4j-defaults.properties
31+
log4j-defaults-repl.properties
3132
bootstrap-tooltip.js
3233
jquery-1.11.1.min.js
3334
d3.min.js
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Set everything to be logged to the console
2+
log4j.rootCategory=WARN, console
3+
log4j.appender.console=org.apache.log4j.ConsoleAppender
4+
log4j.appender.console.target=System.err
5+
log4j.appender.console.layout=org.apache.log4j.PatternLayout
6+
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
7+
8+
# Settings to quiet third party logs that are too verbose
9+
log4j.logger.org.spark-project.jetty=WARN
10+
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
11+
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
12+
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

core/src/main/scala/org/apache/spark/Logging.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,25 @@ trait Logging {
121121
if (usingLog4j12) {
122122
val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
123123
if (!log4j12Initialized) {
124-
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
125-
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
126-
case Some(url) =>
127-
PropertyConfigurator.configure(url)
128-
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
129-
case None =>
130-
System.err.println(s"Spark was unable to load $defaultLogProps")
124+
if (Utils.isInInterpreter) {
125+
val replDefaultLogProps = "org/apache/spark/log4j-defaults-repl.properties"
126+
Option(Utils.getSparkClassLoader.getResource(replDefaultLogProps)) match {
127+
case Some(url) =>
128+
PropertyConfigurator.configure(url)
129+
System.err.println(s"Using Spark's repl log4j profile: $replDefaultLogProps")
130+
System.err.println("To adjust logging level use sc.setLogLevel(\"INFO\")")
131+
case None =>
132+
System.err.println(s"Spark was unable to load $replDefaultLogProps")
133+
}
134+
} else {
135+
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
136+
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
137+
case Some(url) =>
138+
PropertyConfigurator.configure(url)
139+
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
140+
case None =>
141+
System.err.println(s"Spark was unable to load $defaultLogProps")
142+
}
131143
}
132144
}
133145
}

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io._
2121
import java.util.concurrent.ConcurrentHashMap
2222
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
2323

24-
import scala.collection.mutable.{HashSet, Map}
24+
import scala.collection.mutable.{HashMap, HashSet, Map}
2525
import scala.collection.JavaConversions._
2626
import scala.reflect.ClassTag
2727

@@ -284,6 +284,53 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
284284
cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
285285
}
286286

287+
/**
288+
* Return a list of locations that each have fraction of map output greater than the specified
289+
* threshold.
290+
*
291+
* @param shuffleId id of the shuffle
292+
* @param reducerId id of the reduce task
293+
* @param numReducers total number of reducers in the shuffle
294+
* @param fractionThreshold fraction of total map output size that a location must have
295+
* for it to be considered large.
296+
*
297+
* This method is not thread-safe.
298+
*/
299+
def getLocationsWithLargestOutputs(
300+
shuffleId: Int,
301+
reducerId: Int,
302+
numReducers: Int,
303+
fractionThreshold: Double)
304+
: Option[Array[BlockManagerId]] = {
305+
306+
if (mapStatuses.contains(shuffleId)) {
307+
val statuses = mapStatuses(shuffleId)
308+
if (statuses.nonEmpty) {
309+
// HashMap to add up sizes of all blocks at the same location
310+
val locs = new HashMap[BlockManagerId, Long]
311+
var totalOutputSize = 0L
312+
var mapIdx = 0
313+
while (mapIdx < statuses.length) {
314+
val status = statuses(mapIdx)
315+
val blockSize = status.getSizeForBlock(reducerId)
316+
if (blockSize > 0) {
317+
locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
318+
totalOutputSize += blockSize
319+
}
320+
mapIdx = mapIdx + 1
321+
}
322+
val topLocs = locs.filter { case (loc, size) =>
323+
size.toDouble / totalOutputSize >= fractionThreshold
324+
}
325+
// Return if we have any locations which satisfy the required threshold
326+
if (topLocs.nonEmpty) {
327+
return Some(topLocs.map(_._1).toArray)
328+
}
329+
}
330+
}
331+
None
332+
}
333+
287334
def incrementEpoch() {
288335
epochLock.synchronized {
289336
epoch += 1

core/src/main/scala/org/apache/spark/api/r/RBackend.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel
2929
import io.netty.handler.codec.LengthFieldBasedFrameDecoder
3030
import io.netty.handler.codec.bytes.{ByteArrayDecoder, ByteArrayEncoder}
3131

32-
import org.apache.spark.Logging
32+
import org.apache.spark.{Logging, SparkConf}
3333

3434
/**
3535
* Netty-based backend server that is used to communicate between R and Java.
@@ -41,7 +41,8 @@ private[spark] class RBackend {
4141
private[this] var bossGroup: EventLoopGroup = null
4242

4343
def init(): Int = {
44-
bossGroup = new NioEventLoopGroup(2)
44+
val conf = new SparkConf()
45+
bossGroup = new NioEventLoopGroup(conf.getInt("spark.r.numRBackendThreads", 2))
4546
val workerGroup = bossGroup
4647
val handler = new RBackendHandler(this)
4748

core/src/main/scala/org/apache/spark/api/r/SerDe.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.api.r
1919

2020
import java.io.{DataInputStream, DataOutputStream}
21-
import java.sql.{Date, Time}
21+
import java.sql.{Timestamp, Date, Time}
2222

2323
import scala.collection.JavaConversions._
2424

@@ -107,9 +107,12 @@ private[spark] object SerDe {
107107
Date.valueOf(readString(in))
108108
}
109109

110-
def readTime(in: DataInputStream): Time = {
111-
val t = in.readDouble()
112-
new Time((t * 1000L).toLong)
110+
def readTime(in: DataInputStream): Timestamp = {
111+
val seconds = in.readDouble()
112+
val sec = Math.floor(seconds).toLong
113+
val t = new Timestamp(sec * 1000L)
114+
t.setNanos(((seconds - sec) * 1e9).toInt)
115+
t
113116
}
114117

115118
def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {
@@ -227,6 +230,9 @@ private[spark] object SerDe {
227230
case "java.sql.Time" =>
228231
writeType(dos, "time")
229232
writeTime(dos, value.asInstanceOf[Time])
233+
case "java.sql.Timestamp" =>
234+
writeType(dos, "time")
235+
writeTime(dos, value.asInstanceOf[Timestamp])
230236
case "[B" =>
231237
writeType(dos, "raw")
232238
writeBytes(dos, value.asInstanceOf[Array[Byte]])
@@ -289,6 +295,9 @@ private[spark] object SerDe {
289295
out.writeDouble(value.getTime.toDouble / 1000.0)
290296
}
291297

298+
def writeTime(out: DataOutputStream, value: Timestamp): Unit = {
299+
out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9)
300+
}
292301

293302
// NOTE: Only works for ASCII right now
294303
def writeString(out: DataOutputStream, value: String): Unit = {

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 17 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -324,55 +324,20 @@ object SparkSubmit {
324324
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
325325
args.mainClass = "org.apache.spark.deploy.PythonRunner"
326326
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
327-
args.files = mergeFileLists(args.files, args.primaryResource)
327+
if (clusterManager != YARN) {
328+
// The YARN backend distributes the primary file differently, so don't merge it.
329+
args.files = mergeFileLists(args.files, args.primaryResource)
330+
}
331+
}
332+
if (clusterManager != YARN) {
333+
// The YARN backend handles python files differently, so don't merge the lists.
334+
args.files = mergeFileLists(args.files, args.pyFiles)
328335
}
329-
args.files = mergeFileLists(args.files, args.pyFiles)
330336
if (args.pyFiles != null) {
331337
sysProps("spark.submit.pyFiles") = args.pyFiles
332338
}
333339
}
334340

335-
// In yarn mode for a python app, add pyspark archives to files
336-
// that can be distributed with the job
337-
if (args.isPython && clusterManager == YARN) {
338-
var pyArchives: String = null
339-
val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH")
340-
if (pyArchivesEnvOpt.isDefined) {
341-
pyArchives = pyArchivesEnvOpt.get
342-
} else {
343-
if (!sys.env.contains("SPARK_HOME")) {
344-
printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.")
345-
}
346-
val pythonPath = new ArrayBuffer[String]
347-
for (sparkHome <- sys.env.get("SPARK_HOME")) {
348-
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
349-
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
350-
if (!pyArchivesFile.exists()) {
351-
printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.")
352-
}
353-
val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
354-
if (!py4jFile.exists()) {
355-
printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " +
356-
"in yarn mode.")
357-
}
358-
pythonPath += pyArchivesFile.getAbsolutePath()
359-
pythonPath += py4jFile.getAbsolutePath()
360-
}
361-
pyArchives = pythonPath.mkString(",")
362-
}
363-
364-
pyArchives = pyArchives.split(",").map { localPath =>
365-
val localURI = Utils.resolveURI(localPath)
366-
if (localURI.getScheme != "local") {
367-
args.files = mergeFileLists(args.files, localURI.toString)
368-
new Path(localPath).getName
369-
} else {
370-
localURI.getPath
371-
}
372-
}.mkString(File.pathSeparator)
373-
sysProps("spark.submit.pyArchives") = pyArchives
374-
}
375-
376341
// If we're running a R app, set the main class to our specific R runner
377342
if (args.isR && deployMode == CLIENT) {
378343
if (args.primaryResource == SPARKR_SHELL) {
@@ -386,19 +351,10 @@ object SparkSubmit {
386351
}
387352
}
388353

389-
if (isYarnCluster) {
390-
// In yarn-cluster mode for a python app, add primary resource and pyFiles to files
391-
// that can be distributed with the job
392-
if (args.isPython) {
393-
args.files = mergeFileLists(args.files, args.primaryResource)
394-
args.files = mergeFileLists(args.files, args.pyFiles)
395-
}
396-
354+
if (isYarnCluster && args.isR) {
397355
// In yarn-cluster mode for a R app, add primary resource to files
398356
// that can be distributed with the job
399-
if (args.isR) {
400-
args.files = mergeFileLists(args.files, args.primaryResource)
401-
}
357+
args.files = mergeFileLists(args.files, args.primaryResource)
402358
}
403359

404360
// Special flag to avoid deprecation warnings at the client
@@ -515,17 +471,18 @@ object SparkSubmit {
515471
}
516472
}
517473

474+
// Let YARN know it's a pyspark app, so it distributes needed libraries.
475+
if (clusterManager == YARN && args.isPython) {
476+
sysProps.put("spark.yarn.isPython", "true")
477+
}
478+
518479
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
519480
if (isYarnCluster) {
520481
childMainClass = "org.apache.spark.deploy.yarn.Client"
521482
if (args.isPython) {
522-
val mainPyFile = new Path(args.primaryResource).getName
523-
childArgs += ("--primary-py-file", mainPyFile)
483+
childArgs += ("--primary-py-file", args.primaryResource)
524484
if (args.pyFiles != null) {
525-
// These files will be distributed to each machine's working directory, so strip the
526-
// path prefix
527-
val pyFilesNames = args.pyFiles.split(",").map(p => (new Path(p)).getName).mkString(",")
528-
childArgs += ("--py-files", pyFilesNames)
485+
childArgs += ("--py-files", args.pyFiles)
529486
}
530487
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
531488
} else if (args.isR) {

core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,9 @@ private[spark] object RDDOperationScope extends Logging {
9595
private[spark] def withScope[T](
9696
sc: SparkContext,
9797
allowNesting: Boolean = false)(body: => T): T = {
98-
val stackTrace = Thread.currentThread.getStackTrace().tail // ignore "Thread#getStackTrace"
99-
val ourMethodName = stackTrace(1).getMethodName // i.e. withScope
100-
// Climb upwards to find the first method that's called something different
101-
val callerMethodName = stackTrace
98+
val ourMethodName = "withScope"
99+
val callerMethodName = Thread.currentThread.getStackTrace()
100+
.dropWhile(_.getMethodName != ourMethodName)
102101
.find(_.getMethodName != ourMethodName)
103102
.map(_.getMethodName)
104103
.getOrElse {

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,22 @@ class DAGScheduler(
137137
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
138138
taskScheduler.setDAGScheduler(this)
139139

140+
// Flag to control if reduce tasks are assigned preferred locations
141+
private val shuffleLocalityEnabled =
142+
sc.getConf.getBoolean("spark.shuffle.reduceLocality.enabled", true)
143+
// Number of map, reduce tasks above which we do not assign preferred locations
144+
// based on map output sizes. We limit the size of jobs for which assign preferred locations
145+
// as computing the top locations by size becomes expensive.
146+
private[this] val SHUFFLE_PREF_MAP_THRESHOLD = 1000
147+
// NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
148+
private[this] val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000
149+
150+
// Fraction of total map output that must be at a location for it to considered as a preferred
151+
// location for a reduce task.
152+
// Making this larger will focus on fewer locations where most data can be read locally, but
153+
// may lead to more delay in scheduling if those locations are busy.
154+
private[scheduler] val REDUCER_PREF_LOCS_FRACTION = 0.2
155+
140156
// Called by TaskScheduler to report task's starting.
141157
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
142158
eventProcessLoop.post(BeginEvent(task, taskInfo))
@@ -1399,17 +1415,32 @@ class DAGScheduler(
13991415
if (rddPrefs.nonEmpty) {
14001416
return rddPrefs.map(TaskLocation(_))
14011417
}
1402-
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
1403-
// that has any placement preferences. Ideally we would choose based on transfer sizes,
1404-
// but this will do for now.
1418+
14051419
rdd.dependencies.foreach {
14061420
case n: NarrowDependency[_] =>
1421+
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
1422+
// that has any placement preferences. Ideally we would choose based on transfer sizes,
1423+
// but this will do for now.
14071424
for (inPart <- n.getParents(partition)) {
14081425
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
14091426
if (locs != Nil) {
14101427
return locs
14111428
}
14121429
}
1430+
case s: ShuffleDependency[_, _, _] =>
1431+
// For shuffle dependencies, pick locations which have at least REDUCER_PREF_LOCS_FRACTION
1432+
// of data as preferred locations
1433+
if (shuffleLocalityEnabled &&
1434+
rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
1435+
s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
1436+
// Get the preferred map output locations for this reducer
1437+
val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
1438+
partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION)
1439+
if (topLocsForReducer.nonEmpty) {
1440+
return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
1441+
}
1442+
}
1443+
14131444
case _ =>
14141445
}
14151446
Nil

0 commit comments

Comments
 (0)