Skip to content

Commit 0520c3c

Browse files
committed
Merge branch 'master' into sqlconf
2 parents 7afb0ec + e428b3a commit 0520c3c

File tree

136 files changed

+2509
-1406
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

136 files changed

+2509
-1406
lines changed

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ spark-env.sh
2828
spark-env.cmd
2929
spark-env.sh.template
3030
log4j-defaults.properties
31+
log4j-defaults-repl.properties
3132
bootstrap-tooltip.js
3233
jquery-1.11.1.min.js
3334
d3.min.js
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Set everything to be logged to the console
2+
log4j.rootCategory=WARN, console
3+
log4j.appender.console=org.apache.log4j.ConsoleAppender
4+
log4j.appender.console.target=System.err
5+
log4j.appender.console.layout=org.apache.log4j.PatternLayout
6+
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
7+
8+
# Settings to quiet third party logs that are too verbose
9+
log4j.logger.org.spark-project.jetty=WARN
10+
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
11+
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
12+
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ function renderDagVizForJob(svgContainer) {
235235
// them separately later. Note that we cannot draw them now because we need to
236236
// put these edges in a separate container that is on top of all stage graphs.
237237
metadata.selectAll(".incoming-edge").each(function(v) {
238-
var edge = d3.select(this).text().split(","); // e.g. 3,4 => [3, 4]
238+
var edge = d3.select(this).text().trim().split(","); // e.g. 3,4 => [3, 4]
239239
crossStageEdges.push(edge);
240240
});
241241
});

core/src/main/scala/org/apache/spark/Logging.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,25 @@ trait Logging {
121121
if (usingLog4j12) {
122122
val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
123123
if (!log4j12Initialized) {
124-
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
125-
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
126-
case Some(url) =>
127-
PropertyConfigurator.configure(url)
128-
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
129-
case None =>
130-
System.err.println(s"Spark was unable to load $defaultLogProps")
124+
if (Utils.isInInterpreter) {
125+
val replDefaultLogProps = "org/apache/spark/log4j-defaults-repl.properties"
126+
Option(Utils.getSparkClassLoader.getResource(replDefaultLogProps)) match {
127+
case Some(url) =>
128+
PropertyConfigurator.configure(url)
129+
System.err.println(s"Using Spark's repl log4j profile: $replDefaultLogProps")
130+
System.err.println("To adjust logging level use sc.setLogLevel(\"INFO\")")
131+
case None =>
132+
System.err.println(s"Spark was unable to load $replDefaultLogProps")
133+
}
134+
} else {
135+
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
136+
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
137+
case Some(url) =>
138+
PropertyConfigurator.configure(url)
139+
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
140+
case None =>
141+
System.err.println(s"Spark was unable to load $defaultLogProps")
142+
}
131143
}
132144
}
133145
}

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io._
2121
import java.util.concurrent.ConcurrentHashMap
2222
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
2323

24-
import scala.collection.mutable.{HashSet, Map}
24+
import scala.collection.mutable.{HashMap, HashSet, Map}
2525
import scala.collection.JavaConversions._
2626
import scala.reflect.ClassTag
2727

@@ -284,6 +284,53 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
284284
cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
285285
}
286286

287+
/**
288+
* Return a list of locations that each have fraction of map output greater than the specified
289+
* threshold.
290+
*
291+
* @param shuffleId id of the shuffle
292+
* @param reducerId id of the reduce task
293+
* @param numReducers total number of reducers in the shuffle
294+
* @param fractionThreshold fraction of total map output size that a location must have
295+
* for it to be considered large.
296+
*
297+
* This method is not thread-safe.
298+
*/
299+
def getLocationsWithLargestOutputs(
300+
shuffleId: Int,
301+
reducerId: Int,
302+
numReducers: Int,
303+
fractionThreshold: Double)
304+
: Option[Array[BlockManagerId]] = {
305+
306+
if (mapStatuses.contains(shuffleId)) {
307+
val statuses = mapStatuses(shuffleId)
308+
if (statuses.nonEmpty) {
309+
// HashMap to add up sizes of all blocks at the same location
310+
val locs = new HashMap[BlockManagerId, Long]
311+
var totalOutputSize = 0L
312+
var mapIdx = 0
313+
while (mapIdx < statuses.length) {
314+
val status = statuses(mapIdx)
315+
val blockSize = status.getSizeForBlock(reducerId)
316+
if (blockSize > 0) {
317+
locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
318+
totalOutputSize += blockSize
319+
}
320+
mapIdx = mapIdx + 1
321+
}
322+
val topLocs = locs.filter { case (loc, size) =>
323+
size.toDouble / totalOutputSize >= fractionThreshold
324+
}
325+
// Return if we have any locations which satisfy the required threshold
326+
if (topLocs.nonEmpty) {
327+
return Some(topLocs.map(_._1).toArray)
328+
}
329+
}
330+
}
331+
None
332+
}
333+
287334
def incrementEpoch() {
288335
epochLock.synchronized {
289336
epoch += 1

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark
2020
import java.io.File
2121
import java.net.Socket
2222

23+
import akka.actor.ActorSystem
24+
2325
import scala.collection.JavaConversions._
2426
import scala.collection.mutable
2527
import scala.util.Properties
@@ -75,7 +77,8 @@ class SparkEnv (
7577
val conf: SparkConf) extends Logging {
7678

7779
// TODO Remove actorSystem
78-
val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
80+
@deprecated("Actor system is no longer supported as of 1.4")
81+
val actorSystem: ActorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
7982

8083
private[spark] var isStopped = false
8184
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()

core/src/main/scala/org/apache/spark/api/r/RBackend.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel
2929
import io.netty.handler.codec.LengthFieldBasedFrameDecoder
3030
import io.netty.handler.codec.bytes.{ByteArrayDecoder, ByteArrayEncoder}
3131

32-
import org.apache.spark.Logging
32+
import org.apache.spark.{Logging, SparkConf}
3333

3434
/**
3535
* Netty-based backend server that is used to communicate between R and Java.
@@ -41,7 +41,8 @@ private[spark] class RBackend {
4141
private[this] var bossGroup: EventLoopGroup = null
4242

4343
def init(): Int = {
44-
bossGroup = new NioEventLoopGroup(2)
44+
val conf = new SparkConf()
45+
bossGroup = new NioEventLoopGroup(conf.getInt("spark.r.numRBackendThreads", 2))
4546
val workerGroup = bossGroup
4647
val handler = new RBackendHandler(this)
4748

core/src/main/scala/org/apache/spark/api/r/SerDe.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.api.r
1919

2020
import java.io.{DataInputStream, DataOutputStream}
21-
import java.sql.{Date, Time}
21+
import java.sql.{Timestamp, Date, Time}
2222

2323
import scala.collection.JavaConversions._
2424

@@ -107,9 +107,12 @@ private[spark] object SerDe {
107107
Date.valueOf(readString(in))
108108
}
109109

110-
def readTime(in: DataInputStream): Time = {
111-
val t = in.readDouble()
112-
new Time((t * 1000L).toLong)
110+
def readTime(in: DataInputStream): Timestamp = {
111+
val seconds = in.readDouble()
112+
val sec = Math.floor(seconds).toLong
113+
val t = new Timestamp(sec * 1000L)
114+
t.setNanos(((seconds - sec) * 1e9).toInt)
115+
t
113116
}
114117

115118
def readBytesArr(in: DataInputStream): Array[Array[Byte]] = {
@@ -227,6 +230,9 @@ private[spark] object SerDe {
227230
case "java.sql.Time" =>
228231
writeType(dos, "time")
229232
writeTime(dos, value.asInstanceOf[Time])
233+
case "java.sql.Timestamp" =>
234+
writeType(dos, "time")
235+
writeTime(dos, value.asInstanceOf[Timestamp])
230236
case "[B" =>
231237
writeType(dos, "raw")
232238
writeBytes(dos, value.asInstanceOf[Array[Byte]])
@@ -289,6 +295,9 @@ private[spark] object SerDe {
289295
out.writeDouble(value.getTime.toDouble / 1000.0)
290296
}
291297

298+
def writeTime(out: DataOutputStream, value: Timestamp): Unit = {
299+
out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9)
300+
}
292301

293302
// NOTE: Only works for ASCII right now
294303
def writeString(out: DataOutputStream, value: String): Unit = {

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 17 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -324,55 +324,20 @@ object SparkSubmit {
324324
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
325325
args.mainClass = "org.apache.spark.deploy.PythonRunner"
326326
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
327-
args.files = mergeFileLists(args.files, args.primaryResource)
327+
if (clusterManager != YARN) {
328+
// The YARN backend distributes the primary file differently, so don't merge it.
329+
args.files = mergeFileLists(args.files, args.primaryResource)
330+
}
331+
}
332+
if (clusterManager != YARN) {
333+
// The YARN backend handles python files differently, so don't merge the lists.
334+
args.files = mergeFileLists(args.files, args.pyFiles)
328335
}
329-
args.files = mergeFileLists(args.files, args.pyFiles)
330336
if (args.pyFiles != null) {
331337
sysProps("spark.submit.pyFiles") = args.pyFiles
332338
}
333339
}
334340

335-
// In yarn mode for a python app, add pyspark archives to files
336-
// that can be distributed with the job
337-
if (args.isPython && clusterManager == YARN) {
338-
var pyArchives: String = null
339-
val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH")
340-
if (pyArchivesEnvOpt.isDefined) {
341-
pyArchives = pyArchivesEnvOpt.get
342-
} else {
343-
if (!sys.env.contains("SPARK_HOME")) {
344-
printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.")
345-
}
346-
val pythonPath = new ArrayBuffer[String]
347-
for (sparkHome <- sys.env.get("SPARK_HOME")) {
348-
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
349-
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
350-
if (!pyArchivesFile.exists()) {
351-
printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.")
352-
}
353-
val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
354-
if (!py4jFile.exists()) {
355-
printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " +
356-
"in yarn mode.")
357-
}
358-
pythonPath += pyArchivesFile.getAbsolutePath()
359-
pythonPath += py4jFile.getAbsolutePath()
360-
}
361-
pyArchives = pythonPath.mkString(",")
362-
}
363-
364-
pyArchives = pyArchives.split(",").map { localPath =>
365-
val localURI = Utils.resolveURI(localPath)
366-
if (localURI.getScheme != "local") {
367-
args.files = mergeFileLists(args.files, localURI.toString)
368-
new Path(localPath).getName
369-
} else {
370-
localURI.getPath
371-
}
372-
}.mkString(File.pathSeparator)
373-
sysProps("spark.submit.pyArchives") = pyArchives
374-
}
375-
376341
// If we're running a R app, set the main class to our specific R runner
377342
if (args.isR && deployMode == CLIENT) {
378343
if (args.primaryResource == SPARKR_SHELL) {
@@ -386,19 +351,10 @@ object SparkSubmit {
386351
}
387352
}
388353

389-
if (isYarnCluster) {
390-
// In yarn-cluster mode for a python app, add primary resource and pyFiles to files
391-
// that can be distributed with the job
392-
if (args.isPython) {
393-
args.files = mergeFileLists(args.files, args.primaryResource)
394-
args.files = mergeFileLists(args.files, args.pyFiles)
395-
}
396-
354+
if (isYarnCluster && args.isR) {
397355
// In yarn-cluster mode for a R app, add primary resource to files
398356
// that can be distributed with the job
399-
if (args.isR) {
400-
args.files = mergeFileLists(args.files, args.primaryResource)
401-
}
357+
args.files = mergeFileLists(args.files, args.primaryResource)
402358
}
403359

404360
// Special flag to avoid deprecation warnings at the client
@@ -515,17 +471,18 @@ object SparkSubmit {
515471
}
516472
}
517473

474+
// Let YARN know it's a pyspark app, so it distributes needed libraries.
475+
if (clusterManager == YARN && args.isPython) {
476+
sysProps.put("spark.yarn.isPython", "true")
477+
}
478+
518479
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
519480
if (isYarnCluster) {
520481
childMainClass = "org.apache.spark.deploy.yarn.Client"
521482
if (args.isPython) {
522-
val mainPyFile = new Path(args.primaryResource).getName
523-
childArgs += ("--primary-py-file", mainPyFile)
483+
childArgs += ("--primary-py-file", args.primaryResource)
524484
if (args.pyFiles != null) {
525-
// These files will be distributed to each machine's working directory, so strip the
526-
// path prefix
527-
val pyFilesNames = args.pyFiles.split(",").map(p => (new Path(p)).getName).mkString(",")
528-
childArgs += ("--py-files", pyFilesNames)
485+
childArgs += ("--py-files", args.pyFiles)
529486
}
530487
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
531488
} else if (args.isR) {

core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,9 @@ private[spark] object RDDOperationScope extends Logging {
9595
private[spark] def withScope[T](
9696
sc: SparkContext,
9797
allowNesting: Boolean = false)(body: => T): T = {
98-
val stackTrace = Thread.currentThread.getStackTrace().tail // ignore "Thread#getStackTrace"
99-
val ourMethodName = stackTrace(1).getMethodName // i.e. withScope
100-
// Climb upwards to find the first method that's called something different
101-
val callerMethodName = stackTrace
98+
val ourMethodName = "withScope"
99+
val callerMethodName = Thread.currentThread.getStackTrace()
100+
.dropWhile(_.getMethodName != ourMethodName)
102101
.find(_.getMethodName != ourMethodName)
103102
.map(_.getMethodName)
104103
.getOrElse {

0 commit comments

Comments
 (0)