From 624acbcd2b6eeaac5c6ae4def76b115b8cec32cf Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Wed, 27 Dec 2017 15:09:41 -0800 Subject: [PATCH 1/2] [SPARK-22914][DEPLOY] Register history.ui.port --- .../spark/deploy/history/HistoryServer.scala | 3 +- .../apache/spark/deploy/history/config.scala | 5 +++ .../spark/deploy/yarn/ApplicationMaster.scala | 21 ++++++--- .../yarn/ApplicationMasterUtilSuite.scala | 44 +++++++++++++++++++ 4 files changed, 67 insertions(+), 6 deletions(-) create mode 100644 resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterUtilSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 75484f5c9f30f..0ec4afad0308c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -28,6 +28,7 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.config.HISTORY_SERVER_UI_PORT import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot} @@ -276,7 +277,7 @@ object HistoryServer extends Logging { .newInstance(conf) .asInstanceOf[ApplicationHistoryProvider] - val port = conf.getInt("spark.history.ui.port", 18080) + val port = conf.get(HISTORY_SERVER_UI_PORT) val server = new HistoryServer(conf, provider, securityManager, port) server.bind() diff --git a/core/src/main/scala/org/apache/spark/deploy/history/config.scala b/core/src/main/scala/org/apache/spark/deploy/history/config.scala index 22b6d49d8e2a4..efdbf672bb52f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/config.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/config.scala @@ -44,4 +44,9 @@ private[spark] object config { .bytesConf(ByteUnit.BYTE) .createWithDefaultString("10g") + val HISTORY_SERVER_UI_PORT = ConfigBuilder("spark.history.ui.port") + .doc("Web UI port to bind Spark History Server") + .intConf + .createWithDefault(18080) + } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index b2576b0d72633..90ea33c5712ba 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -427,11 +427,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends uiAddress: Option[String]) = { val appId = client.getAttemptId().getApplicationId().toString() val attemptId = client.getAttemptId().getAttemptId().toString() - val historyAddress = - _sparkConf.get(HISTORY_SERVER_ADDRESS) - .map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) } - .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" } - .getOrElse("") + val historyAddress = ApplicationMasterUtil + .getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId) val driverUrl = RpcEndpointAddress( _sparkConf.get("spark.driver.host"), @@ -836,6 +833,20 @@ object ApplicationMaster extends Logging { } +object ApplicationMasterUtil { + def getHistoryServerAddress( + sparkConf: SparkConf, + yarnConf: YarnConfiguration, + appId: String, + attemptId: String): String = { + + sparkConf.get(HISTORY_SERVER_ADDRESS) + .map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) } + .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" } + .getOrElse("") + } +} + /** * This object does not provide any special functionality. It exists so that it's easy to tell * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps. diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterUtilSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterUtilSuite.scala new file mode 100644 index 0000000000000..3f2f915fb067d --- /dev/null +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterUtilSuite.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.Logging +import org.apache.spark.util.ResetSystemProperties +import org.scalatest.Matchers + +class ApplicationMasterUtilSuite extends SparkFunSuite with Matchers with Logging + with ResetSystemProperties { + + test("history url with hadoop and spark substitutions") { + val sparkConf = new SparkConf() + sparkConf.set("spark.yarn.historyServer.address", + "http://${hadoopconf-yarn.resourcemanager.hostname}:${spark.history.ui.port}") + + val yarnConf = new YarnConfiguration() + yarnConf.set("yarn.resourcemanager.hostname", "rm.host.com") + val appId = "application_123_1" + val attemptId = appId + "_1" + + val shsAddr = ApplicationMasterUtil + .getHistoryServerAddress(sparkConf, yarnConf, appId, attemptId) + + shsAddr shouldEqual "http://rm.host.com:18080/history/application_123_1/application_123_1_1" + } +} From 03e0e271c58e1a4fe7381b60b06e87e5fe2c3a77 Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Thu, 4 Jan 2018 15:35:10 -0800 Subject: [PATCH 2/2] addressing comments --- .../spark/deploy/yarn/ApplicationMaster.scala | 16 ++++++---------- ...Suite.scala => ApplicationMasterSuite.scala} | 17 ++++++++--------- 2 files changed, 14 insertions(+), 19 deletions(-) rename resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/{ApplicationMasterUtilSuite.scala => ApplicationMasterSuite.scala} (75%) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 90ea33c5712ba..4d5e3bb043671 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -427,7 +427,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends uiAddress: Option[String]) = { val appId = client.getAttemptId().getApplicationId().toString() val attemptId = client.getAttemptId().getAttemptId().toString() - val historyAddress = ApplicationMasterUtil + val historyAddress = ApplicationMaster .getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId) val driverUrl = RpcEndpointAddress( @@ -831,15 +831,11 @@ object ApplicationMaster extends Logging { master.getAttemptId } -} - -object ApplicationMasterUtil { - def getHistoryServerAddress( - sparkConf: SparkConf, - yarnConf: YarnConfiguration, - appId: String, - attemptId: String): String = { - + private[spark] def getHistoryServerAddress( + sparkConf: SparkConf, + yarnConf: YarnConfiguration, + appId: String, + attemptId: String): String = { sparkConf.get(HISTORY_SERVER_ADDRESS) .map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) } .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterUtilSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterSuite.scala similarity index 75% rename from resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterUtilSuite.scala rename to resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterSuite.scala index 3f2f915fb067d..695a82f3583e6 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterUtilSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ApplicationMasterSuite.scala @@ -18,27 +18,26 @@ package org.apache.spark.deploy.yarn import org.apache.hadoop.yarn.conf.YarnConfiguration + import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.internal.Logging -import org.apache.spark.util.ResetSystemProperties -import org.scalatest.Matchers -class ApplicationMasterUtilSuite extends SparkFunSuite with Matchers with Logging - with ResetSystemProperties { +class ApplicationMasterSuite extends SparkFunSuite { test("history url with hadoop and spark substitutions") { + val host = "rm.host.com" + val port = 18080 val sparkConf = new SparkConf() + sparkConf.set("spark.yarn.historyServer.address", "http://${hadoopconf-yarn.resourcemanager.hostname}:${spark.history.ui.port}") - val yarnConf = new YarnConfiguration() - yarnConf.set("yarn.resourcemanager.hostname", "rm.host.com") + yarnConf.set("yarn.resourcemanager.hostname", host) val appId = "application_123_1" val attemptId = appId + "_1" - val shsAddr = ApplicationMasterUtil + val shsAddr = ApplicationMaster .getHistoryServerAddress(sparkConf, yarnConf, appId, attemptId) - shsAddr shouldEqual "http://rm.host.com:18080/history/application_123_1/application_123_1_1" + assert(shsAddr === s"http://${host}:${port}/history/${appId}/${attemptId}") } }