diff --git a/assembly/pom.xml b/assembly/pom.xml
index cc5a4875af..ca20ccadba 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -37,56 +37,31 @@
-
-
- hadoop1
-
- hadoop1
-
-
-
- hadoop2
-
- hadoop2
-
-
-
- hadoop2-yarn
-
- hadoop2-yarn
-
-
-
org.spark-project
spark-core
- ${classifier.name}
${project.version}
org.spark-project
spark-bagel
- ${classifier.name}
${project.version}
org.spark-project
spark-mllib
- ${classifier.name}
${project.version}
org.spark-project
spark-repl
- ${classifier.name}
${project.version}
org.spark-project
spark-streaming
- ${classifier.name}
${project.version}
-
\ No newline at end of file
+
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 60bbc49e6c..cbcf8d1239 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -32,11 +32,15 @@
http://spark-project.org/
+
+ org.spark-project
+ spark-core
+ ${project.version}
+
org.eclipse.jetty
jetty-server
-
org.scalatest
scalatest_${scala.version}
@@ -58,103 +62,4 @@
-
-
-
- hadoop1
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop1
-
-
- org.apache.hadoop
- hadoop-core
- provided
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop1
-
-
-
-
-
-
- hadoop2
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop2
-
-
- org.apache.hadoop
- hadoop-core
- provided
-
-
- org.apache.hadoop
- hadoop-client
- provided
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop2
-
-
-
-
-
-
- hadoop2-yarn
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop2-yarn
-
-
- org.apache.hadoop
- hadoop-client
- provided
-
-
- org.apache.hadoop
- hadoop-yarn-api
- provided
-
-
- org.apache.hadoop
- hadoop-yarn-common
- provided
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop2-yarn
-
-
-
-
-
-
diff --git a/bin/compute-classpath.cmd b/bin/compute-classpath.cmd
index eb836b0ffd..9178b852e6 100644
--- a/bin/compute-classpath.cmd
+++ b/bin/compute-classpath.cmd
@@ -34,6 +34,7 @@ set EXAMPLES_DIR=%FWDIR%examples
set BAGEL_DIR=%FWDIR%bagel
set MLLIB_DIR=%FWDIR%mllib
set TOOLS_DIR=%FWDIR%tools
+set YARN_DIR=%FWDIR%yarn
set STREAMING_DIR=%FWDIR%streaming
set PYSPARK_DIR=%FWDIR%python
@@ -50,6 +51,7 @@ set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\*
set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes
set CLASSPATH=%CLASSPATH%;%MLLIB_DIR%\target\scala-%SCALA_VERSION%\classes
set CLASSPATH=%CLASSPATH%;%TOOLS_DIR%\target\scala-%SCALA_VERSION%\classes
+set CLASSPATH=%CLASSPATH%;%YARN_DIR%\target\scala-%SCALA_VERSION%\classes
rem Add hadoop conf dir - else FileSystem.*, etc fail
rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
index e4ce1ca848..7a21b3c4a1 100755
--- a/bin/compute-classpath.sh
+++ b/bin/compute-classpath.sh
@@ -37,6 +37,7 @@ EXAMPLES_DIR="$FWDIR/examples"
BAGEL_DIR="$FWDIR/bagel"
MLLIB_DIR="$FWDIR/mllib"
TOOLS_DIR="$FWDIR/tools"
+YARN_DIR="$FWDIR/yarn"
STREAMING_DIR="$FWDIR/streaming"
PYSPARK_DIR="$FWDIR/python"
@@ -62,16 +63,18 @@ function dev_classpath {
CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*"
# Add the shaded JAR for Maven builds
if [ -e $REPL_BIN_DIR/target ]; then
- for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
+ for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded.jar'`; do
CLASSPATH="$CLASSPATH:$jar"
done
# The shaded JAR doesn't contain examples, so include those separately
- EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
- CLASSPATH+=":$EXAMPLES_JAR"
+ for jar in `find "$EXAMPLES_DIR/target" -name 'spark-examples*[0-9T].jar'`; do
+ CLASSPATH="$CLASSPATH:$jar"
+ done
fi
CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$MLLIB_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$TOOLS_DIR/target/scala-$SCALA_VERSION/classes"
+ CLASSPATH="$CLASSPATH:$YARN_DIR/target/scala-$SCALA_VERSION/classes"
for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
CLASSPATH="$CLASSPATH:$jar"
done
diff --git a/core/pom.xml b/core/pom.xml
index dfadd22d42..2870906092 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -32,6 +32,18 @@
http://spark-project.org/
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.avro
+ avro
+
+
+ org.apache.avro
+ avro-ipc
+
org.eclipse.jetty
jetty-server
@@ -130,7 +142,6 @@
com.codahale.metrics
metrics-json
-
org.apache.derby
derby
@@ -208,183 +219,4 @@
-
-
-
- hadoop1
-
-
- org.apache.hadoop
- hadoop-core
- provided
-
-
-
-
-
- org.codehaus.mojo
- build-helper-maven-plugin
-
-
- add-source
- generate-sources
-
- add-source
-
-
-
- src/main/scala
- src/hadoop1/scala
-
-
-
-
- add-scala-test-sources
- generate-test-sources
-
- add-test-source
-
-
-
- src/test/scala
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop1
-
-
-
-
-
-
- hadoop2
-
-
- org.apache.hadoop
- hadoop-core
- provided
-
-
- org.apache.hadoop
- hadoop-client
- provided
-
-
-
-
-
- org.codehaus.mojo
- build-helper-maven-plugin
-
-
- add-source
- generate-sources
-
- add-source
-
-
-
- src/main/scala
- src/hadoop2/scala
-
-
-
-
- add-scala-test-sources
- generate-test-sources
-
- add-test-source
-
-
-
- src/test/scala
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop2
-
-
-
-
-
-
- hadoop2-yarn
-
-
- org.apache.hadoop
- hadoop-client
- provided
-
-
- org.apache.hadoop
- hadoop-yarn-api
- provided
-
-
- org.apache.hadoop
- hadoop-yarn-common
- provided
-
-
- org.apache.hadoop
- hadoop-yarn-client
- provided
-
-
-
-
-
- org.codehaus.mojo
- build-helper-maven-plugin
-
-
- add-source
- generate-sources
-
- add-source
-
-
-
- src/main/scala
- src/hadoop2-yarn/scala
-
-
-
-
- add-scala-test-sources
- generate-test-sources
-
- add-test-source
-
-
-
- src/test/scala
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop2-yarn
-
-
-
-
-
-
diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
deleted file mode 100644
index 25386b2796..0000000000
--- a/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred
-
-trait HadoopMapRedUtil {
- def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId)
-
- def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
-
- def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
- jobId, isMap, taskId, attemptId)
-}
diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
deleted file mode 100644
index b1002e0cac..0000000000
--- a/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce
-
-import org.apache.hadoop.conf.Configuration
-
-trait HadoopMapReduceUtil {
- def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId)
-
- def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
-
- def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
- jobId, isMap, taskId, attemptId)
-}
diff --git a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
deleted file mode 100644
index 617954cb98..0000000000
--- a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-
-
-/**
- * Contains util methods to interact with Hadoop from spark.
- */
-object SparkHadoopUtil {
-
- def getUserNameFromEnvironment(): String = {
- // defaulting to -D ...
- System.getProperty("user.name")
- }
-
- def runAsUser(func: (Product) => Unit, args: Product) {
-
- // Add support, if exists - for now, simply run func !
- func(args)
- }
-
- // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
- def newConfiguration(): Configuration = new Configuration()
-
- // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
- def addCredentials(conf: JobConf) {}
-
- def isYarnMode(): Boolean = { false }
-
-}
diff --git a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
deleted file mode 100644
index 0f972b7a0b..0000000000
--- a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred
-
-import org.apache.hadoop.mapreduce.TaskType
-
-trait HadoopMapRedUtil {
- def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
-
- def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
-
- def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) =
- new TaskAttemptID(jtIdentifier, jobId, if (isMap) TaskType.MAP else TaskType.REDUCE, taskId, attemptId)
-}
diff --git a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
deleted file mode 100644
index 1a7cdf4788..0000000000
--- a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce
-
-import org.apache.hadoop.conf.Configuration
-import task.{TaskAttemptContextImpl, JobContextImpl}
-
-trait HadoopMapReduceUtil {
- def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
-
- def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
-
- def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) =
- new TaskAttemptID(jtIdentifier, jobId, if (isMap) TaskType.MAP else TaskType.REDUCE, taskId, attemptId)
-}
diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
deleted file mode 100644
index 4b3d84670c..0000000000
--- a/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred
-
-trait HadoopMapRedUtil {
- def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
-
- def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
-
- def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
- jobId, isMap, taskId, attemptId)
-}
diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
deleted file mode 100644
index aa3b1ed3a5..0000000000
--- a/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce
-
-import org.apache.hadoop.conf.Configuration
-import task.{TaskAttemptContextImpl, JobContextImpl}
-
-trait HadoopMapReduceUtil {
- def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
-
- def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
-
- def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
- jobId, isMap, taskId, attemptId)
-}
diff --git a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
new file mode 100644
index 0000000000..f87460039b
--- /dev/null
+++ b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred
+
+trait SparkHadoopMapRedUtil {
+ def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
+ val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", "org.apache.hadoop.mapred.JobContext");
+ val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[org.apache.hadoop.mapreduce.JobID])
+ ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
+ }
+
+ def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = {
+ val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", "org.apache.hadoop.mapred.TaskAttemptContext")
+ val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
+ ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
+ }
+
+ def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
+ new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
+ }
+
+ private def firstAvailableClass(first: String, second: String): Class[_] = {
+ try {
+ Class.forName(first)
+ } catch {
+ case e: ClassNotFoundException =>
+ Class.forName(second)
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
new file mode 100644
index 0000000000..93180307fa
--- /dev/null
+++ b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce
+
+import org.apache.hadoop.conf.Configuration
+import java.lang.{Integer => JInteger, Boolean => JBoolean}
+
+trait SparkHadoopMapReduceUtil {
+ def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
+ val klass = firstAvailableClass(
+ "org.apache.hadoop.mapreduce.task.JobContextImpl", // hadoop2, hadoop2-yarn
+ "org.apache.hadoop.mapreduce.JobContext") // hadoop1
+ val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[JobID])
+ ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
+ }
+
+ def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = {
+ val klass = firstAvailableClass(
+ "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl", // hadoop2, hadoop2-yarn
+ "org.apache.hadoop.mapreduce.TaskAttemptContext") // hadoop1
+ val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID])
+ ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
+ }
+
+ def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
+ val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID");
+ try {
+ // first, attempt to use the old-style constructor that takes a boolean isMap (not available in YARN)
+ val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean],
+ classOf[Int], classOf[Int])
+ ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId), new
+ JInteger(attemptId)).asInstanceOf[TaskAttemptID]
+ } catch {
+ case exc: NoSuchMethodException => {
+ // failed, look for the new ctor that takes a TaskType (not available in 1.x)
+ val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType").asInstanceOf[Class[Enum[_]]]
+ val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(taskTypeClass, if(isMap) "MAP" else "REDUCE")
+ val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass,
+ classOf[Int], classOf[Int])
+ ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId), new
+ JInteger(attemptId)).asInstanceOf[TaskAttemptID]
+ }
+ }
+ }
+
+ private def firstAvailableClass(first: String, second: String): Class[_] = {
+ try {
+ Class.forName(first)
+ } catch {
+ case e: ClassNotFoundException =>
+ Class.forName(second)
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 6b0cc2fbf1..6701f24ff9 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -32,12 +32,13 @@ import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.mapred.FileOutputCommitter
import org.apache.hadoop.mapred.FileOutputFormat
-import org.apache.hadoop.mapred.HadoopWriter
+import org.apache.hadoop.mapred.SparkHadoopWriter
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat,
+ RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, SparkHadoopMapReduceUtil}
import org.apache.hadoop.security.UserGroupInformation
import spark.partial.BoundedDouble
@@ -53,7 +54,7 @@ import spark.Partitioner._
class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
self: RDD[(K, V)])
extends Logging
- with HadoopMapReduceUtil
+ with SparkHadoopMapReduceUtil
with Serializable {
/**
@@ -652,7 +653,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
conf.setOutputCommitter(classOf[FileOutputCommitter])
- FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf))
+ FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf))
saveAsHadoopDataset(conf)
}
@@ -678,7 +679,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
- val writer = new HadoopWriter(conf)
+ val writer = new SparkHadoopWriter(conf)
writer.preSetup()
def writeToFile(context: TaskContext, iter: Iterator[(K,V)]) {
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 80c65dfebd..f020b2554b 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -58,7 +58,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.mesos.MesosNativeLibrary
-import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
+import spark.deploy.LocalSparkCluster
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
@@ -241,7 +241,8 @@ class SparkContext(
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
- val conf = SparkHadoopUtil.newConfiguration()
+ val env = SparkEnv.get
+ val conf = env.hadoop.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
@@ -629,10 +630,11 @@ class SparkContext(
logWarning("null specified as parameter to addJar",
new SparkException("null specified as parameter to addJar"))
} else {
+ val env = SparkEnv.get
val uri = new URI(path)
val key = uri.getScheme match {
case null | "file" =>
- if (SparkHadoopUtil.isYarnMode()) {
+ if (env.hadoop.isYarnMode()) {
logWarning("local jar specified as parameter to addJar under Yarn mode")
return
}
@@ -815,8 +817,9 @@ class SparkContext(
* prevent accidental overriding of checkpoint files in the existing directory.
*/
def setCheckpointDir(dir: String, useExisting: Boolean = false) {
+ val env = SparkEnv.get
val path = new Path(dir)
- val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration())
+ val fs = path.getFileSystem(env.hadoop.newConfiguration())
if (!useExisting) {
if (fs.exists(path)) {
throw new Exception("Checkpoint directory '" + path + "' already exists.")
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 0adbf1d96e..5f71df33b6 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -25,6 +25,7 @@ import akka.remote.RemoteActorRefProvider
import spark.broadcast.BroadcastManager
import spark.metrics.MetricsSystem
+import spark.deploy.SparkHadoopUtil
import spark.storage.BlockManager
import spark.storage.BlockManagerMaster
import spark.network.ConnectionManager
@@ -62,6 +63,19 @@ class SparkEnv (
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
+ val hadoop = {
+ val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+ if(yarnMode) {
+ try {
+ Class.forName("spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
+ } catch {
+ case th: Throwable => throw new SparkException("Unable to load YARN support", th)
+ }
+ } else {
+ new SparkHadoopUtil
+ }
+ }
+
def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
httpFileServer.stop()
diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/SparkHadoopWriter.scala
similarity index 96%
rename from core/src/main/scala/spark/HadoopWriter.scala
rename to core/src/main/scala/spark/SparkHadoopWriter.scala
index b1fe0075a3..6b330ef572 100644
--- a/core/src/main/scala/spark/HadoopWriter.scala
+++ b/core/src/main/scala/spark/SparkHadoopWriter.scala
@@ -36,7 +36,7 @@ import spark.SerializableWritable
* Saves the RDD using a JobConf, which should contain an output key class, an output value class,
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
-class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRedUtil with Serializable {
+class SparkHadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable {
private val now = new Date()
private val conf = new SerializableWritable(jobConf)
@@ -165,7 +165,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe
splitID = splitid
attemptID = attemptid
- jID = new SerializableWritable[JobID](HadoopWriter.createJobID(now, jobid))
+ jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid))
taID = new SerializableWritable[TaskAttemptID](
new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
}
@@ -179,7 +179,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe
}
}
-object HadoopWriter {
+object SparkHadoopWriter {
def createJobID(time: Date, id: Int): JobID = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 673f9a810d..7ea9b0c28a 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -266,8 +266,9 @@ private object Utils extends Logging {
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
+ val env = SparkEnv.get
val uri = new URI(url)
- val conf = SparkHadoopUtil.newConfiguration()
+ val conf = env.hadoop.newConfiguration()
val fs = FileSystem.get(uri, conf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
@@ -433,10 +434,6 @@ private object Utils extends Logging {
try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
}
- def getUserNameFromEnvironment(): String = {
- SparkHadoopUtil.getUserNameFromEnvironment
- }
-
// Typically, this will be of order of number of nodes in cluster
// If not, we should change it to LRUCache or something.
private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
diff --git a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala
similarity index 82%
rename from core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
rename to core/src/main/scala/spark/deploy/SparkHadoopUtil.scala
index 617954cb98..882161e669 100644
--- a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala
@@ -23,18 +23,7 @@ import org.apache.hadoop.mapred.JobConf
/**
* Contains util methods to interact with Hadoop from spark.
*/
-object SparkHadoopUtil {
-
- def getUserNameFromEnvironment(): String = {
- // defaulting to -D ...
- System.getProperty("user.name")
- }
-
- def runAsUser(func: (Product) => Unit, args: Product) {
-
- // Add support, if exists - for now, simply run func !
- func(args)
- }
+class SparkHadoopUtil {
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
def newConfiguration(): Configuration = new Configuration()
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index e47fe50021..b5fb6dbe29 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -22,9 +22,8 @@ import java.nio.ByteBuffer
import akka.actor.{ActorRef, Actor, Props, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
-import spark.{Logging, Utils}
+import spark.{Logging, Utils, SparkEnv}
import spark.TaskState.TaskState
-import spark.deploy.SparkHadoopUtil
import spark.scheduler.cluster.StandaloneClusterMessages._
import spark.util.AkkaUtils
@@ -82,19 +81,6 @@ private[spark] class StandaloneExecutorBackend(
private[spark] object StandaloneExecutorBackend {
def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
- SparkHadoopUtil.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores))
- }
-
- // This will be run 'as' the user
- def run0(args: Product) {
- assert(4 == args.productArity)
- runImpl(args.productElement(0).asInstanceOf[String],
- args.productElement(1).asInstanceOf[String],
- args.productElement(2).asInstanceOf[String],
- args.productElement(3).asInstanceOf[Int])
- }
-
- private def runImpl(driverUrl: String, executorId: String, hostname: String, cores: Int) {
// Debug code
Utils.checkHost(hostname)
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
index 6794e0e201..1ad5fe6539 100644
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
@@ -25,7 +25,6 @@ import org.apache.hadoop.util.ReflectionUtils
import org.apache.hadoop.fs.Path
import java.io.{File, IOException, EOFException}
import java.text.NumberFormat
-import spark.deploy.SparkHadoopUtil
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
@@ -82,8 +81,9 @@ private[spark] object CheckpointRDD extends Logging {
}
def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
+ val env = SparkEnv.get
val outputDir = new Path(path)
- val fs = outputDir.getFileSystem(SparkHadoopUtil.newConfiguration())
+ val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())
val finalOutputName = splitIdToFile(ctx.splitId)
val finalOutputPath = new Path(outputDir, finalOutputName)
@@ -101,7 +101,7 @@ private[spark] object CheckpointRDD extends Logging {
// This is mainly for testing purpose
fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
}
- val serializer = SparkEnv.get.serializer.newInstance()
+ val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
serializeStream.writeAll(iterator)
serializeStream.close()
@@ -121,10 +121,11 @@ private[spark] object CheckpointRDD extends Logging {
}
def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
- val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration())
+ val env = SparkEnv.get
+ val fs = path.getFileSystem(env.hadoop.newConfiguration())
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize)
- val serializer = SparkEnv.get.serializer.newInstance()
+ val serializer = env.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
// Register an on-task-completion callback to close the input stream.
@@ -140,10 +141,11 @@ private[spark] object CheckpointRDD extends Logging {
import spark._
val Array(cluster, hdfsPath) = args
+ val env = SparkEnv.get
val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
- val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration())
+ val fs = path.getFileSystem(env.hadoop.newConfiguration())
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index fd00d59c77..6c41b97780 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -32,8 +32,7 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
-import spark.deploy.SparkHadoopUtil
-import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
+import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, SparkEnv, TaskContext}
import spark.util.NextIterator
import org.apache.hadoop.conf.Configurable
@@ -68,7 +67,8 @@ class HadoopRDD[K, V](
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
override def getPartitions: Array[Partition] = {
- SparkHadoopUtil.addCredentials(conf);
+ val env = SparkEnv.get
+ env.hadoop.addCredentials(conf)
val inputFormat = createInputFormat(conf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index 0b71608169..184685528e 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -43,7 +43,7 @@ class NewHadoopRDD[K, V](
valueClass: Class[V],
@transient conf: Configuration)
extends RDD[(K, V)](sc, Nil)
- with HadoopMapReduceUtil
+ with SparkHadoopMapReduceUtil
with Logging {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
diff --git a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
index 65f8c3200e..8f1b9b29b5 100644
--- a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
@@ -17,7 +17,7 @@
package spark.scheduler
-import spark.Logging
+import spark.{Logging, SparkEnv}
import scala.collection.immutable.Set
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.hadoop.security.UserGroupInformation
@@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.conf.Configuration
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._
-import spark.deploy.SparkHadoopUtil
/**
@@ -88,8 +87,9 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
+ val env = SparkEnv.get
val conf = new JobConf(configuration)
- SparkHadoopUtil.addCredentials(conf);
+ env.hadoop.addCredentials(conf)
FileInputFormat.setInputPaths(conf, path)
val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
@@ -108,8 +108,9 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
+ val env = SparkEnv.get
val jobConf = new JobConf(configuration)
- SparkHadoopUtil.addCredentials(jobConf);
+ env.hadoop.addCredentials(jobConf)
FileInputFormat.setInputPaths(jobConf, path)
val instance: org.apache.hadoop.mapred.InputFormat[_, _] =
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 66fb8d73e8..9c2cedfd88 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -55,7 +55,7 @@ This would be used to connect to the cluster, write to the dfs and submit jobs t
The command to launch the YARN Client is as follows:
- SPARK_JAR= ./run spark.deploy.yarn.Client \
+ SPARK_JAR= ./run spark.deploy.yarn.Client \
--jar \
--class \
--args \
@@ -68,7 +68,7 @@ The command to launch the YARN Client is as follows:
For example:
- SPARK_JAR=./core/target/spark-core-assembly-{{site.SPARK_VERSION}}.jar ./run spark.deploy.yarn.Client \
+ SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./run spark.deploy.yarn.Client \
--jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar \
--class spark.examples.SparkPi \
--args yarn-standalone \
diff --git a/examples/pom.xml b/examples/pom.xml
index a051da8a77..0db52b8691 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -32,6 +32,36 @@
http://spark-project.org/
+
+ org.spark-project
+ spark-core
+ ${project.version}
+
+
+ org.spark-project
+ spark-streaming
+ ${project.version}
+
+
+ org.spark-project
+ spark-mllib
+ ${project.version}
+
+
+ org.apache.hbase
+ hbase
+ 0.94.6
+
+
+ asm
+ asm
+
+
+ org.jboss.netty
+ netty
+
+
+
org.scala-lang
scala-library
@@ -55,41 +85,41 @@
scalacheck_${scala.version}
test
-
- org.apache.cassandra
- cassandra-all
- 1.2.5
-
-
- com.google.guava
- guava
-
-
- com.googlecode.concurrentlinkedhashmap
- concurrentlinkedhashmap-lru
-
-
- com.ning
- compress-lzf
-
-
- io.netty
- netty
-
-
- jline
- jline
-
-
- log4j
- log4j
-
-
- org.apache.cassandra.deps
- avro
-
-
-
+
+ org.apache.cassandra
+ cassandra-all
+ 1.2.5
+
+
+ com.google.guava
+ guava
+
+
+ com.googlecode.concurrentlinkedhashmap
+ concurrentlinkedhashmap-lru
+
+
+ com.ning
+ compress-lzf
+
+
+ io.netty
+ netty
+
+
+ jline
+ jline
+
+
+ log4j
+ log4j
+
+
+ org.apache.cassandra.deps
+ avro
+
+
+
target/scala-${scala.version}/classes
@@ -101,154 +131,4 @@
-
-
-
- hadoop1
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop1
-
-
- org.spark-project
- spark-streaming
- ${project.version}
- hadoop1
-
-
- org.spark-project
- spark-mllib
- ${project.version}
- hadoop1
-
-
- org.apache.hadoop
- hadoop-core
- provided
-
-
- org.apache.hbase
- hbase
- 0.94.6
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop1
-
-
-
-
-
-
- hadoop2
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop2
-
-
- org.spark-project
- spark-streaming
- ${project.version}
- hadoop2
-
-
- org.spark-project
- spark-mllib
- ${project.version}
- hadoop2
-
-
- org.apache.hadoop
- hadoop-core
- provided
-
-
- org.apache.hadoop
- hadoop-client
- provided
-
-
- org.apache.hbase
- hbase
- 0.94.6
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop2
-
-
-
-
-
-
- hadoop2-yarn
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop2-yarn
-
-
- org.spark-project
- spark-streaming
- ${project.version}
- hadoop2-yarn
-
-
- org.spark-project
- spark-mllib
- ${project.version}
- hadoop2-yarn
-
-
- org.apache.hadoop
- hadoop-client
- provided
-
-
- org.apache.hadoop
- hadoop-yarn-api
- provided
-
-
- org.apache.hadoop
- hadoop-yarn-common
- provided
-
-
- org.apache.hbase
- hbase
- 0.94.6
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop2-yarn
-
-
-
-
-
-
diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
index ef6e09a8e8..43c9115664 100644
--- a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
@@ -21,7 +21,6 @@ import java.util.Random
import scala.math.exp
import spark.util.Vector
import spark._
-import spark.deploy.SparkHadoopUtil
import spark.scheduler.InputFormatInfo
/**
@@ -52,7 +51,7 @@ object SparkHdfsLR {
System.exit(1)
}
val inputPath = args(1)
- val conf = SparkHadoopUtil.newConfiguration()
+ val conf = SparkEnv.get.hadoop.newConfiguration()
val sc = new SparkContext(args(0), "SparkHdfsLR",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(),
InputFormatInfo.computePreferredLocations(
diff --git a/make-distribution.sh b/make-distribution.sh
index 0a8941c1f8..55dc22b992 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -24,9 +24,10 @@
# so it is completely self contained.
# It does not contain source or *.class files.
#
-# Arguments
-# (none): Creates dist/ directory
-# tgz: Additionally creates spark-$VERSION-bin.tar.gz
+# Optional Arguments
+# --tgz: Additionally creates spark-$VERSION-bin.tar.gz
+# --hadoop VERSION: Builds against specified version of Hadoop.
+# --with-yarn: Enables support for Hadoop YARN.
#
# Recommended deploy/testing procedure (standalone mode):
# 1) Rsync / deploy the dist/ dir to one host
@@ -44,20 +45,50 @@ DISTDIR="$FWDIR/dist"
export TERM=dumb # Prevents color codes in SBT output
VERSION=$($FWDIR/sbt/sbt "show version" | tail -1 | cut -f 2 | sed 's/^\([a-zA-Z0-9.-]*\).*/\1/')
-if [ "$1" == "tgz" ]; then
- echo "Making spark-$VERSION-bin.tar.gz"
+# Initialize defaults
+SPARK_HADOOP_VERSION=1.2.1
+SPARK_WITH_YARN=false
+MAKE_TGZ=false
+
+# Parse arguments
+while (( "$#" )); do
+ case $1 in
+ --hadoop)
+ SPARK_HADOOP_VERSION="$2"
+ shift
+ ;;
+ --with-yarn)
+ SPARK_WITH_YARN=true
+ ;;
+ --tgz)
+ MAKE_TGZ=true
+ ;;
+ esac
+ shift
+done
+
+if [ "$MAKE_TGZ" == "true" ]; then
+ echo "Making spark-$VERSION-hadoop_$SPARK_HADOOP_VERSION-bin.tar.gz"
else
echo "Making distribution for Spark $VERSION in $DISTDIR..."
fi
+echo "Hadoop version set to $SPARK_HADOOP_VERSION"
+if [ "$SPARK_WITH_YARN" == "true" ]; then
+ echo "YARN enabled"
+else
+ echo "YARN disabled"
+fi
# Build fat JAR
-$FWDIR/sbt/sbt "repl/assembly"
+export SPARK_HADOOP_VERSION
+export SPARK_WITH_YARN
+"$FWDIR/sbt/sbt" "repl/assembly"
# Make directories
rm -rf "$DISTDIR"
mkdir -p "$DISTDIR/jars"
-echo "$VERSION" >$DISTDIR/RELEASE
+echo "$VERSION" > "$DISTDIR/RELEASE"
# Copy jars
cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/"
@@ -69,9 +100,9 @@ cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR"
cp "$FWDIR/spark-executor" "$DISTDIR"
-if [ "$1" == "tgz" ]; then
+if [ "$MAKE_TGZ" == "true" ]; then
TARDIR="$FWDIR/spark-$VERSION"
- cp -r $DISTDIR $TARDIR
- tar -zcf spark-$VERSION-bin.tar.gz -C $FWDIR spark-$VERSION
- rm -rf $TARDIR
+ cp -r "$DISTDIR" "$TARDIR"
+ tar -zcf "spark-$VERSION-hadoop_$SPARK_HADOOP_VERSION-bin.tar.gz" -C "$FWDIR" "spark-$VERSION"
+ rm -rf "$TARDIR"
fi
diff --git a/mllib/pom.xml b/mllib/pom.xml
index a07480fbe2..ab31d5734e 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -32,6 +32,11 @@
http://spark-project.org/
+
+ org.spark-project
+ spark-core
+ ${project.version}
+
org.eclipse.jetty
jetty-server
@@ -41,7 +46,6 @@
jblas
1.2.3
-
org.scalatest
scalatest_${scala.version}
@@ -68,103 +72,4 @@
-
-
-
- hadoop1
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop1
-
-
- org.apache.hadoop
- hadoop-core
- provided
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop1
-
-
-
-
-
-
- hadoop2
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop2
-
-
- org.apache.hadoop
- hadoop-core
- provided
-
-
- org.apache.hadoop
- hadoop-client
- provided
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop2
-
-
-
-
-
-
- hadoop2-yarn
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop2-yarn
-
-
- org.apache.hadoop
- hadoop-client
- provided
-
-
- org.apache.hadoop
- hadoop-yarn-api
- provided
-
-
- org.apache.hadoop
- hadoop-yarn-common
- provided
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop2-yarn
-
-
-
-
-
-
diff --git a/pom.xml b/pom.xml
index 7e6d38df9f..e7445319dd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,8 +73,9 @@
0.12.1
2.0.3
1.7.2
- 4.1.2
1.2.17
+ 1.2.1
+
64m
512m
@@ -325,6 +326,54 @@
0.8
test
+
+ org.apache.hadoop
+ hadoop-client
+ ${hadoop.version}
+
+
+ asm
+ asm
+
+
+ org.jboss.netty
+ netty
+
+
+ org.codehaus.jackson
+ jackson-core-asl
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+
+
+ org.codehaus.jackson
+ jackson-jaxrs
+
+
+ org.codehaus.jackson
+ jackson-xc
+
+
+
+
+
+ org.apache.avro
+ avro
+ 1.7.4
+
+
+ org.apache.avro
+ avro-ipc
+ 1.7.4
+
+
+ org.jboss.netty
+ netty
+
+
+
@@ -530,60 +579,6 @@
-
- hadoop1
-
- 1
-
-
-
-
- org.apache.hadoop
- hadoop-core
- 1.0.4
-
-
-
-
-
-
- hadoop2
-
- 2
-
-
-
-
- org.apache.hadoop
- hadoop-core
- 2.0.0-mr1-cdh${cdh.version}
-
-
- org.apache.hadoop
- hadoop-client
- 2.0.0-mr1-cdh${cdh.version}
-
-
-
- org.apache.avro
- avro
- 1.7.4
-
-
- org.apache.avro
- avro-ipc
- 1.7.4
-
-
- org.jboss.netty
- netty
-
-
-
-
-
-
-
hadoop2-yarn
@@ -593,6 +588,10 @@
2.0.5-alpha
+
+ yarn
+
+
maven-root
@@ -614,32 +613,125 @@
org.apache.hadoop
hadoop-client
${yarn.version}
+
+
+ asm
+ asm
+
+
+ org.jboss.netty
+ netty
+
+
+ org.codehaus.jackson
+ jackson-core-asl
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+
+
+ org.codehaus.jackson
+ jackson-jaxrs
+
+
+ org.codehaus.jackson
+ jackson-xc
+
+
org.apache.hadoop
hadoop-yarn-api
${yarn.version}
+
+
+ asm
+ asm
+
+
+ org.jboss.netty
+ netty
+
+
+ org.codehaus.jackson
+ jackson-core-asl
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+
+
+ org.codehaus.jackson
+ jackson-jaxrs
+
+
+ org.codehaus.jackson
+ jackson-xc
+
+
org.apache.hadoop
hadoop-yarn-common
${yarn.version}
+
+
+ asm
+ asm
+
+
+ org.jboss.netty
+ netty
+
+
+ org.codehaus.jackson
+ jackson-core-asl
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+
+
+ org.codehaus.jackson
+ jackson-jaxrs
+
+
+ org.codehaus.jackson
+ jackson-xc
+
+
org.apache.hadoop
hadoop-yarn-client
${yarn.version}
-
-
-
- org.apache.avro
- avro
- 1.7.4
-
-
- org.apache.avro
- avro-ipc
- 1.7.4
+
+
+ asm
+ asm
+
+
+ org.jboss.netty
+ netty
+
+
+ org.codehaus.jackson
+ jackson-core-asl
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+
+
+ org.codehaus.jackson
+ jackson-jaxrs
+
+
+ org.codehaus.jackson
+ jackson-xc
+
+
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f6519c8287..282b0cbed5 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -26,28 +26,19 @@ import AssemblyKeys._
object SparkBuild extends Build {
// Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or
// "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop.
- val HADOOP_VERSION = "1.0.4"
- val HADOOP_MAJOR_VERSION = "1"
- val HADOOP_YARN = false
-
- // For Hadoop 2 versions such as "2.0.0-mr1-cdh4.1.1", set the HADOOP_MAJOR_VERSION to "2"
- //val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1"
- //val HADOOP_MAJOR_VERSION = "2"
- //val HADOOP_YARN = false
-
- // For Hadoop 2 YARN support
- //val HADOOP_VERSION = "2.0.2-alpha"
- //val HADOOP_MAJOR_VERSION = "2"
- //val HADOOP_YARN = true
+ // Note that these variables can be set through the environment variables
+ // SPARK_HADOOP_VERSION and SPARK_WITH_YARN.
+ val DEFAULT_HADOOP_VERSION = "1.2.1"
+ val DEFAULT_WITH_YARN = false
// HBase version; set as appropriate.
val HBASE_VERSION = "0.94.6"
- lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel, streaming, mllib, tools)
+ lazy val root = Project("root", file("."), settings = rootSettings) aggregate(allProjects:_*)
lazy val core = Project("core", file("core"), settings = coreSettings)
- lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) dependsOn(bagel) dependsOn(mllib)
+ lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn(core) dependsOn(bagel) dependsOn(mllib) dependsOn(maybeYarn:_*)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming) dependsOn(mllib)
@@ -59,10 +50,24 @@ object SparkBuild extends Build {
lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn (core)
+ lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn (core)
+
// A configuration to set an alternative publishLocalConfiguration
lazy val MavenCompile = config("m2r") extend(Compile)
lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy")
+ // Allows build configuration to be set through environment variables
+ lazy val hadoopVersion = scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HADOOP_VERSION)
+ lazy val isYarnMode = scala.util.Properties.envOrNone("SPARK_WITH_YARN") match {
+ case None => DEFAULT_WITH_YARN
+ case Some(v) => v.toBoolean
+ }
+
+ // Conditionally include the yarn sub-project
+ lazy val maybeYarn = if(isYarnMode) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
+ lazy val maybeYarnRef = if(isYarnMode) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
+ lazy val allProjects = Seq[ProjectReference](core, repl, examples, bagel, streaming, mllib, tools) ++ maybeYarnRef
+
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.spark-project",
version := "0.8.0-SNAPSHOT",
@@ -184,37 +189,15 @@ object SparkBuild extends Build {
"org.apache.mesos" % "mesos" % "0.12.1",
"io.netty" % "netty-all" % "4.0.0.Beta2",
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
+ "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
+ "org.apache.avro" % "avro" % "1.7.4",
+ "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty),
"com.codahale.metrics" % "metrics-core" % "3.0.0",
"com.codahale.metrics" % "metrics-jvm" % "3.0.0",
"com.codahale.metrics" % "metrics-json" % "3.0.0",
"com.twitter" % "chill_2.9.3" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1"
- ) ++ (
- if (HADOOP_MAJOR_VERSION == "2") {
- if (HADOOP_YARN) {
- Seq(
- // Exclude rule required for all ?
- "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm),
- "org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm),
- "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm),
- "org.apache.hadoop" % "hadoop-yarn-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm)
- )
- } else {
- Seq(
- "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm),
- "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm)
- )
- }
- } else {
- Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty) )
- }),
- unmanagedSourceDirectories in Compile <+= baseDirectory{ _ /
- ( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") {
- "src/hadoop2-yarn/scala"
- } else {
- "src/hadoop" + HADOOP_MAJOR_VERSION + "/scala"
- } )
- }
+ )
) ++ assemblySettings ++ extraAssemblySettings
def rootSettings = sharedSettings ++ Seq(
@@ -273,6 +256,17 @@ object SparkBuild extends Build {
)
) ++ assemblySettings ++ extraAssemblySettings
+ def yarnSettings = sharedSettings ++ Seq(
+ name := "spark-yarn",
+ libraryDependencies ++= Seq(
+ // Exclude rule required for all ?
+ "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
+ "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
+ "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
+ "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm)
+ )
+ ) ++ assemblySettings ++ extraAssemblySettings
+
def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq(
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml
index 7c4e722cc1..919e35f240 100644
--- a/repl-bin/pom.xml
+++ b/repl-bin/pom.xml
@@ -32,11 +32,31 @@
http://spark-project.org/
- spark-${classifier}
- /usr/share/spark-${classifier}
+ spark
+ /usr/share/spark
root
+
+
+ org.spark-project
+ spark-core
+ ${project.version}
+
+
+ org.spark-project
+ spark-bagel
+ ${project.version}
+ runtime
+
+
+ org.spark-project
+ spark-repl
+ ${project.version}
+ runtime
+
+
+
@@ -44,7 +64,7 @@
maven-shade-plugin
false
- ${project.build.directory}/${project.artifactId}-${project.version}-shaded-${classifier}.jar
+ ${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar
*:*
@@ -85,143 +105,13 @@
-
- hadoop1
-
- hadoop1
-
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop1
-
-
- org.spark-project
- spark-bagel
- ${project.version}
- hadoop1
- runtime
-
-
- org.spark-project
- spark-examples
- ${project.version}
- hadoop1
- runtime
-
-
- org.spark-project
- spark-repl
- ${project.version}
- hadoop1
- runtime
-
-
- org.apache.hadoop
- hadoop-core
- runtime
-
-
-
-
- hadoop2
-
- hadoop2
-
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop2
-
-
- org.spark-project
- spark-bagel
- ${project.version}
- hadoop2
- runtime
-
-
- org.spark-project
- spark-examples
- ${project.version}
- hadoop2
- runtime
-
-
- org.spark-project
- spark-repl
- ${project.version}
- hadoop2
- runtime
-
-
- org.apache.hadoop
- hadoop-core
- runtime
-
-
- org.apache.hadoop
- hadoop-client
- runtime
-
-
-
hadoop2-yarn
-
- hadoop2-yarn
-
org.spark-project
- spark-core
- ${project.version}
- hadoop2-yarn
-
-
- org.spark-project
- spark-bagel
+ spark-yarn
${project.version}
- hadoop2-yarn
- runtime
-
-
- org.spark-project
- spark-examples
- ${project.version}
- hadoop2-yarn
- runtime
-
-
- org.spark-project
- spark-repl
- ${project.version}
- hadoop2-yarn
- runtime
-
-
- org.apache.hadoop
- hadoop-client
- runtime
-
-
- org.apache.hadoop
- hadoop-yarn-api
- runtime
-
-
- org.apache.hadoop
- hadoop-yarn-common
- runtime
-
-
- org.apache.hadoop
- hadoop-yarn-client
- runtime
@@ -261,7 +151,7 @@
gzip
- ${project.build.directory}/${project.artifactId}-${project.version}-shaded-${classifier}.jar
+ ${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar
file
perm
diff --git a/repl/pom.xml b/repl/pom.xml
index 862595b9f9..5bc9a99c5c 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -37,6 +37,17 @@
+
+ org.spark-project
+ spark-core
+ ${project.version}
+
+
+ org.spark-project
+ spark-bagel
+ ${project.version}
+ runtime
+
org.eclipse.jetty
jetty-server
@@ -57,7 +68,6 @@
org.slf4j
slf4j-log4j12
-
org.scalatest
scalatest_${scala.version}
@@ -115,181 +125,16 @@
-
-
- hadoop1
-
- hadoop1
-
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop1
-
-
- org.spark-project
- spark-bagel
- ${project.version}
- hadoop1
- runtime
-
-
- org.spark-project
- spark-examples
- ${project.version}
- hadoop1
- runtime
-
-
- org.apache.hadoop
- hadoop-core
- provided
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop1
-
-
-
-
-
-
- hadoop2
-
- hadoop2
-
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop2
-
-
- org.spark-project
- spark-bagel
- ${project.version}
- hadoop2
- runtime
-
-
- org.spark-project
- spark-examples
- ${project.version}
- hadoop2
- runtime
-
-
- org.apache.hadoop
- hadoop-core
- provided
-
-
- org.apache.hadoop
- hadoop-client
- provided
-
-
- org.apache.avro
- avro
- provided
-
-
- org.apache.avro
- avro-ipc
- provided
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop2
-
-
-
-
-
hadoop2-yarn
-
- hadoop2-yarn
-
org.spark-project
- spark-core
- ${project.version}
- hadoop2-yarn
-
-
- org.spark-project
- spark-bagel
- ${project.version}
- hadoop2-yarn
- runtime
-
-
- org.spark-project
- spark-examples
- ${project.version}
- hadoop2-yarn
- runtime
-
-
- org.spark-project
- spark-streaming
+ spark-yarn
${project.version}
- hadoop2-yarn
- runtime
-
-
- org.apache.hadoop
- hadoop-client
- provided
-
-
- org.apache.hadoop
- hadoop-yarn-api
- provided
-
-
- org.apache.hadoop
- hadoop-yarn-common
- provided
-
-
- org.apache.avro
- avro
- provided
-
-
- org.apache.avro
- avro-ipc
- provided
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop2-yarn
-
-
-
-
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 7e6b06d772..5c0582d6fb 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -40,6 +40,11 @@
+
+ org.spark-project
+ spark-core
+ ${project.version}
+
org.eclipse.jetty
jetty-server
@@ -115,103 +120,4 @@
-
-
-
- hadoop1
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop1
-
-
- org.apache.hadoop
- hadoop-core
- provided
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop1
-
-
-
-
-
-
- hadoop2
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop2
-
-
- org.apache.hadoop
- hadoop-core
- provided
-
-
- org.apache.hadoop
- hadoop-client
- provided
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop2
-
-
-
-
-
-
- hadoop2-yarn
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop2-yarn
-
-
- org.apache.hadoop
- hadoop-client
- provided
-
-
- org.apache.hadoop
- hadoop-yarn-api
- provided
-
-
- org.apache.hadoop
- hadoop-yarn-common
- provided
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop2-yarn
-
-
-
-
-
-
diff --git a/tools/pom.xml b/tools/pom.xml
index 878eb82f18..95b5e80e5b 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -31,6 +31,16 @@
http://spark-project.org/
+
+ org.spark-project
+ spark-core
+ ${project.version}
+
+
+ org.spark-project
+ spark-streaming
+ ${project.version}
+
org.scalatest
scalatest_${scala.version}
@@ -56,121 +66,4 @@
-
-
-
- hadoop1
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop1
-
-
- org.spark-project
- spark-streaming
- ${project.version}
- hadoop1
-
-
- org.apache.hadoop
- hadoop-core
- provided
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop1
-
-
-
-
-
-
- hadoop2
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop2
-
-
- org.spark-project
- spark-streaming
- ${project.version}
- hadoop2
-
-
- org.apache.hadoop
- hadoop-core
- provided
-
-
- org.apache.hadoop
- hadoop-client
- provided
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop2
-
-
-
-
-
-
- hadoop2-yarn
-
-
- org.spark-project
- spark-core
- ${project.version}
- hadoop2-yarn
-
-
- org.spark-project
- spark-streaming
- ${project.version}
- hadoop2-yarn
-
-
- org.apache.hadoop
- hadoop-client
- provided
-
-
- org.apache.hadoop
- hadoop-yarn-api
- provided
-
-
- org.apache.hadoop
- hadoop-yarn-common
- provided
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
- hadoop2-yarn
-
-
-
-
-
-
diff --git a/yarn/pom.xml b/yarn/pom.xml
new file mode 100644
index 0000000000..07dd170eae
--- /dev/null
+++ b/yarn/pom.xml
@@ -0,0 +1,111 @@
+
+
+
+
+ 4.0.0
+
+ org.spark-project
+ spark-parent
+ 0.8.0-SNAPSHOT
+ ../pom.xml
+
+
+ org.spark-project
+ spark-yarn
+ jar
+ Spark Project YARN Support
+ http://spark-project.org/
+
+
+ target/scala-${scala.version}/classes
+ target/scala-${scala.version}/test-classes
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+ false
+ ${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar
+
+
+ *:*
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ package
+
+ shade
+
+
+
+
+
+ reference.conf
+
+
+
+
+
+
+
+
+
+
+
+ hadoop2-yarn
+
+
+ org.spark-project
+ spark-core
+ ${project.version}
+
+
+ org.apache.hadoop
+ hadoop-yarn-api
+
+
+ org.apache.hadoop
+ hadoop-yarn-common
+
+
+ org.apache.hadoop
+ hadoop-yarn-client
+
+
+ org.apache.avro
+ avro
+
+
+ org.apache.avro
+ avro-ipc
+
+
+
+
+
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
similarity index 97%
rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
rename to yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
index 1b06169739..15dbd1c0fb 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -124,18 +124,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private def waitForSparkMaster() {
logInfo("Waiting for spark driver to be reachable.")
var driverUp = false
- while(!driverUp) {
+ var tries = 0
+ while(!driverUp && tries < 10) {
val driverHost = System.getProperty("spark.driver.host")
val driverPort = System.getProperty("spark.driver.port")
try {
val socket = new Socket(driverHost, driverPort.toInt)
socket.close()
- logInfo("Master now available: " + driverHost + ":" + driverPort)
+ logInfo("Driver now available: " + driverHost + ":" + driverPort)
driverUp = true
} catch {
case e: Exception =>
- logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
+ logWarning("Failed to connect to driver at " + driverHost + ":" + driverPort + ", retrying")
Thread.sleep(100)
+ tries = tries + 1
}
}
}
@@ -176,7 +178,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
var sparkContext: SparkContext = null
ApplicationMaster.sparkContextRef.synchronized {
var count = 0
- while (ApplicationMaster.sparkContextRef.get() == null) {
+ while (ApplicationMaster.sparkContextRef.get() == null && count < 10) {
logInfo("Waiting for spark context initialization ... " + count)
count = count + 1
ApplicationMaster.sparkContextRef.wait(10000L)
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMasterArguments.scala
similarity index 100%
rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala
rename to yarn/src/main/scala/spark/deploy/yarn/ApplicationMasterArguments.scala
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/spark/deploy/yarn/Client.scala
similarity index 98%
rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
rename to yarn/src/main/scala/spark/deploy/yarn/Client.scala
index 8bcbfc2735..9d3860b863 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/Client.scala
@@ -165,7 +165,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
Client.populateHadoopClasspath(yarnConf, env)
- SparkHadoopUtil.setYarnMode(env)
+ env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_JAR_PATH") =
localResources("spark.jar").getResource().getScheme.toString() + "://" +
localResources("spark.jar").getResource().getFile().toString()
@@ -313,8 +313,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
object Client {
def main(argStrings: Array[String]) {
+ // Set an env variable indicating we are running in YARN mode.
+ // Note that anything with SPARK prefix gets propagated to all (remote) processes
+ System.setProperty("SPARK_YARN_MODE", "true")
+
val args = new ClientArguments(argStrings)
- SparkHadoopUtil.setYarnMode()
new Client(args).run
}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/spark/deploy/yarn/ClientArguments.scala
similarity index 100%
rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
rename to yarn/src/main/scala/spark/deploy/yarn/ClientArguments.scala
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala
similarity index 100%
rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
rename to yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala
similarity index 100%
rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala
rename to yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala
similarity index 58%
rename from core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
rename to yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 6122fdced0..77c4ee7f3f 100644
--- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package spark.deploy
+package spark.deploy.yarn
+import spark.deploy.SparkHadoopUtil
import collection.mutable.HashMap
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation
@@ -28,48 +29,17 @@ import java.security.PrivilegedExceptionAction
/**
* Contains util methods to interact with Hadoop from spark.
*/
-object SparkHadoopUtil {
-
- val yarnConf = newConfiguration()
-
- def getUserNameFromEnvironment(): String = {
- // defaulting to env if -D is not present ...
- val retval = System.getProperty(Environment.USER.name, System.getenv(Environment.USER.name))
-
- // If nothing found, default to user we are running as
- if (retval == null) System.getProperty("user.name") else retval
- }
-
- def runAsUser(func: (Product) => Unit, args: Product) {
- runAsUser(func, args, getUserNameFromEnvironment())
- }
-
- def runAsUser(func: (Product) => Unit, args: Product, user: String) {
- func(args)
- }
+class YarnSparkHadoopUtil extends SparkHadoopUtil {
// Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
- def isYarnMode(): Boolean = {
- val yarnMode = System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))
- java.lang.Boolean.valueOf(yarnMode)
- }
-
- // Set an env variable indicating we are running in YARN mode.
- // Note that anything with SPARK prefix gets propagated to all (remote) processes
- def setYarnMode() {
- System.setProperty("SPARK_YARN_MODE", "true")
- }
-
- def setYarnMode(env: HashMap[String, String]) {
- env("SPARK_YARN_MODE") = "true"
- }
+ override def isYarnMode(): Boolean = { true }
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
// Always create a new config, dont reuse yarnConf.
- def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
+ override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
// add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
- def addCredentials(conf: JobConf) {
+ override def addCredentials(conf: JobConf) {
val jobCreds = conf.getCredentials();
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}
diff --git a/core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
similarity index 100%
rename from core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
rename to yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala