Skip to content

Commit 8981de1

Browse files
committed
Merge branch 'master' of github.com:apache/spark into fix-drop-events
2 parents af19bc0 + baf9ce1 commit 8981de1

File tree

132 files changed

+3050
-730
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

132 files changed

+3050
-730
lines changed

core/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,8 @@
192192
</dependency>
193193
<dependency>
194194
<groupId>org.tachyonproject</groupId>
195-
<artifactId>tachyon</artifactId>
196-
<version>0.4.1-thrift</version>
195+
<artifactId>tachyon-client</artifactId>
196+
<version>0.5.0</version>
197197
<exclusions>
198198
<exclusion>
199199
<groupId>org.apache.hadoop</groupId>

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ abstract class Dependency[T] extends Serializable {
3434

3535
/**
3636
* :: DeveloperApi ::
37-
* Base class for dependencies where each partition of the parent RDD is used by at most one
38-
* partition of the child RDD. Narrow dependencies allow for pipelined execution.
37+
* Base class for dependencies where each partition of the child RDD depends on a small number
38+
* of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
3939
*/
4040
@DeveloperApi
4141
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import akka.actor.Actor
21+
import org.apache.spark.executor.TaskMetrics
22+
import org.apache.spark.storage.BlockManagerId
23+
import org.apache.spark.scheduler.TaskScheduler
24+
25+
/**
26+
* A heartbeat from executors to the driver. This is a shared message used by several internal
27+
* components to convey liveness or execution information for in-progress tasks.
28+
*/
29+
private[spark] case class Heartbeat(
30+
executorId: String,
31+
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
32+
blockManagerId: BlockManagerId)
33+
34+
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
35+
36+
/**
37+
* Lives in the driver to receive heartbeats from executors..
38+
*/
39+
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor {
40+
override def receive = {
41+
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
42+
val response = HeartbeatResponse(
43+
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
44+
sender ! response
45+
}
46+
}

core/src/main/scala/org/apache/spark/Logging.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,7 @@ trait Logging {
4545
initializeIfNecessary()
4646
var className = this.getClass.getName
4747
// Ignore trailing $'s in the class names for Scala objects
48-
if (className.endsWith("$")) {
49-
className = className.substring(0, className.length - 1)
50-
}
51-
log_ = LoggerFactory.getLogger(className)
48+
log_ = LoggerFactory.getLogger(className.stripSuffix("$"))
5249
}
5350
log_
5451
}
@@ -110,23 +107,27 @@ trait Logging {
110107
}
111108

112109
private def initializeLogging() {
113-
// If Log4j is being used, but is not initialized, load a default properties file
114-
val binder = StaticLoggerBinder.getSingleton
115-
val usingLog4j = binder.getLoggerFactoryClassStr.endsWith("Log4jLoggerFactory")
116-
val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
117-
if (!log4jInitialized && usingLog4j) {
110+
// Don't use a logger in here, as this is itself occurring during initialization of a logger
111+
// If Log4j 1.2 is being used, but is not initialized, load a default properties file
112+
val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
113+
// This distinguishes the log4j 1.2 binding, currently
114+
// org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently
115+
// org.apache.logging.slf4j.Log4jLoggerFactory
116+
val usingLog4j12 = "org.slf4j.impl.Log4jLoggerFactory".equals(binderClass)
117+
val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
118+
if (!log4j12Initialized && usingLog4j12) {
118119
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
119120
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
120121
case Some(url) =>
121122
PropertyConfigurator.configure(url)
122-
log.info(s"Using Spark's default log4j profile: $defaultLogProps")
123+
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
123124
case None =>
124125
System.err.println(s"Spark was unable to load $defaultLogProps")
125126
}
126127
}
127128
Logging.initialized = true
128129

129-
// Force a call into slf4j to initialize it. Avoids this happening from mutliple threads
130+
// Force a call into slf4j to initialize it. Avoids this happening from multiple threads
130131
// and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
131132
log
132133
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence
3636
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
3737
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
3838
import org.apache.mesos.MesosNativeLibrary
39+
import akka.actor.Props
3940

4041
import org.apache.spark.annotation.{DeveloperApi, Experimental}
4142
import org.apache.spark.broadcast.Broadcast
@@ -307,6 +308,8 @@ class SparkContext(config: SparkConf) extends Logging {
307308

308309
// Create and start the scheduler
309310
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
311+
private val heartbeatReceiver = env.actorSystem.actorOf(
312+
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
310313
@volatile private[spark] var dagScheduler: DAGScheduler = _
311314
try {
312315
dagScheduler = new DAGScheduler(this)
@@ -455,7 +458,7 @@ class SparkContext(config: SparkConf) extends Logging {
455458
/** Distribute a local Scala collection to form an RDD, with one or more
456459
* location preferences (hostnames of Spark nodes) for each object.
457460
* Create a new partition for each collection item. */
458-
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
461+
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
459462
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
460463
new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
461464
}
@@ -992,7 +995,9 @@ class SparkContext(config: SparkConf) extends Logging {
992995
val dagSchedulerCopy = dagScheduler
993996
dagScheduler = null
994997
if (dagSchedulerCopy != null) {
998+
env.metricsSystem.report()
995999
metadataCleaner.cancel()
1000+
env.actorSystem.stop(heartbeatReceiver)
9961001
cleaner.foreach(_.stop())
9971002
dagSchedulerCopy.stop()
9981003
taskScheduler = null
@@ -1453,9 +1458,9 @@ object SparkContext extends Logging {
14531458
/** Creates a task scheduler based on a given master URL. Extracted for testing. */
14541459
private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
14551460
// Regular expression used for local[N] and local[*] master formats
1456-
val LOCAL_N_REGEX = """local\[([0-9\*]+)\]""".r
1461+
val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
14571462
// Regular expression for local[N, maxRetries], used in tests with failing tasks
1458-
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r
1463+
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
14591464
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
14601465
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
14611466
// Regular expression for connecting to Spark deploy clusters
@@ -1485,8 +1490,12 @@ object SparkContext extends Logging {
14851490
scheduler
14861491

14871492
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
1493+
def localCpuCount = Runtime.getRuntime.availableProcessors()
1494+
// local[*, M] means the number of cores on the computer with M failures
1495+
// local[N, M] means exactly N threads with M failures
1496+
val threadCount = if (threads == "*") localCpuCount else threads.toInt
14881497
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
1489-
val backend = new LocalBackend(scheduler, threads.toInt)
1498+
val backend = new LocalBackend(scheduler, threadCount)
14901499
scheduler.initialize(backend)
14911500
scheduler
14921501

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -193,13 +193,7 @@ object SparkEnv extends Logging {
193193
logInfo("Registering " + name)
194194
actorSystem.actorOf(Props(newActor), name = name)
195195
} else {
196-
val driverHost: String = conf.get("spark.driver.host", "localhost")
197-
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
198-
Utils.checkHost(driverHost, "Expected hostname")
199-
val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
200-
val timeout = AkkaUtils.lookupTimeout(conf)
201-
logInfo(s"Connecting to $name: $url")
202-
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
196+
AkkaUtils.makeDriverRef(name, conf, actorSystem)
203197
}
204198
}
205199

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
783783
sortByKey(comp, ascending)
784784
}
785785

786+
/**
787+
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
788+
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
789+
* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
790+
* order of the keys).
791+
*/
792+
def sortByKey(ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V] = {
793+
val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
794+
sortByKey(comp, ascending, numPartitions)
795+
}
796+
786797
/**
787798
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
788799
* `collect` or `save` on the resulting RDD will return or output an ordered list of records

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -731,19 +731,30 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
731731

732732
val bufferSize = SparkEnv.get.conf.getInt("spark.buffer.size", 65536)
733733

734+
/**
735+
* We try to reuse a single Socket to transfer accumulator updates, as they are all added
736+
* by the DAGScheduler's single-threaded actor anyway.
737+
*/
738+
@transient var socket: Socket = _
739+
740+
def openSocket(): Socket = synchronized {
741+
if (socket == null || socket.isClosed) {
742+
socket = new Socket(serverHost, serverPort)
743+
}
744+
socket
745+
}
746+
734747
override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
735748

736749
override def addInPlace(val1: JList[Array[Byte]], val2: JList[Array[Byte]])
737-
: JList[Array[Byte]] = {
750+
: JList[Array[Byte]] = synchronized {
738751
if (serverHost == null) {
739752
// This happens on the worker node, where we just want to remember all the updates
740753
val1.addAll(val2)
741754
val1
742755
} else {
743756
// This happens on the master, where we pass the updates to Python through a socket
744-
val socket = new Socket(serverHost, serverPort)
745-
// SPARK-2282: Immediately reuse closed sockets because we create one per task.
746-
socket.setReuseAddress(true)
757+
val socket = openSocket()
747758
val in = socket.getInputStream
748759
val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize))
749760
out.writeInt(val2.size)
@@ -757,7 +768,6 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
757768
if (byteRead == -1) {
758769
throw new SparkException("EOF reached before Python server acknowledged")
759770
}
760-
socket.close()
761771
null
762772
}
763773
}

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ private[spark] class Master(
154154
}
155155

156156
override def postStop() {
157+
masterMetricsSystem.report()
158+
applicationMetricsSystem.report()
157159
// prevent the CompleteRecovery message sending to restarted master
158160
if (recoveryCompletionTask != null) {
159161
recoveryCompletionTask.cancel()

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ private[spark] class Worker(
357357
}
358358

359359
override def postStop() {
360+
metricsSystem.report()
360361
registrationRetryTimer.foreach(_.cancel())
361362
executors.values.foreach(_.kill())
362363
drivers.values.foreach(_.kill())

0 commit comments

Comments
 (0)