Skip to content

Commit fd849b0

Browse files
committed
Merge remote-tracking branch 'apache/master' into scala-2.11-prashant
2 parents 37d972c + d4fa04e commit fd849b0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+788
-314
lines changed

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,20 @@ pre {
120120
border: none;
121121
}
122122

123+
.stacktrace-details {
124+
max-height: 300px;
125+
overflow-y: auto;
126+
margin: 0;
127+
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
128+
}
129+
130+
.stacktrace-details.collapsed {
131+
max-height: 0;
132+
padding-top: 0;
133+
padding-bottom: 0;
134+
border: none;
135+
}
136+
123137
span.expand-additional-metrics {
124138
cursor: pointer;
125139
}

core/src/main/scala/org/apache/spark/TaskEndReason.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,48 @@ case class FetchFailed(
8383
* :: DeveloperApi ::
8484
* Task failed due to a runtime exception. This is the most common failure case and also captures
8585
* user program exceptions.
86+
*
87+
* `stackTrace` contains the stack trace of the exception itself. It still exists for backward
88+
* compatibility. It's better to use `this(e: Throwable, metrics: Option[TaskMetrics])` to
89+
* create `ExceptionFailure` as it will handle the backward compatibility properly.
90+
*
91+
* `fullStackTrace` is a better representation of the stack trace because it contains the whole
92+
* stack trace including the exception and its causes
8693
*/
8794
@DeveloperApi
8895
case class ExceptionFailure(
8996
className: String,
9097
description: String,
9198
stackTrace: Array[StackTraceElement],
99+
fullStackTrace: String,
92100
metrics: Option[TaskMetrics])
93101
extends TaskFailedReason {
94-
override def toErrorString: String = Utils.exceptionString(className, description, stackTrace)
102+
103+
private[spark] def this(e: Throwable, metrics: Option[TaskMetrics]) {
104+
this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e), metrics)
105+
}
106+
107+
override def toErrorString: String =
108+
if (fullStackTrace == null) {
109+
// fullStackTrace is added in 1.2.0
110+
// If fullStackTrace is null, use the old error string for backward compatibility
111+
exceptionString(className, description, stackTrace)
112+
} else {
113+
fullStackTrace
114+
}
115+
116+
/**
117+
* Return a nice string representation of the exception, including the stack trace.
118+
* Note: It does not include the exception's causes, and is only used for backward compatibility.
119+
*/
120+
private def exceptionString(
121+
className: String,
122+
description: String,
123+
stackTrace: Array[StackTraceElement]): String = {
124+
val desc = if (description == null) "" else description
125+
val st = if (stackTrace == null) "" else stackTrace.map(" " + _).mkString("\n")
126+
s"$className: $desc\n$st"
127+
}
95128
}
96129

