From b877e20a339872f9a29a35272e6c1f280ac901d5 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Wed, 17 Jul 2013 14:53:37 -0700 Subject: [PATCH 01/31] move yarn to its own directory --- .../main}/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala | 0 .../scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala | 0 .../src/main}/scala/spark/deploy/SparkHadoopUtil.scala | 0 .../src/main}/scala/spark/deploy/yarn/ApplicationMaster.scala | 0 .../scala/spark/deploy/yarn/ApplicationMasterArguments.scala | 0 .../src/main}/scala/spark/deploy/yarn/Client.scala | 0 .../src/main}/scala/spark/deploy/yarn/ClientArguments.scala | 0 .../src/main}/scala/spark/deploy/yarn/WorkerRunnable.scala | 0 .../src/main}/scala/spark/deploy/yarn/YarnAllocationHandler.scala | 0 .../scala/spark/scheduler/cluster/YarnClusterScheduler.scala | 0 10 files changed, 0 insertions(+), 0 deletions(-) rename {core/src/hadoop2-yarn => yarn/src/main}/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala (100%) rename {core/src/hadoop2-yarn => yarn/src/main}/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala (100%) rename {core/src/hadoop2-yarn => yarn/src/main}/scala/spark/deploy/SparkHadoopUtil.scala (100%) rename {core/src/hadoop2-yarn => yarn/src/main}/scala/spark/deploy/yarn/ApplicationMaster.scala (100%) rename {core/src/hadoop2-yarn => yarn/src/main}/scala/spark/deploy/yarn/ApplicationMasterArguments.scala (100%) rename {core/src/hadoop2-yarn => yarn/src/main}/scala/spark/deploy/yarn/Client.scala (100%) rename {core/src/hadoop2-yarn => yarn/src/main}/scala/spark/deploy/yarn/ClientArguments.scala (100%) rename {core/src/hadoop2-yarn => yarn/src/main}/scala/spark/deploy/yarn/WorkerRunnable.scala (100%) rename {core/src/hadoop2-yarn => yarn/src/main}/scala/spark/deploy/yarn/YarnAllocationHandler.scala (100%) rename {core/src/hadoop2-yarn => yarn/src/main}/scala/spark/scheduler/cluster/YarnClusterScheduler.scala (100%) diff --git a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/yarn/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala similarity index 100% rename from core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala rename to yarn/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala diff --git a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/yarn/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala similarity index 100% rename from core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala rename to yarn/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala similarity index 100% rename from core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala rename to yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala similarity index 100% rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala rename to yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMasterArguments.scala similarity index 100% rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala rename to yarn/src/main/scala/spark/deploy/yarn/ApplicationMasterArguments.scala diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/spark/deploy/yarn/Client.scala similarity index 100% rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala rename to yarn/src/main/scala/spark/deploy/yarn/Client.scala diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/spark/deploy/yarn/ClientArguments.scala similarity index 100% rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala rename to yarn/src/main/scala/spark/deploy/yarn/ClientArguments.scala diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala similarity index 100% rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala rename to yarn/src/main/scala/spark/deploy/yarn/WorkerRunnable.scala diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala similarity index 100% rename from core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala rename to yarn/src/main/scala/spark/deploy/yarn/YarnAllocationHandler.scala diff --git a/core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala similarity index 100% rename from core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala rename to yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala From f67b94ad4fc8c9e7a71dd7f65d617743947ae91c Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Wed, 17 Jul 2013 17:31:26 -0700 Subject: [PATCH 02/31] remove core/src/hadoop{1,2} dirs --- .../hadoop/mapred/HadoopMapRedUtil.scala | 27 ----------- .../mapreduce/HadoopMapReduceUtil.scala | 30 ------------ .../scala/spark/deploy/SparkHadoopUtil.scala | 47 ------------------- .../hadoop/mapred/HadoopMapRedUtil.scala | 0 .../mapreduce/HadoopMapReduceUtil.scala | 0 .../scala/spark/deploy/SparkHadoopUtil.scala | 0 6 files changed, 104 deletions(-) delete mode 100644 core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala delete mode 100644 core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala delete mode 100644 core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala rename core/src/{hadoop1 => main}/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala (100%) rename core/src/{hadoop1 => main}/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala (100%) rename core/src/{hadoop1 => main}/scala/spark/deploy/SparkHadoopUtil.scala (100%) diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala deleted file mode 100644 index 4b3d84670c..0000000000 --- a/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred - -trait HadoopMapRedUtil { - def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) - - def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - - def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, - jobId, isMap, taskId, attemptId) -} diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala deleted file mode 100644 index aa3b1ed3a5..0000000000 --- a/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapreduce - -import org.apache.hadoop.conf.Configuration -import task.{TaskAttemptContextImpl, JobContextImpl} - -trait HadoopMapReduceUtil { - def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) - - def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - - def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, - jobId, isMap, taskId, attemptId) -} diff --git a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala deleted file mode 100644 index 617954cb98..0000000000 --- a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark.deploy -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapred.JobConf - - -/** - * Contains util methods to interact with Hadoop from spark. - */ -object SparkHadoopUtil { - - def getUserNameFromEnvironment(): String = { - // defaulting to -D ... - System.getProperty("user.name") - } - - def runAsUser(func: (Product) => Unit, args: Product) { - - // Add support, if exists - for now, simply run func ! - func(args) - } - - // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems - def newConfiguration(): Configuration = new Configuration() - - // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster - def addCredentials(conf: JobConf) {} - - def isYarnMode(): Boolean = { false } - -} diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala similarity index 100% rename from core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala rename to core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala similarity index 100% rename from core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala rename to core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala diff --git a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala similarity index 100% rename from core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala rename to core/src/main/scala/spark/deploy/SparkHadoopUtil.scala From 69c3bbf688cdd21171413d415cfc6d6cb8e77bd5 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Wed, 17 Jul 2013 17:33:38 -0700 Subject: [PATCH 03/31] dynamically detect hadoop version --- .../hadoop/mapred/HadoopMapRedUtil.scala | 26 +++++++++++--- .../mapreduce/HadoopMapReduceUtil.scala | 30 +++++++++++++--- project/SparkBuild.scala | 35 ++----------------- 3 files changed, 51 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala index 25386b2796..6cfafd3760 100644 --- a/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala @@ -18,10 +18,28 @@ package org.apache.hadoop.mapred trait HadoopMapRedUtil { - def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId) + def newJobContext(conf: JobConf, jobId: JobID): JobContext = { + val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", "org.apache.hadoop.mapred.JobContext"); + val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[org.apache.hadoop.mapreduce.JobID]) + ctor.newInstance(conf, jobId).asInstanceOf[JobContext] + } - def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId) + def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = { + val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", "org.apache.hadoop.mapred.TaskAttemptContext") + val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID]) + ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] + } - def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, - jobId, isMap, taskId, attemptId) + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = { + new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId) + } + + private def firstAvailableClass(first: String, second: String): Class[_] = { + try { + Class.forName(first) + } catch { + case e: ClassNotFoundException => + Class.forName(second) + } + } } diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala index b1002e0cac..0f77828dc8 100644 --- a/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala +++ b/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala @@ -20,10 +20,32 @@ package org.apache.hadoop.mapreduce import org.apache.hadoop.conf.Configuration trait HadoopMapReduceUtil { - def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId) + def newJobContext(conf: Configuration, jobId: JobID): JobContext = { + val klass = firstAvailableClass( + "org.apache.hadoop.mapreduce.task.JobContextImpl", + "org.apache.hadoop.mapreduce.JobContext") + val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[JobID]) + ctor.newInstance(conf, jobId).asInstanceOf[JobContext] + } - def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId) + def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = { + val klass = firstAvailableClass( + "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl", + "org.apache.hadoop.mapreduce.TaskAttemptContext") + val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID]) + ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] + } - def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier, - jobId, isMap, taskId, attemptId) + def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = { + new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId) + } + + private def firstAvailableClass(first: String, second: String): Class[_] = { + try { + Class.forName(first) + } catch { + case e: ClassNotFoundException => + Class.forName(second) + } + } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f6519c8287..a06550bb97 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -27,13 +27,8 @@ object SparkBuild extends Build { // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or // "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop. val HADOOP_VERSION = "1.0.4" - val HADOOP_MAJOR_VERSION = "1" - val HADOOP_YARN = false - - // For Hadoop 2 versions such as "2.0.0-mr1-cdh4.1.1", set the HADOOP_MAJOR_VERSION to "2" //val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1" - //val HADOOP_MAJOR_VERSION = "2" - //val HADOOP_YARN = false + val HADOOP_YARN = false // For Hadoop 2 YARN support //val HADOOP_VERSION = "2.0.2-alpha" @@ -184,37 +179,13 @@ object SparkBuild extends Build { "org.apache.mesos" % "mesos" % "0.12.1", "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test", + "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION, "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", "com.twitter" % "chill_2.9.3" % "0.3.1", "com.twitter" % "chill-java" % "0.3.1" - ) ++ ( - if (HADOOP_MAJOR_VERSION == "2") { - if (HADOOP_YARN) { - Seq( - // Exclude rule required for all ? - "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm) - ) - } else { - Seq( - "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm) - ) - } - } else { - Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty) ) - }), - unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / - ( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") { - "src/hadoop2-yarn/scala" - } else { - "src/hadoop" + HADOOP_MAJOR_VERSION + "/scala" - } ) - } + ) ) ++ assemblySettings ++ extraAssemblySettings def rootSettings = sharedSettings ++ Seq( From 273b499b9ac8373f0f92ebf8e4141fe51cec4a33 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Thu, 18 Jul 2013 13:36:34 -0700 Subject: [PATCH 04/31] yarn sbt --- project/SparkBuild.scala | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a06550bb97..99351ca935 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -24,21 +24,10 @@ import AssemblyKeys._ //import com.jsuereth.pgp.sbtplugin.PgpKeys._ object SparkBuild extends Build { - // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or - // "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop. - val HADOOP_VERSION = "1.0.4" - //val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1" - val HADOOP_YARN = false - - // For Hadoop 2 YARN support - //val HADOOP_VERSION = "2.0.2-alpha" - //val HADOOP_MAJOR_VERSION = "2" - //val HADOOP_YARN = true - // HBase version; set as appropriate. val HBASE_VERSION = "0.94.6" - lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel, streaming, mllib, tools) + lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel, streaming, mllib, tools, yarn) lazy val core = Project("core", file("core"), settings = coreSettings) @@ -54,6 +43,8 @@ object SparkBuild extends Build { lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn (core) + lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn (core) + // A configuration to set an alternative publishLocalConfiguration lazy val MavenCompile = config("m2r") extend(Compile) lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy") @@ -179,7 +170,7 @@ object SparkBuild extends Build { "org.apache.mesos" % "mesos" % "0.12.1", "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test", - "org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION, + "org.apache.hadoop" % "hadoop-client" % "1.0.4", "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", @@ -244,6 +235,17 @@ object SparkBuild extends Build { ) ) ++ assemblySettings ++ extraAssemblySettings + def yarnSettings = sharedSettings ++ Seq( + name := "spark-yarn", + libraryDependencies ++= Seq( + // Exclude rule required for all ? + "org.apache.hadoop" % "hadoop-client" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty), + "org.apache.hadoop" % "hadoop-yarn-api" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty), + "org.apache.hadoop" % "hadoop-yarn-common" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty), + "org.apache.hadoop" % "hadoop-yarn-client" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty) + ) + ) + def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq( mergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard From 5d0785b4e5bb681770675a1729f1742a8cd3b491 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Sat, 20 Jul 2013 11:34:13 -0700 Subject: [PATCH 05/31] remove hadoop-yarn's org/apache/... --- .../hadoop/mapred/HadoopMapRedUtil.scala | 30 ------------------- .../mapreduce/HadoopMapReduceUtil.scala | 30 ------------------- 2 files changed, 60 deletions(-) delete mode 100644 yarn/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala delete mode 100644 yarn/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala diff --git a/yarn/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/yarn/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala deleted file mode 100644 index 0f972b7a0b..0000000000 --- a/yarn/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala +++ /dev/null @@ -1,30 +0,0 @@ - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred - -import org.apache.hadoop.mapreduce.TaskType - -trait HadoopMapRedUtil { - def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) - - def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - - def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = - new TaskAttemptID(jtIdentifier, jobId, if (isMap) TaskType.MAP else TaskType.REDUCE, taskId, attemptId) -} diff --git a/yarn/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/yarn/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala deleted file mode 100644 index 1a7cdf4788..0000000000 --- a/yarn/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapreduce - -import org.apache.hadoop.conf.Configuration -import task.{TaskAttemptContextImpl, JobContextImpl} - -trait HadoopMapReduceUtil { - def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId) - - def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - - def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = - new TaskAttemptID(jtIdentifier, jobId, if (isMap) TaskType.MAP else TaskType.REDUCE, taskId, attemptId) -} From 8b1c1520fc26132a21062ebb063dea25e9b36b8b Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Sat, 20 Jul 2013 11:34:18 -0700 Subject: [PATCH 06/31] add comment --- .../org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala index 0f77828dc8..dd624d8890 100644 --- a/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala +++ b/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala @@ -22,16 +22,16 @@ import org.apache.hadoop.conf.Configuration trait HadoopMapReduceUtil { def newJobContext(conf: Configuration, jobId: JobID): JobContext = { val klass = firstAvailableClass( - "org.apache.hadoop.mapreduce.task.JobContextImpl", - "org.apache.hadoop.mapreduce.JobContext") + "org.apache.hadoop.mapreduce.task.JobContextImpl", // hadoop2, hadoop2-yarn + "org.apache.hadoop.mapreduce.JobContext") // hadoop1 val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[JobID]) ctor.newInstance(conf, jobId).asInstanceOf[JobContext] } def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = { val klass = firstAvailableClass( - "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl", - "org.apache.hadoop.mapreduce.TaskAttemptContext") + "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl", // hadoop2, hadoop2-yarn + "org.apache.hadoop.mapreduce.TaskAttemptContext") // hadoop1 val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID]) ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] } From cb4ef19214332b5e9c2e0d0bfa0a72262122d04e Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Mon, 22 Jul 2013 14:03:31 -0700 Subject: [PATCH 07/31] yarn support --- bin/compute-classpath.cmd | 2 ++ bin/compute-classpath.sh | 2 ++ project/SparkBuild.scala | 12 ++++++------ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/bin/compute-classpath.cmd b/bin/compute-classpath.cmd index eb836b0ffd..9178b852e6 100644 --- a/bin/compute-classpath.cmd +++ b/bin/compute-classpath.cmd @@ -34,6 +34,7 @@ set EXAMPLES_DIR=%FWDIR%examples set BAGEL_DIR=%FWDIR%bagel set MLLIB_DIR=%FWDIR%mllib set TOOLS_DIR=%FWDIR%tools +set YARN_DIR=%FWDIR%yarn set STREAMING_DIR=%FWDIR%streaming set PYSPARK_DIR=%FWDIR%python @@ -50,6 +51,7 @@ set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\* set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes set CLASSPATH=%CLASSPATH%;%MLLIB_DIR%\target\scala-%SCALA_VERSION%\classes set CLASSPATH=%CLASSPATH%;%TOOLS_DIR%\target\scala-%SCALA_VERSION%\classes +set CLASSPATH=%CLASSPATH%;%YARN_DIR%\target\scala-%SCALA_VERSION%\classes rem Add hadoop conf dir - else FileSystem.*, etc fail rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index e4ce1ca848..db6fc866ab 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -37,6 +37,7 @@ EXAMPLES_DIR="$FWDIR/examples" BAGEL_DIR="$FWDIR/bagel" MLLIB_DIR="$FWDIR/mllib" TOOLS_DIR="$FWDIR/tools" +YARN_DIR="$FWDIR/yarn" STREAMING_DIR="$FWDIR/streaming" PYSPARK_DIR="$FWDIR/python" @@ -72,6 +73,7 @@ function dev_classpath { CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$MLLIB_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$TOOLS_DIR/target/scala-$SCALA_VERSION/classes" + CLASSPATH="$CLASSPATH:$YARN_DIR/target/scala-$SCALA_VERSION/classes" for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do CLASSPATH="$CLASSPATH:$jar" done diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 99351ca935..ede49ea38b 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -170,7 +170,7 @@ object SparkBuild extends Build { "org.apache.mesos" % "mesos" % "0.12.1", "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test", - "org.apache.hadoop" % "hadoop-client" % "1.0.4", + "org.apache.hadoop" % "hadoop-client" % "1.0.4" excludeAll(excludeJackson, excludeNetty, excludeAsm), "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", @@ -239,12 +239,12 @@ object SparkBuild extends Build { name := "spark-yarn", libraryDependencies ++= Seq( // Exclude rule required for all ? - "org.apache.hadoop" % "hadoop-client" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty), - "org.apache.hadoop" % "hadoop-yarn-api" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty), - "org.apache.hadoop" % "hadoop-yarn-common" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty), - "org.apache.hadoop" % "hadoop-yarn-client" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty) + "org.apache.hadoop" % "hadoop-client" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-yarn-api" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-yarn-common" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-yarn-client" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty, excludeAsm) ) - ) + ) ++ assemblySettings ++ extraAssemblySettings def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq( mergeStrategy in assembly := { From 43ebcb84840dc9db61e5912d9a37707c065edc5a Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Tue, 23 Jul 2013 17:37:10 -0700 Subject: [PATCH 08/31] rename HadoopMapRedUtil => SparkHadoopMapRedUtil, HadoopMapReduceUtil => SparkHadoopMapReduceUtil --- .../{HadoopMapRedUtil.scala => SparkHadoopMapRedUtil.scala} | 2 +- ...oopMapReduceUtil.scala => SparkHadoopMapReduceUtil.scala} | 2 +- core/src/main/scala/spark/HadoopWriter.scala | 2 +- core/src/main/scala/spark/PairRDDFunctions.scala | 5 +++-- core/src/main/scala/spark/rdd/NewHadoopRDD.scala | 2 +- 5 files changed, 7 insertions(+), 6 deletions(-) rename core/src/main/scala/org/apache/hadoop/mapred/{HadoopMapRedUtil.scala => SparkHadoopMapRedUtil.scala} (98%) rename core/src/main/scala/org/apache/hadoop/mapreduce/{HadoopMapReduceUtil.scala => SparkHadoopMapReduceUtil.scala} (98%) diff --git a/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala similarity index 98% rename from core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala rename to core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala index 6cfafd3760..f87460039b 100644 --- a/core/src/main/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala @@ -17,7 +17,7 @@ package org.apache.hadoop.mapred -trait HadoopMapRedUtil { +trait SparkHadoopMapRedUtil { def newJobContext(conf: JobConf, jobId: JobID): JobContext = { val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", "org.apache.hadoop.mapred.JobContext"); val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[org.apache.hadoop.mapreduce.JobID]) diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala similarity index 98% rename from core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala rename to core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala index dd624d8890..bea6253677 100644 --- a/core/src/main/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala +++ b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala @@ -19,7 +19,7 @@ package org.apache.hadoop.mapreduce import org.apache.hadoop.conf.Configuration -trait HadoopMapReduceUtil { +trait SparkHadoopMapReduceUtil { def newJobContext(conf: Configuration, jobId: JobID): JobContext = { val klass = firstAvailableClass( "org.apache.hadoop.mapreduce.task.JobContextImpl", // hadoop2, hadoop2-yarn diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala index b1fe0075a3..60840ce77e 100644 --- a/core/src/main/scala/spark/HadoopWriter.scala +++ b/core/src/main/scala/spark/HadoopWriter.scala @@ -36,7 +36,7 @@ import spark.SerializableWritable * Saves the RDD using a JobConf, which should contain an output key class, an output value class, * a filename to write to, etc, exactly like in a Hadoop MapReduce job. */ -class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRedUtil with Serializable { +class HadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable { private val now = new Date() private val conf = new SerializableWritable(jobConf) diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 6b0cc2fbf1..aeeac65cca 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -37,7 +37,8 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} -import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, + RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, SparkHadoopMapReduceUtil} import org.apache.hadoop.security.UserGroupInformation import spark.partial.BoundedDouble @@ -53,7 +54,7 @@ import spark.Partitioner._ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( self: RDD[(K, V)]) extends Logging - with HadoopMapReduceUtil + with SparkHadoopMapReduceUtil with Serializable { /** diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index 0b71608169..184685528e 100644 --- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -43,7 +43,7 @@ class NewHadoopRDD[K, V]( valueClass: Class[V], @transient conf: Configuration) extends RDD[(K, V)](sc, Nil) - with HadoopMapReduceUtil + with SparkHadoopMapReduceUtil with Logging { // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it From 4f43fd791ab0e84693e2337358c6b880a1593e54 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Wed, 24 Jul 2013 12:41:40 -0700 Subject: [PATCH 09/31] make SparkHadoopUtil a member of SparkEnv --- core/src/main/scala/spark/SparkContext.scala | 11 +++++++---- core/src/main/scala/spark/SparkEnv.scala | 2 ++ core/src/main/scala/spark/Utils.scala | 7 ++----- .../main/scala/spark/deploy/SparkHadoopUtil.scala | 2 +- .../spark/executor/StandaloneExecutorBackend.scala | 6 +++--- core/src/main/scala/spark/rdd/CheckpointRDD.scala | 14 ++++++++------ core/src/main/scala/spark/rdd/HadoopRDD.scala | 6 +++--- .../scala/spark/scheduler/InputFormatInfo.scala | 9 +++++---- .../main/scala/spark/examples/SparkHdfsLR.scala | 3 +-- 9 files changed, 32 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 80c65dfebd..f020b2554b 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -58,7 +58,7 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.mesos.MesosNativeLibrary -import spark.deploy.{LocalSparkCluster, SparkHadoopUtil} +import spark.deploy.LocalSparkCluster import spark.partial.{ApproximateEvaluator, PartialResult} import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener, @@ -241,7 +241,8 @@ class SparkContext( /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = { - val conf = SparkHadoopUtil.newConfiguration() + val env = SparkEnv.get + val conf = env.hadoop.newConfiguration() // Explicitly check for S3 environment variables if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) { conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) @@ -629,10 +630,11 @@ class SparkContext( logWarning("null specified as parameter to addJar", new SparkException("null specified as parameter to addJar")) } else { + val env = SparkEnv.get val uri = new URI(path) val key = uri.getScheme match { case null | "file" => - if (SparkHadoopUtil.isYarnMode()) { + if (env.hadoop.isYarnMode()) { logWarning("local jar specified as parameter to addJar under Yarn mode") return } @@ -815,8 +817,9 @@ class SparkContext( * prevent accidental overriding of checkpoint files in the existing directory. */ def setCheckpointDir(dir: String, useExisting: Boolean = false) { + val env = SparkEnv.get val path = new Path(dir) - val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration()) + val fs = path.getFileSystem(env.hadoop.newConfiguration()) if (!useExisting) { if (fs.exists(path)) { throw new Exception("Checkpoint directory '" + path + "' already exists.") diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 0adbf1d96e..73990f0423 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -25,6 +25,7 @@ import akka.remote.RemoteActorRefProvider import spark.broadcast.BroadcastManager import spark.metrics.MetricsSystem +import spark.deploy.SparkHadoopUtil import spark.storage.BlockManager import spark.storage.BlockManagerMaster import spark.network.ConnectionManager @@ -60,6 +61,7 @@ class SparkEnv ( // If executorId is NOT found, return defaultHostPort var executorIdToHostPort: Option[(String, String) => String]) { + val hadoop = new SparkHadoopUtil private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() def stop() { diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 673f9a810d..7ea9b0c28a 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -266,8 +266,9 @@ private object Utils extends Logging { } case _ => // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others + val env = SparkEnv.get val uri = new URI(url) - val conf = SparkHadoopUtil.newConfiguration() + val conf = env.hadoop.newConfiguration() val fs = FileSystem.get(uri, conf) val in = fs.open(new Path(uri)) val out = new FileOutputStream(tempFile) @@ -433,10 +434,6 @@ private object Utils extends Logging { try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } } } - def getUserNameFromEnvironment(): String = { - SparkHadoopUtil.getUserNameFromEnvironment - } - // Typically, this will be of order of number of nodes in cluster // If not, we should change it to LRUCache or something. private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]() diff --git a/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala index 617954cb98..c4ed0bb17e 100644 --- a/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.mapred.JobConf /** * Contains util methods to interact with Hadoop from spark. */ -object SparkHadoopUtil { +class SparkHadoopUtil { def getUserNameFromEnvironment(): String = { // defaulting to -D ... diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index e47fe50021..a9e06f8d54 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -22,9 +22,8 @@ import java.nio.ByteBuffer import akka.actor.{ActorRef, Actor, Props, Terminated} import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} -import spark.{Logging, Utils} +import spark.{Logging, Utils, SparkEnv} import spark.TaskState.TaskState -import spark.deploy.SparkHadoopUtil import spark.scheduler.cluster.StandaloneClusterMessages._ import spark.util.AkkaUtils @@ -82,7 +81,8 @@ private[spark] class StandaloneExecutorBackend( private[spark] object StandaloneExecutorBackend { def run(driverUrl: String, executorId: String, hostname: String, cores: Int) { - SparkHadoopUtil.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores)) + val env = SparkEnv.get + env.hadoop.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores)) } // This will be run 'as' the user diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index 6794e0e201..1ad5fe6539 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -25,7 +25,6 @@ import org.apache.hadoop.util.ReflectionUtils import org.apache.hadoop.fs.Path import java.io.{File, IOException, EOFException} import java.text.NumberFormat -import spark.deploy.SparkHadoopUtil private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {} @@ -82,8 +81,9 @@ private[spark] object CheckpointRDD extends Logging { } def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) { + val env = SparkEnv.get val outputDir = new Path(path) - val fs = outputDir.getFileSystem(SparkHadoopUtil.newConfiguration()) + val fs = outputDir.getFileSystem(env.hadoop.newConfiguration()) val finalOutputName = splitIdToFile(ctx.splitId) val finalOutputPath = new Path(outputDir, finalOutputName) @@ -101,7 +101,7 @@ private[spark] object CheckpointRDD extends Logging { // This is mainly for testing purpose fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize) } - val serializer = SparkEnv.get.serializer.newInstance() + val serializer = env.serializer.newInstance() val serializeStream = serializer.serializeStream(fileOutputStream) serializeStream.writeAll(iterator) serializeStream.close() @@ -121,10 +121,11 @@ private[spark] object CheckpointRDD extends Logging { } def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = { - val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration()) + val env = SparkEnv.get + val fs = path.getFileSystem(env.hadoop.newConfiguration()) val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt val fileInputStream = fs.open(path, bufferSize) - val serializer = SparkEnv.get.serializer.newInstance() + val serializer = env.serializer.newInstance() val deserializeStream = serializer.deserializeStream(fileInputStream) // Register an on-task-completion callback to close the input stream. @@ -140,10 +141,11 @@ private[spark] object CheckpointRDD extends Logging { import spark._ val Array(cluster, hdfsPath) = args + val env = SparkEnv.get val sc = new SparkContext(cluster, "CheckpointRDD Test") val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000) val path = new Path(hdfsPath, "temp") - val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration()) + val fs = path.getFileSystem(env.hadoop.newConfiguration()) sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index fd00d59c77..6c41b97780 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -32,8 +32,7 @@ import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils -import spark.deploy.SparkHadoopUtil -import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext} +import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, SparkEnv, TaskContext} import spark.util.NextIterator import org.apache.hadoop.conf.Configurable @@ -68,7 +67,8 @@ class HadoopRDD[K, V]( private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) override def getPartitions: Array[Partition] = { - SparkHadoopUtil.addCredentials(conf); + val env = SparkEnv.get + env.hadoop.addCredentials(conf) val inputFormat = createInputFormat(conf) if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(conf) diff --git a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala index 65f8c3200e..8f1b9b29b5 100644 --- a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala @@ -17,7 +17,7 @@ package spark.scheduler -import spark.Logging +import spark.{Logging, SparkEnv} import scala.collection.immutable.Set import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.hadoop.security.UserGroupInformation @@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.conf.Configuration import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.collection.JavaConversions._ -import spark.deploy.SparkHadoopUtil /** @@ -88,8 +87,9 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // This method does not expect failures, since validate has already passed ... private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = { + val env = SparkEnv.get val conf = new JobConf(configuration) - SparkHadoopUtil.addCredentials(conf); + env.hadoop.addCredentials(conf) FileInputFormat.setInputPaths(conf, path) val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] = @@ -108,8 +108,9 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // This method does not expect failures, since validate has already passed ... private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = { + val env = SparkEnv.get val jobConf = new JobConf(configuration) - SparkHadoopUtil.addCredentials(jobConf); + env.hadoop.addCredentials(jobConf) FileInputFormat.setInputPaths(jobConf, path) val instance: org.apache.hadoop.mapred.InputFormat[_, _] = diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala index ef6e09a8e8..43c9115664 100644 --- a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala @@ -21,7 +21,6 @@ import java.util.Random import scala.math.exp import spark.util.Vector import spark._ -import spark.deploy.SparkHadoopUtil import spark.scheduler.InputFormatInfo /** @@ -52,7 +51,7 @@ object SparkHdfsLR { System.exit(1) } val inputPath = args(1) - val conf = SparkHadoopUtil.newConfiguration() + val conf = SparkEnv.get.hadoop.newConfiguration() val sc = new SparkContext(args(0), "SparkHdfsLR", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(), InputFormatInfo.computePreferredLocations( From bd0bab47c9602462628b1d3c90d5eb5d889f4596 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Wed, 24 Jul 2013 13:07:27 -0700 Subject: [PATCH 10/31] SparkEnv isn't available this early, and not needed anyway --- .../scala/spark/deploy/SparkHadoopUtil.scala | 11 ----------- .../executor/StandaloneExecutorBackend.scala | 14 -------------- .../scala/spark/deploy/SparkHadoopUtil.scala | 16 ---------------- 3 files changed, 41 deletions(-) diff --git a/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala index c4ed0bb17e..882161e669 100644 --- a/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala @@ -25,17 +25,6 @@ import org.apache.hadoop.mapred.JobConf */ class SparkHadoopUtil { - def getUserNameFromEnvironment(): String = { - // defaulting to -D ... - System.getProperty("user.name") - } - - def runAsUser(func: (Product) => Unit, args: Product) { - - // Add support, if exists - for now, simply run func ! - func(args) - } - // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems def newConfiguration(): Configuration = new Configuration() diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index a9e06f8d54..b5fb6dbe29 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -81,20 +81,6 @@ private[spark] class StandaloneExecutorBackend( private[spark] object StandaloneExecutorBackend { def run(driverUrl: String, executorId: String, hostname: String, cores: Int) { - val env = SparkEnv.get - env.hadoop.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores)) - } - - // This will be run 'as' the user - def run0(args: Product) { - assert(4 == args.productArity) - runImpl(args.productElement(0).asInstanceOf[String], - args.productElement(1).asInstanceOf[String], - args.productElement(2).asInstanceOf[String], - args.productElement(3).asInstanceOf[Int]) - } - - private def runImpl(driverUrl: String, executorId: String, hostname: String, cores: Int) { // Debug code Utils.checkHost(hostname) diff --git a/yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala b/yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala index 6122fdced0..a812bcf867 100644 --- a/yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala +++ b/yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala @@ -32,22 +32,6 @@ object SparkHadoopUtil { val yarnConf = newConfiguration() - def getUserNameFromEnvironment(): String = { - // defaulting to env if -D is not present ... - val retval = System.getProperty(Environment.USER.name, System.getenv(Environment.USER.name)) - - // If nothing found, default to user we are running as - if (retval == null) System.getProperty("user.name") else retval - } - - def runAsUser(func: (Product) => Unit, args: Product) { - runAsUser(func, args, getUserNameFromEnvironment()) - } - - def runAsUser(func: (Product) => Unit, args: Product, user: String) { - func(args) - } - // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true. def isYarnMode(): Boolean = { val yarnMode = System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")) From e2d7656ca3d561f0a6fc8dd81ef46e4aa6ba608e Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Wed, 24 Jul 2013 14:01:48 -0700 Subject: [PATCH 11/31] re-enable YARN support --- core/src/main/scala/spark/SparkEnv.scala | 14 +++++++++- .../spark/deploy/yarn/ApplicationMaster.scala | 4 +-- .../main/scala/spark/deploy/yarn/Client.scala | 7 +++-- .../YarnSparkHadoopUtil.scala} | 26 +++++-------------- 4 files changed, 26 insertions(+), 25 deletions(-) rename yarn/src/main/scala/spark/deploy/{SparkHadoopUtil.scala => yarn/YarnSparkHadoopUtil.scala} (71%) diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 73990f0423..5f71df33b6 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -61,9 +61,21 @@ class SparkEnv ( // If executorId is NOT found, return defaultHostPort var executorIdToHostPort: Option[(String, String) => String]) { - val hadoop = new SparkHadoopUtil private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() + val hadoop = { + val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) + if(yarnMode) { + try { + Class.forName("spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil] + } catch { + case th: Throwable => throw new SparkException("Unable to load YARN support", th) + } + } else { + new SparkHadoopUtil + } + } + def stop() { pythonWorkers.foreach { case(key, worker) => worker.stop() } httpFileServer.stop() diff --git a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala index 1b06169739..d69a969d42 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -130,11 +130,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e try { val socket = new Socket(driverHost, driverPort.toInt) socket.close() - logInfo("Master now available: " + driverHost + ":" + driverPort) + logInfo("Driver now available: " + driverHost + ":" + driverPort) driverUp = true } catch { case e: Exception => - logError("Failed to connect to driver at " + driverHost + ":" + driverPort) + logWarning("Failed to connect to driver at " + driverHost + ":" + driverPort + ", retrying") Thread.sleep(100) } } diff --git a/yarn/src/main/scala/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/spark/deploy/yarn/Client.scala index 8bcbfc2735..9d3860b863 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/Client.scala @@ -165,7 +165,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*") Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH") Client.populateHadoopClasspath(yarnConf, env) - SparkHadoopUtil.setYarnMode(env) + env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_JAR_PATH") = localResources("spark.jar").getResource().getScheme.toString() + "://" + localResources("spark.jar").getResource().getFile().toString() @@ -313,8 +313,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl object Client { def main(argStrings: Array[String]) { + // Set an env variable indicating we are running in YARN mode. + // Note that anything with SPARK prefix gets propagated to all (remote) processes + System.setProperty("SPARK_YARN_MODE", "true") + val args = new ClientArguments(argStrings) - SparkHadoopUtil.setYarnMode() new Client(args).run } diff --git a/yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala b/yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala similarity index 71% rename from yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala rename to yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala index a812bcf867..77c4ee7f3f 100644 --- a/yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -15,8 +15,9 @@ * limitations under the License. */ -package spark.deploy +package spark.deploy.yarn +import spark.deploy.SparkHadoopUtil import collection.mutable.HashMap import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.UserGroupInformation @@ -28,32 +29,17 @@ import java.security.PrivilegedExceptionAction /** * Contains util methods to interact with Hadoop from spark. */ -object SparkHadoopUtil { - - val yarnConf = newConfiguration() +class YarnSparkHadoopUtil extends SparkHadoopUtil { // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true. - def isYarnMode(): Boolean = { - val yarnMode = System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")) - java.lang.Boolean.valueOf(yarnMode) - } - - // Set an env variable indicating we are running in YARN mode. - // Note that anything with SPARK prefix gets propagated to all (remote) processes - def setYarnMode() { - System.setProperty("SPARK_YARN_MODE", "true") - } - - def setYarnMode(env: HashMap[String, String]) { - env("SPARK_YARN_MODE") = "true" - } + override def isYarnMode(): Boolean = { true } // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems // Always create a new config, dont reuse yarnConf. - def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) + override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster - def addCredentials(conf: JobConf) { + override def addCredentials(conf: JobConf) { val jobCreds = conf.getCredentials(); jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) } From 8bb0bd11cea8a16d21c56c1b80b31a1c5605a414 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Wed, 24 Jul 2013 14:16:50 -0700 Subject: [PATCH 12/31] YARN ApplicationMaster shouldn't wait forever --- .../main/scala/spark/deploy/yarn/ApplicationMaster.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala index d69a969d42..15dbd1c0fb 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -124,7 +124,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private def waitForSparkMaster() { logInfo("Waiting for spark driver to be reachable.") var driverUp = false - while(!driverUp) { + var tries = 0 + while(!driverUp && tries < 10) { val driverHost = System.getProperty("spark.driver.host") val driverPort = System.getProperty("spark.driver.port") try { @@ -136,6 +137,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e case e: Exception => logWarning("Failed to connect to driver at " + driverHost + ":" + driverPort + ", retrying") Thread.sleep(100) + tries = tries + 1 } } } @@ -176,7 +178,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e var sparkContext: SparkContext = null ApplicationMaster.sparkContextRef.synchronized { var count = 0 - while (ApplicationMaster.sparkContextRef.get() == null) { + while (ApplicationMaster.sparkContextRef.get() == null && count < 10) { logInfo("Waiting for spark context initialization ... " + count) count = count + 1 ApplicationMaster.sparkContextRef.wait(10000L) From 14b6bcdf93642624c42fa04aeaff9fff97f6e07f Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Mon, 29 Jul 2013 16:04:48 -0700 Subject: [PATCH 13/31] update YARN docs --- docs/running-on-yarn.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 66fb8d73e8..9c2cedfd88 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -55,7 +55,7 @@ This would be used to connect to the cluster, write to the dfs and submit jobs t The command to launch the YARN Client is as follows: - SPARK_JAR= ./run spark.deploy.yarn.Client \ + SPARK_JAR= ./run spark.deploy.yarn.Client \ --jar \ --class \ --args \ @@ -68,7 +68,7 @@ The command to launch the YARN Client is as follows: For example: - SPARK_JAR=./core/target/spark-core-assembly-{{site.SPARK_VERSION}}.jar ./run spark.deploy.yarn.Client \ + SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./run spark.deploy.yarn.Client \ --jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar \ --class spark.examples.SparkPi \ --args yarn-standalone \ From 8f979edef5b80967b81323e13dcafd5aac92feb1 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Tue, 6 Aug 2013 15:47:49 -0700 Subject: [PATCH 14/31] Fix newTaskAttemptID to work under YARN --- .../mapreduce/SparkHadoopMapReduceUtil.scala | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala index bea6253677..93180307fa 100644 --- a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala +++ b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce import org.apache.hadoop.conf.Configuration +import java.lang.{Integer => JInteger, Boolean => JBoolean} trait SparkHadoopMapReduceUtil { def newJobContext(conf: Configuration, jobId: JobID): JobContext = { @@ -37,7 +38,24 @@ trait SparkHadoopMapReduceUtil { } def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = { - new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId) + val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID"); + try { + // first, attempt to use the old-style constructor that takes a boolean isMap (not available in YARN) + val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean], + classOf[Int], classOf[Int]) + ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId), new + JInteger(attemptId)).asInstanceOf[TaskAttemptID] + } catch { + case exc: NoSuchMethodException => { + // failed, look for the new ctor that takes a TaskType (not available in 1.x) + val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType").asInstanceOf[Class[Enum[_]]] + val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(taskTypeClass, if(isMap) "MAP" else "REDUCE") + val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass, + classOf[Int], classOf[Int]) + ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId), new + JInteger(attemptId)).asInstanceOf[TaskAttemptID] + } + } } private def firstAvailableClass(first: String, second: String): Class[_] = { From a06a9d5c5fd6584a57292a0115253e0a8a45d490 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Tue, 6 Aug 2013 16:16:07 -0700 Subject: [PATCH 15/31] Rename HadoopWriter to SparkHadoopWriter since it's outside of our package --- core/src/main/scala/spark/PairRDDFunctions.scala | 6 +++--- .../spark/{HadoopWriter.scala => SparkHadoopWriter.scala} | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) rename core/src/main/scala/spark/{HadoopWriter.scala => SparkHadoopWriter.scala} (96%) diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index aeeac65cca..6701f24ff9 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.mapred.FileOutputCommitter import org.apache.hadoop.mapred.FileOutputFormat -import org.apache.hadoop.mapred.HadoopWriter +import org.apache.hadoop.mapred.SparkHadoopWriter import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.OutputFormat @@ -653,7 +653,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) } conf.setOutputCommitter(classOf[FileOutputCommitter]) - FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf)) + FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf)) saveAsHadoopDataset(conf) } @@ -679,7 +679,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")") - val writer = new HadoopWriter(conf) + val writer = new SparkHadoopWriter(conf) writer.preSetup() def writeToFile(context: TaskContext, iter: Iterator[(K,V)]) { diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/SparkHadoopWriter.scala similarity index 96% rename from core/src/main/scala/spark/HadoopWriter.scala rename to core/src/main/scala/spark/SparkHadoopWriter.scala index 60840ce77e..6b330ef572 100644 --- a/core/src/main/scala/spark/HadoopWriter.scala +++ b/core/src/main/scala/spark/SparkHadoopWriter.scala @@ -36,7 +36,7 @@ import spark.SerializableWritable * Saves the RDD using a JobConf, which should contain an output key class, an output value class, * a filename to write to, etc, exactly like in a Hadoop MapReduce job. */ -class HadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable { +class SparkHadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable { private val now = new Date() private val conf = new SerializableWritable(jobConf) @@ -165,7 +165,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoop splitID = splitid attemptID = attemptid - jID = new SerializableWritable[JobID](HadoopWriter.createJobID(now, jobid)) + jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid)) taID = new SerializableWritable[TaskAttemptID]( new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID)) } @@ -179,7 +179,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoop } } -object HadoopWriter { +object SparkHadoopWriter { def createJobID(time: Date, id: Int): JobID = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) From a0f08484636395394ac2e0d22ca0ca3e1606664c Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Wed, 14 Aug 2013 15:36:12 -0700 Subject: [PATCH 16/31] Update default version of Hadoop to 1.2.1 --- pom.xml | 2 +- project/SparkBuild.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 7e6d38df9f..135b18bd26 100644 --- a/pom.xml +++ b/pom.xml @@ -540,7 +540,7 @@ org.apache.hadoop hadoop-core - 1.0.4 + 1.2.1 diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index ede49ea38b..350a36a964 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -170,7 +170,7 @@ object SparkBuild extends Build { "org.apache.mesos" % "mesos" % "0.12.1", "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test", - "org.apache.hadoop" % "hadoop-client" % "1.0.4" excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-client" % "1.2.1" excludeAll(excludeJackson, excludeNetty, excludeAsm), "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", From 3f98eff63a3df35f6dc56f0786c828cdbe4ffcf1 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Wed, 14 Aug 2013 17:34:34 -0700 Subject: [PATCH 17/31] Allow make-distribution.sh to specify Hadoop version used --- make-distribution.sh | 53 +++++++++++++++++++++++++++++++--------- project/SparkBuild.scala | 28 ++++++++++++++++----- 2 files changed, 64 insertions(+), 17 deletions(-) diff --git a/make-distribution.sh b/make-distribution.sh index 0a8941c1f8..a101024de5 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -24,9 +24,10 @@ # so it is completely self contained. # It does not contain source or *.class files. # -# Arguments -# (none): Creates dist/ directory -# tgz: Additionally creates spark-$VERSION-bin.tar.gz +# Optional Arguments +# --tgz: Additionally creates spark-$VERSION-bin.tar.gz +# --hadoop VERSION: Builds against specified version of Hadoop. +# --with-yarn: Enables support for Hadoop YARN. # # Recommended deploy/testing procedure (standalone mode): # 1) Rsync / deploy the dist/ dir to one host @@ -44,20 +45,50 @@ DISTDIR="$FWDIR/dist" export TERM=dumb # Prevents color codes in SBT output VERSION=$($FWDIR/sbt/sbt "show version" | tail -1 | cut -f 2 | sed 's/^\([a-zA-Z0-9.-]*\).*/\1/') -if [ "$1" == "tgz" ]; then - echo "Making spark-$VERSION-bin.tar.gz" +# Initialize defaults +SPARK_HADOOP_VERSION=1.2.1 +SPARK_YARN_MODE=false +MAKE_TGZ=false + +# Parse arguments +while (( "$#" )); do + case $1 in + --hadoop) + SPARK_HADOOP_VERSION="$2" + shift + ;; + --with-yarn) + SPARK_YARN_MODE=true + ;; + --tgz) + MAKE_TGZ=true + ;; + esac + shift +done + +if [ "$MAKE_TGZ" == "true" ]; then + echo "Making spark-$VERSION-hadoop_$SPARK_HADOOP_VERSION-bin.tar.gz" else echo "Making distribution for Spark $VERSION in $DISTDIR..." fi +echo "Hadoop version set to $SPARK_HADOOP_VERSION" +if [ "$SPARK_YARN_MODE" == "true" ]; then + echo "YARN enabled" +else + echo "YARN disabled" +fi # Build fat JAR -$FWDIR/sbt/sbt "repl/assembly" +export SPARK_HADOOP_VERSION +export SPARK_YARN_MODE +"$FWDIR/sbt/sbt" "repl/assembly" # Make directories rm -rf "$DISTDIR" mkdir -p "$DISTDIR/jars" -echo "$VERSION" >$DISTDIR/RELEASE +echo "$VERSION" > "$DISTDIR/RELEASE" # Copy jars cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/" @@ -69,9 +100,9 @@ cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR" cp "$FWDIR/spark-executor" "$DISTDIR" -if [ "$1" == "tgz" ]; then +if [ "$MAKE_TGZ" == "true" ]; then TARDIR="$FWDIR/spark-$VERSION" - cp -r $DISTDIR $TARDIR - tar -zcf spark-$VERSION-bin.tar.gz -C $FWDIR spark-$VERSION - rm -rf $TARDIR + cp -r "$DISTDIR" "$TARDIR" + tar -zcf "spark-$VERSION-hadoop_$SPARK_HADOOP_VERSION-bin.tar.gz" -C "$FWDIR" "spark-$VERSION" + rm -rf "$TARDIR" fi diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 350a36a964..23c7179919 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -24,10 +24,15 @@ import AssemblyKeys._ //import com.jsuereth.pgp.sbtplugin.PgpKeys._ object SparkBuild extends Build { + // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or + // "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop. + val HADOOP_VERSION = "1.2.1" + val HADOOP_YARN = false + // HBase version; set as appropriate. val HBASE_VERSION = "0.94.6" - lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel, streaming, mllib, tools, yarn) + lazy val root = Project("root", file("."), settings = rootSettings) aggregate(allProjects:_*) lazy val core = Project("core", file("core"), settings = coreSettings) @@ -49,6 +54,17 @@ object SparkBuild extends Build { lazy val MavenCompile = config("m2r") extend(Compile) lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy") + // Allows build configuration to be set through environment variables + lazy val hadoopVersion = scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", HADOOP_VERSION) + lazy val isYarnMode = scala.util.Properties.envOrNone("SPARK_YARN_MODE") match { + case None => HADOOP_YARN + case Some(v) => v.toBoolean + } + + // Conditionally include the yarn sub-project + lazy val maybeYarn = if(isYarnMode) Seq[ProjectReference](yarn) else Seq[ProjectReference]() + lazy val allProjects = Seq[ProjectReference](core, repl, examples, bagel, streaming, mllib, tools) ++ maybeYarn + def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.spark-project", version := "0.8.0-SNAPSHOT", @@ -170,7 +186,7 @@ object SparkBuild extends Build { "org.apache.mesos" % "mesos" % "0.12.1", "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test", - "org.apache.hadoop" % "hadoop-client" % "1.2.1" excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", @@ -239,10 +255,10 @@ object SparkBuild extends Build { name := "spark-yarn", libraryDependencies ++= Seq( // Exclude rule required for all ? - "org.apache.hadoop" % "hadoop-client" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-api" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-common" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-client" % "2.0.2-alpha" excludeAll(excludeJackson, excludeNetty, excludeAsm) + "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm) ) ) ++ assemblySettings ++ extraAssemblySettings From 8add2d7a59c59e72539da86a58b9c2980843f1e0 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Wed, 14 Aug 2013 17:49:42 -0700 Subject: [PATCH 18/31] Fix repl/assembly when YARN enabled --- project/SparkBuild.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 23c7179919..fa9ec7deca 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -36,7 +36,7 @@ object SparkBuild extends Build { lazy val core = Project("core", file("core"), settings = coreSettings) - lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) dependsOn(bagel) dependsOn(mllib) + lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn(core) dependsOn(bagel) dependsOn(mllib) dependsOn(maybeYarn:_*) lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming) dependsOn(mllib) @@ -62,8 +62,9 @@ object SparkBuild extends Build { } // Conditionally include the yarn sub-project - lazy val maybeYarn = if(isYarnMode) Seq[ProjectReference](yarn) else Seq[ProjectReference]() - lazy val allProjects = Seq[ProjectReference](core, repl, examples, bagel, streaming, mllib, tools) ++ maybeYarn + lazy val maybeYarn = if(isYarnMode) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]() + lazy val maybeYarnRef = if(isYarnMode) Seq[ProjectReference](yarn) else Seq[ProjectReference]() + lazy val allProjects = Seq[ProjectReference](core, repl, examples, bagel, streaming, mllib, tools) ++ maybeYarnRef def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.spark-project", From 353fab2440dbf1369df20393e0377de2b327de72 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Thu, 15 Aug 2013 12:10:31 -0700 Subject: [PATCH 19/31] Initial changes to make Maven build agnostic of hadoop version --- assembly/pom.xml | 8 +-- bagel/pom.xml | 43 ++++---------- core/pom.xml | 63 ++------------------ examples/pom.xml | 144 +++++++++++++++++++--------------------------- mllib/pom.xml | 43 ++++---------- pom.xml | 32 ++++------- repl-bin/pom.xml | 76 ++++++++++-------------- repl/pom.xml | 72 +++++++---------------- streaming/pom.xml | 42 ++++---------- tools/pom.xml | 53 +++++------------ 10 files changed, 175 insertions(+), 401 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index cc5a4875af..76ac9f5478 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -38,12 +38,6 @@ - - hadoop1 - - hadoop1 - - hadoop2 @@ -89,4 +83,4 @@ ${project.version} - \ No newline at end of file + diff --git a/bagel/pom.xml b/bagel/pom.xml index 60bbc49e6c..3c82af3b33 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -32,11 +32,20 @@ http://spark-project.org/ + + org.spark-project + spark-core + ${project.version} + + + org.apache.hadoop + hadoop-client + provided + org.eclipse.jetty jetty-server - org.scalatest scalatest_${scala.version} @@ -60,33 +69,6 @@ - - hadoop1 - - - org.spark-project - spark-core - ${project.version} - hadoop1 - - - org.apache.hadoop - hadoop-core - provided - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop1 - - - - - hadoop2 @@ -96,11 +78,6 @@ ${project.version} hadoop2 - - org.apache.hadoop - hadoop-core - provided - org.apache.hadoop hadoop-client diff --git a/core/pom.xml b/core/pom.xml index dfadd22d42..680ae94a11 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -32,6 +32,11 @@ http://spark-project.org/ + + org.apache.hadoop + hadoop-client + provided + org.eclipse.jetty jetty-server @@ -130,7 +135,6 @@ com.codahale.metrics metrics-json - org.apache.derby derby @@ -210,66 +214,9 @@ - - hadoop1 - - - org.apache.hadoop - hadoop-core - provided - - - - - - org.codehaus.mojo - build-helper-maven-plugin - - - add-source - generate-sources - - add-source - - - - src/main/scala - src/hadoop1/scala - - - - - add-scala-test-sources - generate-test-sources - - add-test-source - - - - src/test/scala - - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop1 - - - - - hadoop2 - - org.apache.hadoop - hadoop-core - provided - org.apache.hadoop hadoop-client diff --git a/examples/pom.xml b/examples/pom.xml index a051da8a77..6a9c19ed6f 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -32,6 +32,31 @@ http://spark-project.org/ + + org.spark-project + spark-core + ${project.version} + + + org.spark-project + spark-streaming + ${project.version} + + + org.spark-project + spark-mllib + ${project.version} + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.hbase + hbase + 0.94.6 + org.scala-lang scala-library @@ -55,41 +80,41 @@ scalacheck_${scala.version} test - - org.apache.cassandra - cassandra-all - 1.2.5 - - - com.google.guava - guava - - - com.googlecode.concurrentlinkedhashmap - concurrentlinkedhashmap-lru - - - com.ning - compress-lzf - - - io.netty - netty - - - jline - jline - - - log4j - log4j - - - org.apache.cassandra.deps - avro - - - + + org.apache.cassandra + cassandra-all + 1.2.5 + + + com.google.guava + guava + + + com.googlecode.concurrentlinkedhashmap + concurrentlinkedhashmap-lru + + + com.ning + compress-lzf + + + io.netty + netty + + + jline + jline + + + log4j + log4j + + + org.apache.cassandra.deps + avro + + + target/scala-${scala.version}/classes @@ -103,50 +128,6 @@ - - hadoop1 - - - org.spark-project - spark-core - ${project.version} - hadoop1 - - - org.spark-project - spark-streaming - ${project.version} - hadoop1 - - - org.spark-project - spark-mllib - ${project.version} - hadoop1 - - - org.apache.hadoop - hadoop-core - provided - - - org.apache.hbase - hbase - 0.94.6 - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop1 - - - - - hadoop2 @@ -168,11 +149,6 @@ ${project.version} hadoop2 - - org.apache.hadoop - hadoop-core - provided - org.apache.hadoop hadoop-client diff --git a/mllib/pom.xml b/mllib/pom.xml index a07480fbe2..36f410d3b0 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -32,6 +32,16 @@ http://spark-project.org/ + + org.spark-project + spark-core + ${project.version} + + + org.apache.hadoop + hadoop-client + provided + org.eclipse.jetty jetty-server @@ -41,7 +51,6 @@ jblas 1.2.3 - org.scalatest scalatest_${scala.version} @@ -70,33 +79,6 @@ - - hadoop1 - - - org.spark-project - spark-core - ${project.version} - hadoop1 - - - org.apache.hadoop - hadoop-core - provided - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop1 - - - - - hadoop2 @@ -106,11 +88,6 @@ ${project.version} hadoop2 - - org.apache.hadoop - hadoop-core - provided - org.apache.hadoop hadoop-client diff --git a/pom.xml b/pom.xml index 135b18bd26..8d34fff3ee 100644 --- a/pom.xml +++ b/pom.xml @@ -325,6 +325,17 @@ 0.8 test + + org.apache.hadoop + hadoop-client + 1.2.1 + + + asm + asm + + + @@ -530,22 +541,6 @@ - - hadoop1 - - 1 - - - - - org.apache.hadoop - hadoop-core - 1.2.1 - - - - - hadoop2 @@ -553,11 +548,6 @@ - - org.apache.hadoop - hadoop-core - 2.0.0-mr1-cdh${cdh.version} - org.apache.hadoop hadoop-client diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index 7c4e722cc1..81aba06e14 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -37,6 +37,37 @@ root + + + org.spark-project + spark-core + ${project.version} + + + org.spark-project + spark-bagel + ${project.version} + runtime + + + org.spark-project + spark-examples + ${project.version} + runtime + + + org.spark-project + spark-repl + ${project.version} + runtime + + + org.apache.hadoop + hadoop-client + runtime + + + @@ -85,46 +116,6 @@ - - hadoop1 - - hadoop1 - - - - org.spark-project - spark-core - ${project.version} - hadoop1 - - - org.spark-project - spark-bagel - ${project.version} - hadoop1 - runtime - - - org.spark-project - spark-examples - ${project.version} - hadoop1 - runtime - - - org.spark-project - spark-repl - ${project.version} - hadoop1 - runtime - - - org.apache.hadoop - hadoop-core - runtime - - - hadoop2 @@ -158,11 +149,6 @@ hadoop2 runtime - - org.apache.hadoop - hadoop-core - runtime - org.apache.hadoop hadoop-client diff --git a/repl/pom.xml b/repl/pom.xml index 862595b9f9..81cebe178a 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -37,6 +37,28 @@ + + org.spark-project + spark-core + ${project.version} + + + org.spark-project + spark-bagel + ${project.version} + runtime + + + org.spark-project + spark-examples + ${project.version} + runtime + + + org.apache.hadoop + hadoop-client + provided + org.eclipse.jetty jetty-server @@ -57,7 +79,6 @@ org.slf4j slf4j-log4j12 - org.scalatest scalatest_${scala.version} @@ -117,50 +138,6 @@ - - hadoop1 - - hadoop1 - - - - org.spark-project - spark-core - ${project.version} - hadoop1 - - - org.spark-project - spark-bagel - ${project.version} - hadoop1 - runtime - - - org.spark-project - spark-examples - ${project.version} - hadoop1 - runtime - - - org.apache.hadoop - hadoop-core - provided - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop1 - - - - - hadoop2 @@ -187,11 +164,6 @@ hadoop2 runtime - - org.apache.hadoop - hadoop-core - provided - org.apache.hadoop hadoop-client diff --git a/streaming/pom.xml b/streaming/pom.xml index 7e6b06d772..9b478f7a05 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -40,6 +40,16 @@ + + org.spark-project + spark-core + ${project.version} + + + org.apache.hadoop + hadoop-client + provided + org.eclipse.jetty jetty-server @@ -117,33 +127,6 @@ - - hadoop1 - - - org.spark-project - spark-core - ${project.version} - hadoop1 - - - org.apache.hadoop - hadoop-core - provided - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop1 - - - - - hadoop2 @@ -153,11 +136,6 @@ ${project.version} hadoop2 - - org.apache.hadoop - hadoop-core - provided - org.apache.hadoop hadoop-client diff --git a/tools/pom.xml b/tools/pom.xml index 878eb82f18..c123c2ab23 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -31,6 +31,21 @@ http://spark-project.org/ + + org.spark-project + spark-core + ${project.version} + + + org.spark-project + spark-streaming + ${project.version} + + + org.apache.hadoop + hadoop-client + provided + org.scalatest scalatest_${scala.version} @@ -58,39 +73,6 @@ - - hadoop1 - - - org.spark-project - spark-core - ${project.version} - hadoop1 - - - org.spark-project - spark-streaming - ${project.version} - hadoop1 - - - org.apache.hadoop - hadoop-core - provided - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop1 - - - - - hadoop2 @@ -106,11 +88,6 @@ ${project.version} hadoop2 - - org.apache.hadoop - hadoop-core - provided - org.apache.hadoop hadoop-client From 11b42a84db255eb659412e9d0bf4622cb2e8b20a Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Thu, 15 Aug 2013 15:31:31 -0700 Subject: [PATCH 20/31] Maven build now works with CDH hadoop-2.0.0-mr1 --- assembly/pom.xml | 14 ----------- bagel/pom.xml | 27 -------------------- core/pom.xml | 52 -------------------------------------- examples/pom.xml | 44 -------------------------------- mllib/pom.xml | 27 -------------------- pom.xml | 55 +++++++++++++++------------------------- repl-bin/pom.xml | 40 ----------------------------- repl/pom.xml | 64 ++++++++--------------------------------------- streaming/pom.xml | 27 -------------------- tools/pom.xml | 33 ------------------------ 10 files changed, 30 insertions(+), 353 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index 76ac9f5478..3d645e0379 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -37,20 +37,6 @@ - - - hadoop2 - - hadoop2 - - - - hadoop2-yarn - - hadoop2-yarn - - - org.spark-project diff --git a/bagel/pom.xml b/bagel/pom.xml index 3c82af3b33..1b555bead7 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -69,33 +69,6 @@ - - hadoop2 - - - org.spark-project - spark-core - ${project.version} - hadoop2 - - - org.apache.hadoop - hadoop-client - provided - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop2 - - - - - hadoop2-yarn diff --git a/core/pom.xml b/core/pom.xml index 680ae94a11..9310d000fd 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -214,58 +214,6 @@ - - hadoop2 - - - org.apache.hadoop - hadoop-client - provided - - - - - - org.codehaus.mojo - build-helper-maven-plugin - - - add-source - generate-sources - - add-source - - - - src/main/scala - src/hadoop2/scala - - - - - add-scala-test-sources - generate-test-sources - - add-test-source - - - - src/test/scala - - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop2 - - - - - hadoop2-yarn diff --git a/examples/pom.xml b/examples/pom.xml index 6a9c19ed6f..6e54e94cf5 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -128,50 +128,6 @@ - - hadoop2 - - - org.spark-project - spark-core - ${project.version} - hadoop2 - - - org.spark-project - spark-streaming - ${project.version} - hadoop2 - - - org.spark-project - spark-mllib - ${project.version} - hadoop2 - - - org.apache.hadoop - hadoop-client - provided - - - org.apache.hbase - hbase - 0.94.6 - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop2 - - - - - hadoop2-yarn diff --git a/mllib/pom.xml b/mllib/pom.xml index 36f410d3b0..863aef9392 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -79,33 +79,6 @@ - - hadoop2 - - - org.spark-project - spark-core - ${project.version} - hadoop2 - - - org.apache.hadoop - hadoop-client - provided - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop2 - - - - - hadoop2-yarn diff --git a/pom.xml b/pom.xml index 8d34fff3ee..054f5d170c 100644 --- a/pom.xml +++ b/pom.xml @@ -73,8 +73,9 @@ 0.12.1 2.0.3 1.7.2 - 4.1.2 1.2.17 + 1.2.1 + 64m 512m @@ -328,7 +329,7 @@ org.apache.hadoop hadoop-client - 1.2.1 + ${hadoop.version} asm @@ -336,6 +337,23 @@ + + + org.apache.avro + avro + 1.7.4 + + + org.apache.avro + avro-ipc + 1.7.4 + + + org.jboss.netty + netty + + + @@ -541,39 +559,6 @@ - - hadoop2 - - 2 - - - - - org.apache.hadoop - hadoop-client - 2.0.0-mr1-cdh${cdh.version} - - - - org.apache.avro - avro - 1.7.4 - - - org.apache.avro - avro-ipc - 1.7.4 - - - org.jboss.netty - netty - - - - - - - hadoop2-yarn diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index 81aba06e14..eaee8ea016 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -116,46 +116,6 @@ - - hadoop2 - - hadoop2 - - - - org.spark-project - spark-core - ${project.version} - hadoop2 - - - org.spark-project - spark-bagel - ${project.version} - hadoop2 - runtime - - - org.spark-project - spark-examples - ${project.version} - hadoop2 - runtime - - - org.spark-project - spark-repl - ${project.version} - hadoop2 - runtime - - - org.apache.hadoop - hadoop-client - runtime - - - hadoop2-yarn diff --git a/repl/pom.xml b/repl/pom.xml index 81cebe178a..032c20e118 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -59,6 +59,16 @@ hadoop-client provided + + org.apache.avro + avro + provided + + + org.apache.avro + avro-ipc + provided + org.eclipse.jetty jetty-server @@ -138,60 +148,6 @@ - - hadoop2 - - hadoop2 - - - - org.spark-project - spark-core - ${project.version} - hadoop2 - - - org.spark-project - spark-bagel - ${project.version} - hadoop2 - runtime - - - org.spark-project - spark-examples - ${project.version} - hadoop2 - runtime - - - org.apache.hadoop - hadoop-client - provided - - - org.apache.avro - avro - provided - - - org.apache.avro - avro-ipc - provided - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop2 - - - - - hadoop2-yarn diff --git a/streaming/pom.xml b/streaming/pom.xml index 9b478f7a05..612ff0a024 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -127,33 +127,6 @@ - - hadoop2 - - - org.spark-project - spark-core - ${project.version} - hadoop2 - - - org.apache.hadoop - hadoop-client - provided - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop2 - - - - - hadoop2-yarn diff --git a/tools/pom.xml b/tools/pom.xml index c123c2ab23..5864c9f217 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -73,39 +73,6 @@ - - hadoop2 - - - org.spark-project - spark-core - ${project.version} - hadoop2 - - - org.spark-project - spark-streaming - ${project.version} - hadoop2 - - - org.apache.hadoop - hadoop-client - provided - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop2 - - - - - hadoop2-yarn From 9dd15fe700ad8f52739cce58cbdf198fab8fd5d8 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Thu, 15 Aug 2013 15:40:37 -0700 Subject: [PATCH 21/31] Don't mark hadoop-client as 'provided' --- bagel/pom.xml | 1 - core/pom.xml | 1 - examples/pom.xml | 1 - mllib/pom.xml | 1 - 4 files changed, 4 deletions(-) diff --git a/bagel/pom.xml b/bagel/pom.xml index 1b555bead7..fc5dce7ffd 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -40,7 +40,6 @@ org.apache.hadoop hadoop-client - provided org.eclipse.jetty diff --git a/core/pom.xml b/core/pom.xml index 9310d000fd..c5baecfaad 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -35,7 +35,6 @@ org.apache.hadoop hadoop-client - provided org.eclipse.jetty diff --git a/examples/pom.xml b/examples/pom.xml index 6e54e94cf5..4ccc6aa198 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -50,7 +50,6 @@ org.apache.hadoop hadoop-client - provided org.apache.hbase diff --git a/mllib/pom.xml b/mllib/pom.xml index 863aef9392..801aa6e719 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -40,7 +40,6 @@ org.apache.hadoop hadoop-client - provided org.eclipse.jetty From 741ecd56fe714fe42c22518aefcfa48fa3a448c7 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Thu, 15 Aug 2013 16:32:15 -0700 Subject: [PATCH 22/31] Forgot to remove a few references to ${classifier} --- assembly/pom.xml | 5 ----- repl-bin/pom.xml | 8 ++++---- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index 3d645e0379..ca20ccadba 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -41,31 +41,26 @@ org.spark-project spark-core - ${classifier.name} ${project.version} org.spark-project spark-bagel - ${classifier.name} ${project.version} org.spark-project spark-mllib - ${classifier.name} ${project.version} org.spark-project spark-repl - ${classifier.name} ${project.version} org.spark-project spark-streaming - ${classifier.name} ${project.version} diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index eaee8ea016..270a160120 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -32,8 +32,8 @@ http://spark-project.org/ - spark-${classifier} - /usr/share/spark-${classifier} + spark + /usr/share/spark root @@ -75,7 +75,7 @@ maven-shade-plugin false - ${project.build.directory}/${project.artifactId}-${project.version}-shaded-${classifier}.jar + ${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar *:* @@ -207,7 +207,7 @@ gzip - ${project.build.directory}/${project.artifactId}-${project.version}-shaded-${classifier}.jar + ${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar file perm From ad580b94d506b3dbb8b4206326e4df2e1104e3b3 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Thu, 15 Aug 2013 16:49:24 -0700 Subject: [PATCH 23/31] Maven build now also works with YARN --- bagel/pom.xml | 40 ------------ bin/compute-classpath.sh | 2 +- core/pom.xml | 70 --------------------- examples/pom.xml | 57 ----------------- mllib/pom.xml | 40 ------------ pom.xml | 128 +++++++++++++++++++++++++++++++++++++++ repl-bin/pom.xml | 47 +------------- repl/pom.xml | 64 +------------------- streaming/pom.xml | 40 ------------ tools/pom.xml | 46 -------------- yarn/pom.xml | 106 ++++++++++++++++++++++++++++++++ 11 files changed, 237 insertions(+), 403 deletions(-) create mode 100644 yarn/pom.xml diff --git a/bagel/pom.xml b/bagel/pom.xml index fc5dce7ffd..ae40f38e43 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -66,44 +66,4 @@ - - - - hadoop2-yarn - - - org.spark-project - spark-core - ${project.version} - hadoop2-yarn - - - org.apache.hadoop - hadoop-client - provided - - - org.apache.hadoop - hadoop-yarn-api - provided - - - org.apache.hadoop - hadoop-yarn-common - provided - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop2-yarn - - - - - - diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index db6fc866ab..f975d3bfb9 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -63,7 +63,7 @@ function dev_classpath { CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*" # Add the shaded JAR for Maven builds if [ -e $REPL_BIN_DIR/target ]; then - for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do + for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded.jar'`; do CLASSPATH="$CLASSPATH:$jar" done # The shaded JAR doesn't contain examples, so include those separately diff --git a/core/pom.xml b/core/pom.xml index c5baecfaad..90d279c635 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -211,74 +211,4 @@ - - - - hadoop2-yarn - - - org.apache.hadoop - hadoop-client - provided - - - org.apache.hadoop - hadoop-yarn-api - provided - - - org.apache.hadoop - hadoop-yarn-common - provided - - - org.apache.hadoop - hadoop-yarn-client - provided - - - - - - org.codehaus.mojo - build-helper-maven-plugin - - - add-source - generate-sources - - add-source - - - - src/main/scala - src/hadoop2-yarn/scala - - - - - add-scala-test-sources - generate-test-sources - - add-test-source - - - - src/test/scala - - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop2-yarn - - - - - - diff --git a/examples/pom.xml b/examples/pom.xml index 4ccc6aa198..4eb32935f4 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -125,61 +125,4 @@ - - - - hadoop2-yarn - - - org.spark-project - spark-core - ${project.version} - hadoop2-yarn - - - org.spark-project - spark-streaming - ${project.version} - hadoop2-yarn - - - org.spark-project - spark-mllib - ${project.version} - hadoop2-yarn - - - org.apache.hadoop - hadoop-client - provided - - - org.apache.hadoop - hadoop-yarn-api - provided - - - org.apache.hadoop - hadoop-yarn-common - provided - - - org.apache.hbase - hbase - 0.94.6 - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop2-yarn - - - - - - diff --git a/mllib/pom.xml b/mllib/pom.xml index 801aa6e719..3292f6dad0 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -76,44 +76,4 @@ - - - - hadoop2-yarn - - - org.spark-project - spark-core - ${project.version} - hadoop2-yarn - - - org.apache.hadoop - hadoop-client - provided - - - org.apache.hadoop - hadoop-yarn-api - provided - - - org.apache.hadoop - hadoop-yarn-common - provided - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop2-yarn - - - - - - diff --git a/pom.xml b/pom.xml index 054f5d170c..4714576f3b 100644 --- a/pom.xml +++ b/pom.xml @@ -335,6 +335,26 @@ asm asm + + org.jboss.netty + netty + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-jaxrs + + + org.codehaus.jackson + jackson-xc + @@ -568,6 +588,10 @@ 2.0.5-alpha + + yarn + + maven-root @@ -589,21 +613,125 @@ org.apache.hadoop hadoop-client ${yarn.version} + + + asm + asm + + + org.jboss.netty + netty + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-jaxrs + + + org.codehaus.jackson + jackson-xc + + org.apache.hadoop hadoop-yarn-api ${yarn.version} + + + asm + asm + + + org.jboss.netty + netty + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-jaxrs + + + org.codehaus.jackson + jackson-xc + + org.apache.hadoop hadoop-yarn-common ${yarn.version} + + + asm + asm + + + org.jboss.netty + netty + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-jaxrs + + + org.codehaus.jackson + jackson-xc + + org.apache.hadoop hadoop-yarn-client ${yarn.version} + + + asm + asm + + + org.jboss.netty + netty + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-jaxrs + + + org.codehaus.jackson + jackson-xc + + diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index 270a160120..f3bde60744 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -118,56 +118,11 @@ hadoop2-yarn - - hadoop2-yarn - org.spark-project - spark-core + spark-yarn ${project.version} - hadoop2-yarn - - - org.spark-project - spark-bagel - ${project.version} - hadoop2-yarn - runtime - - - org.spark-project - spark-examples - ${project.version} - hadoop2-yarn - runtime - - - org.spark-project - spark-repl - ${project.version} - hadoop2-yarn - runtime - - - org.apache.hadoop - hadoop-client - runtime - - - org.apache.hadoop - hadoop-yarn-api - runtime - - - org.apache.hadoop - hadoop-yarn-common - runtime - - - org.apache.hadoop - hadoop-yarn-client - runtime diff --git a/repl/pom.xml b/repl/pom.xml index 032c20e118..429de7861f 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -146,78 +146,16 @@ - hadoop2-yarn - - hadoop2-yarn - org.spark-project - spark-core - ${project.version} - hadoop2-yarn - - - org.spark-project - spark-bagel - ${project.version} - hadoop2-yarn - runtime - - - org.spark-project - spark-examples - ${project.version} - hadoop2-yarn - runtime - - - org.spark-project - spark-streaming + spark-yarn ${project.version} - hadoop2-yarn - runtime - - - org.apache.hadoop - hadoop-client - provided - - - org.apache.hadoop - hadoop-yarn-api - provided - - - org.apache.hadoop - hadoop-yarn-common - provided - - - org.apache.avro - avro - provided - - - org.apache.avro - avro-ipc - provided - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop2-yarn - - - - diff --git a/streaming/pom.xml b/streaming/pom.xml index 612ff0a024..1860990122 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -125,44 +125,4 @@ - - - - hadoop2-yarn - - - org.spark-project - spark-core - ${project.version} - hadoop2-yarn - - - org.apache.hadoop - hadoop-client - provided - - - org.apache.hadoop - hadoop-yarn-api - provided - - - org.apache.hadoop - hadoop-yarn-common - provided - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop2-yarn - - - - - - diff --git a/tools/pom.xml b/tools/pom.xml index 5864c9f217..9177d85b2f 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -71,50 +71,4 @@ - - - - hadoop2-yarn - - - org.spark-project - spark-core - ${project.version} - hadoop2-yarn - - - org.spark-project - spark-streaming - ${project.version} - hadoop2-yarn - - - org.apache.hadoop - hadoop-client - provided - - - org.apache.hadoop - hadoop-yarn-api - provided - - - org.apache.hadoop - hadoop-yarn-common - provided - - - - - - org.apache.maven.plugins - maven-jar-plugin - - hadoop2-yarn - - - - - - diff --git a/yarn/pom.xml b/yarn/pom.xml new file mode 100644 index 0000000000..6acde8e98c --- /dev/null +++ b/yarn/pom.xml @@ -0,0 +1,106 @@ + + + + + 4.0.0 + + org.spark-project + spark-parent + 0.8.0-SNAPSHOT + ../pom.xml + + + org.spark-project + spark-yarn + jar + Spark Project YARN Support + http://spark-project.org/ + + + + org.spark-project + spark-core + ${project.version} + + + org.apache.hadoop + hadoop-yarn-api + + + org.apache.hadoop + hadoop-yarn-common + + + org.apache.hadoop + hadoop-yarn-client + + + org.apache.avro + avro + + + org.apache.avro + avro-ipc + + + + + target/scala-${scala.version}/classes + target/scala-${scala.version}/test-classes + + + org.apache.maven.plugins + maven-shade-plugin + + false + ${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar + + + *:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + package + + shade + + + + + + reference.conf + + + + + + + + + From c1e547bb7f21ebed198bea0aed0a122eb0d70835 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Fri, 16 Aug 2013 12:26:45 -0700 Subject: [PATCH 24/31] Updates to repl and example POMs to match SBT build --- examples/pom.xml | 10 ++++++++++ repl-bin/pom.xml | 6 ------ repl/pom.xml | 6 ------ 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/examples/pom.xml b/examples/pom.xml index 4eb32935f4..023ad8cb45 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -55,6 +55,16 @@ org.apache.hbase hbase 0.94.6 + + + asm + asm + + + org.jboss.netty + netty + + org.scala-lang diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index f3bde60744..f132c44fb9 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -49,12 +49,6 @@ ${project.version} runtime - - org.spark-project - spark-examples - ${project.version} - runtime - org.spark-project spark-repl diff --git a/repl/pom.xml b/repl/pom.xml index 429de7861f..82e26defbc 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -48,12 +48,6 @@ ${project.version} runtime - - org.spark-project - spark-examples - ${project.version} - runtime - org.apache.hadoop hadoop-client From b1d99744a813a72301260612943f46853794f00f Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Fri, 16 Aug 2013 13:49:26 -0700 Subject: [PATCH 25/31] Fix SBT build under Hadoop 0.23.x --- project/SparkBuild.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index fa9ec7deca..4023626c16 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -193,6 +193,17 @@ object SparkBuild extends Build { "com.codahale.metrics" % "metrics-json" % "3.0.0", "com.twitter" % "chill_2.9.3" % "0.3.1", "com.twitter" % "chill-java" % "0.3.1" + ) ++ ( + if (isYarnMode) { + // This kludge is needed for 0.23.x + Seq( + "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm) + ) + } else { + Seq() + } ) ) ++ assemblySettings ++ extraAssemblySettings From 67b593607c7df934d5a73012fe9cce220b25f321 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Fri, 16 Aug 2013 13:53:16 -0700 Subject: [PATCH 26/31] Rename YARN build flag to SPARK_WITH_YARN --- make-distribution.sh | 8 ++++---- project/SparkBuild.scala | 12 +++++++----- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/make-distribution.sh b/make-distribution.sh index a101024de5..55dc22b992 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -47,7 +47,7 @@ VERSION=$($FWDIR/sbt/sbt "show version" | tail -1 | cut -f 2 | sed 's/^\([a-zA-Z # Initialize defaults SPARK_HADOOP_VERSION=1.2.1 -SPARK_YARN_MODE=false +SPARK_WITH_YARN=false MAKE_TGZ=false # Parse arguments @@ -58,7 +58,7 @@ while (( "$#" )); do shift ;; --with-yarn) - SPARK_YARN_MODE=true + SPARK_WITH_YARN=true ;; --tgz) MAKE_TGZ=true @@ -74,7 +74,7 @@ else fi echo "Hadoop version set to $SPARK_HADOOP_VERSION" -if [ "$SPARK_YARN_MODE" == "true" ]; then +if [ "$SPARK_WITH_YARN" == "true" ]; then echo "YARN enabled" else echo "YARN disabled" @@ -82,7 +82,7 @@ fi # Build fat JAR export SPARK_HADOOP_VERSION -export SPARK_YARN_MODE +export SPARK_WITH_YARN "$FWDIR/sbt/sbt" "repl/assembly" # Make directories diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 4023626c16..cea982b886 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -26,8 +26,10 @@ import AssemblyKeys._ object SparkBuild extends Build { // Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or // "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop. - val HADOOP_VERSION = "1.2.1" - val HADOOP_YARN = false + // Note that these variables can be set through the environment variables + // SPARK_HADOOP_VERSION and SPARK_WITH_YARN. + val DEFAULT_HADOOP_VERSION = "1.2.1" + val DEFAULT_WITH_YARN = false // HBase version; set as appropriate. val HBASE_VERSION = "0.94.6" @@ -55,9 +57,9 @@ object SparkBuild extends Build { lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy") // Allows build configuration to be set through environment variables - lazy val hadoopVersion = scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", HADOOP_VERSION) - lazy val isYarnMode = scala.util.Properties.envOrNone("SPARK_YARN_MODE") match { - case None => HADOOP_YARN + lazy val hadoopVersion = scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HADOOP_VERSION) + lazy val isYarnMode = scala.util.Properties.envOrNone("SPARK_WITH_YARN") match { + case None => DEFAULT_WITH_YARN case Some(v) => v.toBoolean } From 44000b10ffed83ae605521701d104878f491f31c Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Sun, 18 Aug 2013 16:23:22 -0700 Subject: [PATCH 27/31] Make YARN POM file valid --- yarn/pom.xml | 61 ++++++++++++++++++++++++++++------------------------ 1 file changed, 33 insertions(+), 28 deletions(-) diff --git a/yarn/pom.xml b/yarn/pom.xml index 6acde8e98c..07dd170eae 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -30,34 +30,6 @@ Spark Project YARN Support http://spark-project.org/ - - - org.spark-project - spark-core - ${project.version} - - - org.apache.hadoop - hadoop-yarn-api - - - org.apache.hadoop - hadoop-yarn-common - - - org.apache.hadoop - hadoop-yarn-client - - - org.apache.avro - avro - - - org.apache.avro - avro-ipc - - - target/scala-${scala.version}/classes target/scala-${scala.version}/test-classes @@ -103,4 +75,37 @@ + + + + hadoop2-yarn + + + org.spark-project + spark-core + ${project.version} + + + org.apache.hadoop + hadoop-yarn-api + + + org.apache.hadoop + hadoop-yarn-common + + + org.apache.hadoop + hadoop-yarn-client + + + org.apache.avro + avro + + + org.apache.avro + avro-ipc + + + + From 47a7c4338a7e912d8677704204e98df15679322b Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Sun, 18 Aug 2013 16:58:08 -0700 Subject: [PATCH 28/31] Don't assume spark-examples JAR always exists --- bin/compute-classpath.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index f975d3bfb9..7a21b3c4a1 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -67,8 +67,9 @@ function dev_classpath { CLASSPATH="$CLASSPATH:$jar" done # The shaded JAR doesn't contain examples, so include those separately - EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar` - CLASSPATH+=":$EXAMPLES_JAR" + for jar in `find "$EXAMPLES_DIR/target" -name 'spark-examples*[0-9T].jar'`; do + CLASSPATH="$CLASSPATH:$jar" + done fi CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH="$CLASSPATH:$MLLIB_DIR/target/scala-$SCALA_VERSION/classes" From bdd861c6c34ca3fa158707b3a1bed91ef928c1e3 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Sun, 18 Aug 2013 18:28:10 -0700 Subject: [PATCH 29/31] Fix Maven build with Hadoop 0.23.9 --- core/pom.xml | 8 ++++++++ pom.xml | 11 ----------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 90d279c635..2870906092 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -36,6 +36,14 @@ org.apache.hadoop hadoop-client + + org.apache.avro + avro + + + org.apache.avro + avro-ipc + org.eclipse.jetty jetty-server diff --git a/pom.xml b/pom.xml index 4714576f3b..e7445319dd 100644 --- a/pom.xml +++ b/pom.xml @@ -733,17 +733,6 @@ - - - org.apache.avro - avro - 1.7.4 - - - org.apache.avro - avro-ipc - 1.7.4 - From 23f4622affc25685bb74ce09ed56ef3515551d17 Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Sun, 18 Aug 2013 18:53:57 -0700 Subject: [PATCH 30/31] Remove redundant dependencies from POMs --- bagel/pom.xml | 4 ---- examples/pom.xml | 4 ---- mllib/pom.xml | 4 ---- repl-bin/pom.xml | 5 ----- repl/pom.xml | 15 --------------- streaming/pom.xml | 5 ----- tools/pom.xml | 5 ----- 7 files changed, 42 deletions(-) diff --git a/bagel/pom.xml b/bagel/pom.xml index ae40f38e43..cbcf8d1239 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -37,10 +37,6 @@ spark-core ${project.version} - - org.apache.hadoop - hadoop-client - org.eclipse.jetty jetty-server diff --git a/examples/pom.xml b/examples/pom.xml index 023ad8cb45..0db52b8691 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -47,10 +47,6 @@ spark-mllib ${project.version} - - org.apache.hadoop - hadoop-client - org.apache.hbase hbase diff --git a/mllib/pom.xml b/mllib/pom.xml index 3292f6dad0..ab31d5734e 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -37,10 +37,6 @@ spark-core ${project.version} - - org.apache.hadoop - hadoop-client - org.eclipse.jetty jetty-server diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml index f132c44fb9..919e35f240 100644 --- a/repl-bin/pom.xml +++ b/repl-bin/pom.xml @@ -55,11 +55,6 @@ ${project.version} runtime - - org.apache.hadoop - hadoop-client - runtime - diff --git a/repl/pom.xml b/repl/pom.xml index 82e26defbc..5bc9a99c5c 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -48,21 +48,6 @@ ${project.version} runtime - - org.apache.hadoop - hadoop-client - provided - - - org.apache.avro - avro - provided - - - org.apache.avro - avro-ipc - provided - org.eclipse.jetty jetty-server diff --git a/streaming/pom.xml b/streaming/pom.xml index 1860990122..5c0582d6fb 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -45,11 +45,6 @@ spark-core ${project.version} - - org.apache.hadoop - hadoop-client - provided - org.eclipse.jetty jetty-server diff --git a/tools/pom.xml b/tools/pom.xml index 9177d85b2f..95b5e80e5b 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -41,11 +41,6 @@ spark-streaming ${project.version} - - org.apache.hadoop - hadoop-client - provided - org.scalatest scalatest_${scala.version} From 6f6944c8079bffdd088ddb0a84fbf83356e294ea Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Mon, 19 Aug 2013 12:33:13 -0700 Subject: [PATCH 31/31] Update SBT build to use simpler fix for Hadoop 0.23.9 --- project/SparkBuild.scala | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index cea982b886..282b0cbed5 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -190,22 +190,13 @@ object SparkBuild extends Build { "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test", "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), + "org.apache.avro" % "avro" % "1.7.4", + "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty), "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", "com.twitter" % "chill_2.9.3" % "0.3.1", "com.twitter" % "chill-java" % "0.3.1" - ) ++ ( - if (isYarnMode) { - // This kludge is needed for 0.23.x - Seq( - "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm), - "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm) - ) - } else { - Seq() - } ) ) ++ assemblySettings ++ extraAssemblySettings