diff --git a/README.md b/README.md index c840a68f76b17..dc8135b9b8b51 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,12 @@ # Apache Spark -Lightning-Fast Cluster Computing - +Lightning-Fast Cluster Computing - ## Online Documentation You can find the latest Spark documentation, including a programming -guide, on the project webpage at . +guide, on the project webpage at . This README file only contains basic setup instructions. @@ -92,21 +92,10 @@ If your project is built with Maven, add this to your POM file's ` ## Configuration -Please refer to the [Configuration guide](http://spark.incubator.apache.org/docs/latest/configuration.html) +Please refer to the [Configuration guide](http://spark.apache.org/docs/latest/configuration.html) in the online documentation for an overview on how to configure Spark. -## Apache Incubator Notice - -Apache Spark is an effort undergoing incubation at The Apache Software -Foundation (ASF), sponsored by the Apache Incubator. Incubation is required of -all newly accepted projects until a further review indicates that the -infrastructure, communications, and decision making process have stabilized in -a manner consistent with other successful ASF projects. While incubation status -is not necessarily a reflection of the completeness or stability of the code, -it does indicate that the project has yet to be fully endorsed by the ASF. - - ## Contributing to Spark Contributions via GitHub pull requests are gladly accepted from their original diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala index dd3eed8affe39..70c7474a936dc 100644 --- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala @@ -27,7 +27,7 @@ object Bagel extends Logging { /** * Runs a Bagel program. - * @param sc [[org.apache.spark.SparkContext]] to use for the program. + * @param sc org.apache.spark.SparkContext to use for the program. * @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the * Key will be the vertex id. * @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often @@ -38,10 +38,10 @@ object Bagel extends Logging { * @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices * after each superstep and provides the result to each vertex in the next * superstep. - * @param partitioner [[org.apache.spark.Partitioner]] partitions values by key + * @param partitioner org.apache.spark.Partitioner partitions values by key * @param numPartitions number of partitions across which to split the graph. * Default is the default parallelism of the SparkContext - * @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of + * @param storageLevel org.apache.spark.storage.StorageLevel to use for caching of * intermediate RDDs in each superstep. Defaults to caching in memory. * @param compute function that takes a Vertex, optional set of (possibly combined) messages to * the Vertex, optional Aggregator and the current superstep, @@ -131,7 +131,7 @@ object Bagel extends Logging { /** * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default - * [[org.apache.spark.HashPartitioner]] and default storage level + * org.apache.spark.HashPartitioner and default storage level */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( sc: SparkContext, @@ -146,7 +146,7 @@ object Bagel extends Logging { /** * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the - * default [[org.apache.spark.HashPartitioner]] + * default org.apache.spark.HashPartitioner */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest]( sc: SparkContext, @@ -166,7 +166,7 @@ object Bagel extends Logging { /** * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], - * default [[org.apache.spark.HashPartitioner]], + * default org.apache.spark.HashPartitioner, * [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest]( @@ -180,7 +180,7 @@ object Bagel extends Logging { /** * Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], - * the default [[org.apache.spark.HashPartitioner]] + * the default org.apache.spark.HashPartitioner * and [[org.apache.spark.bagel.DefaultCombiner]] */ def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest]( diff --git a/core/pom.xml b/core/pom.xml index 5576b0c3b4795..d3a81d564c2e8 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -47,16 +47,8 @@ - org.apache.avro - avro - - - org.apache.avro - avro-ipc - - - org.apache.zookeeper - zookeeper + org.apache.curator + curator-recipes org.eclipse.jetty @@ -130,8 +122,9 @@ scala-library - net.liftweb - lift-json_${scala.binary.version} + org.json4s + json4s-jackson_${scala.binary.version} + 3.2.6 it.unimi.dsi @@ -224,7 +217,7 @@ true - + @@ -237,7 +230,7 @@ - + diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a24f07e9a6e9a..da778aa851cd2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -240,6 +240,7 @@ class SparkContext( localProperties.set(props) } + @deprecated("Properties no longer need to be explicitly initialized.", "1.0.0") def initLocalProperties() { localProperties.set(new Properties()) } @@ -308,7 +309,7 @@ class SparkContext( private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this) private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this) - def initDriverMetrics() { + private def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) SparkEnv.get.metricsSystem.registerSource(blockManagerSource) } @@ -350,7 +351,7 @@ class SparkContext( * using the older MapReduce API (`org.apache.hadoop.mapred`). * * @param conf JobConf for setting up the dataset - * @param inputFormatClass Class of the [[InputFormat]] + * @param inputFormatClass Class of the InputFormat * @param keyClass Class of the keys * @param valueClass Class of the values * @param minSplits Minimum number of Hadoop Splits to generate. diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 0055c98844ded..d7ce8fdfc23f4 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -126,6 +126,8 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] = wrapRDD(rdd.subtract(other, p)) + def generator: String = rdd.generator + override def toString = rdd.toString /** Assign a name to this RDD */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 24a9925dbd22c..729668fb679b4 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -74,7 +74,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * of the original partition. */ def mapPartitionsWithIndex[R: ClassTag]( - f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]], + f: JFunction2[java.lang.Integer, java.util.Iterator[T], java.util.Iterator[R]], preservesPartitioning: Boolean = false): JavaRDD[R] = new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))), preservesPartitioning)) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index dc26b7f621fee..8e0eab56a3dcf 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -17,6 +17,7 @@ package org.apache.spark.api.java +import java.util import java.util.{Map => JMap} import scala.collection.JavaConversions @@ -92,6 +93,24 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork private[spark] val env = sc.env + def isLocal: java.lang.Boolean = sc.isLocal + + def sparkUser: String = sc.sparkUser + + def master: String = sc.master + + def appName: String = sc.appName + + def jars: util.List[String] = sc.jars + + def startTime: java.lang.Long = sc.startTime + + /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ + def defaultParallelism: java.lang.Integer = sc.defaultParallelism + + /** Default min number of partitions for Hadoop RDDs when not given by user */ + def defaultMinSplits: java.lang.Integer = sc.defaultMinSplits + /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { implicit val ctag: ClassTag[T] = fakeClassTag diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index 190b331cfe7d8..d48c1892aea9c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -27,7 +27,8 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.sys.process._ -import net.liftweb.json.JsonParser +import org.json4s._ +import org.json4s.jackson.JsonMethods import org.apache.spark.{Logging, SparkContext} import org.apache.spark.deploy.master.RecoveryState @@ -311,7 +312,7 @@ private[spark] object FaultToleranceTest extends App with Logging { private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File) extends Logging { - implicit val formats = net.liftweb.json.DefaultFormats + implicit val formats = org.json4s.DefaultFormats var state: RecoveryState.Value = _ var liveWorkerIPs: List[String] = _ var numLiveApps = 0 @@ -321,7 +322,7 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val def readState() { try { val masterStream = new InputStreamReader(new URL("http://%s:8080/json".format(ip)).openStream) - val json = JsonParser.parse(masterStream, closeAutomatically = true) + val json = JsonMethods.parse(masterStream) val workers = json \ "workers" val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE") @@ -349,7 +350,7 @@ private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val private[spark] class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File) extends Logging { - implicit val formats = net.liftweb.json.DefaultFormats + implicit val formats = org.json4s.DefaultFormats logDebug("Created worker: " + this) diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index 318beb5db5214..cefb1ff97e83c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy -import net.liftweb.json.JsonDSL._ +import org.json4s.JsonDSL._ import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala index f25a1ad3bf92a..a730fe1f599af 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala @@ -30,6 +30,7 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader * [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]] */ private[spark] trait LeaderElectionAgent extends Actor { + //TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring. val masterActor: ActorRef } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala index 74a9f8cd824fb..db72d8ae9bdaf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala @@ -28,10 +28,6 @@ private[master] object MasterMessages { case object RevokedLeadership - // Actor System to LeaderElectionAgent - - case object CheckLeader - // Actor System to Master case object CheckForWorkerTimeOut diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala new file mode 100644 index 0000000000000..2d35397035a03 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import org.apache.spark.{SparkConf, Logging} +import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} +import org.apache.curator.retry.ExponentialBackoffRetry +import org.apache.zookeeper.KeeperException + + +object SparkCuratorUtil extends Logging { + + val ZK_CONNECTION_TIMEOUT_MILLIS = 15000 + val ZK_SESSION_TIMEOUT_MILLIS = 60000 + val RETRY_WAIT_MILLIS = 5000 + val MAX_RECONNECT_ATTEMPTS = 3 + + def newClient(conf: SparkConf): CuratorFramework = { + val ZK_URL = conf.get("spark.deploy.zookeeper.url") + val zk = CuratorFrameworkFactory.newClient(ZK_URL, + ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS, + new ExponentialBackoffRetry(RETRY_WAIT_MILLIS, MAX_RECONNECT_ATTEMPTS)) + zk.start() + zk + } + + def mkdir(zk: CuratorFramework, path: String) { + if (zk.checkExists().forPath(path) == null) { + try { + zk.create().creatingParentsIfNeeded().forPath(path) + } catch { + case nodeExist: KeeperException.NodeExistsException => + // do nothing, ignore node existing exception. + case e: Exception => throw e + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala deleted file mode 100644 index 57758055b19c0..0000000000000 --- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.master - -import scala.collection.JavaConversions._ - -import org.apache.zookeeper._ -import org.apache.zookeeper.Watcher.Event.KeeperState -import org.apache.zookeeper.data.Stat - -import org.apache.spark.{Logging, SparkConf} - -/** - * Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry - * logic. If the ZooKeeper session expires or otherwise dies, a new ZooKeeper session will be - * created. If ZooKeeper remains down after several retries, the given - * [[org.apache.spark.deploy.master.SparkZooKeeperWatcher SparkZooKeeperWatcher]] will be - * informed via zkDown(). - * - * Additionally, all commands sent to ZooKeeper will be retried until they either fail too many - * times or a semantic exception is thrown (e.g., "node already exists"). - */ -private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher, - conf: SparkConf) extends Logging { - val ZK_URL = conf.get("spark.deploy.zookeeper.url", "") - - val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE - val ZK_TIMEOUT_MILLIS = 30000 - val RETRY_WAIT_MILLIS = 5000 - val ZK_CHECK_PERIOD_MILLIS = 10000 - val MAX_RECONNECT_ATTEMPTS = 3 - - private var zk: ZooKeeper = _ - - private val watcher = new ZooKeeperWatcher() - private var reconnectAttempts = 0 - private var closed = false - - /** Connect to ZooKeeper to start the session. Must be called before anything else. */ - def connect() { - connectToZooKeeper() - - new Thread() { - override def run() = sessionMonitorThread() - }.start() - } - - def sessionMonitorThread(): Unit = { - while (!closed) { - Thread.sleep(ZK_CHECK_PERIOD_MILLIS) - if (zk.getState != ZooKeeper.States.CONNECTED) { - reconnectAttempts += 1 - val attemptsLeft = MAX_RECONNECT_ATTEMPTS - reconnectAttempts - if (attemptsLeft <= 0) { - logError("Could not connect to ZooKeeper: system failure") - zkWatcher.zkDown() - close() - } else { - logWarning("ZooKeeper connection failed, retrying " + attemptsLeft + " more times...") - connectToZooKeeper() - } - } - } - } - - def close() { - if (!closed && zk != null) { zk.close() } - closed = true - } - - private def connectToZooKeeper() { - if (zk != null) zk.close() - zk = new ZooKeeper(ZK_URL, ZK_TIMEOUT_MILLIS, watcher) - } - - /** - * Attempts to maintain a live ZooKeeper exception despite (very) transient failures. - * Mainly useful for handling the natural ZooKeeper session expiration. - */ - private class ZooKeeperWatcher extends Watcher { - def process(event: WatchedEvent) { - if (closed) { return } - - event.getState match { - case KeeperState.SyncConnected => - reconnectAttempts = 0 - zkWatcher.zkSessionCreated() - case KeeperState.Expired => - connectToZooKeeper() - case KeeperState.Disconnected => - logWarning("ZooKeeper disconnected, will retry...") - case s => // Do nothing - } - } - } - - def create(path: String, bytes: Array[Byte], createMode: CreateMode): String = { - retry { - zk.create(path, bytes, ZK_ACL, createMode) - } - } - - def exists(path: String, watcher: Watcher = null): Stat = { - retry { - zk.exists(path, watcher) - } - } - - def getChildren(path: String, watcher: Watcher = null): List[String] = { - retry { - zk.getChildren(path, watcher).toList - } - } - - def getData(path: String): Array[Byte] = { - retry { - zk.getData(path, false, null) - } - } - - def delete(path: String, version: Int = -1): Unit = { - retry { - zk.delete(path, version) - } - } - - /** - * Creates the given directory (non-recursively) if it doesn't exist. - * All znodes are created in PERSISTENT mode with no data. - */ - def mkdir(path: String) { - if (exists(path) == null) { - try { - create(path, "".getBytes, CreateMode.PERSISTENT) - } catch { - case e: Exception => - // If the exception caused the directory not to be created, bubble it up, - // otherwise ignore it. - if (exists(path) == null) { throw e } - } - } - } - - /** - * Recursively creates all directories up to the given one. - * All znodes are created in PERSISTENT mode with no data. - */ - def mkdirRecursive(path: String) { - var fullDir = "" - for (dentry <- path.split("/").tail) { - fullDir += "/" + dentry - mkdir(fullDir) - } - } - - /** - * Retries the given function up to 3 times. The assumption is that failure is transient, - * UNLESS it is a semantic exception (i.e., trying to get data from a node that doesn't exist), - * in which case the exception will be thrown without retries. - * - * @param fn Block to execute, possibly multiple times. - */ - def retry[T](fn: => T, n: Int = MAX_RECONNECT_ATTEMPTS): T = { - try { - fn - } catch { - case e: KeeperException.NoNodeException => throw e - case e: KeeperException.NodeExistsException => throw e - case e: Exception if n > 0 => - logError("ZooKeeper exception, " + n + " more retries...", e) - Thread.sleep(RETRY_WAIT_MILLIS) - retry(fn, n-1) - } - } -} - -trait SparkZooKeeperWatcher { - /** - * Called whenever a ZK session is created -- - * this will occur when we create our first session as well as each time - * the session expires or errors out. - */ - def zkSessionCreated() - - /** - * Called if ZK appears to be completely down (i.e., not just a transient error). - * We will no longer attempt to reconnect to ZK, and the SparkZooKeeperSession is considered dead. - */ - def zkDown() -} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala index 47b8f67f8a45b..285f9b014e291 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala @@ -18,105 +18,67 @@ package org.apache.spark.deploy.master import akka.actor.ActorRef -import org.apache.zookeeper._ -import org.apache.zookeeper.Watcher.Event.EventType import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.master.MasterMessages._ +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch} private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String, conf: SparkConf) - extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging { + extends LeaderElectionAgent with LeaderLatchListener with Logging { val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" - private val watcher = new ZooKeeperWatcher() - private val zk = new SparkZooKeeperSession(this, conf) + private var zk: CuratorFramework = _ + private var leaderLatch: LeaderLatch = _ private var status = LeadershipStatus.NOT_LEADER - private var myLeaderFile: String = _ - private var leaderUrl: String = _ override def preStart() { + logInfo("Starting ZooKeeper LeaderElection agent") - zk.connect() - } + zk = SparkCuratorUtil.newClient(conf) + leaderLatch = new LeaderLatch(zk, WORKING_DIR) + leaderLatch.addListener(this) - override def zkSessionCreated() { - synchronized { - zk.mkdirRecursive(WORKING_DIR) - myLeaderFile = - zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL) - self ! CheckLeader - } + leaderLatch.start() } override def preRestart(reason: scala.Throwable, message: scala.Option[scala.Any]) { - logError("LeaderElectionAgent failed, waiting " + zk.ZK_TIMEOUT_MILLIS + "...", reason) - Thread.sleep(zk.ZK_TIMEOUT_MILLIS) + logError("LeaderElectionAgent failed...", reason) super.preRestart(reason, message) } - override def zkDown() { - logError("ZooKeeper down! LeaderElectionAgent shutting down Master.") - System.exit(1) - } - override def postStop() { + leaderLatch.close() zk.close() } override def receive = { - case CheckLeader => checkLeader() + case _ => } - private class ZooKeeperWatcher extends Watcher { - def process(event: WatchedEvent) { - if (event.getType == EventType.NodeDeleted) { - logInfo("Leader file disappeared, a master is down!") - self ! CheckLeader + override def isLeader() { + synchronized { + // could have lost leadership by now. + if (!leaderLatch.hasLeadership) { + return } - } - } - /** Uses ZK leader election. Navigates several ZK potholes along the way. */ - def checkLeader() { - val masters = zk.getChildren(WORKING_DIR).toList - val leader = masters.sorted.head - val leaderFile = WORKING_DIR + "/" + leader - - // Setup a watch for the current leader. - zk.exists(leaderFile, watcher) - - try { - leaderUrl = new String(zk.getData(leaderFile)) - } catch { - // A NoNodeException may be thrown if old leader died since the start of this method call. - // This is fine -- just check again, since we're guaranteed to see the new values. - case e: KeeperException.NoNodeException => - logInfo("Leader disappeared while reading it -- finding next leader") - checkLeader() - return + logInfo("We have gained leadership") + updateLeadershipStatus(true) } + } - // Synchronization used to ensure no interleaving between the creation of a new session and the - // checking of a leader, which could cause us to delete our real leader file erroneously. + override def notLeader() { synchronized { - val isLeader = myLeaderFile == leaderFile - if (!isLeader && leaderUrl == masterUrl) { - // We found a different master file pointing to this process. - // This can happen in the following two cases: - // (1) The master process was restarted on the same node. - // (2) The ZK server died between creating the file and returning the name of the file. - // For this case, we will end up creating a second file, and MUST explicitly delete the - // first one, since our ZK session is still open. - // Note that this deletion will cause a NodeDeleted event to be fired so we check again for - // leader changes. - assert(leaderFile < myLeaderFile) - logWarning("Cleaning up old ZK master election file that points to this master.") - zk.delete(leaderFile) - } else { - updateLeadershipStatus(isLeader) + // could have gained leadership by now. + if (leaderLatch.hasLeadership) { + return } + + logInfo("We have lost leadership") + updateLeadershipStatus(false) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala index 48b2fc06a9d70..939006239d2b1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -17,36 +17,28 @@ package org.apache.spark.deploy.master +import scala.collection.JavaConversions._ + import akka.serialization.Serialization -import org.apache.zookeeper._ +import org.apache.zookeeper.CreateMode import org.apache.spark.{Logging, SparkConf} class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) extends PersistenceEngine - with SparkZooKeeperWatcher with Logging { val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status" + val zk = SparkCuratorUtil.newClient(conf) - val zk = new SparkZooKeeperSession(this, conf) - - zk.connect() - - override def zkSessionCreated() { - zk.mkdirRecursive(WORKING_DIR) - } - - override def zkDown() { - logError("PersistenceEngine disconnected from ZooKeeper -- ZK looks down.") - } + SparkCuratorUtil.mkdir(zk, WORKING_DIR) override def addApplication(app: ApplicationInfo) { serializeIntoFile(WORKING_DIR + "/app_" + app.id, app) } override def removeApplication(app: ApplicationInfo) { - zk.delete(WORKING_DIR + "/app_" + app.id) + zk.delete().forPath(WORKING_DIR + "/app_" + app.id) } override def addDriver(driver: DriverInfo) { @@ -54,7 +46,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) } override def removeDriver(driver: DriverInfo) { - zk.delete(WORKING_DIR + "/driver_" + driver.id) + zk.delete().forPath(WORKING_DIR + "/driver_" + driver.id) } override def addWorker(worker: WorkerInfo) { @@ -62,7 +54,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) } override def removeWorker(worker: WorkerInfo) { - zk.delete(WORKING_DIR + "/worker_" + worker.id) + zk.delete().forPath(WORKING_DIR + "/worker_" + worker.id) } override def close() { @@ -70,7 +62,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) } override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = { - val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted + val sortedFiles = zk.getChildren().forPath(WORKING_DIR).toList.sorted val appFiles = sortedFiles.filter(_.startsWith("app_")) val apps = appFiles.map(deserializeFromFile[ApplicationInfo]) val driverFiles = sortedFiles.filter(_.startsWith("driver_")) @@ -83,11 +75,11 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) private def serializeIntoFile(path: String, value: AnyRef) { val serializer = serialization.findSerializerFor(value) val serialized = serializer.toBinary(value) - zk.create(path, serialized, CreateMode.PERSISTENT) + zk.create().withMode(CreateMode.PERSISTENT).forPath(path, serialized) } def deserializeFromFile[T](filename: String)(implicit m: Manifest[T]): T = { - val fileData = zk.getData(WORKING_DIR + "/" + filename) + val fileData = zk.getData().forPath(WORKING_DIR + "/" + filename) val clazz = m.runtimeClass.asInstanceOf[Class[T]] val serializer = serialization.serializerFor(clazz) serializer.fromBinary(fileData).asInstanceOf[T] diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 5cc4adbe448b7..90cad3c37fda6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -23,7 +23,8 @@ import scala.concurrent.Await import scala.xml.Node import akka.pattern.ask -import net.liftweb.json.JsonAST.JValue +import javax.servlet.http.HttpServletRequest +import org.json4s.JValue import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 01c8f9065e50a..3233cd97f7bd0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -23,7 +23,8 @@ import scala.concurrent.Await import scala.xml.Node import akka.pattern.ask -import net.liftweb.json.JsonAST.JValue +import javax.servlet.http.HttpServletRequest +import org.json4s.JValue import org.apache.spark.deploy.{DeployWebUI, JsonProtocol} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} @@ -85,6 +86,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
  • Drivers: {state.activeDrivers.size} Running, {state.completedDrivers.size} Completed
  • +
  • Status: {state.status}
  • diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index 3089acffb8d98..85200ab0e102d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -22,7 +22,7 @@ import scala.xml.Node import akka.pattern.ask import javax.servlet.http.HttpServletRequest -import net.liftweb.json.JsonAST.JValue +import org.json4s.JValue import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 50320f40350cd..3fe56963e0008 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -543,7 +543,8 @@ abstract class RDD[T: ClassTag]( * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ - def mapWith[A: ClassTag, U: ClassTag] + @deprecated("use mapPartitionsWithIndex", "1.0.0") + def mapWith[A, U: ClassTag] (constructA: Int => A, preservesPartitioning: Boolean = false) (f: (T, A) => U): RDD[U] = { mapPartitionsWithIndex((index, iter) => { @@ -557,7 +558,8 @@ abstract class RDD[T: ClassTag]( * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ - def flatMapWith[A: ClassTag, U: ClassTag] + @deprecated("use mapPartitionsWithIndex and flatMap", "1.0.0") + def flatMapWith[A, U: ClassTag] (constructA: Int => A, preservesPartitioning: Boolean = false) (f: (T, A) => Seq[U]): RDD[U] = { mapPartitionsWithIndex((index, iter) => { @@ -571,7 +573,8 @@ abstract class RDD[T: ClassTag]( * This additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ - def foreachWith[A: ClassTag](constructA: Int => A)(f: (T, A) => Unit) { + @deprecated("use mapPartitionsWithIndex and foreach", "1.0.0") + def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit) { mapPartitionsWithIndex { (index, iter) => val a = constructA(index) iter.map(t => {f(t, a); t}) @@ -583,7 +586,8 @@ abstract class RDD[T: ClassTag]( * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ - def filterWith[A: ClassTag](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = { + @deprecated("use mapPartitionsWithIndex and filter", "1.0.0") + def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = { mapPartitionsWithIndex((index, iter) => { val a = constructA(index) iter.filter(t => p(t, a)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 9d75d7c4ad69a..006e2a3335428 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -81,7 +81,7 @@ class JobLogger(val user: String, val logDirName: String) /** * Create a log file for one job * @param jobID ID of the job - * @exception FileNotFoundException Fail to create log file + * @throws FileNotFoundException Fail to create log file */ protected def createLogWriter(jobID: Int) { try { diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index eefc8c232b564..f1924a4573b21 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler /** * A backend interface for scheduling systems that allows plugging in different ones under - * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as + * TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as * machines become available and can launch tasks on them. */ private[spark] trait SchedulerBackend { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 1cdfed1d7005e..92616c997e20c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -20,7 +20,7 @@ package org.apache.spark.scheduler import org.apache.spark.scheduler.SchedulingMode.SchedulingMode /** - * Low-level task scheduler interface, currently implemented exclusively by the ClusterScheduler. + * Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl. * This interface allows plugging in different task schedulers. Each TaskScheduler schedulers tasks * for a single SparkContext. These schedulers get sets of tasks submitted to them from the * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 1a4b7e599c01e..5ea4557bbf56a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -26,13 +26,14 @@ import scala.collection.mutable.HashSet import scala.math.max import scala.math.min -import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState} +import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted, + SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.TaskMetrics import org.apache.spark.util.{Clock, SystemClock} /** - * Schedules the tasks within a single TaskSet in the ClusterScheduler. This class keeps track of + * Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of * each task, retries tasks if they fail (up to a limited number of times), and * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, @@ -41,7 +42,7 @@ import org.apache.spark.util.{Clock, SystemClock} * THREADING: This class is designed to only be called from code with a lock on the * TaskScheduler (e.g. its event handlers). It should not be called from other threads. * - * @param sched the ClusterScheduler associated with the TaskSetManager + * @param sched the TaskSchedulerImpl associated with the TaskSetManager * @param taskSet the TaskSet to manage scheduling for * @param maxTaskFailures if any particular task fails more than this number of times, the entire * task set will be aborted diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index c576beb0c0d38..bcf0ce19a54cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -203,7 +203,7 @@ private[spark] class MesosSchedulerBackend( getResource(offer.getResourcesList, "cpus").toInt) } - // Call into the ClusterScheduler + // Call into the TaskSchedulerImpl val taskLists = scheduler.resourceOffers(offerableWorkers) // Build a list of Mesos tasks for each slave diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 50f7e79e97dd8..16e2f5cf3076d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -35,7 +35,7 @@ private case class KillTask(taskId: Long) /** * Calls to LocalBackend are all serialized through LocalActor. Using an actor makes the calls on * LocalBackend asynchronous, which is necessary to prevent deadlock between LocalBackend - * and the ClusterScheduler. + * and the TaskSchedulerImpl. */ private[spark] class LocalActor( scheduler: TaskSchedulerImpl, @@ -76,7 +76,7 @@ private[spark] class LocalActor( /** * LocalBackend is used when running a local version of Spark where the executor, backend, and - * master all run in the same JVM. It sits behind a ClusterScheduler and handles launching tasks + * master all run in the same JVM. It sits behind a TaskSchedulerImpl and handles launching tasks * on a single Executor (created by the LocalBackend) running locally. */ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala deleted file mode 100644 index 2e0b0e6eda765..0000000000000 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.storage - -private[spark] trait BlockFetchTracker { - def totalBlocks : Int - def numLocalBlocks: Int - def numRemoteBlocks: Int - def remoteFetchTime : Long - def fetchWaitTime: Long - def remoteBytesRead : Long -} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index 925022e7fe6fb..fb50b45bd4197 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -44,9 +44,14 @@ import org.apache.spark.util.Utils */ private[storage] -trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] - with Logging with BlockFetchTracker { +trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging { def initialize() + def totalBlocks: Int + def numLocalBlocks: Int + def numRemoteBlocks: Int + def remoteFetchTime: Long + def fetchWaitTime: Long + def remoteBytesRead: Long } @@ -233,7 +238,16 @@ object BlockFetcherIterator { logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms") } - //an iterator that will read fetched blocks off the queue as they arrive. + override def totalBlocks: Int = numLocal + numRemote + override def numLocalBlocks: Int = numLocal + override def numRemoteBlocks: Int = numRemote + override def remoteFetchTime: Long = _remoteFetchTime + override def fetchWaitTime: Long = _fetchWaitTime + override def remoteBytesRead: Long = _remoteBytesRead + + + // Implementing the Iterator methods with an iterator that reads fetched blocks off the queue + // as they arrive. @volatile protected var resultsGotten = 0 override def hasNext: Boolean = resultsGotten < _numBlocksToFetch @@ -251,14 +265,6 @@ object BlockFetcherIterator { } (result.blockId, if (result.failed) None else Some(result.deserialize())) } - - // Implementing BlockFetchTracker trait. - override def totalBlocks: Int = numLocal + numRemote - override def numLocalBlocks: Int = numLocal - override def numRemoteBlocks: Int = numRemote - override def remoteFetchTime: Long = _remoteFetchTime - override def fetchWaitTime: Long = _fetchWaitTime - override def remoteBytesRead: Long = _remoteBytesRead } // End of BasicBlockFetcherIterator diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 1f048a84cdfb6..1b78c52ff6077 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -24,7 +24,8 @@ import scala.annotation.tailrec import scala.util.{Failure, Success, Try} import scala.xml.Node -import net.liftweb.json.{JValue, pretty, render} +import org.json4s.JValue +import org.json4s.jackson.JsonMethods.{pretty, render} import org.eclipse.jetty.server.{Handler, Request, Server} import org.eclipse.jetty.server.handler.{AbstractHandler, ContextHandler, HandlerList, ResourceHandler} import org.eclipse.jetty.util.thread.QueuedThreadPool diff --git a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala index bf71882ef770a..c539d2f708f95 100644 --- a/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala +++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala @@ -23,9 +23,9 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.typesafe.config.Config /** - * An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception. + * An akka.actor.ActorSystem which refuses to shut down in the event of a fatal exception * This is necessary as Spark Executors are allowed to recover from fatal exceptions - * (see [[org.apache.spark.executor.Executor]]). + * (see org.apache.spark.executor.Executor) */ object IndestructibleActorSystem { def apply(name: String, config: Config): ActorSystem = diff --git a/core/src/main/scala/org/apache/spark/util/StatCounter.scala b/core/src/main/scala/org/apache/spark/util/StatCounter.scala index 5b0d2c36510b8..f837dc7ccc860 100644 --- a/core/src/main/scala/org/apache/spark/util/StatCounter.scala +++ b/core/src/main/scala/org/apache/spark/util/StatCounter.scala @@ -19,9 +19,9 @@ package org.apache.spark.util /** * A class for tracking the statistics of a set of numbers (count, mean and variance) in a - * numerically robust way. Includes support for merging two StatCounters. Based on - * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance - * Welford and Chan's algorithms for running variance]]. + * numerically robust way. Includes support for merging two StatCounters. Based on Welford + * and Chan's [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance algorithms]] + * for running variance. * * @constructor Initialize the StatCounter with the given values. */ diff --git a/core/src/main/scala/org/apache/spark/util/Vector.scala b/core/src/main/scala/org/apache/spark/util/Vector.scala index d437c055f33d4..dc4b8f253f259 100644 --- a/core/src/main/scala/org/apache/spark/util/Vector.scala +++ b/core/src/main/scala/org/apache/spark/util/Vector.scala @@ -136,7 +136,7 @@ object Vector { /** * Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers - * between 0.0 and 1.0. Optional [[scala.util.Random]] number generator can be provided. + * between 0.0 and 1.0. Optional scala.util.Random number generator can be provided. */ def random(length: Int, random: Random = new XORShiftRandom()) = Vector(length, _ => random.nextDouble()) diff --git a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala index ca611b67ed91d..8a4cdea2fa7b1 100644 --- a/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala +++ b/core/src/main/scala/org/apache/spark/util/random/XORShiftRandom.scala @@ -17,8 +17,11 @@ package org.apache.spark.util.random +import java.nio.ByteBuffer import java.util.{Random => JavaRandom} +import scala.util.hashing.MurmurHash3 + import org.apache.spark.util.Utils.timeIt /** @@ -36,8 +39,8 @@ private[spark] class XORShiftRandom(init: Long) extends JavaRandom(init) { def this() = this(System.nanoTime) - private var seed = init - + private var seed = XORShiftRandom.hashSeed(init) + // we need to just override next - this will be called by nextInt, nextDouble, // nextGaussian, nextLong, etc. override protected def next(bits: Int): Int = { @@ -49,13 +52,19 @@ private[spark] class XORShiftRandom(init: Long) extends JavaRandom(init) { } override def setSeed(s: Long) { - seed = s + seed = XORShiftRandom.hashSeed(s) } } /** Contains benchmark method and main method to run benchmark of the RNG */ private[spark] object XORShiftRandom { + /** Hash seeds to have 0/1 bits throughout. */ + private def hashSeed(seed: Long): Long = { + val bytes = ByteBuffer.allocate(java.lang.Long.SIZE).putLong(seed).array() + MurmurHash3.bytesHash(bytes) + } + /** * Main method for running benchmark * @param args takes one argument - the number of random numbers to generate diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 20232e9fbb8d0..aa5079c159830 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -75,8 +75,9 @@ public int compare(Integer a, Integer b) { else if (a < b) return 1; else return 0; } - }; + } + @SuppressWarnings("unchecked") @Test public void sparkContextUnion() { // Union of non-specialized JavaRDDs @@ -148,6 +149,7 @@ public void call(String s) { Assert.assertEquals(2, foreachCalls); } + @SuppressWarnings("unchecked") @Test public void lookup() { JavaPairRDD categories = sc.parallelizePairs(Arrays.asList( @@ -179,6 +181,7 @@ public Boolean call(Integer x) { Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds } + @SuppressWarnings("unchecked") @Test public void cogroup() { JavaPairRDD categories = sc.parallelizePairs(Arrays.asList( @@ -197,6 +200,7 @@ public void cogroup() { cogrouped.collect(); } + @SuppressWarnings("unchecked") @Test public void leftOuterJoin() { JavaPairRDD rdd1 = sc.parallelizePairs(Arrays.asList( @@ -243,6 +247,7 @@ public Integer call(Integer a, Integer b) { Assert.assertEquals(33, sum); } + @SuppressWarnings("unchecked") @Test public void foldByKey() { List> pairs = Arrays.asList( @@ -265,6 +270,7 @@ public Integer call(Integer a, Integer b) { Assert.assertEquals(3, sums.lookup(3).get(0).intValue()); } + @SuppressWarnings("unchecked") @Test public void reduceByKey() { List> pairs = Arrays.asList( @@ -320,8 +326,8 @@ public void approximateResults() { public void take() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); Assert.assertEquals(1, rdd.first().intValue()); - List firstTwo = rdd.take(2); - List sample = rdd.takeSample(false, 2, 42); + rdd.take(2); + rdd.takeSample(false, 2, 42); } @Test @@ -359,8 +365,8 @@ public Boolean call(Double x) { Assert.assertEquals(2.49444, rdd.stdev(), 0.01); Assert.assertEquals(2.73252, rdd.sampleStdev(), 0.01); - Double first = rdd.first(); - List take = rdd.take(5); + rdd.first(); + rdd.take(5); } @Test @@ -438,11 +444,11 @@ public Iterable call(String s) { return lengths; } }); - Double x = doubles.first(); - Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01); + Assert.assertEquals(5.0, doubles.first(), 0.01); Assert.assertEquals(11, pairs.count()); } + @SuppressWarnings("unchecked") @Test public void mapsFromPairsToPairs() { List> pairs = Arrays.asList( @@ -509,6 +515,7 @@ public void repartition() { } } + @SuppressWarnings("unchecked") @Test public void persist() { JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); @@ -573,6 +580,7 @@ public void textFilesCompressed() throws IOException { Assert.assertEquals(expected, readRDD.collect()); } + @SuppressWarnings("unchecked") @Test public void sequenceFile() { File tempDir = Files.createTempDir(); @@ -602,6 +610,7 @@ public Tuple2 call(Tuple2 pair) { Assert.assertEquals(pairs, readRDD.collect()); } + @SuppressWarnings("unchecked") @Test public void writeWithNewAPIHadoopFile() { File tempDir = Files.createTempDir(); @@ -632,6 +641,7 @@ public String call(Tuple2 x) { }).collect().toString()); } + @SuppressWarnings("unchecked") @Test public void readWithNewAPIHadoopFile() throws IOException { File tempDir = Files.createTempDir(); @@ -674,6 +684,7 @@ public void objectFilesOfInts() { Assert.assertEquals(expected, readRDD.collect()); } + @SuppressWarnings("unchecked") @Test public void objectFilesOfComplexTypes() { File tempDir = Files.createTempDir(); @@ -690,6 +701,7 @@ public void objectFilesOfComplexTypes() { Assert.assertEquals(pairs, readRDD.collect()); } + @SuppressWarnings("unchecked") @Test public void hadoopFile() { File tempDir = Files.createTempDir(); @@ -719,6 +731,7 @@ public String call(Tuple2 x) { }).collect().toString()); } + @SuppressWarnings("unchecked") @Test public void hadoopFileCompressed() { File tempDir = Files.createTempDir(); @@ -824,7 +837,7 @@ public Float zero(Float initialValue) { } }; - final Accumulator floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam); + final Accumulator floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); rdd.foreach(new VoidFunction() { public void call(Integer x) { floatAccum.add((float) x); @@ -876,6 +889,7 @@ public void checkpointAndRestore() { Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect()); } + @SuppressWarnings("unchecked") @Test public void mapOnPairRDD() { JavaRDD rdd1 = sc.parallelize(Arrays.asList(1,2,3,4)); @@ -900,6 +914,7 @@ public Tuple2 call(Tuple2 in) throws Excepti } + @SuppressWarnings("unchecked") @Test public void collectPartitions() { JavaRDD rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); @@ -968,7 +983,7 @@ public void countApproxDistinctByKey() { @Test public void collectAsMapWithIntArrayValues() { // Regression test for SPARK-1040 - JavaRDD rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 })); + JavaRDD rdd = sc.parallelize(Arrays.asList(1)); JavaPairRDD pairRDD = rdd.map(new PairFunction() { @Override public Tuple2 call(Integer x) throws Exception { @@ -976,6 +991,6 @@ public Tuple2 call(Integer x) throws Exception { } }); pairRDD.collect(); // Works fine - Map map = pairRDD.collectAsMap(); // Used to crash with ClassCastException + pairRDD.collectAsMap(); // Used to crash with ClassCastException } } diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index de866ed7ffed8..bae3b37e267d5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -20,9 +20,12 @@ package org.apache.spark.deploy import java.io.File import java.util.Date -import net.liftweb.json.Diff -import net.liftweb.json.{JsonAST, JsonParser} -import net.liftweb.json.JsonAST.{JNothing, JValue} +import org.json4s._ + +import org.json4s.JValue +import org.json4s.jackson.JsonMethods +import com.fasterxml.jackson.core.JsonParseException + import org.scalatest.FunSuite import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} @@ -34,31 +37,31 @@ class JsonProtocolSuite extends FunSuite { test("writeApplicationInfo") { val output = JsonProtocol.writeApplicationInfo(createAppInfo()) assertValidJson(output) - assertValidDataInJson(output, JsonParser.parse(JsonConstants.appInfoJsonStr)) + assertValidDataInJson(output, JsonMethods.parse(JsonConstants.appInfoJsonStr)) } test("writeWorkerInfo") { val output = JsonProtocol.writeWorkerInfo(createWorkerInfo()) assertValidJson(output) - assertValidDataInJson(output, JsonParser.parse(JsonConstants.workerInfoJsonStr)) + assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerInfoJsonStr)) } test("writeApplicationDescription") { val output = JsonProtocol.writeApplicationDescription(createAppDesc()) assertValidJson(output) - assertValidDataInJson(output, JsonParser.parse(JsonConstants.appDescJsonStr)) + assertValidDataInJson(output, JsonMethods.parse(JsonConstants.appDescJsonStr)) } test("writeExecutorRunner") { val output = JsonProtocol.writeExecutorRunner(createExecutorRunner()) assertValidJson(output) - assertValidDataInJson(output, JsonParser.parse(JsonConstants.executorRunnerJsonStr)) + assertValidDataInJson(output, JsonMethods.parse(JsonConstants.executorRunnerJsonStr)) } test("writeDriverInfo") { val output = JsonProtocol.writeDriverInfo(createDriverInfo()) assertValidJson(output) - assertValidDataInJson(output, JsonParser.parse(JsonConstants.driverInfoJsonStr)) + assertValidDataInJson(output, JsonMethods.parse(JsonConstants.driverInfoJsonStr)) } test("writeMasterState") { @@ -71,7 +74,7 @@ class JsonProtocolSuite extends FunSuite { activeDrivers, completedDrivers, RecoveryState.ALIVE) val output = JsonProtocol.writeMasterState(stateResponse) assertValidJson(output) - assertValidDataInJson(output, JsonParser.parse(JsonConstants.masterStateJsonStr)) + assertValidDataInJson(output, JsonMethods.parse(JsonConstants.masterStateJsonStr)) } test("writeWorkerState") { @@ -83,7 +86,7 @@ class JsonProtocolSuite extends FunSuite { finishedExecutors, drivers, finishedDrivers, "masterUrl", 4, 1234, 4, 1234, "masterWebUiUrl") val output = JsonProtocol.writeWorkerState(stateResponse) assertValidJson(output) - assertValidDataInJson(output, JsonParser.parse(JsonConstants.workerStateJsonStr)) + assertValidDataInJson(output, JsonMethods.parse(JsonConstants.workerStateJsonStr)) } def createAppDesc(): ApplicationDescription = { @@ -125,9 +128,9 @@ class JsonProtocolSuite extends FunSuite { def assertValidJson(json: JValue) { try { - JsonParser.parse(JsonAST.compactRender(json)) + JsonMethods.parse(JsonMethods.compact(json)) } catch { - case e: JsonParser.ParseException => fail("Invalid Json detected", e) + case e: JsonParseException => fail("Invalid Json detected", e) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index ac07f60e284bb..c4e7a4bb7d385 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -93,10 +93,10 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA // If this test hangs, it's probably because no resource offers were made after the task // failed. val scheduler: TaskSchedulerImpl = sc.taskScheduler match { - case clusterScheduler: TaskSchedulerImpl => - clusterScheduler + case taskScheduler: TaskSchedulerImpl => + taskScheduler case _ => - assert(false, "Expect local cluster to use ClusterScheduler") + assert(false, "Expect local cluster to use TaskSchedulerImpl") throw new ClassCastException } scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala similarity index 79% rename from core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala rename to core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 85e929925e3b5..f4e62c64daf12 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -29,9 +29,9 @@ class FakeTaskSetManager( initPriority: Int, initStageId: Int, initNumTasks: Int, - clusterScheduler: TaskSchedulerImpl, + taskScheduler: TaskSchedulerImpl, taskSet: TaskSet) - extends TaskSetManager(clusterScheduler, taskSet, 0) { + extends TaskSetManager(taskScheduler, taskSet, 0) { parent = null weight = 1 @@ -105,7 +105,7 @@ class FakeTaskSetManager( } } -class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging { +class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Logging { def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl, taskSet: TaskSet): FakeTaskSetManager = { new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet) @@ -133,8 +133,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging } test("FIFO Scheduler Test") { - sc = new SparkContext("local", "ClusterSchedulerSuite") - val clusterScheduler = new TaskSchedulerImpl(sc) + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) var tasks = ArrayBuffer[Task[_]]() val task = new FakeTask(0) tasks += task @@ -144,9 +144,9 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) schedulableBuilder.buildPools() - val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet) - val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet) - val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet) + val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, taskScheduler, taskSet) + val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, taskScheduler, taskSet) + val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, taskScheduler, taskSet) schedulableBuilder.addTaskSetManager(taskSetManager0, null) schedulableBuilder.addTaskSetManager(taskSetManager1, null) schedulableBuilder.addTaskSetManager(taskSetManager2, null) @@ -160,8 +160,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging } test("Fair Scheduler Test") { - sc = new SparkContext("local", "ClusterSchedulerSuite") - val clusterScheduler = new TaskSchedulerImpl(sc) + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) var tasks = ArrayBuffer[Task[_]]() val task = new FakeTask(0) tasks += task @@ -189,15 +189,15 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging val properties2 = new Properties() properties2.setProperty("spark.scheduler.pool","2") - val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet) - val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet) - val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet) + val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, taskScheduler, taskSet) + val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, taskScheduler, taskSet) + val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, taskScheduler, taskSet) schedulableBuilder.addTaskSetManager(taskSetManager10, properties1) schedulableBuilder.addTaskSetManager(taskSetManager11, properties1) schedulableBuilder.addTaskSetManager(taskSetManager12, properties1) - val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet) - val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet) + val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, taskScheduler, taskSet) + val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, taskScheduler, taskSet) schedulableBuilder.addTaskSetManager(taskSetManager23, properties2) schedulableBuilder.addTaskSetManager(taskSetManager24, properties2) @@ -217,8 +217,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging } test("Nested Pool Test") { - sc = new SparkContext("local", "ClusterSchedulerSuite") - val clusterScheduler = new TaskSchedulerImpl(sc) + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) var tasks = ArrayBuffer[Task[_]]() val task = new FakeTask(0) tasks += task @@ -240,23 +240,23 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging pool1.addSchedulable(pool10) pool1.addSchedulable(pool11) - val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet) - val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet) + val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, taskScheduler, taskSet) + val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, taskScheduler, taskSet) pool00.addSchedulable(taskSetManager000) pool00.addSchedulable(taskSetManager001) - val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet) - val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet) + val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, taskScheduler, taskSet) + val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, taskScheduler, taskSet) pool01.addSchedulable(taskSetManager010) pool01.addSchedulable(taskSetManager011) - val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet) - val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet) + val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, taskScheduler, taskSet) + val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, taskScheduler, taskSet) pool10.addSchedulable(taskSetManager100) pool10.addSchedulable(taskSetManager101) - val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet) - val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet) + val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, taskScheduler, taskSet) + val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, taskScheduler, taskSet) pool11.addSchedulable(taskSetManager110) pool11.addSchedulable(taskSetManager111) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 34a7d8cefeea2..20f6e503872ac 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.util.FakeClock -class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler(taskScheduler) { +class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(taskScheduler) { override def taskStarted(task: Task[_], taskInfo: TaskInfo) { taskScheduler.startedTasks += taskInfo.index } @@ -51,12 +51,12 @@ class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler } /** - * A mock ClusterScheduler implementation that just remembers information about tasks started and + * A mock TaskSchedulerImpl implementation that just remembers information about tasks started and * feedback received from the TaskSetManagers. Note that it's important to initialize this with * a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost * to work, and these are required for locality in TaskSetManager. */ -class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */) +class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */) extends TaskSchedulerImpl(sc) { val startedTasks = new ArrayBuffer[Long] @@ -87,7 +87,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { test("TaskSet with no preferences") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = createTaskSet(1) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) @@ -113,7 +113,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { test("multiple offers with no preferences") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = createTaskSet(3) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) @@ -144,7 +144,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { test("basic delay scheduling") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = createTaskSet(4, Seq(TaskLocation("host1", "exec1")), Seq(TaskLocation("host2", "exec2")), @@ -188,7 +188,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { test("delay scheduling with fallback") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3")) val taskSet = createTaskSet(5, Seq(TaskLocation("host1")), @@ -228,7 +228,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { test("delay scheduling with failed hosts") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = createTaskSet(3, Seq(TaskLocation("host1")), Seq(TaskLocation("host2")), @@ -260,7 +260,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { test("task result lost") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = createTaskSet(1) val clock = new FakeClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) @@ -277,7 +277,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { test("repeated failures lead to task set abortion") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = createTaskSet(1) val clock = new FakeClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) diff --git a/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala b/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala index c51d12bfe0bc6..757476efdb789 100644 --- a/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/random/XORShiftRandomSuite.scala @@ -72,4 +72,8 @@ class XORShiftRandomSuite extends FunSuite with ShouldMatchers { } + test ("XORShift with zero seed") { + val random = new XORShiftRandom(0L) + assert(random.nextInt() != 0) + } } diff --git a/docs/README.md b/docs/README.md index cc09d6e88f41e..cac65d97e488b 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1,6 +1,6 @@ Welcome to the Spark documentation! -This readme will walk you through navigating and building the Spark documentation, which is included here with the Spark source code. You can also find documentation specific to release versions of Spark at http://spark.incubator.apache.org/documentation.html. +This readme will walk you through navigating and building the Spark documentation, which is included here with the Spark source code. You can also find documentation specific to release versions of Spark at http://spark.apache.org/documentation.html. Read on to learn more about viewing documentation in plain text (i.e., markdown) or building the documentation yourself. Why build it yourself? So that you have the docs that corresponds to whichever version of Spark you currently have checked out of revision control. diff --git a/docs/_config.yml b/docs/_config.yml index 9e5a95fe53af6..aa5a5adbc1743 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -3,10 +3,10 @@ markdown: kramdown # These allow the documentation to be updated with nerw releases # of Spark, Scala, and Mesos. -SPARK_VERSION: 1.0.0-incubating-SNAPSHOT +SPARK_VERSION: 1.0.0-SNAPSHOT SPARK_VERSION_SHORT: 1.0.0 SCALA_BINARY_VERSION: "2.10" SCALA_VERSION: "2.10.3" MESOS_VERSION: 0.13.0 SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net -SPARK_GITHUB_URL: https://github.com/apache/incubator-spark +SPARK_GITHUB_URL: https://github.com/apache/spark diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index 7114e1f5dd5b9..ebb58e8b9af79 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -159,16 +159,6 @@

    Heading


    --> -
    -
    -

    - Apache Spark is an effort undergoing incubation at the Apache Software Foundation. - - - -

    -
    - diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md index b070d8e73a38b..da6d0c9dcd97b 100644 --- a/docs/bagel-programming-guide.md +++ b/docs/bagel-programming-guide.md @@ -108,7 +108,7 @@ _Example_ ## Operations -Here are the actions and types in the Bagel API. See [Bagel.scala](https://github.com/apache/incubator-spark/blob/master/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala) for details. +Here are the actions and types in the Bagel API. See [Bagel.scala](https://github.com/apache/spark/blob/master/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala) for details. ### Actions diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md index ded12926885b9..40cac8eb4f0db 100644 --- a/docs/building-with-maven.md +++ b/docs/building-with-maven.md @@ -76,3 +76,7 @@ The maven build includes support for building a Debian package containing the as $ mvn -Pdeb -DskipTests clean package The debian package can then be found under assembly/target. We added the short commit hash to the file name so that we can distinguish individual packages built for SNAPSHOT versions. + +## A note about Hadoop version 0.23.x + +For building spark with hadoop 0.23.x and also yarn, you will have to manually add a dependency on avro (org.apache.avro, avro, 1.7.4). diff --git a/docs/index.md b/docs/index.md index aa9c8666e7d75..4eb297df39144 100644 --- a/docs/index.md +++ b/docs/index.md @@ -9,7 +9,7 @@ It also supports a rich set of higher-level tools including [Shark](http://shark # Downloading -Get Spark by visiting the [downloads page](http://spark.incubator.apache.org/downloads.html) of the Apache Spark site. This documentation is for Spark version {{site.SPARK_VERSION}}. +Get Spark by visiting the [downloads page](http://spark.apache.org/downloads.html) of the Apache Spark site. This documentation is for Spark version {{site.SPARK_VERSION}}. Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). All you need to run it is to have `java` to installed on your system `PATH`, or the `JAVA_HOME` environment variable pointing to a Java installation. @@ -96,7 +96,7 @@ For this version of Spark (0.8.1) Hadoop 2.2.x (or newer) users will have to bui * [Amazon EC2](ec2-scripts.html): scripts that let you launch a cluster on EC2 in about 5 minutes * [Standalone Deploy Mode](spark-standalone.html): launch a standalone cluster quickly without a third-party cluster manager * [Mesos](running-on-mesos.html): deploy a private cluster using - [Apache Mesos](http://incubator.apache.org/mesos) + [Apache Mesos](http://mesos.apache.org) * [YARN](running-on-yarn.html): deploy Spark on top of Hadoop NextGen (YARN) **Other documents:** @@ -110,20 +110,20 @@ For this version of Spark (0.8.1) Hadoop 2.2.x (or newer) users will have to bui **External resources:** -* [Spark Homepage](http://spark.incubator.apache.org) +* [Spark Homepage](http://spark.apache.org) * [Shark](http://shark.cs.berkeley.edu): Apache Hive over Spark -* [Mailing Lists](http://spark.incubator.apache.org/mailing-lists.html): ask questions about Spark here +* [Mailing Lists](http://spark.apache.org/mailing-lists.html): ask questions about Spark here * [AMP Camps](http://ampcamp.berkeley.edu/): a series of training camps at UC Berkeley that featured talks and exercises about Spark, Shark, Mesos, and more. [Videos](http://ampcamp.berkeley.edu/agenda-2012), [slides](http://ampcamp.berkeley.edu/agenda-2012) and [exercises](http://ampcamp.berkeley.edu/exercises-2012) are available online for free. -* [Code Examples](http://spark.incubator.apache.org/examples.html): more are also available in the [examples subfolder](https://github.com/apache/incubator-spark/tree/master/examples/src/main/scala/) of Spark +* [Code Examples](http://spark.apache.org/examples.html): more are also available in the [examples subfolder](https://github.com/apache/spark/tree/master/examples/src/main/scala/) of Spark * [Paper Describing Spark](http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf) * [Paper Describing Spark Streaming](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) # Community -To get help using Spark or keep up with Spark development, sign up for the [user mailing list](http://spark.incubator.apache.org/mailing-lists.html). +To get help using Spark or keep up with Spark development, sign up for the [user mailing list](http://spark.apache.org/mailing-lists.html). If you're in the San Francisco Bay Area, there's a regular [Spark meetup](http://www.meetup.com/spark-users/) every few weeks. Come by to meet the developers and other users. diff --git a/docs/java-programming-guide.md b/docs/java-programming-guide.md index 07732fa1229f3..5c73dbb25ede8 100644 --- a/docs/java-programming-guide.md +++ b/docs/java-programming-guide.md @@ -189,7 +189,7 @@ We hope to generate documentation with Java-style syntax in the future. # Where to Go from Here Spark includes several sample programs using the Java API in -[`examples/src/main/java`](https://github.com/apache/incubator-spark/tree/master/examples/src/main/java/org/apache/spark/examples). You can run them by passing the class name to the +[`examples/src/main/java`](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples). You can run them by passing the class name to the `bin/run-example` script included in Spark; for example: ./bin/run-example org.apache.spark.examples.JavaWordCount diff --git a/docs/js/main.js b/docs/js/main.js index 102699789a71a..0bd2286cced19 100755 --- a/docs/js/main.js +++ b/docs/js/main.js @@ -1,26 +1,3 @@ - -// From docs.scala-lang.org -function styleCode() { - if (typeof disableStyleCode != "undefined") { - return; - } - $(".codetabs pre code").parent().each(function() { - if (!$(this).hasClass("prettyprint")) { - var lang = $(this).parent().data("lang"); - if (lang == "python") { - lang = "py" - } - if (lang == "bash") { - lang = "bsh" - } - $(this).addClass("prettyprint lang-"+lang+" linenums"); - } - }); - console.log("runningPrettyPrint()") - prettyPrint(); -} - - function codeTabs() { var counter = 0; var langImages = { @@ -97,11 +74,7 @@ function viewSolution() { } -$(document).ready(function() { +$(function() { codeTabs(); viewSolution(); - $('#chapter-toc').toc({exclude: '', context: '.container'}); - $('#chapter-toc').prepend('

    In This Chapter

    '); - makeCollapsable($('#global-toc'), "", "global-toc", "Show Table of Contents"); - //styleCode(); }); diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md index 7c5283fb0b6fb..57ed54c9cf4c0 100644 --- a/docs/python-programming-guide.md +++ b/docs/python-programming-guide.md @@ -157,7 +157,7 @@ some example applications. # Where to Go from Here -PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/incubator-spark/tree/master/python/examples). +PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/spark/tree/master/python/examples). You can run them by passing the files to `pyspark`; e.g.: ./bin/pyspark python/examples/wordcount.py diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index cd4509ede735a..ee1d892a3b630 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -99,13 +99,12 @@ With this mode, your application is actually run on the remote machine where the ## Launch spark application with yarn-client mode. -With yarn-client mode, the application will be launched locally. Just like running application or spark-shell on Local / Mesos / Standalone mode. The launch method is also the similar with them, just make sure that when you need to specify a master url, use "yarn-client" instead. And you also need to export the env value for SPARK_JAR and SPARK_YARN_APP_JAR +With yarn-client mode, the application will be launched locally. Just like running application or spark-shell on Local / Mesos / Standalone mode. The launch method is also the similar with them, just make sure that when you need to specify a master url, use "yarn-client" instead. And you also need to export the env value for SPARK_JAR. Configuration in yarn-client mode: In order to tune worker core/number/memory etc. You need to export environment variables or add them to the spark configuration file (./conf/spark_env.sh). The following are the list of options. -* `SPARK_YARN_APP_JAR`, Path to your application's JAR file (required) * `SPARK_WORKER_INSTANCES`, Number of workers to start (Default: 2) * `SPARK_WORKER_CORES`, Number of cores for the workers (Default: 1). * `SPARK_WORKER_MEMORY`, Memory per Worker (e.g. 1000M, 2G) (Default: 1G) @@ -118,12 +117,11 @@ In order to tune worker core/number/memory etc. You need to export environment v For example: SPARK_JAR=./assembly/target/scala-{{site.SCALA_BINARY_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop2.0.5-alpha.jar \ - SPARK_YARN_APP_JAR=examples/target/scala-{{site.SCALA_BINARY_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \ ./bin/run-example org.apache.spark.examples.SparkPi yarn-client +or SPARK_JAR=./assembly/target/scala-{{site.SCALA_BINARY_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop2.0.5-alpha.jar \ - SPARK_YARN_APP_JAR=examples/target/scala-{{site.SCALA_BINARY_VERSION}}/spark-examples-assembly-{{site.SPARK_VERSION}}.jar \ MASTER=yarn-client ./bin/spark-shell diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md index 506d3faa767f3..99412733d4268 100644 --- a/docs/scala-programming-guide.md +++ b/docs/scala-programming-guide.md @@ -365,7 +365,7 @@ res2: Int = 10 # Where to Go from Here -You can see some [example Spark programs](http://spark.incubator.apache.org/examples.html) on the Spark website. +You can see some [example Spark programs](http://spark.apache.org/examples.html) on the Spark website. In addition, Spark includes several samples in `examples/src/main/scala`. Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what had to be changed to make the program run on a cluster. You can run them using by passing the class name to the `bin/run-example` script included in Spark; for example: ./bin/run-example org.apache.spark.examples.SparkPi diff --git a/docs/spark-debugger.md b/docs/spark-debugger.md index 11c51d5cde7c9..891c2bfa8943d 100644 --- a/docs/spark-debugger.md +++ b/docs/spark-debugger.md @@ -2,7 +2,7 @@ layout: global title: The Spark Debugger --- -**Summary:** The Spark debugger provides replay debugging for deterministic (logic) errors in Spark programs. It's currently in development, but you can try it out in the [arthur branch](https://github.com/apache/incubator-spark/tree/arthur). +**Summary:** The Spark debugger provides replay debugging for deterministic (logic) errors in Spark programs. It's currently in development, but you can try it out in the [arthur branch](https://github.com/apache/spark/tree/arthur). ## Introduction @@ -19,7 +19,7 @@ For deterministic errors, debugging a Spark program is now as easy as debugging ## Approach -As your Spark program runs, the slaves report key events back to the master -- for example, RDD creations, RDD contents, and uncaught exceptions. (A full list of event types is in [EventLogging.scala](https://github.com/apache/incubator-spark/blob/arthur/core/src/main/scala/spark/EventLogging.scala).) The master logs those events, and you can load the event log into the debugger after your program is done running. +As your Spark program runs, the slaves report key events back to the master -- for example, RDD creations, RDD contents, and uncaught exceptions. (A full list of event types is in [EventLogging.scala](https://github.com/apache/spark/blob/arthur/core/src/main/scala/spark/EventLogging.scala).) The master logs those events, and you can load the event log into the debugger after your program is done running. _A note on nondeterminism:_ For fault recovery, Spark requires RDD transformations (for example, the function passed to `RDD.map`) to be deterministic. The Spark debugger also relies on this property, and it can also warn you if your transformation is nondeterministic. This works by checksumming the contents of each RDD and comparing the checksums from the original execution to the checksums after recomputing the RDD in the debugger. diff --git a/docs/tuning.md b/docs/tuning.md index 6b010aed618a3..704778681cb8f 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -44,7 +44,10 @@ This setting configures the serializer used for not only shuffling data between nodes but also when serializing RDDs to disk. The only reason Kryo is not the default is because of the custom registration requirement, but we recommend trying it in any network-intensive application. -Finally, to register your classes with Kryo, create a public class that extends +Spark automatically includes Kryo serializers for the many commonly-used core Scala classes covered +in the AllScalaRegistrar from the [Twitter chill](https://github.com/twitter/chill) library. + +To register your own custom classes with Kryo, create a public class that extends [`org.apache.spark.serializer.KryoRegistrator`](api/core/index.html#org.apache.spark.serializer.KryoRegistrator) and set the `spark.kryo.registrator` config property to point to it, as follows: @@ -72,8 +75,8 @@ If your objects are large, you may also need to increase the `spark.kryoserializ config property. The default is 2, but this value needs to be large enough to hold the *largest* object you will serialize. -Finally, if you don't register your classes, Kryo will still work, but it will have to store the -full class name with each object, which is wasteful. +Finally, if you don't register your custom classes, Kryo will still work, but it will have to store +the full class name with each object, which is wasteful. # Memory Tuning diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 0fc1e4df6813c..377d9d6bd5e72 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -18,11 +18,11 @@ package org.apache.spark.graphx import scala.reflect.ClassTag - import org.apache.spark.SparkContext._ import org.apache.spark.SparkException import org.apache.spark.graphx.lib._ import org.apache.spark.rdd.RDD +import scala.util.Random /** * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the @@ -137,6 +137,42 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali } } // end of collectNeighbor + /** + * Returns an RDD that contains for each vertex v its local edges, + * i.e., the edges that are incident on v, in the user-specified direction. + * Warning: note that singleton vertices, those with no edges in the given + * direction will not be part of the return value. + * + * @note This function could be highly inefficient on power-law + * graphs where high degree vertices may force a large amount of + * information to be collected to a single location. + * + * @param edgeDirection the direction along which to collect + * the local edges of vertices + * + * @return the local edges for each vertex + */ + def collectEdges(edgeDirection: EdgeDirection): VertexRDD[Array[Edge[ED]]] = { + edgeDirection match { + case EdgeDirection.Either => + graph.mapReduceTriplets[Array[Edge[ED]]]( + edge => Iterator((edge.srcId, Array(new Edge(edge.srcId, edge.dstId, edge.attr))), + (edge.dstId, Array(new Edge(edge.srcId, edge.dstId, edge.attr)))), + (a, b) => a ++ b) + case EdgeDirection.In => + graph.mapReduceTriplets[Array[Edge[ED]]]( + edge => Iterator((edge.dstId, Array(new Edge(edge.srcId, edge.dstId, edge.attr)))), + (a, b) => a ++ b) + case EdgeDirection.Out => + graph.mapReduceTriplets[Array[Edge[ED]]]( + edge => Iterator((edge.srcId, Array(new Edge(edge.srcId, edge.dstId, edge.attr)))), + (a, b) => a ++ b) + case EdgeDirection.Both => + throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" + + "EdgeDirection.Either instead.") + } + } + /** * Join the vertices with an RDD and then apply a function from the * the vertex and RDD entry to a new vertex value. The input table @@ -209,6 +245,27 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali graph.mask(preprocess(graph).subgraph(epred, vpred)) } + /** + * Picks a random vertex from the graph and returns its ID. + */ + def pickRandomVertex(): VertexId = { + val probability = 50 / graph.numVertices + var found = false + var retVal: VertexId = null.asInstanceOf[VertexId] + while (!found) { + val selectedVertices = graph.vertices.flatMap { vidVvals => + if (Random.nextDouble() < probability) { Some(vidVvals._1) } + else { None } + } + if (selectedVertices.count > 1) { + found = true + val collectedVertices = selectedVertices.collect() + retVal = collectedVertices(Random.nextInt(collectedVertices.size)) + } + } + retVal + } + /** * Execute a Pregel-like iterative vertex-parallel abstraction. The * user-defined vertex-program `vprog` is executed in parallel on diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala index bc2ad5677f806..6386306c048fc 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala @@ -42,21 +42,20 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { test("collectNeighborIds") { withSpark { sc => - val chain = (0 until 100).map(x => (x, (x+1)%100) ) - val rawEdges = sc.parallelize(chain, 3).map { case (s,d) => (s.toLong, d.toLong) } - val graph = Graph.fromEdgeTuples(rawEdges, 1.0).cache() + val graph = getCycleGraph(sc, 100) val nbrs = graph.collectNeighborIds(EdgeDirection.Either).cache() - assert(nbrs.count === chain.size) + assert(nbrs.count === 100) assert(graph.numVertices === nbrs.count) nbrs.collect.foreach { case (vid, nbrs) => assert(nbrs.size === 2) } - nbrs.collect.foreach { case (vid, nbrs) => - val s = nbrs.toSet - assert(s.contains((vid + 1) % 100)) - assert(s.contains(if (vid > 0) vid - 1 else 99 )) + nbrs.collect.foreach { + case (vid, nbrs) => + val s = nbrs.toSet + assert(s.contains((vid + 1) % 100)) + assert(s.contains(if (vid > 0) vid - 1 else 99)) } } } - + test ("filter") { withSpark { sc => val n = 5 @@ -80,4 +79,121 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { } } + test("collectEdgesCycleDirectionOut") { + withSpark { sc => + val graph = getCycleGraph(sc, 100) + val edges = graph.collectEdges(EdgeDirection.Out).cache() + assert(edges.count == 100) + edges.collect.foreach { case (vid, edges) => assert(edges.size == 1) } + edges.collect.foreach { + case (vid, edges) => + val s = edges.toSet + val edgeDstIds = s.map(e => e.dstId) + assert(edgeDstIds.contains((vid + 1) % 100)) + } + } + } + + test("collectEdgesCycleDirectionIn") { + withSpark { sc => + val graph = getCycleGraph(sc, 100) + val edges = graph.collectEdges(EdgeDirection.In).cache() + assert(edges.count == 100) + edges.collect.foreach { case (vid, edges) => assert(edges.size == 1) } + edges.collect.foreach { + case (vid, edges) => + val s = edges.toSet + val edgeSrcIds = s.map(e => e.srcId) + assert(edgeSrcIds.contains(if (vid > 0) vid - 1 else 99)) + } + } + } + + test("collectEdgesCycleDirectionEither") { + withSpark { sc => + val graph = getCycleGraph(sc, 100) + val edges = graph.collectEdges(EdgeDirection.Either).cache() + assert(edges.count == 100) + edges.collect.foreach { case (vid, edges) => assert(edges.size == 2) } + edges.collect.foreach { + case (vid, edges) => + val s = edges.toSet + val edgeIds = s.map(e => if (vid != e.srcId) e.srcId else e.dstId) + assert(edgeIds.contains((vid + 1) % 100)) + assert(edgeIds.contains(if (vid > 0) vid - 1 else 99)) + } + } + } + + test("collectEdgesChainDirectionOut") { + withSpark { sc => + val graph = getChainGraph(sc, 50) + val edges = graph.collectEdges(EdgeDirection.Out).cache() + assert(edges.count == 49) + edges.collect.foreach { case (vid, edges) => assert(edges.size == 1) } + edges.collect.foreach { + case (vid, edges) => + val s = edges.toSet + val edgeDstIds = s.map(e => e.dstId) + assert(edgeDstIds.contains(vid + 1)) + } + } + } + + test("collectEdgesChainDirectionIn") { + withSpark { sc => + val graph = getChainGraph(sc, 50) + val edges = graph.collectEdges(EdgeDirection.In).cache() + // We expect only 49 because collectEdges does not return vertices that do + // not have any edges in the specified direction. + assert(edges.count == 49) + edges.collect.foreach { case (vid, edges) => assert(edges.size == 1) } + edges.collect.foreach { + case (vid, edges) => + val s = edges.toSet + val edgeDstIds = s.map(e => e.srcId) + assert(edgeDstIds.contains((vid - 1) % 100)) + } + } + } + + test("collectEdgesChainDirectionEither") { + withSpark { sc => + val graph = getChainGraph(sc, 50) + val edges = graph.collectEdges(EdgeDirection.Either).cache() + // We expect only 49 because collectEdges does not return vertices that do + // not have any edges in the specified direction. + assert(edges.count === 50) + edges.collect.foreach { + case (vid, edges) => if (vid > 0 && vid < 49) assert(edges.size == 2) + else assert(edges.size == 1) + } + edges.collect.foreach { + case (vid, edges) => + val s = edges.toSet + val edgeIds = s.map(e => if (vid != e.srcId) e.srcId else e.dstId) + if (vid == 0) { assert(edgeIds.contains(1)) } + else if (vid == 49) { assert(edgeIds.contains(48)) } + else { + assert(edgeIds.contains(vid + 1)) + assert(edgeIds.contains(vid - 1)) + } + } + } + } + + private def getCycleGraph(sc: SparkContext, numVertices: Int): Graph[Double, Int] = { + val cycle = (0 until numVertices).map(x => (x, (x + 1) % numVertices)) + getGraphFromSeq(sc, cycle) + } + + private def getChainGraph(sc: SparkContext, numVertices: Int): Graph[Double, Int] = { + val chain = (0 until numVertices - 1).map(x => (x, (x + 1))) + getGraphFromSeq(sc, chain) + } + + private def getGraphFromSeq(sc: SparkContext, seq: IndexedSeq[(Int, Int)]): Graph[Double, Int] = { + val rawEdges = sc.parallelize(seq, 3).map { case (s, d) => (s.toLong, d.toLong) } + Graph.fromEdgeTuples(rawEdges, 1.0).cache() + } } diff --git a/pom.xml b/pom.xml index 3a530685b8e5a..21060ee69c041 100644 --- a/pom.xml +++ b/pom.xml @@ -393,9 +393,9 @@ test
    - org.apache.zookeeper - zookeeper - 3.4.5 + org.apache.curator + curator-recipes + 2.4.0 org.jboss.netty @@ -505,27 +505,6 @@ - - - org.apache.avro - avro - 1.7.4 - - - org.apache.avro - avro-ipc - 1.7.4 - - - org.jboss.netty - netty - - - io.netty - netty - - -
    @@ -613,12 +592,13 @@ org.apache.maven.plugins maven-compiler-plugin - 2.5.1 + 3.1 ${java.version} ${java.version} UTF-8 1024m + true @@ -633,7 +613,7 @@ org.scalatest scalatest-maven-plugin - 1.0-M2 + 1.0-RC2 ${project.build.directory}/surefire-reports . diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f0d2e741484f9..c402415742b5e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -87,7 +87,7 @@ object SparkBuild extends Build { case Some(v) => v.toBoolean } lazy val hadoopClient = if (hadoopVersion.startsWith("0.20.") || hadoopVersion == "1.0.0") "hadoop-core" else "hadoop-client" - + val maybeAvro = if (hadoopVersion.startsWith("0.23.") && isYarnEnabled) Seq("org.apache.avro" % "avro" % "1.7.4") else Seq() // Conditionally include the yarn sub-project lazy val yarnAlpha = Project("yarn-alpha", file("yarn/alpha"), settings = yarnAlphaSettings) dependsOn(core) lazy val yarn = Project("yarn", file("yarn/stable"), settings = yarnSettings) dependsOn(core) @@ -130,6 +130,8 @@ object SparkBuild extends Build { javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION), unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath }, retrieveManaged := true, + // This is to add convenience of enabling sbt -Dsbt.offline=true for making the build offline. + offline := "true".equalsIgnoreCase(sys.props("sbt.offline")), retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]", transitiveClassifiers in Scope.GlobalScope := Seq("sources"), testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))), @@ -254,39 +256,38 @@ object SparkBuild extends Build { ), libraryDependencies ++= Seq( - "com.google.guava" % "guava" % "14.0.1", - "com.google.code.findbugs" % "jsr305" % "1.3.9", - "log4j" % "log4j" % "1.2.17", - "org.slf4j" % "slf4j-api" % slf4jVersion, - "org.slf4j" % "slf4j-log4j12" % slf4jVersion, - "org.slf4j" % "jul-to-slf4j" % slf4jVersion, - "org.slf4j" % "jcl-over-slf4j" % slf4jVersion, - "commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407 - "com.ning" % "compress-lzf" % "1.0.0", - "org.xerial.snappy" % "snappy-java" % "1.0.5", - "org.ow2.asm" % "asm" % "4.0", - "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), - "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), - "org.spark-project.akka" %% "akka-testkit" % "2.2.3-shaded-protobuf" % "test", - "net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty), - "it.unimi.dsi" % "fastutil" % "6.4.4", - "colt" % "colt" % "1.2.0", - "org.apache.mesos" % "mesos" % "0.13.0", - "net.java.dev.jets3t" % "jets3t" % "0.7.1" excludeAll(excludeCommonsLogging), - "org.apache.derby" % "derby" % "10.4.2.0" % "test", - "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib, excludeCommonsLogging, excludeSLF4J), - "org.apache.avro" % "avro" % "1.7.4", - "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty), - "org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty), - "com.codahale.metrics" % "metrics-core" % "3.0.0", - "com.codahale.metrics" % "metrics-jvm" % "3.0.0", - "com.codahale.metrics" % "metrics-json" % "3.0.0", - "com.codahale.metrics" % "metrics-ganglia" % "3.0.0", - "com.codahale.metrics" % "metrics-graphite" % "3.0.0", - "com.twitter" %% "chill" % "0.3.1", - "com.twitter" % "chill-java" % "0.3.1", - "com.clearspring.analytics" % "stream" % "2.5.1" - ) + "com.google.guava" % "guava" % "14.0.1", + "com.google.code.findbugs" % "jsr305" % "1.3.9", + "log4j" % "log4j" % "1.2.17", + "org.slf4j" % "slf4j-api" % slf4jVersion, + "org.slf4j" % "slf4j-log4j12" % slf4jVersion, + "org.slf4j" % "jul-to-slf4j" % slf4jVersion, + "org.slf4j" % "jcl-over-slf4j" % slf4jVersion, + "commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407 + "com.ning" % "compress-lzf" % "1.0.0", + "org.xerial.snappy" % "snappy-java" % "1.0.5", + "org.ow2.asm" % "asm" % "4.0", + "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-testkit" % "2.2.3-shaded-protobuf" % "test", + "org.json4s" %% "json4s-jackson" % "3.2.6", + "it.unimi.dsi" % "fastutil" % "6.4.4", + "colt" % "colt" % "1.2.0", + "org.apache.mesos" % "mesos" % "0.13.0", + "net.java.dev.jets3t" % "jets3t" % "0.7.1" excludeAll(excludeCommonsLogging), + "org.apache.derby" % "derby" % "10.4.2.0" % "test", + "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib, excludeCommonsLogging, excludeSLF4J), + "org.apache.curator" % "curator-recipes" % "2.4.0" excludeAll(excludeNetty), + "com.codahale.metrics" % "metrics-core" % "3.0.0", + "com.codahale.metrics" % "metrics-jvm" % "3.0.0", + "com.codahale.metrics" % "metrics-json" % "3.0.0", + "com.codahale.metrics" % "metrics-ganglia" % "3.0.0", + "com.codahale.metrics" % "metrics-graphite" % "3.0.0", + "com.twitter" %% "chill" % "0.3.1", + "com.twitter" % "chill-java" % "0.3.1", + "com.clearspring.analytics" % "stream" % "2.5.1" + ), + libraryDependencies ++= maybeAvro ) def rootSettings = sharedSettings ++ Seq( diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 158646352039f..4c214ef359685 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -45,34 +45,34 @@ def report_times(outfile, boot, init, finish): def main(infile, outfile): - boot_time = time.time() - split_index = read_int(infile) - if split_index == -1: # for unit tests - return + try: + boot_time = time.time() + split_index = read_int(infile) + if split_index == -1: # for unit tests + return - # fetch name of workdir - spark_files_dir = utf8_deserializer.loads(infile) - SparkFiles._root_directory = spark_files_dir - SparkFiles._is_running_on_worker = True + # fetch name of workdir + spark_files_dir = utf8_deserializer.loads(infile) + SparkFiles._root_directory = spark_files_dir + SparkFiles._is_running_on_worker = True - # fetch names and values of broadcast variables - num_broadcast_variables = read_int(infile) - for _ in range(num_broadcast_variables): - bid = read_long(infile) - value = pickleSer._read_with_length(infile) - _broadcastRegistry[bid] = Broadcast(bid, value) + # fetch names and values of broadcast variables + num_broadcast_variables = read_int(infile) + for _ in range(num_broadcast_variables): + bid = read_long(infile) + value = pickleSer._read_with_length(infile) + _broadcastRegistry[bid] = Broadcast(bid, value) - # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH - sys.path.append(spark_files_dir) # *.py files that were added will be copied here - num_python_includes = read_int(infile) - for _ in range(num_python_includes): - filename = utf8_deserializer.loads(infile) - sys.path.append(os.path.join(spark_files_dir, filename)) + # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH + sys.path.append(spark_files_dir) # *.py files that were added will be copied here + num_python_includes = read_int(infile) + for _ in range(num_python_includes): + filename = utf8_deserializer.loads(infile) + sys.path.append(os.path.join(spark_files_dir, filename)) - command = pickleSer._read_with_length(infile) - (func, deserializer, serializer) = command - init_time = time.time() - try: + command = pickleSer._read_with_length(infile) + (func, deserializer, serializer) = command + init_time = time.time() iterator = deserializer.load_stream(infile) serializer.dump_stream(func(split_index, iterator), outfile) except Exception as e: diff --git a/repl/pom.xml b/repl/pom.xml index 73597f635b9e0..4c5f9720c802a 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -98,7 +98,7 @@ true - + @@ -111,7 +111,7 @@ - + diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 013cea07d48fd..f52ebe4a159f1 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -182,8 +182,13 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter, /** Create a new interpreter. */ def createInterpreter() { - if (addedClasspath != "") - settings.classpath append addedClasspath + require(settings != null) + + if (addedClasspath != "") settings.classpath.append(addedClasspath) + // work around for Scala bug + val totalClassPath = SparkILoop.getAddedJars.foldLeft( + settings.classpath.value)((l, r) => ClassPath.join(l, r)) + this.settings.classpath.value = totalClassPath intp = new SparkILoopInterpreter } diff --git a/sbt/sbt b/sbt/sbt index 8472dce589bcc..da725f720e423 100755 --- a/sbt/sbt +++ b/sbt/sbt @@ -23,7 +23,7 @@ SBT_VERSION=`awk -F "=" '/sbt\\.version/ {print $2}' ./project/build.properties` URL1=http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar URL2=http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar -JAR=sbt/sbt-launch-${SBT_VERSION}.jar +JAR=sbt-launch-${SBT_VERSION}.jar # Download sbt launch jar if it hasn't been downloaded yet if [ ! -f ${JAR} ]; then diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 4dcd0e4c51ec3..2c7ff87744d7a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -127,7 +127,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying `groupByKey` on each RDD of `this` DStream. * Therefore, the values for each key in `this` DStream's RDDs are grouped into a - * single sequence to generate the RDDs of the new DStream. [[org.apache.spark.Partitioner]] + * single sequence to generate the RDDs of the new DStream. org.apache.spark.Partitioner * is used to control the partitioning of each RDD. */ def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] = @@ -151,7 +151,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control + * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control * thepartitioning of each RDD. */ def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = { @@ -161,7 +161,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the * combineByKey for RDDs. Please refer to combineByKey in - * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. + * org.apache.spark.rdd.PairRDDFunctions for more information. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], @@ -176,7 +176,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the * combineByKey for RDDs. Please refer to combineByKey in - * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. + * org.apache.spark.rdd.PairRDDFunctions for more information. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], @@ -479,7 +479,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new @@ -579,7 +579,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def join[W]( other: JavaPairDStream[K, W], @@ -619,7 +619,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def leftOuterJoin[W]( other: JavaPairDStream[K, W], @@ -660,7 +660,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def rightOuterJoin[W]( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 2268160dccc1f..b082bb058529b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -406,7 +406,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream(). * In the transform function, convert the JavaRDD corresponding to that JavaDStream to - * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD(). + * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD(). */ def transform[T]( dstreams: JList[JavaDStream[_]], @@ -429,7 +429,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream(). * In the transform function, convert the JavaRDD corresponding to that JavaDStream to - * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD(). + * a JavaPairRDD using org.apache.spark.api.java.JavaPairRDD.fromJavaRDD(). */ def transform[K, V]( dstreams: JList[JavaDStream[_]], diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index f3c58aede092a..2473496949360 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -65,7 +65,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying `groupByKey` on each RDD. The supplied - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { val createCombiner = (v: V) => ArrayBuffer[V](v) @@ -95,7 +95,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control + * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { @@ -376,7 +376,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new @@ -396,7 +396,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. Note, that * this function may generate a different a tuple with a different key @@ -453,7 +453,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs. + * The supplied org.apache.spark.Partitioner is used to partition the generated RDDs. */ def cogroup[W: ClassTag]( other: DStream[(K, W)], @@ -483,7 +483,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ def join[W: ClassTag]( other: DStream[(K, W)], @@ -518,7 +518,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def leftOuterJoin[W: ClassTag]( @@ -554,7 +554,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) /** * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * `other` DStream. The supplied org.apache.spark.Partitioner is used to control * the partitioning of each RDD. */ def rightOuterJoin[W: ClassTag]( diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 4fbbce9b8b90e..54a0791d04ea4 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -19,7 +19,6 @@ import scala.Tuple2; -import org.junit.After; import org.junit.Assert; import org.junit.Test; import java.io.*; @@ -30,7 +29,6 @@ import com.google.common.io.Files; import com.google.common.collect.Sets; -import org.apache.spark.SparkConf; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -38,6 +36,7 @@ import org.apache.spark.api.java.function.*; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; @@ -45,6 +44,8 @@ // serialized, as an alternative to converting these anonymous classes to static inner classes; // see http://stackoverflow.com/questions/758570/. public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable { + + @SuppressWarnings("unchecked") @Test public void testCount() { List> inputData = Arrays.asList( @@ -64,6 +65,7 @@ public void testCount() { assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testMap() { List> inputData = Arrays.asList( @@ -87,6 +89,7 @@ public Integer call(String s) throws Exception { assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testWindow() { List> inputData = Arrays.asList( @@ -108,6 +111,7 @@ public void testWindow() { assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testWindowWithSlideDuration() { List> inputData = Arrays.asList( @@ -132,6 +136,7 @@ public void testWindowWithSlideDuration() { assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testFilter() { List> inputData = Arrays.asList( @@ -155,13 +160,16 @@ public Boolean call(String s) throws Exception { assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testRepartitionMorePartitions() { List> inputData = Arrays.asList( Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2); - JavaDStream repartitioned = stream.repartition(4); + JavaDStream stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 2); + JavaDStreamLike,JavaRDD> repartitioned = + stream.repartition(4); JavaTestUtils.attachTestOutputStream(repartitioned); List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); Assert.assertEquals(2, result.size()); @@ -172,13 +180,16 @@ public void testRepartitionMorePartitions() { } } + @SuppressWarnings("unchecked") @Test public void testRepartitionFewerPartitions() { List> inputData = Arrays.asList( Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4); - JavaDStream repartitioned = stream.repartition(2); + JavaDStream stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 4); + JavaDStreamLike,JavaRDD> repartitioned = + stream.repartition(2); JavaTestUtils.attachTestOutputStream(repartitioned); List>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2); Assert.assertEquals(2, result.size()); @@ -188,6 +199,7 @@ public void testRepartitionFewerPartitions() { } } + @SuppressWarnings("unchecked") @Test public void testGlom() { List> inputData = Arrays.asList( @@ -206,6 +218,7 @@ public void testGlom() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testMapPartitions() { List> inputData = Arrays.asList( @@ -217,16 +230,17 @@ public void testMapPartitions() { Arrays.asList("YANKEESRED SOCKS")); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream mapped = stream.mapPartitions(new FlatMapFunction, String>() { - @Override - public Iterable call(Iterator in) { - String out = ""; - while (in.hasNext()) { - out = out + in.next().toUpperCase(); - } - return Lists.newArrayList(out); - } - }); + JavaDStream mapped = stream.mapPartitions( + new FlatMapFunction, String>() { + @Override + public Iterable call(Iterator in) { + String out = ""; + while (in.hasNext()) { + out = out + in.next().toUpperCase(); + } + return Lists.newArrayList(out); + } + }); JavaTestUtils.attachTestOutputStream(mapped); List> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -247,6 +261,7 @@ public Integer call(Integer i1, Integer i2) throws Exception { } } + @SuppressWarnings("unchecked") @Test public void testReduce() { List> inputData = Arrays.asList( @@ -267,6 +282,7 @@ public void testReduce() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testReduceByWindow() { List> inputData = Arrays.asList( @@ -289,6 +305,7 @@ public void testReduceByWindow() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testQueueStream() { List> expected = Arrays.asList( @@ -312,6 +329,7 @@ public void testQueueStream() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testTransform() { List> inputData = Arrays.asList( @@ -344,6 +362,7 @@ public Integer call(Integer i) throws Exception { assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testVariousTransform() { // tests whether all variations of transform can be called from Java @@ -423,6 +442,7 @@ public JavaRDD call(JavaRDD in) throws Exception { } + @SuppressWarnings("unchecked") @Test public void testTransformWith() { List>> stringStringKVStream1 = Arrays.asList( @@ -492,6 +512,7 @@ public JavaPairRDD> call( } + @SuppressWarnings("unchecked") @Test public void testVariousTransformWith() { // tests whether all variations of transformWith can be called from Java @@ -591,6 +612,7 @@ public JavaPairRDD call(JavaPairRDD rdd1, JavaP ); } + @SuppressWarnings("unchecked") @Test public void testStreamingContextTransform(){ List> stream1input = Arrays.asList( @@ -658,6 +680,7 @@ public Tuple2 call(Integer i) throws Exception { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testFlatMap() { List> inputData = Arrays.asList( @@ -683,6 +706,7 @@ public Iterable call(String x) { assertOrderInvariantEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairFlatMap() { List> inputData = Arrays.asList( @@ -718,22 +742,24 @@ public void testPairFlatMap() { new Tuple2(9, "s"))); JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction() { - @Override - public Iterable> call(String in) throws Exception { - List> out = Lists.newArrayList(); - for (String letter: in.split("(?!^)")) { - out.add(new Tuple2(in.length(), letter)); - } - return out; - } - }); + JavaPairDStream flatMapped = stream.flatMap( + new PairFlatMapFunction() { + @Override + public Iterable> call(String in) throws Exception { + List> out = Lists.newArrayList(); + for (String letter: in.split("(?!^)")) { + out.add(new Tuple2(in.length(), letter)); + } + return out; + } + }); JavaTestUtils.attachTestOutputStream(flatMapped); List>> result = JavaTestUtils.runStreams(ssc, 3, 3); Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testUnion() { List> inputData1 = Arrays.asList( @@ -778,6 +804,7 @@ public static > void assertOrderInvariantEquals( // PairDStream Functions + @SuppressWarnings("unchecked") @Test public void testPairFilter() { List> inputData = Arrays.asList( @@ -810,7 +837,8 @@ public Boolean call(Tuple2 in) throws Exception { Assert.assertEquals(expected, result); } - List>> stringStringKVStream = Arrays.asList( + @SuppressWarnings("unchecked") + private List>> stringStringKVStream = Arrays.asList( Arrays.asList(new Tuple2("california", "dodgers"), new Tuple2("california", "giants"), new Tuple2("new york", "yankees"), @@ -820,7 +848,8 @@ public Boolean call(Tuple2 in) throws Exception { new Tuple2("new york", "rangers"), new Tuple2("new york", "islanders"))); - List>> stringIntKVStream = Arrays.asList( + @SuppressWarnings("unchecked") + private List>> stringIntKVStream = Arrays.asList( Arrays.asList( new Tuple2("california", 1), new Tuple2("california", 3), @@ -832,6 +861,7 @@ public Boolean call(Tuple2 in) throws Exception { new Tuple2("new york", 3), new Tuple2("new york", 1))); + @SuppressWarnings("unchecked") @Test public void testPairMap() { // Maps pair -> pair of different type List>> inputData = stringIntKVStream; @@ -864,6 +894,7 @@ public Tuple2 call(Tuple2 in) throws Exception Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairMapPartitions() { // Maps pair -> pair of different type List>> inputData = stringIntKVStream; @@ -901,6 +932,7 @@ public Iterable> call(Iterator> Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairMap2() { // Maps pair -> single List>> inputData = stringIntKVStream; @@ -925,6 +957,7 @@ public Integer call(Tuple2 in) throws Exception { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair List>> inputData = Arrays.asList( @@ -967,6 +1000,7 @@ public Iterable> call(Tuple2 in) throws Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairGroupByKey() { List>> inputData = stringStringKVStream; @@ -989,6 +1023,7 @@ public void testPairGroupByKey() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairReduceByKey() { List>> inputData = stringIntKVStream; @@ -1013,6 +1048,7 @@ public void testPairReduceByKey() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCombineByKey() { List>> inputData = stringIntKVStream; @@ -1043,6 +1079,7 @@ public Integer call(Integer i) throws Exception { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCountByValue() { List> inputData = Arrays.asList( @@ -1068,6 +1105,7 @@ public void testCountByValue() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testGroupByKeyAndWindow() { List>> inputData = stringIntKVStream; @@ -1113,6 +1151,7 @@ private Tuple2> convert(Tuple2> t return new Tuple2>(tuple._1(), new HashSet(tuple._2())); } + @SuppressWarnings("unchecked") @Test public void testReduceByKeyAndWindow() { List>> inputData = stringIntKVStream; @@ -1136,6 +1175,7 @@ public void testReduceByKeyAndWindow() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testUpdateStateByKey() { List>> inputData = stringIntKVStream; @@ -1171,6 +1211,7 @@ public Optional call(List values, Optional state) { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testReduceByKeyAndWindowWithInverse() { List>> inputData = stringIntKVStream; @@ -1194,6 +1235,7 @@ public void testReduceByKeyAndWindowWithInverse() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCountByValueAndWindow() { List> inputData = Arrays.asList( @@ -1227,6 +1269,7 @@ public void testCountByValueAndWindow() { Assert.assertEquals(expected, unorderedResult); } + @SuppressWarnings("unchecked") @Test public void testPairTransform() { List>> inputData = Arrays.asList( @@ -1271,6 +1314,7 @@ public JavaPairRDD call(JavaPairRDD in) thro Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testPairToNormalRDDTransform() { List>> inputData = Arrays.asList( @@ -1312,6 +1356,8 @@ public Integer call(Tuple2 in) { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") + @Test public void testMapValues() { List>> inputData = stringStringKVStream; @@ -1342,6 +1388,7 @@ public String call(String s) throws Exception { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testFlatMapValues() { List>> inputData = stringStringKVStream; @@ -1386,6 +1433,7 @@ public Iterable call(String in) { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCoGroup() { List>> stringStringKVStream1 = Arrays.asList( @@ -1429,6 +1477,7 @@ public void testCoGroup() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testJoin() { List>> stringStringKVStream1 = Arrays.asList( @@ -1472,6 +1521,7 @@ public void testJoin() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testLeftOuterJoin() { List>> stringStringKVStream1 = Arrays.asList( @@ -1503,6 +1553,7 @@ public void testLeftOuterJoin() { Assert.assertEquals(expected, result); } + @SuppressWarnings("unchecked") @Test public void testCheckpointMasterRecovery() throws InterruptedException { List> inputData = Arrays.asList( @@ -1541,7 +1592,8 @@ public Integer call(String s) throws Exception { } - /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD + /* TEST DISABLED: Pending a discussion about checkpoint() semantics with TD + @SuppressWarnings("unchecked") @Test public void testCheckpointofIndividualStream() throws InterruptedException { List> inputData = Arrays.asList( @@ -1581,16 +1633,14 @@ public void testSocketTextStream() { @Test public void testSocketString() { class Converter extends Function> { - public Iterable call(InputStream in) { + public Iterable call(InputStream in) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(in)); List out = new ArrayList(); - try { - while (true) { - String line = reader.readLine(); - if (line == null) { break; } - out.add(line); - } - } catch (IOException e) { } + while (true) { + String line = reader.readLine(); + if (line == null) { break; } + out.add(line); + } return out; } } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 1419f215c78e5..fe37168e5a7ba 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -108,7 +108,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { args = tail case Nil => - if (userJar == null || userClass == null) { + if (userClass == null) { printUsageAndExit(1) } @@ -129,7 +129,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { System.err.println( "Usage: org.apache.spark.deploy.yarn.Client [options] \n" + "Options:\n" + - " --jar JAR_PATH Path to your application's JAR file (required)\n" + + " --jar JAR_PATH Path to your application's JAR file (required in yarn-standalone mode)\n" + " --class CLASS_NAME Name of your application's main class (required)\n" + " --args ARGS Arguments to be passed to your application's main class.\n" + " Mutliple invocations are possible, each will be passed in order.\n" + diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 2db5744be1a70..24520bd21ba98 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -68,7 +68,8 @@ trait ClientBase extends Logging { def validateArgs() = { Map( (System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!", - (args.userJar == null) -> "Error: You must specify a user jar!", + ((args.userJar == null && args.amClass == classOf[ApplicationMaster].getName) -> + "Error: You must specify a user jar when running in standalone mode!"), (args.userClass == null) -> "Error: You must specify a user class!", (args.numWorkers <= 0) -> "Error: You must specify at least 1 worker!", (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be" + diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 22e55e0c60647..e7130d24072ca 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -44,10 +44,6 @@ private[spark] class YarnClientSchedulerBackend( override def start() { super.start() - val userJar = System.getenv("SPARK_YARN_APP_JAR") - if (userJar == null) - throw new SparkException("env SPARK_YARN_APP_JAR is not set") - val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort @@ -55,7 +51,7 @@ private[spark] class YarnClientSchedulerBackend( val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ( "--class", "notused", - "--jar", userJar, + "--jar", null, "--args", hostport, "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher" ) diff --git a/yarn/pom.xml b/yarn/pom.xml index e7eba36ba351b..c0e133dd603b1 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -133,7 +133,7 @@ true - + @@ -146,7 +146,7 @@ - +