Skip to content

Commit 95d2499

Browse files
committed
Merge remote-tracking branch 'upstream/master' into appClient-receiveAndReply-SPARK-10827
2 parents ab3e929 + 71d1c90 commit 95d2499

File tree

86 files changed

+1822
-702
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+1822
-702
lines changed

README.md

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,7 @@ Hadoop, you must build Spark against the same version that your cluster runs.
8787
Please refer to the build documentation at
8888
["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)
8989
for detailed guidance on building for a particular distribution of Hadoop, including
90-
building for particular Hive and Hive Thriftserver distributions. See also
91-
["Third Party Hadoop Distributions"](http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html)
92-
for guidance on building a Spark application that works with a particular
93-
distribution.
90+
building for particular Hive and Hive Thriftserver distributions.
9491

9592
## Configuration
9693

core/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,6 @@
173173
<groupId>net.jpountz.lz4</groupId>
174174
<artifactId>lz4</artifactId>
175175
</dependency>
176-
<dependency>
177-
<groupId>org.roaringbitmap</groupId>
178-
<artifactId>RoaringBitmap</artifactId>
179-
</dependency>
180176
<dependency>
181177
<groupId>commons-net</groupId>
182178
<artifactId>commons-net</artifactId>

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 41 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package org.apache.spark
1919

20+
import java.lang.{Byte => JByte}
2021
import java.net.{Authenticator, PasswordAuthentication}
21-
import java.security.KeyStore
22+
import java.security.{KeyStore, SecureRandom}
2223
import java.security.cert.X509Certificate
2324
import javax.net.ssl._
2425

26+
import com.google.common.hash.HashCodes
2527
import com.google.common.io.Files
2628
import org.apache.hadoop.io.Text
2729

@@ -130,15 +132,16 @@ import org.apache.spark.util.Utils
130132
*
131133
* The exact mechanisms used to generate/distribute the shared secret are deployment-specific.
132134
*
133-
* For Yarn deployments, the secret is automatically generated using the Akka remote
134-
* Crypt.generateSecureCookie() API. The secret is placed in the Hadoop UGI which gets passed
135-
* around via the Hadoop RPC mechanism. Hadoop RPC can be configured to support different levels
136-
* of protection. See the Hadoop documentation for more details. Each Spark application on Yarn
137-
* gets a different shared secret. On Yarn, the Spark UI gets configured to use the Hadoop Yarn
138-
* AmIpFilter which requires the user to go through the ResourceManager Proxy. That Proxy is there
139-
* to reduce the possibility of web based attacks through YARN. Hadoop can be configured to use
140-
* filters to do authentication. That authentication then happens via the ResourceManager Proxy
141-
* and Spark will use that to do authorization against the view acls.
135+
* For YARN deployments, the secret is automatically generated. The secret is placed in the Hadoop
136+
* UGI which gets passed around via the Hadoop RPC mechanism. Hadoop RPC can be configured to
137+
* support different levels of protection. See the Hadoop documentation for more details. Each
138+
* Spark application on YARN gets a different shared secret.
139+
*
140+
* On YARN, the Spark UI gets configured to use the Hadoop YARN AmIpFilter which requires the user
141+
* to go through the ResourceManager Proxy. That proxy is there to reduce the possibility of web
142+
* based attacks through YARN. Hadoop can be configured to use filters to do authentication. That
143+
* authentication then happens via the ResourceManager Proxy and Spark will use that to do
144+
* authorization against the view acls.
142145
*
143146
* For other Spark deployments, the shared secret must be specified via the
144147
* spark.authenticate.secret config.
@@ -189,8 +192,7 @@ import org.apache.spark.util.Utils
189192
private[spark] class SecurityManager(sparkConf: SparkConf)
190193
extends Logging with SecretKeyHolder {
191194

192-
// key used to store the spark secret in the Hadoop UGI
193-
private val sparkSecretLookupKey = "sparkCookie"
195+
import SecurityManager._
194196

195197
private val authOn = sparkConf.getBoolean(SecurityManager.SPARK_AUTH_CONF, false)
196198
// keep spark.ui.acls.enable for backwards compatibility with 1.0
@@ -365,33 +367,38 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
365367
* we throw an exception.
366368
*/
367369
private def generateSecretKey(): String = {
368-
if (!isAuthenticationEnabled) return null
369-
// first check to see if the secret is already set, else generate a new one if on yarn
370-
val sCookie = if (SparkHadoopUtil.get.isYarnMode) {
371-
val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(sparkSecretLookupKey)
372-
if (secretKey != null) {
373-
logDebug("in yarn mode, getting secret from credentials")
374-
return new Text(secretKey).toString
370+
if (!isAuthenticationEnabled) {
371+
null
372+
} else if (SparkHadoopUtil.get.isYarnMode) {
373+
// In YARN mode, the secure cookie will be created by the driver and stashed in the
374+
// user's credentials, where executors can get it. The check for an array of size 0
375+
// is because of the test code in YarnSparkHadoopUtilSuite.
376+
val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(SECRET_LOOKUP_KEY)
377+
if (secretKey == null || secretKey.length == 0) {
378+
logDebug("generateSecretKey: yarn mode, secret key from credentials is null")
379+
val rnd = new SecureRandom()
380+
val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE
381+
val secret = new Array[Byte](length)
382+
rnd.nextBytes(secret)
383+
384+
val cookie = HashCodes.fromBytes(secret).toString()
385+
SparkHadoopUtil.get.addSecretKeyToUserCredentials(SECRET_LOOKUP_KEY, cookie)
386+
cookie
375387
} else {
376-
logDebug("getSecretKey: yarn mode, secret key from credentials is null")
388+
new Text(secretKey).toString
377389
}
378-
val cookie = akka.util.Crypt.generateSecureCookie
379-
// if we generated the secret then we must be the first so lets set it so t
380-
// gets used by everyone else
381-
SparkHadoopUtil.get.addSecretKeyToUserCredentials(sparkSecretLookupKey, cookie)
382-
logInfo("adding secret to credentials in yarn mode")
383-
cookie
384390
} else {
385391
// user must have set spark.authenticate.secret config
386392
// For Master/Worker, auth secret is in conf; for Executors, it is in env variable
387-
sys.env.get(SecurityManager.ENV_AUTH_SECRET)
393+
Option(sparkConf.getenv(SecurityManager.ENV_AUTH_SECRET))
388394
.orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match {
389395
case Some(value) => value
390-
case None => throw new Exception("Error: a secret key must be specified via the " +
391-
SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
396+
case None =>
397+
throw new IllegalArgumentException(
398+
"Error: a secret key must be specified via the " +
399+
SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
392400
}
393401
}
394-
sCookie
395402
}
396403

397404
/**
@@ -475,6 +482,9 @@ private[spark] object SecurityManager {
475482
val SPARK_AUTH_CONF: String = "spark.authenticate"
476483
val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret"
477484
// This is used to set auth secret to an executor's env variable. It should have the same
478-
// value as SPARK_AUTH_SECERET_CONF set in SparkConf
485+
// value as SPARK_AUTH_SECRET_CONF set in SparkConf
479486
val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"
487+
488+
// key used to store the spark secret in the Hadoop UGI
489+
val SECRET_LOOKUP_KEY = "sparkCookie"
480490
}

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ object SparkEnv extends Logging {
252252

253253
// Create the ActorSystem for Akka and get the port it binds to.
254254
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
255-
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager)
255+
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
256+
clientMode = !isDriver)
256257
val actorSystem: ActorSystem =
257258
if (rpcEnv.isInstanceOf[AkkaRpcEnv]) {
258259
rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
@@ -262,9 +263,11 @@ object SparkEnv extends Logging {
262263
}
263264

264265
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
266+
// In the non-driver case, the RPC env's address may be null since it may not be listening
267+
// for incoming connections.
265268
if (isDriver) {
266269
conf.set("spark.driver.port", rpcEnv.address.port.toString)
267-
} else {
270+
} else if (rpcEnv.address != null) {
268271
conf.set("spark.executor.port", rpcEnv.address.port.toString)
269272
}
270273

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 101 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import scala.collection.mutable
2727
import com.google.common.io.ByteStreams
2828
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
2929
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
30+
import org.apache.hadoop.hdfs.DistributedFileSystem
3031
import org.apache.hadoop.security.AccessControlException
3132

3233
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
@@ -52,6 +53,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
5253

5354
private val NOT_STARTED = "<Not Started>"
5455

56+
// Interval between safemode checks.
57+
private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
58+
"spark.history.fs.safemodeCheck.interval", "5s")
59+
5560
// Interval between each check for event log updates
5661
private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
5762

@@ -107,9 +112,57 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
107112
}
108113
}
109114

110-
initialize()
115+
// Conf option used for testing the initialization code.
116+
val initThread = if (!conf.getBoolean("spark.history.testing.skipInitialize", false)) {
117+
initialize(None)
118+
} else {
119+
null
120+
}
121+
122+
private[history] def initialize(errorHandler: Option[Thread.UncaughtExceptionHandler]): Thread = {
123+
if (!isFsInSafeMode()) {
124+
startPolling()
125+
return null
126+
}
127+
128+
// Cannot probe anything while the FS is in safe mode, so spawn a new thread that will wait
129+
// for the FS to leave safe mode before enabling polling. This allows the main history server
130+
// UI to be shown (so that the user can see the HDFS status).
131+
//
132+
// The synchronization in the run() method is needed because of the tests; mockito can
133+
// misbehave if the test is modifying the mocked methods while the thread is calling
134+
// them.
135+
val initThread = new Thread(new Runnable() {
136+
override def run(): Unit = {
137+
try {
138+
clock.synchronized {
139+
while (isFsInSafeMode()) {
140+
logInfo("HDFS is still in safe mode. Waiting...")
141+
val deadline = clock.getTimeMillis() +
142+
TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S)
143+
clock.waitTillTime(deadline)
144+
}
145+
}
146+
startPolling()
147+
} catch {
148+
case _: InterruptedException =>
149+
}
150+
}
151+
})
152+
initThread.setDaemon(true)
153+
initThread.setName(s"${getClass().getSimpleName()}-init")
154+
initThread.setUncaughtExceptionHandler(errorHandler.getOrElse(
155+
new Thread.UncaughtExceptionHandler() {
156+
override def uncaughtException(t: Thread, e: Throwable): Unit = {
157+
logError("Error initializing FsHistoryProvider.", e)
158+
System.exit(1)
159+
}
160+
}))
161+
initThread.start()
162+
initThread
163+
}
111164

112-
private def initialize(): Unit = {
165+
private def startPolling(): Unit = {
113166
// Validate the log directory.
114167
val path = new Path(logDir)
115168
if (!fs.exists(path)) {
@@ -170,7 +223,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
170223
}
171224
}
172225

173-
override def getConfig(): Map[String, String] = Map("Event log directory" -> logDir.toString)
226+
override def getConfig(): Map[String, String] = {
227+
val safeMode = if (isFsInSafeMode()) {
228+
Map("HDFS State" -> "In safe mode, application logs not available.")
229+
} else {
230+
Map()
231+
}
232+
Map("Event log directory" -> logDir.toString) ++ safeMode
233+
}
234+
235+
override def stop(): Unit = {
236+
if (initThread != null && initThread.isAlive()) {
237+
initThread.interrupt()
238+
initThread.join()
239+
}
240+
}
174241

175242
/**
176243
* Builds the application list based on the current contents of the log directory.
@@ -585,6 +652,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
585652
}
586653
}
587654

655+
/**
656+
* Checks whether HDFS is in safe mode. The API is slightly different between hadoop 1 and 2,
657+
* so we have to resort to ugly reflection (as usual...).
658+
*
659+
* Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons
660+
* makes it more public than not.
661+
*/
662+
private[history] def isFsInSafeMode(): Boolean = fs match {
663+
case dfs: DistributedFileSystem =>
664+
isFsInSafeMode(dfs)
665+
case _ =>
666+
false
667+
}
668+
669+
// For testing.
670+
private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = {
671+
val hadoop1Class = "org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction"
672+
val hadoop2Class = "org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction"
673+
val actionClass: Class[_] =
674+
try {
675+
getClass().getClassLoader().loadClass(hadoop2Class)
676+
} catch {
677+
case _: ClassNotFoundException =>
678+
getClass().getClassLoader().loadClass(hadoop1Class)
679+
}
680+
681+
val action = actionClass.getField("SAFEMODE_GET").get(null)
682+
val method = dfs.getClass().getMethod("setSafeMode", action.getClass())
683+
method.invoke(dfs, action).asInstanceOf[Boolean]
684+
}
685+
588686
}
589687

590688
private[history] object FsHistoryProvider {

core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
161161
info: ApplicationHistoryInfo,
162162
attempt: ApplicationAttemptInfo,
163163
isFirst: Boolean): Seq[Node] = {
164-
val uiAddress = HistoryServer.getAttemptURI(info.id, attempt.attemptId)
164+
val uiAddress = UIUtils.prependBaseUri(HistoryServer.getAttemptURI(info.id, attempt.attemptId))
165165
val startTime = UIUtils.formatDate(attempt.startTime)
166166
val endTime = if (attempt.endTime > 0) UIUtils.formatDate(attempt.endTime) else "-"
167167
val duration =
@@ -190,8 +190,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
190190
{
191191
if (renderAttemptIdColumn) {
192192
if (info.attempts.size > 1 && attempt.attemptId.isDefined) {
193-
<td><a href={HistoryServer.getAttemptURI(info.id, attempt.attemptId)}>
194-
{attempt.attemptId.get}</a></td>
193+
<td><a href={uiAddress}>{attempt.attemptId.get}</a></td>
195194
} else {
196195
<td>&nbsp;</td>
197196
}
@@ -218,9 +217,9 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
218217
}
219218

220219
private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = {
221-
"/?" + Array(
220+
UIUtils.prependBaseUri("/?" + Array(
222221
"page=" + linkPage,
223222
"showIncomplete=" + showIncomplete
224-
).mkString("&")
223+
).mkString("&"))
225224
}
226225
}

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ private[spark] class CoarseGrainedExecutorBackend(
4545
env: SparkEnv)
4646
extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
4747

48-
Utils.checkHostPort(hostPort, "Expected hostport")
49-
5048
var executor: Executor = null
5149
@volatile var driver: Option[RpcEndpointRef] = None
5250

@@ -80,9 +78,8 @@ private[spark] class CoarseGrainedExecutorBackend(
8078
}
8179

8280
override def receive: PartialFunction[Any, Unit] = {
83-
case RegisteredExecutor =>
81+
case RegisteredExecutor(hostname) =>
8482
logInfo("Successfully registered with driver")
85-
val (hostname, _) = Utils.parseHostPort(hostPort)
8683
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
8784

8885
case RegisterExecutorFailed(message) =>
@@ -163,7 +160,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
163160
hostname,
164161
port,
165162
executorConf,
166-
new SecurityManager(executorConf))
163+
new SecurityManager(executorConf),
164+
clientMode = true)
167165
val driver = fetcher.setupEndpointRefByURI(driverUrl)
168166
val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++
169167
Seq[(String, String)](("spark.app.id", appId))
@@ -188,12 +186,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
188186
val env = SparkEnv.createExecutorEnv(
189187
driverConf, executorId, hostname, port, cores, isLocal = false)
190188

191-
// SparkEnv sets spark.driver.port so it shouldn't be 0 anymore.
192-
val boundPort = env.conf.getInt("spark.executor.port", 0)
193-
assert(boundPort != 0)
194-
195-
// Start the CoarseGrainedExecutorBackend endpoint.
196-
val sparkHostPort = hostname + ":" + boundPort
189+
// SparkEnv will set spark.executor.port if the rpc env is listening for incoming
190+
// connections (e.g., if it's using akka). Otherwise, the executor is running in
191+
// client mode only, and does not accept incoming connections.
192+
val sparkHostPort = env.conf.getOption("spark.executor.port").map { port =>
193+
hostname + ":" + port
194+
}.orNull
197195
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
198196
env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
199197
workerUrl.foreach { url =>

0 commit comments

Comments
 (0)