Skip to content

Commit a4f9ed2

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into jdbc-schema-rdd
2 parents dda49bc + 09f7e45 commit a4f9ed2

File tree

81 files changed

+2032
-682
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

81 files changed

+2032
-682
lines changed

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@
150150
<dependency>
151151
<groupId>org.json4s</groupId>
152152
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
153-
<version>3.2.6</version>
153+
<version>3.2.10</version>
154154
</dependency>
155155
<dependency>
156156
<groupId>colt</groupId>

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,21 @@ import org.apache.spark.serializer.JavaSerializer
3636
*
3737
* @param initialValue initial value of accumulator
3838
* @param param helper object defining how to add elements of type `R` and `T`
39+
* @param name human-readable name for use in Spark's web UI
3940
* @tparam R the full accumulated data (result type)
4041
* @tparam T partial data that can be added in
4142
*/
4243
class Accumulable[R, T] (
4344
@transient initialValue: R,
44-
param: AccumulableParam[R, T])
45+
param: AccumulableParam[R, T],
46+
val name: Option[String])
4547
extends Serializable {
4648

47-
val id = Accumulators.newId
49+
def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
50+
this(initialValue, param, None)
51+
52+
val id: Long = Accumulators.newId
53+
4854
@transient private var value_ = initialValue // Current value on master
4955
val zero = param.zero(initialValue) // Zero value to be passed to workers
5056
private var deserialized = false
@@ -219,8 +225,10 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
219225
* @param param helper object defining how to add elements of type `T`
220226
* @tparam T result type
221227
*/
222-
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T])
223-
extends Accumulable[T,T](initialValue, param)
228+
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
229+
extends Accumulable[T,T](initialValue, param, name) {
230+
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
231+
}
224232

