Skip to content

Commit 08ce188

Browse files
committed
[SPARK-3019] Pluggable block transfer interface (BlockTransferService)
This pull request creates a new BlockTransferService interface for block fetch/upload and refactors the existing ConnectionManager to implement BlockTransferService (NioBlockTransferService). Most of the changes are simply moving code around. The main class to inspect is ShuffleBlockFetcherIterator. Review guide: - Most of the ConnectionManager code is now in network.cm package - ManagedBuffer is a new buffer abstraction backed by several different implementations (file segment, nio ByteBuffer, Netty ByteBuf) - BlockTransferService is the main internal interface introduced in this PR - NioBlockTransferService implements BlockTransferService and replaces the old BlockManagerWorker - ShuffleBlockFetcherIterator replaces the told BlockFetcherIterator to use the new interface TODOs that should be separate PRs: - Implement NettyBlockTransferService - Finalize the API/semantics for ManagedBuffer.release() Author: Reynold Xin <[email protected]> Closes apache#2240 from rxin/blockTransferService and squashes the following commits: 64cd9d7 [Reynold Xin] Merge branch 'master' into blockTransferService 1dfd3d7 [Reynold Xin] Limit the length of the FileInputStream. 1332156 [Reynold Xin] Fixed style violation from refactoring. 2960c93 [Reynold Xin] Added ShuffleBlockFetcherIteratorSuite. e29c721 [Reynold Xin] Updated comment for ShuffleBlockFetcherIterator. 8a1046e [Reynold Xin] Code review feedback: 2c6b1e1 [Reynold Xin] Removed println in test cases. 2a907e4 [Reynold Xin] Merge branch 'master' into blockTransferService-merge 07ccf0d [Reynold Xin] Added init check to CMBlockTransferService. 98c668a [Reynold Xin] Added failure handling and fixed unit tests. ae05fcd [Reynold Xin] Updated tests, although DistributedSuite is hanging. d8d595c [Reynold Xin] Merge branch 'master' of github.com:apache/spark into blockTransferService 9ef279c [Reynold Xin] Initial refactoring to move ConnectionManager to use the BlockTransferService.
1 parent 939a322 commit 08ce188

38 files changed

+1129
-1273
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ import org.apache.spark.annotation.DeveloperApi
3131
import org.apache.spark.api.python.PythonWorkerFactory
3232
import org.apache.spark.broadcast.BroadcastManager
3333
import org.apache.spark.metrics.MetricsSystem
34-
import org.apache.spark.network.ConnectionManager
34+
import org.apache.spark.network.BlockTransferService
35+
import org.apache.spark.network.nio.NioBlockTransferService
3536
import org.apache.spark.scheduler.LiveListenerBus
3637
import org.apache.spark.serializer.Serializer
3738
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
@@ -59,8 +60,8 @@ class SparkEnv (
5960
val mapOutputTracker: MapOutputTracker,
6061
val shuffleManager: ShuffleManager,
6162
val broadcastManager: BroadcastManager,
63+
val blockTransferService: BlockTransferService,
6264
val blockManager: BlockManager,
63-
val connectionManager: ConnectionManager,
6465
val securityManager: SecurityManager,
6566
val httpFileServer: HttpFileServer,
6667
val sparkFilesDir: String,
@@ -88,6 +89,8 @@ class SparkEnv (
8889
// down, but let's call it anyway in case it gets fixed in a later release
8990
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
9091
// actorSystem.awaitTermination()
92+
93+
// Note that blockTransferService is stopped by BlockManager since it is started by it.
9194
}
9295

9396
private[spark]
@@ -223,14 +226,14 @@ object SparkEnv extends Logging {
223226

224227
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
225228

229+
val blockTransferService = new NioBlockTransferService(conf, securityManager)
230+
226231
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
227232
"BlockManagerMaster",
228233
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)
229234

230235
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
231-
serializer, conf, securityManager, mapOutputTracker, shuffleManager)
232-
233-
val connectionManager = blockManager.connectionManager
236+
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)
234237

235238
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
236239

