Skip to content

Commit 9ef279c

Browse files
committed
Initial refactoring to move ConnectionManager to use the BlockTransferService.
1 parent b21ae5b commit 9ef279c

33 files changed

+784
-859
lines changed

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ import org.apache.spark.annotation.DeveloperApi
3131
import org.apache.spark.api.python.PythonWorkerFactory
3232
import org.apache.spark.broadcast.BroadcastManager
3333
import org.apache.spark.metrics.MetricsSystem
34-
import org.apache.spark.network.ConnectionManager
34+
import org.apache.spark.network.BlockTransferService
35+
import org.apache.spark.network.cm.CMBlockTransferService
3536
import org.apache.spark.scheduler.LiveListenerBus
3637
import org.apache.spark.serializer.Serializer
3738
import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
@@ -59,8 +60,8 @@ class SparkEnv (
5960
val mapOutputTracker: MapOutputTracker,
6061
val shuffleManager: ShuffleManager,
6162
val broadcastManager: BroadcastManager,
63+
val blockTransferService: BlockTransferService,
6264
val blockManager: BlockManager,
63-
val connectionManager: ConnectionManager,
6465
val securityManager: SecurityManager,
6566
val httpFileServer: HttpFileServer,
6667
val sparkFilesDir: String,
@@ -79,6 +80,7 @@ class SparkEnv (
7980
Option(httpFileServer).foreach(_.stop())
8081
mapOutputTracker.stop()
8182
shuffleManager.stop()
83+
blockTransferService.stop()
8284
broadcastManager.stop()
8385
blockManager.stop()
8486
blockManager.master.stop()
@@ -223,14 +225,14 @@ object SparkEnv extends Logging {
223225

224226
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
225227

228+
val blockTransferService = new CMBlockTransferService(conf, securityManager)
229+
226230
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
227231
"BlockManagerMaster",
228232
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
229233

230234
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
231-
serializer, conf, securityManager, mapOutputTracker, shuffleManager)
232-
233-
val connectionManager = blockManager.connectionManager
235+
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)
234236

235237
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
236238

@@ -278,8 +280,8 @@ object SparkEnv extends Logging {
278280
mapOutputTracker,
279281
shuffleManager,
280282
broadcastManager,
283+
blockTransferService,
281284
blockManager,
282-
connectionManager,
283285
securityManager,
284286
httpFileServer,
285287
sparkFilesDir,

core/src/main/scala/org/apache/spark/network/ReceiverTest.scala renamed to core/src/main/scala/org/apache/spark/network/BlockDataManager.scala

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,20 @@
1717

1818
package org.apache.spark.network
1919

20-
import java.nio.ByteBuffer
21-
import org.apache.spark.{SecurityManager, SparkConf}
20+
import org.apache.spark.storage.StorageLevel
2221

23-
private[spark] object ReceiverTest {
24-
def main(args: Array[String]) {
25-
val conf = new SparkConf
26-
val manager = new ConnectionManager(9999, conf, new SecurityManager(conf))
27-
println("Started connection manager with id = " + manager.id)
2822

29-
manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
30-
/* println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis) */
31-
val buffer = ByteBuffer.wrap("response".getBytes("utf-8"))
32-
Some(Message.createBufferMessage(buffer, msg.id))
33-
})
34-
Thread.currentThread.join()
35-
}
36-
}
23+
trait BlockDataManager {
24+
25+
/**
26+
* Interface to get local block data.
27+
*
28+
* @return Some(buffer) if the block exists locally, and None if it doesn't.
29+
*/
30+
def getBlockData(blockId: String): Option[ManagedBuffer]
3731

32+
/**
33+
* Put the block locally, using the given storage level.
34+
*/
35+
def putBlockData(blockId: String, data: ManagedBuffer, level: StorageLevel): Unit
36+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network
19+
20+
import java.util.EventListener
21+
22+
23+
/**
24+
* Listener callback interface for [[BlockTransferService.fetchBlocks]].
25+
*/
26+
trait BlockFetchingListener extends EventListener {
27+
28+
/**
29+
* Called once per successfully fetched block.
30+
*/
31+
def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit
32+
33+
/**
34+
* Called upon failures.
35+
*/
36+
def onBlockFetchFailure(exception: Exception): Unit
37+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network
19+
20+
import org.apache.spark.storage.StorageLevel
21+
22+
23+
abstract class BlockTransferService {
24+
25+
/**
26+
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
27+
* local blocks or put local blocks.
28+
*/
29+
def init(blockDataManager: BlockDataManager)
30+
31+
/**
32+
* Tear down the transfer service.
33+
*/
34+
def stop(): Unit
35+
36+
/**
37+
* Port number the service is listening on, available only after [[init]] is invoked.
38+
*/
39+
def port: Int
40+
41+
/**
42+
* Host name the service is listening on, available only after [[init]] is invoked.
43+
*/
44+
def hostName: String
45+
46+
/**
47+
* Fetch a sequence of blocks from a remote node, available only after [[init]] is invoked.
48+
*
49+
* This takes a sequence so the implementation can batch requests.
50+
*/
51+
def fetchBlocks(
52+
hostName: String,
53+
port: Int,
54+
blockIds: Seq[String],
55+
listener: BlockFetchingListener): Unit
56+
57+
/**
58+
* Fetch a single block from a remote node, available only after [[init]] is invoked.
59+
*
60+
* This is functionally equivalent to
61+
* {{{
62+
* fetchBlocks(hostName, port, Seq(blockId)).iterator().next()._2
63+
* }}}
64+
*/
65+
def fetchBlock(hostName: String, port: Int, blockId: String): ManagedBuffer = {
66+
//fetchBlocks(hostName, port, Seq(blockId)).iterator().next()._2
67+
null
68+
}
69+
70+
/**
71+
* Upload a single block to a remote node, available only after [[init]] is invoked.
72+
*
73+
* This call blocks until the upload completes, or throws an exception upon failures.
74+
*/
75+
def uploadBlock(
76+
hostname: String,
77+
port: Int,
78+
blockId: String,
79+
blockData: ManagedBuffer,
80+
level: StorageLevel): Unit
81+
}

core/src/main/scala/org/apache/spark/network/ConnectionManagerTest.scala

Lines changed: 0 additions & 103 deletions
This file was deleted.

0 commit comments

Comments
 (0)