Skip to content

Commit bcf36cb

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into SPARK-1609
2 parents 1185605 + 80429f3 commit bcf36cb

File tree

64 files changed

+651
-192
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+651
-192
lines changed

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,11 @@ class Accumulable[R, T] (
104104
* Set the accumulator's value; only allowed on master.
105105
*/
106106
def value_= (newValue: R) {
107-
if (!deserialized) value_ = newValue
108-
else throw new UnsupportedOperationException("Can't assign accumulator value in task")
107+
if (!deserialized) {
108+
value_ = newValue
109+
} else {
110+
throw new UnsupportedOperationException("Can't assign accumulator value in task")
111+
}
109112
}
110113

111114
/**

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ package org.apache.spark
1919

2020
import java.net.{Authenticator, PasswordAuthentication}
2121

22-
import scala.collection.mutable.ArrayBuffer
23-
2422
import org.apache.hadoop.io.Text
2523

2624
import org.apache.spark.deploy.SparkHadoopUtil
@@ -139,13 +137,13 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
139137
private val sparkSecretLookupKey = "sparkCookie"
140138

141139
private val authOn = sparkConf.getBoolean("spark.authenticate", false)
142-
private val uiAclsOn = sparkConf.getBoolean("spark.ui.acls.enable", false)
140+
private var uiAclsOn = sparkConf.getBoolean("spark.ui.acls.enable", false)
143141

142+
private var viewAcls: Set[String] = _
144143
// always add the current user and SPARK_USER to the viewAcls
145-
private val aclUsers = ArrayBuffer[String](System.getProperty("user.name", ""),
144+
private val defaultAclUsers = Seq[String](System.getProperty("user.name", ""),
146145
Option(System.getenv("SPARK_USER")).getOrElse(""))
147-
aclUsers ++= sparkConf.get("spark.ui.view.acls", "").split(',')
148-
private val viewAcls = aclUsers.map(_.trim()).filter(!_.isEmpty).toSet
146+
setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
149147

150148
private val secretKey = generateSecretKey()
151149
logInfo("SecurityManager, is authentication enabled: " + authOn +
@@ -170,6 +168,20 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
170168
)
171169
}
172170

171+
private[spark] def setViewAcls(defaultUsers: Seq[String], allowedUsers: String) {
172+
viewAcls = (defaultUsers ++ allowedUsers.split(',')).map(_.trim()).filter(!_.isEmpty).toSet
173+
logInfo("Changing view acls to: " + viewAcls.mkString(","))
174+
}
175+
176+
private[spark] def setViewAcls(defaultUser: String, allowedUsers: String) {
177+
setViewAcls(Seq[String](defaultUser), allowedUsers)
178+
}
179+
180+
private[spark] def setUIAcls(aclSetting: Boolean) {
181+
uiAclsOn = aclSetting
182+
logInfo("Changing acls enabled to: " + uiAclsOn)
183+
}
184+
173185
/**
174186
* Generates or looks up the secret key.
175187
*
@@ -222,6 +234,8 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
222234
* @return true is the user has permission, otherwise false
223235
*/
224236
def checkUIViewPermissions(user: String): Boolean = {
237+
logDebug("user=" + user + " uiAclsEnabled=" + uiAclsEnabled() + " viewAcls=" +
238+
viewAcls.mkString(","))
225239
if (uiAclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true
226240
}
227241

core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.partial.{BoundedDouble, PartialResult}
3030
import org.apache.spark.rdd.RDD
3131
import org.apache.spark.storage.StorageLevel
3232
import org.apache.spark.util.StatCounter
33+
import org.apache.spark.util.Utils
3334

3435
class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, JavaDoubleRDD] {
3536

@@ -133,7 +134,13 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja
133134
/**
134135
* Return a sampled subset of this RDD.
135136
*/
136-
def sample(withReplacement: Boolean, fraction: JDouble, seed: Int): JavaDoubleRDD =
137+
def sample(withReplacement: Boolean, fraction: JDouble): JavaDoubleRDD =
138+
sample(withReplacement, fraction, Utils.random.nextLong)
139+
140+
/**
141+
* Return a sampled subset of this RDD.
142+
*/
143+
def sample(withReplacement: Boolean, fraction: JDouble, seed: Long): JavaDoubleRDD =
137144
fromRDD(srdd.sample(withReplacement, fraction, seed))
138145

139146
/**

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
3939
import org.apache.spark.partial.{BoundedDouble, PartialResult}
4040
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
4141
import org.apache.spark.storage.StorageLevel
42+
import org.apache.spark.util.Utils
4243

4344
class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
4445
(implicit val kClassTag: ClassTag[K], implicit val vClassTag: ClassTag[V])
@@ -119,7 +120,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
119120
/**
120121
* Return a sampled subset of this RDD.
121122
*/
122-
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaPairRDD[K, V] =
123+
def sample(withReplacement: Boolean, fraction: Double): JavaPairRDD[K, V] =
124+
sample(withReplacement, fraction, Utils.random.nextLong)
125+
126+
/**
127+
* Return a sampled subset of this RDD.
128+
*/
129+
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaPairRDD[K, V] =
123130
new JavaPairRDD[K, V](rdd.sample(withReplacement, fraction, seed))
124131

125132
/**

core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark._
2424
import org.apache.spark.api.java.function.{Function => JFunction}
2525
import org.apache.spark.rdd.RDD
2626
import org.apache.spark.storage.StorageLevel
27+
import org.apache.spark.util.Utils
2728

2829
class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
2930
extends JavaRDDLike[T, JavaRDD[T]] {
@@ -98,7 +99,13 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
9899
/**
99100
* Return a sampled subset of this RDD.
100101
*/
101-
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] =
102+
def sample(withReplacement: Boolean, fraction: Double): JavaRDD[T] =
103+
sample(withReplacement, fraction, Utils.random.nextLong)
104+
105+
/**
106+
* Return a sampled subset of this RDD.
107+
*/
108+
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T] =
102109
wrapRDD(rdd.sample(withReplacement, fraction, seed))
103110

