Skip to content

Commit 5c9581f

Browse files
committed
Merge branch 'master' into SPARK-1930
2 parents 9a6bcf2 + 9ecc40d commit 5c9581f

File tree

50 files changed

+2722
-1655
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2722
-1655
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ class SparkContext(config: SparkConf) extends Logging {
7676
* :: DeveloperApi ::
7777
* Alternative constructor for setting preferred locations where Spark will create executors.
7878
*
79-
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Ca
80-
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
79+
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on.
80+
* Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
8181
* from a list of input files or InputFormats for the application.
8282
*/
8383
@DeveloperApi

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,23 @@ private[spark] class ExecutorRunner(
6161
// Shutdown hook that kills actors on shutdown.
6262
shutdownHook = new Thread() {
6363
override def run() {
64-
killProcess()
64+
killProcess(Some("Worker shutting down"))
6565
}
6666
}
6767
Runtime.getRuntime.addShutdownHook(shutdownHook)
6868
}
6969

70-
private def killProcess() {
70+
/**
71+
* kill executor process, wait for exit and notify worker to update resource status
72+
*
73+
* @param message the exception message which caused the executor's death
74+
*/
75+
private def killProcess(message: Option[String]) {
7176
if (process != null) {
7277
logInfo("Killing process!")
7378
process.destroy()
74-
process.waitFor()
79+
val exitCode = process.waitFor()
80+
worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode))
7581
}
7682
}
7783