97130
/**

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ private[spark] class Executor(
263263
m.executorRunTime = serviceTime
264264
m.jvmGCTime = gcTime - startGCTime
265265
}
266-
val reason = ExceptionFailure(t.getClass.getName, t.getMessage, t.getStackTrace, metrics)
266+
val reason = new ExceptionFailure(t, metrics)
267267
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
268268

269269
// Don't forcibly exit unless the exception was inherently fatal, to avoid

core/src/main/scala/org/apache/spark/network/BlockTransferService.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
7373
def uploadBlock(
7474
hostname: String,
7575
port: Int,
76+
execId: String,
7677
blockId: BlockId,
7778
blockData: ManagedBuffer,
7879
level: StorageLevel): Future[Unit]
@@ -110,9 +111,10 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
110111
def uploadBlockSync(
111112
hostname: String,
112113
port: Int,
114+
execId: String,
113115
blockId: BlockId,
114116
blockData: ManagedBuffer,
115117
level: StorageLevel): Unit = {
116-
Await.result(uploadBlock(hostname, port, blockId, blockData, level), Duration.Inf)
118+
Await.result(uploadBlock(hostname, port, execId, blockId, blockData, level), Duration.Inf)
117119
}
118120
}

core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,10 @@ import org.apache.spark.network.BlockDataManager
2626
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
2727
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
2828
import org.apache.spark.network.server.{OneForOneStreamManager, RpcHandler, StreamManager}
29-
import org.apache.spark.network.shuffle.ShuffleStreamHandle
29+
import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, OpenBlocks, StreamHandle, UploadBlock}
3030
import org.apache.spark.serializer.Serializer
3131
import org.apache.spark.storage.{BlockId, StorageLevel}
3232

33-
object NettyMessages {
34-
/** Request to read a set of blocks. Returns [[ShuffleStreamHandle]] to identify the stream. */
35-
case class OpenBlocks(blockIds: Seq[BlockId])
36-
37-
/** Request to upload a block with a certain StorageLevel. Returns nothing (empty byte array). */
38-
case class UploadBlock(blockId: BlockId, blockData: Array[Byte], level: StorageLevel)
39-
}
40-
4133
/**
4234
* Serves requests to open blocks by simply registering one chunk per block requested.
4335
* Handles opening and uploading arbitrary BlockManager blocks.
@@ -50,28 +42,29 @@ class NettyBlockRpcServer(
5042
blockManager: BlockDataManager)
5143
extends RpcHandler with Logging {
5244

53-
import NettyMessages._
54-
5545
private val streamManager = new OneForOneStreamManager()
5646

5747
override def receive(
5848
client: TransportClient,
5949
messageBytes: Array[Byte],
6050
responseContext: RpcResponseCallback): Unit = {
61-
val ser = serializer.newInstance()
62-
val message = ser.deserialize[AnyRef](ByteBuffer.wrap(messageBytes))
51+
val message = BlockTransferMessage.Decoder.fromByteArray(messageBytes)
6352
logTrace(s"Received request: $message")
6453

6554
message match {
66-
case OpenBlocks(blockIds) =>
67-
val blocks: Seq[ManagedBuffer] = blockIds.map(blockManager.getBlockData)
55+
case openBlocks: OpenBlocks =>
56+
val blocks: Seq[ManagedBuffer] =
57+
openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
6858
val streamId = streamManager.registerStream(blocks.iterator)
6959
logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
70-
responseContext.onSuccess(
71-
ser.serialize(new ShuffleStreamHandle(streamId, blocks.size)).array())
60+
responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteArray)
7261

73-
case UploadBlock(blockId, blockData, level) =>
74-
blockManager.putBlockData(blockId, new NioManagedBuffer(ByteBuffer.wrap(blockData)), level)
62+
case uploadBlock: UploadBlock =>
63+
// StorageLevel is serialized as bytes using our JavaSerializer.
64+
val level: StorageLevel =
65+
serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata))
66+
val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
67+
blockManager.putBlockData(BlockId(uploadBlock.blockId), data, level)
7568
responseContext.onSuccess(new Array[Byte](0))
7669
}
7770
}

core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ import org.apache.spark.{SecurityManager, SparkConf}
2424
import org.apache.spark.network._
2525
import org.apache.spark.network.buffer.ManagedBuffer
2626
import org.apache.spark.network.client.{TransportClientBootstrap, RpcResponseCallback, TransportClientFactory}
27-
import org.apache.spark.network.netty.NettyMessages.{OpenBlocks, UploadBlock}
2827
import org.apache.spark.network.sasl.{SaslRpcHandler, SaslClientBootstrap}
2928
import org.apache.spark.network.server._
3029
import org.apache.spark.network.shuffle.{RetryingBlockFetcher, BlockFetchingListener, OneForOneBlockFetcher}
30+
import org.apache.spark.network.shuffle.protocol.UploadBlock
3131
import org.apache.spark.serializer.JavaSerializer
3232
import org.apache.spark.storage.{BlockId, StorageLevel}
3333
import org.apache.spark.util.Utils
@@ -46,6 +46,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
4646
private[this] var transportContext: TransportContext = _
4747
private[this] var server: TransportServer = _
4848
private[this] var clientFactory: TransportClientFactory = _
49+
private[this] var appId: String = _
4950

5051
override def init(blockDataManager: BlockDataManager): Unit = {
5152
val (rpcHandler: RpcHandler, bootstrap: Option[TransportClientBootstrap]) = {
@@ -60,6 +61,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
6061
transportContext = new TransportContext(transportConf, rpcHandler)
6162
clientFactory = transportContext.createClientFactory(bootstrap.toList)
6263
server = transportContext.createServer()
64+
appId = conf.getAppId
6365
logInfo("Server created on " + server.getPort)
6466
}
6567

@@ -74,8 +76,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
7476
val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
7577
override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
7678
val client = clientFactory.createClient(host, port)
77-
new OneForOneBlockFetcher(client, blockIds.toArray, listener)
78-
.start(OpenBlocks(blockIds.map(BlockId.apply)))
79+
new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener).start()
7980
}
8081
}
8182

@@ -101,12 +102,17 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
101102
override def uploadBlock(
102103
hostname: String,
103104
port: Int,
105+
execId: String,
104106
blockId: BlockId,
105107
blockData: ManagedBuffer,
106108
level: StorageLevel): Future[Unit] = {
107109
val result = Promise[Unit]()
108110
val client = clientFactory.createClient(hostname, port)
109111

112+
// StorageLevel is serialized as bytes using our JavaSerializer. Everything else is encoded
113+
// using our binary protocol.
114+
val levelBytes = serializer.newInstance().serialize(level).array()
115+
110116
// Convert or copy nio buffer into array in order to serialize it.
111117
val nioBuffer = blockData.nioByteBuffer()
112118
val array = if (nioBuffer.hasArray) {
@@ -117,8 +123,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
117123
data
118124
}
119125

120-
val ser = serializer.newInstance()
121-
client.sendRpc(ser.serialize(new UploadBlock(blockId, array, level)).array(),
126+
client.sendRpc(new UploadBlock(appId, execId, blockId.toString, levelBytes, array).toByteArray,
122127
new RpcResponseCallback {
123128
override def onSuccess(response: Array[Byte]): Unit = {
124129
logTrace(s"Successfully uploaded block $blockId")

core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
137137
override def uploadBlock(
138138
hostname: String,
139139
port: Int,
140+
execId: String,
140141
blockId: BlockId,
141142
blockData: ManagedBuffer,
142143
level: StorageLevel)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1063,7 +1063,7 @@ class DAGScheduler(
10631063
if (runningStages.contains(failedStage)) {
10641064
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
10651065
s"due to a fetch failure from $mapStage (${mapStage.name})")
1066-
markStageAsFinished(failedStage, Some("Fetch failure: " + failureMessage))
1066+
markStageAsFinished(failedStage, Some(failureMessage))
10671067
runningStages -= failedStage
10681068
}
10691069

@@ -1094,7 +1094,7 @@ class DAGScheduler(
10941094
handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
10951095
}
10961096

1097-
case ExceptionFailure(className, description, stackTrace, metrics) =>
1097+
case ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics) =>
10981098
// Do nothing here, left up to the TaskScheduler to decide how to handle user failures
10991099

11001100
case TaskResultLost =>

core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,21 @@ private[spark] class FetchFailedException(
3232
shuffleId: Int,
3333
mapId: Int,
3434
reduceId: Int,
35-
message: String)
36-
extends Exception(message) {
35+
message: String,
36+
cause: Throwable = null)
37+
extends Exception(message, cause) {
38+
39+
def this(
40+
bmAddress: BlockManagerId,
41+
shuffleId: Int,
42+
mapId: Int,
43+
reduceId: Int,
44+
cause: Throwable) {
45+
this(bmAddress, shuffleId, mapId, reduceId, cause.getMessage, cause)
46+
}
3747

38-
def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message)
48+
def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId,
49+
Utils.exceptionString(this))
3950
}
4051

4152
/**

core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark._
2525
import org.apache.spark.serializer.Serializer
2626
import org.apache.spark.shuffle.FetchFailedException
2727
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId}
28-
import org.apache.spark.util.{CompletionIterator, Utils}
28+
import org.apache.spark.util.CompletionIterator
2929

3030
private[hash] object BlockStoreShuffleFetcher extends Logging {
3131
def fetch[T](
@@ -64,8 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
6464
blockId match {
6565
case ShuffleBlockId(shufId, mapId, _) =>
6666
val address = statuses(mapId.toInt)._1
67-
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId,
68-
Utils.exceptionString(e))
67+
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
6968
case _ =>
7069
throw new SparkException(
7170
"Failed to get block " + blockId + ", which is not a shuffle block", e)

0 commit comments

Comments
 (0)