Skip to content

Commit 30e3f9f

Browse files
committed
Update ExecutorActor to use RpcEndpoint
1 parent 478b443 commit 30e3f9f

File tree

3 files changed

+12
-17
lines changed

3 files changed

+12
-17
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ import scala.collection.generic.Growable
3232
import scala.collection.mutable.HashMap
3333
import scala.reflect.{ClassTag, classTag}
3434

35-
import akka.actor.Props
36-
3735
import org.apache.hadoop.conf.Configuration
3836
import org.apache.hadoop.fs.Path
3937
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
@@ -54,6 +52,7 @@ import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextF
5452
import org.apache.spark.io.CompressionCodec
5553
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
5654
import org.apache.spark.rdd._
55+
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
5756
import org.apache.spark.scheduler._
5857
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
5958
SparkDeploySchedulerBackend, SimrSchedulerBackend}
@@ -448,9 +447,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
448447
Some(Utils.getThreadDump())
449448
} else {
450449
val (host, port) = env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get
451-
val actorRef = AkkaUtils.makeExecutorRef("ExecutorActor", conf, host, port, env.actorSystem)
452-
Some(AkkaUtils.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump, actorRef,
453-
AkkaUtils.numRetries(conf), AkkaUtils.retryWaitMs(conf), AkkaUtils.askTimeout(conf)))
450+
val endpointRef = env.rpcEnv.setupEndpointRef(
451+
SparkEnv.executorActorSystemName, RpcAddress(host, port), "ExecutorEndpoint")
452+
Some(endpointRef.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump))
454453
}
455454
} catch {
456455
case e: Exception =>

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ import scala.collection.JavaConversions._
2727
import scala.collection.mutable.{ArrayBuffer, HashMap}
2828
import scala.util.control.NonFatal
2929

30-
import akka.actor.Props
31-
3230
import org.apache.spark._
3331
import org.apache.spark.deploy.SparkHadoopUtil
3432
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task}
@@ -89,8 +87,8 @@ private[spark] class Executor(
8987
}
9088

9189
// Create an actor for receiving RPCs from the driver
92-
private val executorActor = env.actorSystem.actorOf(
93-
Props(new ExecutorActor(executorId)), "ExecutorActor")
90+
private val executorEndpoint = env.rpcEnv.setupEndpoint(
91+
"ExecutorEndpoint", new ExecutorEndpoint(env.rpcEnv, executorId))
9492

9593
// Whether to load classes in user jars before those in Spark jars
9694
private val userClassPathFirst: Boolean = {
@@ -139,7 +137,7 @@ private[spark] class Executor(
139137

140138
def stop(): Unit = {
141139
env.metricsSystem.report()
142-
env.actorSystem.stop(executorActor)
140+
env.rpcEnv.stop(executorEndpoint)
143141
isStopped = true
144142
threadPool.shutdown()
145143
if (!isLocal) {

core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala renamed to core/src/main/scala/org/apache/spark/executor/ExecutorEndpoint.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@
1717

1818
package org.apache.spark.executor
1919

20-
import akka.actor.Actor
21-
import org.apache.spark.Logging
22-
23-
import org.apache.spark.util.{Utils, ActorLogReceive}
20+
import org.apache.spark.rpc.{RpcEnv, RpcCallContext, RpcEndpoint}
21+
import org.apache.spark.util.Utils
2422

2523
/**
2624
* Driver -> Executor message to trigger a thread dump.
@@ -31,11 +29,11 @@ private[spark] case object TriggerThreadDump
3129
* Actor that runs inside of executors to enable driver -> executor RPC.
3230
*/
3331
private[spark]
34-
class ExecutorActor(executorId: String) extends Actor with ActorLogReceive with Logging {
32+
class ExecutorEndpoint(override val rpcEnv: RpcEnv, executorId: String) extends RpcEndpoint {
3533

36-
override def receiveWithLogging: PartialFunction[Any, Unit] = {
34+
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
3735
case TriggerThreadDump =>
38-
sender ! Utils.getThreadDump()
36+
context.reply(Utils.getThreadDump())
3937
}
4038

4139
}

0 commit comments

Comments
 (0)