Skip to content

Commit cd5dc5c

Browse files
committed
Merge remote-tracking branch 'upstream/master' into Kmeans-8018
2 parents 16f1b53 + 3eaed87 commit cd5dc5c

File tree

167 files changed

+3399
-1157
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

167 files changed

+3399
-1157
lines changed

LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,3 +950,4 @@ The following components are provided under the MIT License. See project link fo
950950
(MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt)
951951
(The MIT License) Mockito (org.mockito:mockito-all:1.8.5 - http://www.mockito.org)
952952
(MIT License) jquery (https://jquery.org/license/)
953+
(MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs)

R/log4j.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
log4j.rootCategory=INFO, file
2020
log4j.appender.file=org.apache.log4j.FileAppender
2121
log4j.appender.file.append=true
22-
log4j.appender.file.file=R-unit-tests.log
22+
log4j.appender.file.file=R/target/unit-tests.log
2323
log4j.appender.file.layout=org.apache.log4j.PatternLayout
2424
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
2525

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
192192
// key used to store the spark secret in the Hadoop UGI
193193
private val sparkSecretLookupKey = "sparkCookie"
194194

195-
private val authOn = sparkConf.getBoolean("spark.authenticate", false)
195+
private val authOn = sparkConf.getBoolean(SecurityManager.SPARK_AUTH_CONF, false)
196196
// keep spark.ui.acls.enable for backwards compatibility with 1.0
197197
private var aclsOn =
198198
sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))
@@ -365,10 +365,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
365365
cookie
366366
} else {
367367
// user must have set spark.authenticate.secret config
368-
sparkConf.getOption("spark.authenticate.secret") match {
368+
// For Master/Worker, auth secret is in conf; for Executors, it is in env variable
369+
sys.env.get(SecurityManager.ENV_AUTH_SECRET)
370+
.orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match {
369371
case Some(value) => value
370372
case None => throw new Exception("Error: a secret key must be specified via the " +
371-
"spark.authenticate.secret config")
373+
SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
372374
}
373375
}
374376
sCookie
@@ -449,3 +451,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
449451
override def getSaslUser(appId: String): String = getSaslUser()
450452
override def getSecretKey(appId: String): String = getSecretKey()
451453
}
454+
455+
private[spark] object SecurityManager {
456+
457+
val SPARK_AUTH_CONF: String = "spark.authenticate"
458+
val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret"
459+
// This is used to set auth secret to an executor's env variable. It should have the same
460+
// value as SPARK_AUTH_SECERET_CONF set in SparkConf
461+
val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"
462+
}