104111
/**

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
3434
import org.apache.spark.partial.{BoundedDouble, PartialResult}
3535
import org.apache.spark.rdd.RDD
3636
import org.apache.spark.storage.StorageLevel
37+
import org.apache.spark.util.Utils
3738

3839
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
3940
def wrapRDD(rdd: RDD[T]): This
@@ -394,7 +395,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
394395
new java.util.ArrayList(arr)
395396
}
396397

397-
def takeSample(withReplacement: Boolean, num: Int, seed: Int): JList[T] = {
398+
def takeSample(withReplacement: Boolean, num: Int): JList[T] =
399+
takeSample(withReplacement, num, Utils.random.nextLong)
400+
401+
def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T] = {
398402
import scala.collection.JavaConversions._
399403
val arr: java.util.Collection[T] = rdd.takeSample(withReplacement, num, seed).toSeq
400404
new java.util.ArrayList(arr)

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ class SparkHadoopUtil {
7575

7676
def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }
7777

78+
def loginUserFromKeytab(principalName: String, keytabFilename: String) {
79+
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
80+
}
81+
7882
}
7983

8084
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
6666
if (k.startsWith("spark")) {
6767
defaultProperties(k) = v
6868
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
69-
}
70-
else {
69+
} else {
7170
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
7271
}
7372
}

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.collection.mutable
2222
import org.apache.hadoop.fs.{FileStatus, Path}
2323

2424
import org.apache.spark.{Logging, SecurityManager, SparkConf}
25+
import org.apache.spark.deploy.SparkHadoopUtil
2526
import org.apache.spark.scheduler._
2627
import org.apache.spark.ui.{WebUI, SparkUI}
2728
import org.apache.spark.ui.JettyUtils._
@@ -167,17 +168,21 @@ class HistoryServer(
167168
* directory. If this file exists, the associated application is regarded to be completed, in
168169
* which case the server proceeds to render the SparkUI. Otherwise, the server does nothing.
169170
*/
170-
private def renderSparkUI(logDir: FileStatus, logInfo: EventLoggingInfo) {
171+
private def renderSparkUI(logDir: FileStatus, elogInfo: EventLoggingInfo) {
171172
val path = logDir.getPath
172173
val appId = path.getName
173-
val replayBus = new ReplayListenerBus(logInfo.logPaths, fileSystem, logInfo.compressionCodec)
174+
val replayBus = new ReplayListenerBus(elogInfo.logPaths, fileSystem, elogInfo.compressionCodec)
174175
val appListener = new ApplicationEventListener
175176
replayBus.addListener(appListener)
176-
val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId)
177+
val appConf = conf.clone()
178+
val appSecManager = new SecurityManager(appConf)
179+
val ui = new SparkUI(conf, appSecManager, replayBus, appId, "/history/" + appId)
177180

178181
// Do not call ui.bind() to avoid creating a new server for each application
179182
replayBus.replay()
180183
if (appListener.applicationStarted) {
184+
appSecManager.setUIAcls(HISTORY_UI_ACLS_ENABLED)
185+
appSecManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
181186
attachSparkUI(ui)
182187
val appName = appListener.appName
183188
val sparkUser = appListener.sparkUser
@@ -201,6 +206,7 @@ class HistoryServer(
201206
private def attachSparkUI(ui: SparkUI) {
202207
assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs")
203208
ui.getHandlers.foreach(attachHandler)
209+
addFilters(ui.getHandlers, conf)
204210
}
205211

206212
/** Detach a reconstructed UI from this server. Only valid after bind(). */
@@ -254,9 +260,13 @@ object HistoryServer {
254260
// The port to which the web UI is bound
255261
val WEB_UI_PORT = conf.getInt("spark.history.ui.port", 18080)
256262

263+
// set whether to enable or disable view acls for all applications
264+
val HISTORY_UI_ACLS_ENABLED = conf.getBoolean("spark.history.ui.acls.enable", false)
265+
257266
val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
258267

259268
def main(argStrings: Array[String]) {
269+
initSecurity()
260270
val args = new HistoryServerArguments(argStrings)
261271
val securityManager = new SecurityManager(conf)
262272
val server = new HistoryServer(args.logDir, securityManager, conf)
@@ -266,6 +276,20 @@ object HistoryServer {
266276
while(true) { Thread.sleep(Int.MaxValue) }
267277
server.stop()
268278
}
279+
280+
def initSecurity() {
281+
// If we are accessing HDFS and it has security enabled (Kerberos), we have to login
282+
// from a keytab file so that we can access HDFS beyond the kerberos ticket expiration.
283+
// As long as it is using Hadoop rpc (hdfs://), a relogin will automatically
284+
// occur from the keytab.
285+
if (conf.getBoolean("spark.history.kerberos.enabled", false)) {
286+
// if you have enabled kerberos the following 2 params must be set
287+
val principalName = conf.get("spark.history.kerberos.principal")
288+
val keytabFilename = conf.get("spark.history.kerberos.keytab")
289+
SparkHadoopUtil.get.loginUserFromKeytab(principalName, keytabFilename)
290+
}
291+
}
292+
269293
}
270294

271295

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,7 @@ private[spark] class Master(
237237
if (waitingDrivers.contains(d)) {
238238
waitingDrivers -= d
239239
self ! DriverStateChanged(driverId, DriverState.KILLED, None)
240-
}
241-
else {
240+
} else {
242241
// We just notify the worker to kill the driver here. The final bookkeeping occurs
243242
// on the return path when the worker submits a state change back to the master
244243
// to notify it that the driver was successfully killed.

0 commit comments

Comments
 (0)