Skip to content

Commit 56fe42e

Browse files
author
Marcelo Vanzin
committed
Fix cluster mode history address, plus a cleanup.
1 parent 44112a8 commit 56fe42e

File tree

2 files changed

+13
-10
lines changed

2 files changed

+13
-10
lines changed

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
3535

3636
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
3737
import org.apache.spark.deploy.SparkHadoopUtil
38+
import org.apache.spark.deploy.history.HistoryServer
3839
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3940
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
4041
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
@@ -70,16 +71,21 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
7071
private val sparkContextRef = new AtomicReference[SparkContext](null)
7172

7273
final def run(): Int = {
74+
val appAttemptId = client.getAttemptId()
75+
7376
if (isDriver) {
7477
// Set the web ui port to be ephemeral for yarn so we don't conflict with
7578
// other spark processes running on the same box
7679
System.setProperty("spark.ui.port", "0")
7780

7881
// Set the master property to match the requested mode.
7982
System.setProperty("spark.master", "yarn-cluster")
83+
84+
// Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
85+
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
8086
}
8187

82-
logInfo("ApplicationAttemptId: " + client.getAttemptId())
88+
logInfo("ApplicationAttemptId: " + appAttemptId)
8389

8490
val cleanupHook = new Runnable {
8591
override def run() {
@@ -153,8 +159,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
153159

154160
private def registerAM(uiAddress: String) = {
155161
val sc = sparkContextRef.get()
156-
val historyAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkConf,
157-
client.getAttemptId().getApplicationId().toString())
162+
163+
val appId = client.getAttemptId().getApplicationId().toString()
164+
val historyAddress =
165+
sparkConf.getOption("spark.yarn.historyServer.address")
166+
.map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" }
167+
.getOrElse("")
158168

159169
allocator = client.register(yarnConf,
160170
if (sc != null) sc.getConf else sparkConf,

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.util.RackResolver
3636
import org.apache.hadoop.conf.Configuration
3737

3838
import org.apache.spark.{SparkConf, SparkContext}
39-
import org.apache.spark.deploy.history.HistoryServer
4039
import org.apache.spark.deploy.SparkHadoopUtil
4140
import org.apache.spark.util.Utils
4241

@@ -155,12 +154,6 @@ object YarnSparkHadoopUtil {
155154
}
156155
}
157156

158-
def getUIHistoryAddress(conf: SparkConf, appId: String): String = {
159-
conf.getOption("spark.yarn.historyServer.address")
160-
.map { address => s"$address${HistoryServer.UI_PATH_PREFIX}/${appId}" }
161-
.getOrElse("")
162-
}
163-
164157
/**
165158
* Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands
166159
* using `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. The

0 commit comments

Comments
 (0)