Skip to content

Commit 7d0e6dc

Browse files
committed
Update BlockManagerSlaveActor to use RpcEndpoint
1 parent f5d6543 commit 7d0e6dc

File tree

10 files changed

+87
-103
lines changed

10 files changed

+87
-103
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
446446
if (executorId == SparkContext.DRIVER_IDENTIFIER) {
447447
Some(Utils.getThreadDump())
448448
} else {
449-
val (host, port) = env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get
449+
val (host, port) = env.blockManager.master.getRpcHostPortForExecutor(executorId).get
450450
val endpointRef = env.rpcEnv.setupEndpointRef(
451451
SparkEnv.executorActorSystemName, RpcAddress(host, port), "ExecutorEndpoint")
452452
Some(endpointRef.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump))

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,11 +341,11 @@ object SparkEnv extends Logging {
341341
}
342342

343343
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
344-
BlockManagerMaster.DRIVER_AKKA_ACTOR_NAME,
344+
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
345345
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus), true), conf, isDriver)
346346

347347
// NB: blockManager is not valid until initialize() is called later.
348-
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
348+
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
349349
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,
350350
numUsableCores)
351351

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import scala.concurrent.ExecutionContext.Implicits.global
2626
import scala.concurrent.duration._
2727
import scala.util.Random
2828

29-
import akka.actor.{ActorSystem, Props}
3029
import sun.nio.ch.DirectBuffer
3130