core/src/main/scala/org/apache/spark/SerializableWritable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa
4141
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
4242
in.defaultReadObject()
4343
val ow = new ObjectWritable()
44-
ow.setConf(new Configuration())
44+
ow.setConf(new Configuration(false))
4545
ow.readFields(in)
4646
t = ow.get().asInstanceOf[T]
4747
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ private[spark] object SparkConf extends Logging {
557557
def isExecutorStartupConf(name: String): Boolean = {
558558
isAkkaConf(name) ||
559559
name.startsWith("spark.akka") ||
560-
name.startsWith("spark.auth") ||
560+
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
561561
name.startsWith("spark.ssl") ||
562562
isSparkPortConf(name)
563563
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -974,7 +974,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
974974
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
975975
assertNotStopped()
976976
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
977-
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
977+
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
978978
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
979979
new HadoopRDD(
980980
this,

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path
2828

2929
import org.apache.spark.mapred.SparkHadoopMapRedUtil
3030
import org.apache.spark.rdd.HadoopRDD
31+
import org.apache.spark.util.SerializableJobConf
3132

3233
/**
3334
* Internal helper class that saves an RDD using a Hadoop OutputFormat.
@@ -42,7 +43,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
4243
with Serializable {
4344

4445
private val now = new Date()
45-
private val conf = new SerializableWritable(jobConf)
46+
private val conf = new SerializableJobConf(jobConf)
4647

4748
private var jobID = 0
4849
private var splitID = 0

core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ package org.apache.spark.api.python
1919

2020
import org.apache.spark.broadcast.Broadcast
2121
import org.apache.spark.rdd.RDD
22-
import org.apache.spark.util.Utils
23-
import org.apache.spark.{Logging, SerializableWritable, SparkException}
22+
import org.apache.spark.util.{SerializableConfiguration, Utils}
23+
import org.apache.spark.{Logging, SparkException}
2424
import org.apache.hadoop.conf.Configuration
2525
import org.apache.hadoop.io._
2626
import scala.util.{Failure, Success, Try}
@@ -61,7 +61,7 @@ private[python] object Converter extends Logging {
6161
* Other objects are passed through without conversion.
6262
*/
6363
private[python] class WritableToJavaConverter(
64-
conf: Broadcast[SerializableWritable[Configuration]]) extends Converter[Any, Any] {
64+
conf: Broadcast[SerializableConfiguration]) extends Converter[Any, Any] {
6565

6666
/**
6767
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
3636
import org.apache.spark.broadcast.Broadcast
3737
import org.apache.spark.input.PortableDataStream
3838
import org.apache.spark.rdd.RDD
39-
import org.apache.spark.util.Utils
39+
import org.apache.spark.util.{SerializableConfiguration, Utils}
4040

4141
import scala.util.control.NonFatal
4242

@@ -445,7 +445,7 @@ private[spark] object PythonRDD extends Logging {
445445
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
446446
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
447447
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
448-
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
448+
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration()))
449449
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
450450
new WritableToJavaConverter(confBroadcasted))
451451
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -471,7 +471,7 @@ private[spark] object PythonRDD extends Logging {
471471
val rdd =
472472
newAPIHadoopRDDFromClassNames[K, V, F](sc,
473473
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
474-
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
474+
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
475475
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
476476
new WritableToJavaConverter(confBroadcasted))
477477
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -497,7 +497,7 @@ private[spark] object PythonRDD extends Logging {
497497
val rdd =
498498
newAPIHadoopRDDFromClassNames[K, V, F](sc,
499499
None, inputFormatClass, keyClass, valueClass, conf)
500-
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
500+
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
501501
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
502502
new WritableToJavaConverter(confBroadcasted))
503503
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -540,7 +540,7 @@ private[spark] object PythonRDD extends Logging {
540540
val rdd =
541541
hadoopRDDFromClassNames[K, V, F](sc,
542542
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
543-
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
543+
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
544544
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
545545
new WritableToJavaConverter(confBroadcasted))
546546
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -566,7 +566,7 @@ private[spark] object PythonRDD extends Logging {
566566
val rdd =
567567
hadoopRDDFromClassNames[K, V, F](sc,
568568
None, inputFormatClass, keyClass, valueClass, conf)
569-
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
569+
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
570570
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
571571
new WritableToJavaConverter(confBroadcasted))
572572
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ import org.apache.ivy.core.resolve.ResolveOptions
3535
import org.apache.ivy.core.retrieve.RetrieveOptions
3636
import org.apache.ivy.core.settings.IvySettings
3737
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
38-
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
38+
import org.apache.ivy.plugins.repository.file.FileRepository
39+
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
3940
import org.apache.spark.SPARK_VERSION
4041
import org.apache.spark.deploy.rest._
4142
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
@@ -735,8 +736,14 @@ private[spark] object SparkSubmitUtils {
735736
}
736737

737738
/** Path of the local Maven cache. */
738-
private[spark] def m2Path: File = new File(System.getProperty("user.home"),
739-
".m2" + File.separator + "repository" + File.separator)
739+
private[spark] def m2Path: File = {
740+
if (Utils.isTesting) {
741+
// test builds delete the maven cache, and this can cause flakiness
742+
new File("dummy", ".m2" + File.separator + "repository")
743+
} else {
744+
new File(System.getProperty("user.home"), ".m2" + File.separator + "repository")
745+
}
746+
}
740747

741748
/**
742749
* Extracts maven coordinates from a comma-delimited string
@@ -756,12 +763,13 @@ private[spark] object SparkSubmitUtils {
756763
localM2.setName("local-m2-cache")
757764
cr.add(localM2)
758765

759-
val localIvy = new IBiblioResolver
760-
localIvy.setRoot(new File(ivySettings.getDefaultIvyUserDir,
761-
"local" + File.separator).toURI.toString)
766+
val localIvy = new FileSystemResolver
767+
val localIvyRoot = new File(ivySettings.getDefaultIvyUserDir, "local")
768+
localIvy.setLocal(true)
769+
localIvy.setRepository(new FileRepository(localIvyRoot))
762770
val ivyPattern = Seq("[organisation]", "[module]", "[revision]", "[type]s",
763771
"[artifact](-[classifier]).[ext]").mkString(File.separator)
764-
localIvy.setPattern(ivyPattern)
772+
localIvy.addIvyPattern(localIvyRoot.getAbsolutePath + File.separator + ivyPattern)
765773
localIvy.setName("local-ivy-cache")
766774
cr.add(localIvy)
767775

0 commit comments

Comments
 (0)