Skip to content

Commit 6df89a9

Browse files
committed
Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
2 parents 4562c08 + baf9ce1 commit 6df89a9

File tree

63 files changed

+1131
-323
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+1131
-323
lines changed

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ abstract class Dependency[T] extends Serializable {
3434

3535
/**
3636
* :: DeveloperApi ::
37-
* Base class for dependencies where each partition of the parent RDD is used by at most one
38-
* partition of the child RDD. Narrow dependencies allow for pipelined execution.
37+
* Base class for dependencies where each partition of the child RDD depends on a small number
38+
* of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
3939
*/
4040
@DeveloperApi
4141
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import akka.actor.Actor
21+
import org.apache.spark.executor.TaskMetrics
22+
import org.apache.spark.storage.BlockManagerId
23+
import org.apache.spark.scheduler.TaskScheduler
24+
25+
/**
26+
* A heartbeat from executors to the driver. This is a shared message used by several internal
27+
* components to convey liveness or execution information for in-progress tasks.
28+
*/
29+
private[spark] case class Heartbeat(
30+
executorId: String,
31+
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
32+
blockManagerId: BlockManagerId)
33+
34+
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
35+
36+
/**
37+
* Lives in the driver to receive heartbeats from executors..
38+
*/
39+
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor {
40+
override def receive = {
41+
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
42+
val response = HeartbeatResponse(
43+
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
44+
sender ! response
45+
}
46+
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence
3636
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
3737
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
3838
import org.apache.mesos.MesosNativeLibrary
39+
import akka.actor.Props
3940

4041
import org.apache.spark.annotation.{DeveloperApi, Experimental}
4142
import org.apache.spark.broadcast.Broadcast
@@ -307,6 +308,8 @@ class SparkContext(config: SparkConf) extends Logging {
307308

308309
// Create and start the scheduler
309310
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
311+
private val heartbeatReceiver = env.actorSystem.actorOf(
312+
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
310313
@volatile private[spark] var dagScheduler: DAGScheduler = _
311314
try {
312315
dagScheduler = new DAGScheduler(this)
@@ -455,7 +458,7 @@ class SparkContext(config: SparkConf) extends Logging {
455458
/** Distribute a local Scala collection to form an RDD, with one or more
456459
* location preferences (hostnames of Spark nodes) for each object.
457460
* Create a new partition for each collection item. */
458-
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
461+
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
459462
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
460463
new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
461464
}
@@ -992,6 +995,7 @@ class SparkContext(config: SparkConf) extends Logging {
992995
if (dagSchedulerCopy != null) {
993996
env.metricsSystem.report()
994997
metadataCleaner.cancel()
998+
env.actorSystem.stop(heartbeatReceiver)
995999
cleaner.foreach(_.stop())
9961000
dagSchedulerCopy.stop()
9971001
taskScheduler = null

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -193,13 +193,7 @@ object SparkEnv extends Logging {
193193
logInfo("Registering " + name)
194194
actorSystem.actorOf(Props(newActor), name = name)
195195
} else {
196-
val driverHost: String = conf.get("spark.driver.host", "localhost")
197-
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
198-
Utils.checkHost(driverHost, "Expected hostname")
199-
val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
200-
val timeout = AkkaUtils.lookupTimeout(conf)
201-
logInfo(s"Connecting to $name: $url")
202-
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
196+
AkkaUtils.makeDriverRef(name, conf, actorSystem)
203197
}
204198
}
205199

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer
2323
import java.util.concurrent._
2424

2525
import scala.collection.JavaConversions._
26-
import scala.collection.mutable.HashMap
26+
import scala.collection.mutable.{ArrayBuffer, HashMap}
2727

2828
import org.apache.spark._
2929
import org.apache.spark.scheduler._
@@ -48,6 +48,8 @@ private[spark] class Executor(
4848

4949
private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
5050

51+
@volatile private var isStopped = false
52+
5153
// No ip or host:port - just hostname
5254
Utils.checkHost(slaveHostname, "Expected executed slave to be a hostname")
5355
// must not have port specified.
@@ -107,6 +109,8 @@ private[spark] class Executor(
107109
// Maintains the list of running tasks.
108110
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
109111

112+
startDriverHeartbeater()
113+
110114
def launchTask(
111115
context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
112116
val tr = new TaskRunner(context, taskId, taskName, serializedTask)
@@ -121,8 +125,10 @@ private[spark] class Executor(
121125
}
122126
}
123127

124-
def stop(): Unit = {
128+
def stop() {
125129
env.metricsSystem.report()
130+
isStopped = true
131+
threadPool.shutdown()
126132
}
127133

128134
/** Get the Yarn approved local directories. */
@@ -141,11 +147,12 @@ private[spark] class Executor(
141147
}
142148

143149
class TaskRunner(
144-
execBackend: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer)
150+
execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer)
145151
extends Runnable {
146152

147153
@volatile private var killed = false
148-
@volatile private var task: Task[Any] = _
154+
@volatile var task: Task[Any] = _
155+
@volatile var attemptedTask: Option[Task[Any]] = None
149156

150157
def kill(interruptThread: Boolean) {
151158
logInfo(s"Executor is trying to kill $taskName (TID $taskId)")
@@ -162,7 +169,6 @@ private[spark] class Executor(
162169
val ser = SparkEnv.get.closureSerializer.newInstance()
163170
logInfo(s"Running $taskName (TID $taskId)")
164171
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
165-
var attemptedTask: Option[Task[Any]] = None
166172
var taskStart: Long = 0
167173
def gcTime = ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
168174
val startGCTime = gcTime
@@ -204,7 +210,6 @@ private[spark] class Executor(
204210
val afterSerialization = System.currentTimeMillis()
205211

206212
for (m <- task.metrics) {
207-
m.hostname = Utils.localHostName()
208213
m.executorDeserializeTime = taskStart - startTime
209214
m.executorRunTime = taskFinish - taskStart
210215
m.jvmGCTime = gcTime - startGCTime
@@ -354,4 +359,42 @@ private[spark] class Executor(
354359
}
355360
}
356361
}
362+
363+
def startDriverHeartbeater() {
364+
val interval = conf.getInt("spark.executor.heartbeatInterval", 10000)
365+
val timeout = AkkaUtils.lookupTimeout(conf)
366+
val retryAttempts = AkkaUtils.numRetries(conf)
367+
val retryIntervalMs = AkkaUtils.retryWaitMs(conf)
368+
val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf, env.actorSystem)
369+
370+
val t = new Thread() {
371+
override def run() {
372+
// Sleep a random interval so the heartbeats don't end up in sync
373+
Thread.sleep(interval + (math.random * interval).asInstanceOf[Int])
374+
375+
while (!isStopped) {
376+
val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()
377+
for (taskRunner <- runningTasks.values()) {
378+
if (!taskRunner.attemptedTask.isEmpty) {
379+
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
380+
tasksMetrics += ((taskRunner.taskId, metrics))
381+
}
382+
}
383+
}
384+
385+
val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
386+
val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
387+
retryAttempts, retryIntervalMs, timeout)
388+
if (response.reregisterBlockManager) {
389+
logWarning("Told to re-register on heartbeat")
390+
env.blockManager.reregister()
391+
}
392+
Thread.sleep(interval)
393+
}
394+
}
395+
}
396+
t.setDaemon(true)
397+
t.setName("Driver Heartbeater")
398+
t.start()
399+
}
357400
}

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
2323
/**
2424
* :: DeveloperApi ::
2525
* Metrics tracked during the execution of a task.
26+
*
27+
* This class is used to house metrics both for in-progress and completed tasks. In executors,
28+
* both the task thread and the heartbeat thread write to the TaskMetrics. The heartbeat thread
29+
* reads it to send in-progress metrics, and the task thread reads it to send metrics along with
30+
* the completed task.
31+
*
32+
* So, when adding new fields, take into consideration that the whole object can be serialized for
33+
* shipping off at any time to consumers of the SparkListener interface.
2634
*/
2735
@DeveloperApi
2836
class TaskMetrics extends Serializable {
@@ -143,7 +151,7 @@ class ShuffleReadMetrics extends Serializable {
143151
/**
144152
* Absolute time when this task finished reading shuffle data
145153
*/
146-
var shuffleFinishTime: Long = _
154+
var shuffleFinishTime: Long = -1
147155

148156
/**
149157
* Number of blocks fetched in this shuffle by this task (remote or local)

core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ private[spark] case class CoalescedRDDPartition(
4949
}
5050

5151
/**
52-
* Computes how many of the parents partitions have getPreferredLocation
53-
* as one of their preferredLocations
52+
* Computes the fraction of the parents' partitions containing preferredLocation within
53+
* their getPreferredLocs.
5454
* @return locality of this coalesced partition between 0 and 1
5555
*/
5656
def localFraction: Double = {

0 commit comments

Comments
 (0)