225233
/**
226234
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
@@ -281,4 +289,7 @@ private object Accumulators {
281289
}
282290
}
283291
}
292+
293+
def stringifyPartialValue(partialValue: Any) = "%s".format(partialValue)
294+
def stringifyValue(value: Any) = "%s".format(value)
284295
}

core/src/main/scala/org/apache/spark/HttpFileServer.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ import com.google.common.io.Files
2323

2424
import org.apache.spark.util.Utils
2525

26-
private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging {
26+
private[spark] class HttpFileServer(
27+
securityManager: SecurityManager,
28+
requestedPort: Int = 0)
29+
extends Logging {
2730

2831
var baseDir : File = null
2932
var fileDir : File = null
@@ -38,7 +41,7 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
3841
fileDir.mkdir()
3942
jarDir.mkdir()
4043
logInfo("HTTP File server directory is " + baseDir)
41-
httpServer = new HttpServer(baseDir, securityManager)
44+
httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server")
4245
httpServer.start()
4346
serverUri = httpServer.uri
4447
logDebug("HTTP file server started at: " + serverUri)

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.File
2121

2222
import org.eclipse.jetty.util.security.{Constraint, Password}
2323
import org.eclipse.jetty.security.authentication.DigestAuthenticator
24-
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
24+
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
2525

2626
import org.eclipse.jetty.server.Server
2727
import org.eclipse.jetty.server.bio.SocketConnector
@@ -41,48 +41,68 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
4141
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
4242
* around a Jetty server.
4343
*/
44-
private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager)
45-
extends Logging {
44+
private[spark] class HttpServer(
45+
resourceBase: File,
46+
securityManager: SecurityManager,
47+
requestedPort: Int = 0,
48+
serverName: String = "HTTP server")
49+
extends Logging {
50+
4651
private var server: Server = null
47-
private var port: Int = -1
52+
private var port: Int = requestedPort
4853

4954
def start() {
5055
if (server != null) {
5156
throw new ServerStateException("Server is already started")
5257
} else {
5358
logInfo("Starting HTTP Server")
54-
server = new Server()
55-
val connector = new SocketConnector
56-
connector.setMaxIdleTime(60*1000)
57-
connector.setSoLingerTime(-1)
58-
connector.setPort(0)
59-
server.addConnector(connector)
60-
61-
val threadPool = new QueuedThreadPool
62-
threadPool.setDaemon(true)
63-
server.setThreadPool(threadPool)
64-
val resHandler = new ResourceHandler
65-
resHandler.setResourceBase(resourceBase.getAbsolutePath)
66-
67-
val handlerList = new HandlerList
68-
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
69-
70-
if (securityManager.isAuthenticationEnabled()) {
71-
logDebug("HttpServer is using security")
72-
val sh = setupSecurityHandler(securityManager)
73-
// make sure we go through security handler to get resources
74-
sh.setHandler(handlerList)
75-
server.setHandler(sh)
76-
} else {
77-
logDebug("HttpServer is not using security")
78-
server.setHandler(handlerList)
79-
}
80-
81-
server.start()
82-
port = server.getConnectors()(0).getLocalPort()
59+
val (actualServer, actualPort) =
60+
Utils.startServiceOnPort[Server](requestedPort, doStart, serverName)
61+
server = actualServer
62+
port = actualPort
8363
}
8464
}
8565

66+
/**
67+
* Actually start the HTTP server on the given port.
68+
*
69+
* Note that this is only best effort in the sense that we may end up binding to a nearby port
70+
* in the event of port collision. Return the bound server and the actual port used.
71+
*/
72+
private def doStart(startPort: Int): (Server, Int) = {
73+
val server = new Server()
74+
val connector = new SocketConnector
75+
connector.setMaxIdleTime(60 * 1000)
76+
connector.setSoLingerTime(-1)
77+
connector.setPort(startPort)
78+
server.addConnector(connector)
79+
80+
val threadPool = new QueuedThreadPool
81+
threadPool.setDaemon(true)
82+
server.setThreadPool(threadPool)
83+
val resHandler = new ResourceHandler
84+
resHandler.setResourceBase(resourceBase.getAbsolutePath)
85+
86+
val handlerList = new HandlerList
87+
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
88+
89+
if (securityManager.isAuthenticationEnabled()) {
90+
logDebug("HttpServer is using security")
91+
val sh = setupSecurityHandler(securityManager)
92+
// make sure we go through security handler to get resources
93+
sh.setHandler(handlerList)
94+
server.setHandler(sh)
95+
} else {
96+
logDebug("HttpServer is not using security")
97+
server.setHandler(handlerList)
98+
}
99+
100+
server.start()
101+
val actualPort = server.getConnectors()(0).getLocalPort
102+
103+
(server, actualPort)
104+
}
105+
86106
/**
87107
* Setup Jetty to the HashLoginService using a single user with our
88108
* shared secret. Configure it to use DIGEST-MD5 authentication so that the password
@@ -134,7 +154,7 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan
134154
if (server == null) {
135155
throw new ServerStateException("Server is not started")
136156
} else {
137-
return "http://" + Utils.localIpAddress + ":" + port
157+
"http://" + Utils.localIpAddress + ":" + port
138158
}
139159
}
140160
}

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 90 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,19 @@ import org.apache.spark.deploy.SparkHadoopUtil
4141
* secure the UI if it has data that other users should not be allowed to see. The javax
4242
* servlet filter specified by the user can authenticate the user and then once the user
4343
* is logged in, Spark can compare that user versus the view acls to make sure they are
44-
* authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls'
44+
* authorized to view the UI. The configs 'spark.acls.enable' and 'spark.ui.view.acls'
4545
* control the behavior of the acls. Note that the person who started the application
4646
* always has view access to the UI.
4747
*
48+
* Spark has a set of modify acls (`spark.modify.acls`) that controls which users have permission
49+
* to modify a single application. This would include things like killing the application. By
50+
* default the person who started the application has modify access. For modify access through
51+
* the UI, you must have a filter that does authentication in place for the modify acls to work
52+
* properly.
53+
*
54+
* Spark also has a set of admin acls (`spark.admin.acls`) which is a set of users/administrators
55+
* who always have permission to view or modify the Spark application.
56+
*
4857
* Spark does not currently support encryption after authentication.
4958
*
5059
* At this point spark has multiple communication protocols that need to be secured and
@@ -137,18 +146,32 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
137146
private val sparkSecretLookupKey = "sparkCookie"
138147

139148
private val authOn = sparkConf.getBoolean("spark.authenticate", false)
140-
private var uiAclsOn = sparkConf.getBoolean("spark.ui.acls.enable", false)
149+
// keep spark.ui.acls.enable for backwards compatibility with 1.0
150+
private var aclsOn = sparkConf.getOption("spark.acls.enable").getOrElse(
151+
sparkConf.get("spark.ui.acls.enable", "false")).toBoolean
152+
153+
// admin acls should be set before view or modify acls
154+
private var adminAcls: Set[String] =
155+
stringToSet(sparkConf.get("spark.admin.acls", ""))
141156

142157
private var viewAcls: Set[String] = _
158+
159+
// list of users who have permission to modify the application. This should
160+
// apply to both UI and CLI for things like killing the application.
161+
private var modifyAcls: Set[String] = _
162+
143163
// always add the current user and SPARK_USER to the viewAcls
144-
private val defaultAclUsers = Seq[String](System.getProperty("user.name", ""),
164+
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
145165
Option(System.getenv("SPARK_USER")).getOrElse(""))
166+
146167
setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
168+
setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))
147169

148170
private val secretKey = generateSecretKey()
149171
logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
150-
"; ui acls " + (if (uiAclsOn) "enabled" else "disabled") +
151-
"; users with view permissions: " + viewAcls.toString())
172+
"; ui acls " + (if (aclsOn) "enabled" else "disabled") +
173+
"; users with view permissions: " + viewAcls.toString() +
174+
"; users with modify permissions: " + modifyAcls.toString())
152175

153176
// Set our own authenticator to properly negotiate user/password for HTTP connections.
154177
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
@@ -169,18 +192,51 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
169192
)
170193
}
171194

172-
private[spark] def setViewAcls(defaultUsers: Seq[String], allowedUsers: String) {
173-
viewAcls = (defaultUsers ++ allowedUsers.split(',')).map(_.trim()).filter(!_.isEmpty).toSet
195+
/**
196+
* Split a comma separated String, filter out any empty items, and return a Set of strings
197+
*/
198+
private def stringToSet(list: String): Set[String] = {
199+
list.split(',').map(_.trim).filter(!_.isEmpty).toSet
200+
}
201+
202+
/**
203+
* Admin acls should be set before the view or modify acls. If you modify the admin
204+
* acls you should also set the view and modify acls again to pick up the changes.
205+
*/
206+
def setViewAcls(defaultUsers: Set[String], allowedUsers: String) {
207+
viewAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
174208
logInfo("Changing view acls to: " + viewAcls.mkString(","))
175209
}
176210

177-
private[spark] def setViewAcls(defaultUser: String, allowedUsers: String) {
178-
setViewAcls(Seq[String](defaultUser), allowedUsers)
211+
def setViewAcls(defaultUser: String, allowedUsers: String) {
212+
setViewAcls(Set[String](defaultUser), allowedUsers)
213+
}
214+
215+
def getViewAcls: String = viewAcls.mkString(",")
216+
217+
/**
218+
* Admin acls should be set before the view or modify acls. If you modify the admin
219+
* acls you should also set the view and modify acls again to pick up the changes.
220+
*/
221+
def setModifyAcls(defaultUsers: Set[String], allowedUsers: String) {
222+
modifyAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
223+
logInfo("Changing modify acls to: " + modifyAcls.mkString(","))
224+
}
225+
226+
def getModifyAcls: String = modifyAcls.mkString(",")
227+
228+
/**
229+
* Admin acls should be set before the view or modify acls. If you modify the admin
230+
* acls you should also set the view and modify acls again to pick up the changes.
231+
*/
232+
def setAdminAcls(adminUsers: String) {
233+
adminAcls = stringToSet(adminUsers)
234+
logInfo("Changing admin acls to: " + adminAcls.mkString(","))
179235
}
180236

181-
private[spark] def setUIAcls(aclSetting: Boolean) {
182-
uiAclsOn = aclSetting
183-
logInfo("Changing acls enabled to: " + uiAclsOn)
237+
def setAcls(aclSetting: Boolean) {
238+
aclsOn = aclSetting
239+
logInfo("Changing acls enabled to: " + aclsOn)
184240
}
185241

186242
/**
@@ -224,22 +280,39 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
224280
* Check to see if Acls for the UI are enabled
225281
* @return true if UI authentication is enabled, otherwise false
226282
*/
227-
def uiAclsEnabled(): Boolean = uiAclsOn
283+
def aclsEnabled(): Boolean = aclsOn
228284

229285
/**
230286
* Checks the given user against the view acl list to see if they have
231-
* authorization to view the UI. If the UI acls must are disabled
232-
* via spark.ui.acls.enable, all users have view access.
287+
* authorization to view the UI. If the UI acls are disabled
288+
* via spark.acls.enable, all users have view access. If the user is null
289+
* it is assumed authentication is off and all users have access.
233290
*
234291
* @param user to see if is authorized
235292
* @return true is the user has permission, otherwise false
236293
*/
237294
def checkUIViewPermissions(user: String): Boolean = {
238-
logDebug("user=" + user + " uiAclsEnabled=" + uiAclsEnabled() + " viewAcls=" +
295+
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " viewAcls=" +
239296
viewAcls.mkString(","))
240-
if (uiAclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true
297+
if (aclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true
241298
}
242299

300+
/**
301+
* Checks the given user against the modify acl list to see if they have
302+
* authorization to modify the application. If the UI acls are disabled
303+
* via spark.acls.enable, all users have modify access. If the user is null
304+
* it is assumed authentication isn't turned on and all users have access.
305+
*
306+
* @param user to see if is authorized
307+
* @return true is the user has permission, otherwise false
308+
*/
309+
def checkModifyPermissions(user: String): Boolean = {
310+
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " modifyAcls=" +
311+
modifyAcls.mkString(","))
312+
if (aclsEnabled() && (user != null) && (!modifyAcls.contains(user))) false else true
313+
}
314+
315+
243316
/**
244317
* Check to see if authentication for the Spark communication protocols is enabled
245318
* @return true if authentication is enabled, otherwise false

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,14 @@ private[spark] object SparkConf {
323323
* the scheduler, while the rest of the spark configs can be inherited from the driver later.
324324
*/
325325
def isExecutorStartupConf(name: String): Boolean = {
326-
isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth")
326+
isAkkaConf(name) ||
327+
name.startsWith("spark.akka") ||
328+
name.startsWith("spark.auth") ||
329+
isSparkPortConf(name)
327330
}
331+
332+
/**
333+
* Return whether the given config is a Spark port config.
334+
*/
335+
def isSparkPortConf(name: String): Boolean = name.startsWith("spark.") && name.endsWith(".port")
328336
}

0 commit comments

Comments
 (0)