3231
import org.apache.spark._
@@ -37,6 +36,7 @@ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
3736
import org.apache.spark.network.netty.SparkTransportConf
3837
import org.apache.spark.network.shuffle.ExternalShuffleClient
3938
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
39+
import org.apache.spark.rpc.RpcEnv
4040
import org.apache.spark.serializer.Serializer
4141
import org.apache.spark.shuffle.ShuffleManager
4242
import org.apache.spark.shuffle.hash.HashShuffleManager
@@ -64,7 +64,7 @@ private[spark] class BlockResult(
6464
*/
6565
private[spark] class BlockManager(
6666
executorId: String,
67-
actorSystem: ActorSystem,
67+
rpcEnv: RpcEnv,
6868
val master: BlockManagerMaster,
6969
defaultSerializer: Serializer,
7070
maxMemory: Long,
@@ -136,9 +136,9 @@ private[spark] class BlockManager(
136136
// Whether to compress shuffle output temporarily spilled to disk
137137
private val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)
138138

139-
private val slaveActor = actorSystem.actorOf(
140-
Props(new BlockManagerSlaveActor(this, mapOutputTracker)),
141-
name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next)
139+
private val slaveActor = rpcEnv.setupEndpoint(
140+
"BlockManagerActor" + BlockManager.ID_GENERATOR.next,
141+
new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))
142142

143143
// Pending re-registration action being executed asynchronously or null if none is pending.
144144
// Accesses should synchronize on asyncReregisterLock.
@@ -167,7 +167,7 @@ private[spark] class BlockManager(
167167
*/
168168
def this(
169169
execId: String,
170-
actorSystem: ActorSystem,
170+
rpcEnv: RpcEnv,
171171
master: BlockManagerMaster,
172172
serializer: Serializer,
173173
conf: SparkConf,
@@ -176,7 +176,7 @@ private[spark] class BlockManager(
176176
blockTransferService: BlockTransferService,
177177
securityManager: SecurityManager,
178178
numUsableCores: Int) = {
179-
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
179+
this(execId, rpcEnv, master, serializer, BlockManager.getMaxMemory(conf),
180180
conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores)
181181
}
182182

@@ -1219,7 +1219,7 @@ private[spark] class BlockManager(
12191219
shuffleClient.close()
12201220
}
12211221
diskBlockManager.stop()
1222-
actorSystem.stop(slaveActor)
1222+
rpcEnv.stop(slaveActor)
12231223
blockInfo.clear()
12241224
memoryStore.clear()
12251225
diskStore.clear()

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ package org.apache.spark.storage
2020
import scala.concurrent.{Await, Future}
2121
import scala.concurrent.ExecutionContext.Implicits.global
2222

23-
import akka.actor._
24-
2523
import org.apache.spark.rpc.RpcEndpointRef
2624
import org.apache.spark.{Logging, SparkConf, SparkException}
2725
import org.apache.spark.storage.BlockManagerMessages._
@@ -36,16 +34,17 @@ class BlockManagerMaster(
3634

3735
val timeout = AkkaUtils.askTimeout(conf)
3836

39-
/** Remove a dead executor from the driver actor. This is only called on the driver side. */
37+
/** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
4038
def removeExecutor(execId: String) {
4139
tell(RemoveExecutor(execId))
4240
logInfo("Removed " + execId + " successfully in removeExecutor")
4341
}
4442

4543
/** Register the BlockManager's id with the driver. */
46-
def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
44+
def registerBlockManager(
45+
blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = {
4746
logInfo("Trying to register BlockManager")
48-
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
47+
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
4948
logInfo("Registered BlockManager")
5049
}
5150

@@ -85,8 +84,8 @@ class BlockManagerMaster(
8584
driverEndpoint.askWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
8685
}
8786

88-
def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = {
89-
driverEndpoint.askWithReply[Option[(String, Int)]](GetActorSystemHostPortForExecutor(executorId))
87+
def getRpcHostPortForExecutor(executorId: String): Option[(String, Int)] = {
88+
driverEndpoint.askWithReply[Option[(String, Int)]](GetRpcHostPortForExecutor(executorId))
9089
}
9190

9291
/**
@@ -162,11 +161,12 @@ class BlockManagerMaster(
162161
askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = {
163162
val msg = GetBlockStatus(blockId, askSlaves)
164163
/*
165-
* To avoid potential deadlocks, the use of Futures is necessary, because the master actor
164+
* To avoid potential deadlocks, the use of Futures is necessary, because the master endpoint
166165
* should not block on waiting for a block manager, which can in turn be waiting for the
167-
* master actor for a response to a prior message.
166+
* master endpoint for a response to a prior message.
168167
*/
169-
val response = driverEndpoint.askWithReply[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
168+
val response = driverEndpoint.
169+
askWithReply[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
170170
val (blockManagerIds, futures) = response.unzip
171171
val result = Await.result(Future.sequence(futures), timeout)
172172
if (result == null) {
@@ -194,7 +194,7 @@ class BlockManagerMaster(
194194
Await.result(future, timeout)
195195
}
196196

197-
/** Stop the driver actor, called only on the Spark driver node */
197+
/** Stop the driver endpoint, called only on the Spark driver node */
198198
def stop() {
199199
if (driverEndpoint != null && isDriver) {
200200
tell(StopBlockManagerMaster)
@@ -203,15 +203,15 @@ class BlockManagerMaster(
203203
}
204204
}
205205

206-
/** Send a one-way message to the master actor, to which we expect it to reply with true. */
206+
/** Send a one-way message to the master endpoint, to which we expect it to reply with true. */
207207
private def tell(message: Any) {
208208
if (!driverEndpoint.askWithReply[Boolean](message)) {
209-
throw new SparkException("BlockManagerMasterActor returned false, expected true.")
209+
throw new SparkException("BlockManagerMasterEndpoint returned false, expected true.")
210210
}
211211
}
212212

213213
}
214214

215215
private[spark] object BlockManagerMaster {
216-
val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster"
216+
val DRIVER_ENDPOINT_NAME = "BlockManagerMaster"
217217
}

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 24 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,15 @@ import scala.collection.mutable
2323
import scala.collection.JavaConversions._
2424
import scala.concurrent.{ExecutionContext, Future}
2525

26-
import akka.actor.ActorRef
27-
import akka.pattern.ask
28-
29-
import org.apache.spark.rpc.{RpcEnv, RpcCallContext, RpcEndpoint}
26+
import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcCallContext, RpcEndpoint}
3027
import org.apache.spark.{Logging, SparkConf}
3128
import org.apache.spark.annotation.DeveloperApi
3229
import org.apache.spark.scheduler._
3330
import org.apache.spark.storage.BlockManagerMessages._
34-
import org.apache.spark.util.{AkkaUtils, Utils}
31+
import org.apache.spark.util.Utils
3532

3633
/**
37-
* BlockManagerMasterActor is an actor on the master node to track statuses of
34+
* BlockManagerMasterEndpoint is an [[RpcEndpoint]] on the master node to track statuses of
3835
* all slaves' block managers.
3936
*/
4037
private[spark]
@@ -54,14 +51,12 @@ class BlockManagerMasterEndpoint(
5451
// Mapping from block id to the set of block managers that have the block.
5552
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
5653

57-
private val akkaTimeout = AkkaUtils.askTimeout(conf)
58-
5954
private val askThreadPool = Utils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
6055
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
6156

6257
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
63-
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
64-
register(blockManagerId, maxMemSize, slaveActor)
58+
case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
59+
register(blockManagerId, maxMemSize, slaveEndpoint)
6560
context.reply(true)
6661

6762
case UpdateBlockInfo(
@@ -78,8 +73,8 @@ class BlockManagerMasterEndpoint(
7873
case GetPeers(blockManagerId) =>
7974
context.reply(getPeers(blockManagerId))
8075

81-
case GetActorSystemHostPortForExecutor(executorId) =>
82-
context.reply(getActorSystemHostPortForExecutor(executorId))
76+
case GetRpcHostPortForExecutor(executorId) =>
77+
context.reply(getRpcHostPortForExecutor(executorId))
8378

8479
case GetMemoryStatus =>
8580
context.reply(memoryStatus)
@@ -137,17 +132,17 @@ class BlockManagerMasterEndpoint(
137132
val removeMsg = RemoveRdd(rddId)
138133
Future.sequence(
139134
blockManagerInfo.values.map { bm =>
140-
bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
135+
bm.slaveEndpoint.sendWithReply[Int](removeMsg)
141136
}.toSeq
142137
)
143138
}
144139

145140
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
146-
// Nothing to do in the BlockManagerMasterActor data structures
141+
// Nothing to do in the BlockManagerMasterEndpoint data structures
147142
val removeMsg = RemoveShuffle(shuffleId)
148143
Future.sequence(
149144
blockManagerInfo.values.map { bm =>
150-
bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Boolean]
145+
bm.slaveEndpoint.sendWithReply[Boolean](removeMsg)
151146
}.toSeq
152147
)
153148
}
@@ -164,7 +159,7 @@ class BlockManagerMasterEndpoint(
164159
}
165160
Future.sequence(
166161
requiredBlockManagers.map { bm =>
167-
bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
162+
bm.slaveEndpoint.sendWithReply[Int](removeMsg)
168163
}.toSeq
169164
)
170165
}
@@ -219,7 +214,7 @@ class BlockManagerMasterEndpoint(
219214
// Remove the block from the slave's BlockManager.
220215
// Doesn't actually wait for a confirmation and the message might get lost.
221216
// If message loss becomes frequent, we should add retry logic here.
222-
blockManager.get.slaveActor.ask(RemoveBlock(blockId))(akkaTimeout)
217+
blockManager.get.slaveEndpoint.sendWithReply[Boolean](RemoveBlock(blockId))
223218
}
224219
}
225220
}
@@ -251,14 +246,14 @@ class BlockManagerMasterEndpoint(
251246
askSlaves: Boolean): Map[BlockManagerId, Future[Option[BlockStatus]]] = {
252247
val getBlockStatus = GetBlockStatus(blockId)
253248
/*
254-
* Rather than blocking on the block status query, master actor should simply return
249+
* Rather than blocking on the block status query, master endpoint should simply return
255250
* Futures to avoid potential deadlocks. This can arise if there exists a block manager
256-
* that is also waiting for this master actor's response to a previous message.
251+
* that is also waiting for this master endpoint's response to a previous message.
257252
*/
258253
blockManagerInfo.values.map { info =>
259254
val blockStatusFuture =
260255
if (askSlaves) {
261-
info.slaveActor.ask(getBlockStatus)(akkaTimeout).mapTo[Option[BlockStatus]]
256+
info.slaveEndpoint.sendWithReply[Option[BlockStatus]](getBlockStatus)
262257
} else {
263258
Future { info.getStatus(blockId) }
264259
}
@@ -282,7 +277,7 @@ class BlockManagerMasterEndpoint(
282277
blockManagerInfo.values.map { info =>
283278
val future =
284279
if (askSlaves) {
285-
info.slaveActor.ask(getMatchingBlockIds)(akkaTimeout).mapTo[Seq[BlockId]]
280+
info.slaveEndpoint.sendWithReply[Seq[BlockId]](getMatchingBlockIds)
286281
} else {
287282
Future { info.blocks.keys.filter(filter).toSeq }
288283
}
@@ -291,7 +286,7 @@ class BlockManagerMasterEndpoint(
291286
).map(_.flatten.toSeq)
292287
}
293288

294-
private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
289+
private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {
295290
val time = System.currentTimeMillis()
296291
if (!blockManagerInfo.contains(id)) {
297292
blockManagerIdByExecutor.get(id.executorId) match {
@@ -308,7 +303,7 @@ class BlockManagerMasterEndpoint(
308303
blockManagerIdByExecutor(id.executorId) = id
309304

310305
blockManagerInfo(id) = new BlockManagerInfo(
311-
id, System.currentTimeMillis(), maxMemSize, slaveActor)
306+
id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
312307
}
313308
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
314309
}
@@ -379,17 +374,15 @@ class BlockManagerMasterEndpoint(
379374
}
380375

381376
/**
382-
* Returns the hostname and port of an executor's actor system, based on the Akka address of its
383-
* BlockManagerSlaveActor.
377+
* Returns the hostname and port of an executor, based on the [[RpcEnv]] address of its
378+
* [[BlockManagerSlaveEndpoint]].
384379
*/
385-
private def getActorSystemHostPortForExecutor(executorId: String): Option[(String, Int)] = {
380+
private def getRpcHostPortForExecutor(executorId: String): Option[(String, Int)] = {
386381
for (
387382
blockManagerId <- blockManagerIdByExecutor.get(executorId);
388-
info <- blockManagerInfo.get(blockManagerId);
389-
host <- info.slaveActor.path.address.host;
390-
port <- info.slaveActor.path.address.port
383+
info <- blockManagerInfo.get(blockManagerId)
391384
) yield {
392-
(host, port)
385+
(info.slaveEndpoint.address.host, info.slaveEndpoint.address.port)
393386
}
394387
}
395388
}
@@ -412,7 +405,7 @@ private[spark] class BlockManagerInfo(
412405
val blockManagerId: BlockManagerId,
413406
timeMs: Long,
414407
val maxMem: Long,
415-
val slaveActor: ActorRef)
408+
val slaveEndpoint: RpcEndpointRef)
416409
extends Logging {
417410

418411
private var _lastSeenMs: Long = timeMs

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ package org.apache.spark.storage
1919

2020
import java.io.{Externalizable, ObjectInput, ObjectOutput}
2121

22-
import akka.actor.ActorRef
23-
22+
import org.apache.spark.rpc.RpcEndpointRef
2423
import org.apache.spark.util.Utils
2524

2625
private[spark] object BlockManagerMessages {
@@ -52,7 +51,7 @@ private[spark] object BlockManagerMessages {
5251
case class RegisterBlockManager(
5352
blockManagerId: BlockManagerId,
5453
maxMemSize: Long,
55-
sender: ActorRef)
54+
sender: RpcEndpointRef)
5655
extends ToBlockManagerMaster
5756

5857
case class UpdateBlockInfo(
@@ -92,7 +91,7 @@ private[spark] object BlockManagerMessages {
9291

9392
case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
9493

95-
case class GetActorSystemHostPortForExecutor(executorId: String) extends ToBlockManagerMaster
94+
case class GetRpcHostPortForExecutor(executorId: String) extends ToBlockManagerMaster
9695

9796
case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
9897

0 commit comments

Comments
 (0)