Skip to content

Commit e014f53

Browse files
committed
Merge branch 'master' into sqlconf
2 parents 93dad8e + 658814c commit e014f53

File tree

177 files changed

+2401
-1839
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

177 files changed

+2401
-1839
lines changed

R/log4j.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
log4j.rootCategory=INFO, file
2020
log4j.appender.file=org.apache.log4j.FileAppender
2121
log4j.appender.file.append=true
22-
log4j.appender.file.file=R-unit-tests.log
22+
log4j.appender.file.file=R/target/unit-tests.log
2323
log4j.appender.file.layout=org.apache.log4j.PatternLayout
2424
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
2525

build/mvn

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,14 @@ install_app() {
6969

7070
# Install maven under the build/ folder
7171
install_mvn() {
72+
local MVN_VERSION="3.3.3"
73+
7274
install_app \
73-
"http://archive.apache.org/dist/maven/maven-3/3.2.5/binaries" \
74-
"apache-maven-3.2.5-bin.tar.gz" \
75-
"apache-maven-3.2.5/bin/mvn"
76-
MVN_BIN="${_DIR}/apache-maven-3.2.5/bin/mvn"
75+
"http://archive.apache.org/dist/maven/maven-3/${MVN_VERSION}/binaries" \
76+
"apache-maven-${MVN_VERSION}-bin.tar.gz" \
77+
"apache-maven-${MVN_VERSION}/bin/mvn"
78+
79+
MVN_BIN="${_DIR}/apache-maven-${MVN_VERSION}/bin/mvn"
7780
}
7881

7982
# Install zinc under the build/ folder
@@ -105,28 +108,16 @@ install_scala() {
105108
SCALA_LIBRARY="$(cd "$(dirname ${scala_bin})/../lib" && pwd)/scala-library.jar"
106109
}
107110

108-
# Determines if a given application is already installed. If not, will attempt
109-
# to install
110-
## Arg1 - application name
111-
## Arg2 - Alternate path to local install under build/ dir
112-
check_and_install_app() {
113-
# create the local environment variable in uppercase
114-
local app_bin="`echo $1 | awk '{print toupper(\$0)}'`_BIN"
115-
# some black magic to set the generated app variable (i.e. MVN_BIN) into the
116-
# environment
117-
eval "${app_bin}=`which $1 2>/dev/null`"
118-
119-
if [ -z "`which $1 2>/dev/null`" ]; then
120-
install_$1
121-
fi
122-
}
123-
124111
# Setup healthy defaults for the Zinc port if none were provided from
125112
# the environment
126113
ZINC_PORT=${ZINC_PORT:-"3030"}
127114

128-
# Check and install all applications necessary to build Spark
129-
check_and_install_app "mvn"
115+
# Install Maven if necessary
116+
MVN_BIN="$(command -v mvn)"
117+
118+
if [ ! "$MVN_BIN" ]; then
119+
install_mvn
120+
fi
130121

131122
# Install the proper version of Scala and Zinc for the build
132123
install_zinc

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ function renderDagViz(forJob) {
140140

141141
// Find cached RDDs and mark them as such
142142
metadataContainer().selectAll(".cached-rdd").each(function(v) {
143-
var nodeId = VizConstants.nodePrefix + d3.select(this).text();
143+
var rddId = d3.select(this).text().trim();
144+
var nodeId = VizConstants.nodePrefix + rddId;
144145
svg.selectAll("g." + nodeId).classed("cached", true);
145146
});
146147

@@ -150,7 +151,7 @@ function renderDagViz(forJob) {
150151
/* Render the RDD DAG visualization on the stage page. */
151152
function renderDagVizForStage(svgContainer) {
152153
var metadata = metadataContainer().select(".stage-metadata");
153-
var dot = metadata.select(".dot-file").text();
154+
var dot = metadata.select(".dot-file").text().trim();
154155
var containerId = VizConstants.graphPrefix + metadata.attr("stage-id");
155156
var container = svgContainer.append("g").attr("id", containerId);
156157
renderDot(dot, container, false);

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
192192
// key used to store the spark secret in the Hadoop UGI
193193
private val sparkSecretLookupKey = "sparkCookie"
194194

195-
private val authOn = sparkConf.getBoolean("spark.authenticate", false)
195+
private val authOn = sparkConf.getBoolean(SecurityManager.SPARK_AUTH_CONF, false)
196196
// keep spark.ui.acls.enable for backwards compatibility with 1.0
197197
private var aclsOn =
198198
sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))
@@ -365,10 +365,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
365365
cookie
366366
} else {
367367
// user must have set spark.authenticate.secret config
368-
sparkConf.getOption("spark.authenticate.secret") match {
368+
// For Master/Worker, auth secret is in conf; for Executors, it is in env variable
369+
sys.env.get(SecurityManager.ENV_AUTH_SECRET)
370+
.orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match {
369371
case Some(value) => value
370372
case None => throw new Exception("Error: a secret key must be specified via the " +
371-
"spark.authenticate.secret config")
373+
SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
372374
}
373375
}
374376
sCookie
@@ -449,3 +451,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
449451
override def getSaslUser(appId: String): String = getSaslUser()
450452
override def getSecretKey(appId: String): String = getSecretKey()
451453
}
454+
455+
private[spark] object SecurityManager {
456+
457+
val SPARK_AUTH_CONF: String = "spark.authenticate"
458+
val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret"
459+
// This is used to set auth secret to an executor's env variable. It should have the same
460+
// value as SPARK_AUTH_SECERET_CONF set in SparkConf
461+
val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"
462+
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ private[spark] object SparkConf extends Logging {
557557
def isExecutorStartupConf(name: String): Boolean = {
558558
isAkkaConf(name) ||
559559
name.startsWith("spark.akka") ||
560-
name.startsWith("spark.auth") ||
560+
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
561561
name.startsWith("spark.ssl") ||
562562
isSparkPortConf(name)
563563
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -832,11 +832,7 @@ private[spark] object SparkSubmitUtils {
832832
ivyConfName: String,
833833
md: DefaultModuleDescriptor): Unit = {
834834
// Add scala exclusion rule
835-
val scalaArtifacts = new ArtifactId(new ModuleId("*", "scala-library"), "*", "*", "*")
836-
val scalaDependencyExcludeRule =
837-
new DefaultExcludeRule(scalaArtifacts, ivySettings.getMatcher("glob"), null)
838-
scalaDependencyExcludeRule.addConfiguration(ivyConfName)
839-
md.addExcludeRule(scalaDependencyExcludeRule)
835+
md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, ivyConfName))
840836

841837
// We need to specify each component explicitly, otherwise we miss spark-streaming-kafka and
842838
// other spark-streaming utility components. Underscore is there to differentiate between
@@ -845,13 +841,8 @@ private[spark] object SparkSubmitUtils {
845841
"sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", "network-yarn_")
846842

847843
components.foreach { comp =>
848-
val sparkArtifacts =
849-
new ArtifactId(new ModuleId("org.apache.spark", s"spark-$comp*"), "*", "*", "*")
850-
val sparkDependencyExcludeRule =
851-
new DefaultExcludeRule(sparkArtifacts, ivySettings.getMatcher("glob"), null)
852-
sparkDependencyExcludeRule.addConfiguration(ivyConfName)
853-
854-
md.addExcludeRule(sparkDependencyExcludeRule)
844+
md.addExcludeRule(createExclusion(s"org.apache.spark:spark-$comp*:*", ivySettings,
845+
ivyConfName))
855846
}
856847
}
857848

@@ -864,13 +855,15 @@ private[spark] object SparkSubmitUtils {
864855
* @param coordinates Comma-delimited string of maven coordinates
865856
* @param remoteRepos Comma-delimited string of remote repositories other than maven central
866857
* @param ivyPath The path to the local ivy repository
858+
* @param exclusions Exclusions to apply when resolving transitive dependencies
867859
* @return The comma-delimited path to the jars of the given maven artifacts including their
868860
* transitive dependencies
869861
*/
870862
def resolveMavenCoordinates(
871863
coordinates: String,
872864
remoteRepos: Option[String],
873865
ivyPath: Option[String],
866+
exclusions: Seq[String] = Nil,
874867
isTest: Boolean = false): String = {
875868
if (coordinates == null || coordinates.trim.isEmpty) {
876869
""
@@ -928,6 +921,10 @@ private[spark] object SparkSubmitUtils {
928921
// add all supplied maven artifacts as dependencies
929922
addDependenciesToIvy(md, artifacts, ivyConfName)
930923

924+
exclusions.foreach { e =>
925+
md.addExcludeRule(createExclusion(e + ":*", ivySettings, ivyConfName))
926+
}
927+
931928
// resolve dependencies
932929
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
933930
if (rr.hasError) {
@@ -944,6 +941,18 @@ private[spark] object SparkSubmitUtils {
944941
}
945942
}
946943
}
944+
945+
private def createExclusion(
946+
coords: String,
947+
ivySettings: IvySettings,
948+
ivyConfName: String): ExcludeRule = {
949+
val c = extractMavenCoordinates(coords)(0)
950+
val id = new ArtifactId(new ModuleId(c.groupId, c.artifactId), "*", "*", "*")
951+
val rule = new DefaultExcludeRule(id, ivySettings.getMatcher("glob"), null)
952+
rule.addConfiguration(ivyConfName)
953+
rule
954+
}
955+
947956
}
948957

949958
/**

core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.JavaConversions._
2424
import scala.collection.Map
2525

2626
import org.apache.spark.Logging
27+
import org.apache.spark.SecurityManager
2728
import org.apache.spark.deploy.Command
2829
import org.apache.spark.launcher.WorkerCommandBuilder
2930
import org.apache.spark.util.Utils
@@ -40,12 +41,14 @@ object CommandUtils extends Logging {
4041
*/
4142
def buildProcessBuilder(
4243
command: Command,
44+
securityMgr: SecurityManager,
4345
memory: Int,
4446
sparkHome: String,
4547
substituteArguments: String => String,
4648
classPaths: Seq[String] = Seq[String](),
4749
env: Map[String, String] = sys.env): ProcessBuilder = {
48-
val localCommand = buildLocalCommand(command, substituteArguments, classPaths, env)
50+
val localCommand = buildLocalCommand(
51+
command, securityMgr, substituteArguments, classPaths, env)
4952
val commandSeq = buildCommandSeq(localCommand, memory, sparkHome)
5053
val builder = new ProcessBuilder(commandSeq: _*)
5154
val environment = builder.environment()
@@ -69,27 +72,34 @@ object CommandUtils extends Logging {
6972
*/
7073
private def buildLocalCommand(
7174
command: Command,
75+
securityMgr: SecurityManager,
7276
substituteArguments: String => String,
7377
classPath: Seq[String] = Seq[String](),
7478
env: Map[String, String]): Command = {
7579
val libraryPathName = Utils.libraryPathEnvName
7680
val libraryPathEntries = command.libraryPathEntries
7781
val cmdLibraryPath = command.environment.get(libraryPathName)
7882

79-
val newEnvironment = if (libraryPathEntries.nonEmpty && libraryPathName.nonEmpty) {
83+
var newEnvironment = if (libraryPathEntries.nonEmpty && libraryPathName.nonEmpty) {
8084
val libraryPaths = libraryPathEntries ++ cmdLibraryPath ++ env.get(libraryPathName)
8185
command.environment + ((libraryPathName, libraryPaths.mkString(File.pathSeparator)))
8286
} else {
8387
command.environment
8488
}
8589

90+
// set auth secret to env variable if needed
91+
if (securityMgr.isAuthenticationEnabled) {
92+
newEnvironment += (SecurityManager.ENV_AUTH_SECRET -> securityMgr.getSecretKey)
93+
}
94+
8695
Command(
8796
command.mainClass,
8897
command.arguments.map(substituteArguments),
8998
newEnvironment,
9099
command.classPathEntries ++ classPath,
91100
Seq[String](), // library path already captured in environment variable
92-
command.javaOpts)
101+
// filter out auth secret from java options
102+
command.javaOpts.filterNot(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF)))
93103
}
94104

95105
/** Spawn a thread that will redirect a given stream to a file */

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ private[deploy] class DriverRunner(
8585
}
8686

8787
// TODO: If we add ability to submit multiple jars they should also be added here
88-
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
89-
sparkHome.getAbsolutePath, substituteVariables)
88+
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
89+
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
9090
launchDriver(builder, driverDir, driverDesc.supervise)
9191
}
9292
catch {

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import akka.actor.ActorRef
2525
import com.google.common.base.Charsets.UTF_8
2626
import com.google.common.io.Files
2727

28-
import org.apache.spark.{SparkConf, Logging}
28+
import org.apache.spark.{SecurityManager, SparkConf, Logging}
2929
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
3030
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
3131
import org.apache.spark.util.Utils
@@ -125,8 +125,8 @@ private[deploy] class ExecutorRunner(
125125
private def fetchAndRunExecutor() {
126126
try {
127127
// Launch the process
128-
val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
129-
sparkHome.getAbsolutePath, substituteVariables)
128+
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
129+
memory, sparkHome.getAbsolutePath, substituteVariables)
130130
val command = builder.command()
131131
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
132132

core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ private[spark] object UnsafeShuffleManager extends Logging {
5656
} else if (dependency.aggregator.isDefined) {
5757
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because an aggregator is defined")
5858
false
59-
} else if (dependency.keyOrdering.isDefined) {
60-
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key ordering is defined")
61-
false
6259
} else if (dependency.partitioner.numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS) {
6360
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has more than " +
6461
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS partitions")

0 commit comments

Comments
 (0)