@@ -82,7 +88,6 @@ private[spark] class ExecutorRunner(
8288
workerThread.interrupt()
8389
workerThread = null
8490
state = ExecutorState.KILLED
85-
worker ! ExecutorStateChanged(appId, execId, state, None, None)
8691
Runtime.getRuntime.removeShutdownHook(shutdownHook)
8792
}
8893
}
@@ -148,14 +153,13 @@ private[spark] class ExecutorRunner(
148153
} catch {
149154
case interrupted: InterruptedException => {
150155
logInfo("Runner thread for executor " + fullId + " interrupted")
151-
killProcess()
156+
state = ExecutorState.KILLED
157+
killProcess(None)
152158
}
153159
case e: Exception => {
154160
logError("Error running executor", e)
155-
killProcess()
156161
state = ExecutorState.FAILED
157-
val message = e.getClass + ": " + e.getMessage
158-
worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)
162+
killProcess(Some(e.toString))
159163
}
160164
}
161165
}

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,14 @@ private[spark] class Worker(
317317
state match {
318318
case DriverState.ERROR =>
319319
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
320+
case DriverState.FAILED =>
321+
logWarning(s"Driver $driverId exited with failure")
320322
case DriverState.FINISHED =>
321323
logInfo(s"Driver $driverId exited successfully")
322324
case DriverState.KILLED =>
323325
logInfo(s"Driver $driverId was killed by user")
326+
case _ =>
327+
logDebug(s"Driver $driverId changed state to $state")
324328
}
325329
masterLock.synchronized {
326330
master ! DriverStateChanged(driverId, state, exception)

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ import java.nio.ByteBuffer
2222
import akka.actor._
2323
import akka.remote._
2424

25-
import org.apache.spark.{Logging, SecurityManager, SparkConf}
25+
import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf}
2626
import org.apache.spark.TaskState.TaskState
2727
import org.apache.spark.deploy.SparkHadoopUtil
2828
import org.apache.spark.deploy.worker.WorkerWatcher
2929
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
30+
import org.apache.spark.scheduler.TaskDescription
3031
import org.apache.spark.util.{AkkaUtils, Utils}
3132

3233
private[spark] class CoarseGrainedExecutorBackend(
@@ -61,12 +62,14 @@ private[spark] class CoarseGrainedExecutorBackend(
6162
logError("Slave registration failed: " + message)
6263
System.exit(1)
6364

64-
case LaunchTask(taskDesc) =>
65-
logInfo("Got assigned task " + taskDesc.taskId)
65+
case LaunchTask(data) =>
6666
if (executor == null) {
6767
logError("Received LaunchTask command but executor was null")
6868
System.exit(1)
6969
} else {
70+
val ser = SparkEnv.get.closureSerializer.newInstance()
71+
val taskDesc = ser.deserialize[TaskDescription](data.value)
72+
logInfo("Got assigned task " + taskDesc.taskId)
7073
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
7174
}
7275

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
2828
private[spark] object CoarseGrainedClusterMessages {
2929

3030
// Driver to executors
31-
case class LaunchTask(task: TaskDescription) extends CoarseGrainedClusterMessage
31+
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
3232

3333
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
3434
extends CoarseGrainedClusterMessage

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ import akka.actor._
2727
import akka.pattern.ask
2828
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
2929

30-
import org.apache.spark.{Logging, SparkException, TaskState}
30+
import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
3131
import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
3232
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
33-
import org.apache.spark.util.{AkkaUtils, Utils}
33+
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
3434

3535
/**
3636
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -48,6 +48,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
4848
var totalCoreCount = new AtomicInteger(0)
4949
val conf = scheduler.sc.conf
5050
private val timeout = AkkaUtils.askTimeout(conf)
51+
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
5152

5253
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
5354
private val executorActor = new HashMap[String, ActorRef]
@@ -140,8 +141,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
140141
// Launch tasks returned by a set of resource offers
141142
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
142143
for (task <- tasks.flatten) {
143-
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
144-
executorActor(task.executorId) ! LaunchTask(task)
144+
val ser = SparkEnv.get.closureSerializer.newInstance()
145+
val serializedTask = ser.serialize(task)
146+
if (serializedTask.limit >= akkaFrameSize - 1024) {
147+
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
148+
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
149+
try {
150+
var msg = "Serialized task %s:%d was %d bytes which " +
151+
"exceeds spark.akka.frameSize (%d bytes). " +
152+
"Consider using broadcast variables for large values."
153+
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize)
154+
taskSet.abort(msg)
155+
} catch {
156+
case e: Exception => logError("Exception in error callback", e)
157+
}
158+
}
159+
}
160+
else {
161+
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
162+
executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
163+
}
145164
}
146165
}
147166

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkContext}
21+
import org.apache.spark.util.{SerializableBuffer, AkkaUtils}
22+
23+
import org.scalatest.FunSuite
24+
25+
class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext {
26+
27+
test("serialized task larger than akka frame size") {
28+
val conf = new SparkConf
29+
conf.set("spark.akka.frameSize","1")
30+
conf.set("spark.default.parallelism","1")
31+
sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf)
32+
val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
33+
val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
34+
val larger = sc.parallelize(Seq(buffer))
35+
val thrown = intercept[SparkException] {
36+
larger.collect()
37+
}
38+
assert(thrown.getMessage.contains("Consider using broadcast variables for large values"))
39+
val smaller = sc.parallelize(1 to 4).collect()
40+
assert(smaller.size === 4)
41+
}
42+
43+
}

docs/_layouts/global.html

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@
99
<title>{{ page.title }} - Spark {{site.SPARK_VERSION_SHORT}} Documentation</title>
1010
<meta name="description" content="">
1111

12+
{% if page.redirect %}
13+
<meta http-equiv="refresh" content="0; url={{page.redirect}}">
14+
<link rel="canonical" href="{{page.redirect}}" />
15+
{% endif %}
16+
1217
<link rel="stylesheet" href="css/bootstrap.min.css">
1318
<style>
1419
body {
@@ -61,15 +66,13 @@
6166
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a>
6267
<ul class="dropdown-menu">
6368
<li><a href="quick-start.html">Quick Start</a></li>
64-
<li><a href="scala-programming-guide.html">Spark in Scala</a></li>
65-
<li><a href="java-programming-guide.html">Spark in Java</a></li>
66-
<li><a href="python-programming-guide.html">Spark in Python</a></li>
69+
<li><a href="programming-guide.html">Spark Programming Guide</a></li>
6770
<li class="divider"></li>
6871
<li><a href="streaming-programming-guide.html">Spark Streaming</a></li>
6972
<li><a href="sql-programming-guide.html">Spark SQL</a></li>
7073
<li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li>
71-
<li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li>
7274
<li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
75+
<li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li>
7376
</ul>
7477
</li>
7578

@@ -86,6 +89,8 @@
8689
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
8790
<ul class="dropdown-menu">
8891
<li><a href="cluster-overview.html">Overview</a></li>
92+
<li><a href="submitting-applications.html">Submitting Applications</a></li>
93+
<li class="divider"></li>
8994
<li><a href="ec2-scripts.html">Amazon EC2</a></li>
9095
<li><a href="spark-standalone.html">Standalone Mode</a></li>
9196
<li><a href="running-on-mesos.html">Mesos</a></li>
@@ -99,9 +104,10 @@
99104
<li><a href="configuration.html">Configuration</a></li>
100105
<li><a href="monitoring.html">Monitoring</a></li>
101106
<li><a href="tuning.html">Tuning Guide</a></li>
102-
<li><a href="hadoop-third-party-distributions.html">Running with CDH/HDP</a></li>
103-
<li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
104107
<li><a href="job-scheduling.html">Job Scheduling</a></li>
108+
<li><a href="security.html">Security</a></li>
109+
<li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
110+
<li><a href="hadoop-third-party-distributions.html">3<sup>rd</sup>-Party Hadoop Distros</a></li>
105111
<li class="divider"></li>
106112
<li><a href="building-with-maven.html">Building Spark with Maven</a></li>
107113
<li><a href="https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark">Contributing to Spark</a></li>

docs/bagel-programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ To use Bagel in your program, add the following SBT or Maven dependency:
2121

2222
# Programming Model
2323

24-
Bagel operates on a graph represented as a [distributed dataset](scala-programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.
24+
Bagel operates on a graph represented as a [distributed dataset](programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.
2525

2626
For example, we can use Bagel to implement PageRank. Here, vertices represent pages, edges represent links between pages, and messages represent shares of PageRank sent to the pages that a particular page links to.
2727

0 commit comments

Comments
 (0)