Skip to content

Commit 98c668a

Browse files
committed
Added failure handling and fixed unit tests.
1 parent ae05fcd commit 98c668a

File tree

5 files changed

+27
-24
lines changed

5 files changed

+27
-24
lines changed

core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,5 @@ trait BlockFetchingListener extends EventListener {
3333
/**
3434
* Called upon failures.
3535
*/
36-
def onBlockFetchFailure(exception: Exception): Unit
36+
def onBlockFetchFailure(exception: Throwable): Unit
3737
}

core/src/main/scala/org/apache/spark/network/BlockTransferService.scala

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ abstract class BlockTransferService {
4444
def hostName: String
4545

4646
/**
47-
* Fetch a sequence of blocks from a remote node, available only after [[init]] is invoked.
47+
* Fetch a sequence of blocks from a remote node asynchronously,
48+
* available only after [[init]] is invoked.
49+
*
50+
* Note that [[BlockFetchingListener.onBlockFetchSuccess]] is called once per block,
51+
* while [[BlockFetchingListener.onBlockFetchSuccess]] is called once per failure.
4852
*
4953
* This takes a sequence so the implementation can batch requests.
5054
*/
@@ -55,19 +59,17 @@ abstract class BlockTransferService {
5559
listener: BlockFetchingListener): Unit
5660

5761
/**
58-
* Fetch a single block from a remote node, available only after [[init]] is invoked.
59-
*
60-
* This is functionally equivalent to
61-
* {{{
62-
* fetchBlocks(hostName, port, Seq(blockId)).iterator().next()._2
63-
* }}}
62+
* Fetch a single block from a remote node, synchronously,
63+
* available only after [[init]] is invoked.
6464
*/
6565
def fetchBlock(hostName: String, port: Int, blockId: String): ManagedBuffer = {
6666
// TODO(rxin): Add timeout?
67+
68+
// A monitor for the thread to wait on.
6769
val lock = new Object
68-
@volatile var result: Either[ManagedBuffer, Exception] = null
70+
@volatile var result: Either[ManagedBuffer, Throwable] = null
6971
fetchBlocks(hostName, port, Seq(blockId), new BlockFetchingListener {
70-
override def onBlockFetchFailure(exception: Exception): Unit = {
72+
override def onBlockFetchFailure(exception: Throwable): Unit = {
7173
lock.synchronized {
7274
result = Right(exception)
7375
lock.notify()
@@ -93,8 +95,8 @@ abstract class BlockTransferService {
9395
}
9496

9597
result match {
96-
case Left(data: ManagedBuffer) => data
97-
case Right(e: Exception) => throw e
98+
case Left(data) => data
99+
case Right(e) => throw e
98100
}
99101
}
100102

core/src/main/scala/org/apache/spark/network/cm/CMBlockTransferService.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan
8484

8585
val future = cm.sendMessageReliably(cmId, blockMessageArray.toBufferMessage)
8686

87-
// If succeeds in getting blocks from a remote connection manager, put the block in results.
87+
// Register the listener on success/failure future callback.
8888
future.onSuccess { case message =>
8989
val bufferMessage = message.asInstanceOf[BufferMessage]
9090
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
@@ -101,6 +101,10 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan
101101
}
102102
}
103103
}(cm.futureExecContext)
104+
105+
future.onFailure { case exception =>
106+
listener.onBlockFetchFailure(exception)
107+
}(cm.futureExecContext)
104108
}
105109

106110
/**

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,18 +116,17 @@ final class ShuffleBlockFetcherIterator(
116116
logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
117117
}
118118

119-
override def onBlockFetchFailure(exception: Exception): Unit = {
120-
119+
override def onBlockFetchFailure(e: Throwable): Unit = {
120+
logError("Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
121+
// Note that there is a chance that some blocks have been fetched successfully, but we
122+
// still add them to the failed queue. This is fine because when the caller see a
123+
// FetchFailedException, it is going to fail the entire task anyway.
124+
for ((blockId, size) <- req.blocks) {
125+
results.put(new FetchResult(blockId, -1, null))
126+
}
121127
}
122128
}
123129
)
124-
// case Failure(exception) => {
125-
// logError("Could not get block(s) from " + cmId, exception)
126-
// for ((blockId, size) <- req.blocks) {
127-
// results.put(new FetchResult(blockId, -1, null))
128-
// }
129-
// }
130-
// }
131130
}
132131

133132
private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {

core/src/test/scala/org/apache/spark/DistributedSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,6 @@ object DistributedSuite {
341341
// Act like an identity function, but if the argument is true, set mark to true.
342342
def markNodeIfIdentity(item: Boolean): Boolean = {
343343
if (item) {
344-
println("marking node!!!!!!!!!!!!!!!")
345344
assert(!amMaster)
346345
mark = true
347346
}
@@ -352,7 +351,6 @@ object DistributedSuite {
352351
// crashing the entire JVM.
353352
def failOnMarkedIdentity(item: Boolean): Boolean = {
354353
if (mark) {
355-
println("failing node !!!!!!!!!!!!!!!")
356354
System.exit(42)
357355
}
358356
item

0 commit comments

Comments
 (0)