File tree Expand file tree Collapse file tree 6 files changed +30
-14
lines changed
core/src/main/scala/org/apache/spark Expand file tree Collapse file tree 6 files changed +30
-14
lines changed Original file line number Diff line number Diff line change @@ -77,10 +77,10 @@ private[deploy] object DeployMessages {
7777 object DecommissionWorker extends DeployMessage
7878
7979 /**
80- * A message that sent by the Worker to itself when it receives PWR signal,
80+ * A message that sent by the Worker to itself when it receives a signal,
8181 * indicating the Worker starts to decommission.
8282 */
83- object WorkerSigPWRReceived extends DeployMessage
83+ object WorkerDecommissionSigReceived extends DeployMessage
8484
8585 /**
8686 * A message sent from Worker to Master to tell Master that the Worker has started
Original file line number Diff line number Diff line change @@ -66,16 +66,17 @@ private[deploy] class Worker(
6666 Utils .checkHost(host)
6767 assert (port > 0 )
6868
69- // If worker decommissioning is enabled register a handler on PWR to shutdown.
69+ // If worker decommissioning is enabled register a handler on the configured signal to shutdown.
7070 if (conf.get(config.DECOMMISSION_ENABLED )) {
71- logInfo(" Registering SIGPWR handler to trigger decommissioning." )
72- SignalUtils .register(" PWR" , " Failed to register SIGPWR handler - " +
71+ val signal = conf.get(config.Worker .WORKER_DECOMMISSION_SIGNAL )
72+ logInfo(s " Registering SIG $signal handler to trigger decommissioning. " )
73+ SignalUtils .register(signal, s " Failed to register SIG $signal handler - " +
7374 " disabling worker decommission feature." ) {
74- self.send(WorkerSigPWRReceived )
75+ self.send(WorkerDecommissionSigReceived )
7576 true
7677 }
7778 } else {
78- logInfo(" Worker decommissioning not enabled, SIGPWR will result in exiting ." )
79+ logInfo(" Worker decommissioning not enabled." )
7980 }
8081
8182 // A scheduled executor used to send messages at the specified time.
@@ -682,7 +683,7 @@ private[deploy] class Worker(
682683 case DecommissionWorker =>
683684 decommissionSelf()
684685
685- case WorkerSigPWRReceived =>
686+ case WorkerDecommissionSigReceived =>
686687 decommissionSelf()
687688 // Tell the Master that we are starting decommissioning
688689 // so it stops trying to launch executor/driver on us
Original file line number Diff line number Diff line change @@ -82,9 +82,10 @@ private[spark] class CoarseGrainedExecutorBackend(
8282
8383 override def onStart (): Unit = {
8484 if (env.conf.get(DECOMMISSION_ENABLED )) {
85- logInfo(" Registering PWR handler to trigger decommissioning." )
86- SignalUtils .register(" PWR" , " Failed to register SIGPWR handler - " +
87- " disabling executor decommission feature." ) (self.askSync[Boolean ](ExecutorSigPWRReceived ))
85+ val signal = env.conf.get(EXECUTOR_DECOMMISSION_SIGNAL )
86+ logInfo(s " Registering SIG $signal handler to trigger decommissioning. " )
87+ SignalUtils .register(signal, s " Failed to register SIG $signal handler - disabling " +
88+ s " executor decommission feature. " ) (self.askSync[Boolean ](ExecutorDecommissionSigReceived ))
8889 }
8990
9091 logInfo(" Connecting to driver: " + driverUrl)
@@ -208,7 +209,7 @@ private[spark] class CoarseGrainedExecutorBackend(
208209 }
209210
210211 override def receiveAndReply (context : RpcCallContext ): PartialFunction [Any , Unit ] = {
211- case ExecutorSigPWRReceived =>
212+ case ExecutorDecommissionSigReceived =>
212213 var driverNotified = false
213214 try {
214215 driver.foreach { driverRef =>
Original file line number Diff line number Diff line change @@ -82,4 +82,11 @@ private[spark] object Worker {
8282 .version(" 2.0.2" )
8383 .intConf
8484 .createWithDefault(100 )
85+
86+ val WORKER_DECOMMISSION_SIGNAL =
87+ ConfigBuilder (" spark.worker.decommission.signal" )
88+ .doc(" The signal that used to trigger the worker to start decommission." )
89+ .version(" 3.2.0" )
90+ .stringConf
91+ .createWithDefaultString(" PWR" )
8592}
Original file line number Diff line number Diff line change @@ -1927,6 +1927,13 @@ package object config {
19271927 .timeConf(TimeUnit .SECONDS )
19281928 .createOptional
19291929
1930+ private [spark] val EXECUTOR_DECOMMISSION_SIGNAL =
1931+ ConfigBuilder (" spark.executor.decommission.signal" )
1932+ .doc(" The signal that used to trigger the executor to start decommission." )
1933+ .version(" 3.2.0" )
1934+ .stringConf
1935+ .createWithDefaultString(" PWR" )
1936+
19301937 private [spark] val STAGING_DIR = ConfigBuilder (" spark.yarn.stagingDir" )
19311938 .doc(" Staging directory used while submitting applications." )
19321939 .version(" 2.0.0" )
Original file line number Diff line number Diff line change @@ -102,9 +102,9 @@ private[spark] object CoarseGrainedClusterMessages {
102102 // It's used for Standalone's cases, where decommission is triggered at MasterWebUI or Worker.
103103 object DecommissionExecutor extends CoarseGrainedClusterMessage
104104
105- // A message that sent to the executor itself when it receives PWR signal,
105+ // A message that sent to the executor itself when it receives a signal,
106106 // indicating the executor starts to decommission.
107- object ExecutorSigPWRReceived extends CoarseGrainedClusterMessage
107+ object ExecutorDecommissionSigReceived extends CoarseGrainedClusterMessage
108108
109109 case class RemoveWorker (workerId : String , host : String , message : String )
110110 extends CoarseGrainedClusterMessage
You can’t perform that action at this time.
0 commit comments