@@ -278,8 +281,8 @@ object SparkEnv extends Logging {
278281
mapOutputTracker,
279282
shuffleManager,
280283
broadcastManager,
284+
blockTransferService,
281285
blockManager,
282-
connectionManager,
283286
securityManager,
284287
httpFileServer,
285288
sparkFilesDir,

core/src/main/scala/org/apache/spark/network/ReceiverTest.scala renamed to core/src/main/scala/org/apache/spark/network/BlockDataManager.scala

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,20 @@
1717

1818
package org.apache.spark.network
1919

20-
import java.nio.ByteBuffer
21-
import org.apache.spark.{SecurityManager, SparkConf}
20+
import org.apache.spark.storage.StorageLevel
2221

23-
private[spark] object ReceiverTest {
24-
def main(args: Array[String]) {
25-
val conf = new SparkConf
26-
val manager = new ConnectionManager(9999, conf, new SecurityManager(conf))
27-
println("Started connection manager with id = " + manager.id)
2822

29-
manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
30-
/* println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis) */
31-
val buffer = ByteBuffer.wrap("response".getBytes("utf-8"))
32-
Some(Message.createBufferMessage(buffer, msg.id))
33-
})
34-
Thread.currentThread.join()
35-
}
36-
}
23+
trait BlockDataManager {
24+
25+
/**
26+
* Interface to get local block data.
27+
*
28+
* @return Some(buffer) if the block exists locally, and None if it doesn't.
29+
*/
30+
def getBlockData(blockId: String): Option[ManagedBuffer]
3731

32+
/**
33+
* Put the block locally, using the given storage level.
34+
*/
35+
def putBlockData(blockId: String, data: ManagedBuffer, level: StorageLevel): Unit
36+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network
19+
20+
import java.util.EventListener
21+
22+
23+
/**
24+
* Listener callback interface for [[BlockTransferService.fetchBlocks]].
25+
*/
26+
trait BlockFetchingListener extends EventListener {
27+
28+
/**
29+
* Called once per successfully fetched block.
30+
*/
31+
def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit
32+
33+
/**
34+
* Called upon failures. For each failure, this is called only once (i.e. not once per block).
35+
*/
36+
def onBlockFetchFailure(exception: Throwable): Unit
37+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network
19+
20+
import scala.concurrent.{Await, Future}
21+
import scala.concurrent.duration.Duration
22+
23+
import org.apache.spark.storage.StorageLevel
24+
25+
26+
abstract class BlockTransferService {
27+
28+
/**
29+
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
30+
* local blocks or put local blocks.
31+
*/
32+
def init(blockDataManager: BlockDataManager)
33+
34+
/**
35+
* Tear down the transfer service.
36+
*/
37+
def stop(): Unit
38+
39+
/**
40+
* Port number the service is listening on, available only after [[init]] is invoked.
41+
*/
42+
def port: Int
43+
44+
/**
45+
* Host name the service is listening on, available only after [[init]] is invoked.
46+
*/
47+
def hostName: String
48+
49+
/**
50+
* Fetch a sequence of blocks from a remote node asynchronously,
51+
* available only after [[init]] is invoked.
52+
*
53+
* Note that [[BlockFetchingListener.onBlockFetchSuccess]] is called once per block,
54+
* while [[BlockFetchingListener.onBlockFetchFailure]] is called once per failure (not per block).
55+
*
56+
* Note that this API takes a sequence so the implementation can batch requests, and does not
57+
* return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
58+
* the data of a block is fetched, rather than waiting for all blocks to be fetched.
59+
*/
60+
def fetchBlocks(
61+
hostName: String,
62+
port: Int,
63+
blockIds: Seq[String],
64+
listener: BlockFetchingListener): Unit
65+
66+
/**
67+
* Upload a single block to a remote node, available only after [[init]] is invoked.
68+
*/
69+
def uploadBlock(
70+
hostname: String,
71+
port: Int,
72+
blockId: String,
73+
blockData: ManagedBuffer,
74+
level: StorageLevel): Future[Unit]
75+
76+
/**
77+
* A special case of [[fetchBlocks]], as it fetches only one block and is blocking.
78+
*
79+
* It is also only available after [[init]] is invoked.
80+
*/
81+
def fetchBlockSync(hostName: String, port: Int, blockId: String): ManagedBuffer = {
82+
// A monitor for the thread to wait on.
83+
val lock = new Object
84+
@volatile var result: Either[ManagedBuffer, Throwable] = null
85+
fetchBlocks(hostName, port, Seq(blockId), new BlockFetchingListener {
86+
override def onBlockFetchFailure(exception: Throwable): Unit = {
87+
lock.synchronized {
88+
result = Right(exception)
89+
lock.notify()
90+
}
91+
}
92+
override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
93+
lock.synchronized {
94+
result = Left(data)
95+
lock.notify()
96+
}
97+
}
98+
})
99+
100+
// Sleep until result is no longer null
101+
lock.synchronized {
102+
while (result == null) {
103+
try {
104+
lock.wait()
105+
} catch {
106+
case e: InterruptedException =>
107+
}
108+
}
109+
}
110+
111+
result match {
112+
case Left(data) => data
113+
case Right(e) => throw e
114+
}
115+
}
116+
117+
/**
118+
* Upload a single block to a remote node, available only after [[init]] is invoked.
119+
*
120+
* This method is similar to [[uploadBlock]], except this one blocks the thread
121+
* until the upload finishes.
122+
*/
123+
def uploadBlockSync(
124+
hostname: String,
125+
port: Int,
126+
blockId: String,
127+
blockData: ManagedBuffer,
128+
level: StorageLevel): Unit = {
129+
Await.result(uploadBlock(hostname, port, blockId, blockData, level), Duration.Inf)
130+
}
131+
}

core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala

Lines changed: 0 additions & 103 deletions
This file was deleted.

0 commit comments

Comments
 (0)