From 69a8af2af0437ee816c74ca0ca8c78ef47ede69f Mon Sep 17 00:00:00 2001 From: rubarb Date: Mon, 14 Apr 2014 23:07:20 -0700 Subject: [PATCH 1/6] changed title --- docs/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/index.md b/docs/index.md index 89ec5b05488a..983f25746cd3 100644 --- a/docs/index.md +++ b/docs/index.md @@ -1,6 +1,6 @@ --- layout: global -title: Spark Overview +title: Getting Started with Apache Spark --- Apache Spark is a fast and general-purpose cluster computing system. From b0043607ae6e7fe498703e6a21de98d61f6c3c07 Mon Sep 17 00:00:00 2001 From: rubarb Date: Mon, 21 Apr 2014 23:35:52 -0700 Subject: [PATCH 2/6] made some edits --- docs/index.md | 47 +++++++++++++++++++++++++---------------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/docs/index.md b/docs/index.md index 983f25746cd3..b2c628a24b40 100644 --- a/docs/index.md +++ b/docs/index.md @@ -7,19 +7,39 @@ Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in [Scala](scala-programming-guide.html), [Java](java-programming-guide.html), and [Python](python-programming-guide.html) that make parallel jobs easy to write, and an optimized engine that supports general computation graphs. It also supports a rich set of higher-level tools including [Shark](http://shark.cs.berkeley.edu) (Hive on Spark), [MLlib](mllib-guide.html) for machine learning, [GraphX](graphx-programming-guide.html) for graph processing, and [Spark Streaming](streaming-programming-guide.html). -# Downloading +# Download Get Spark by visiting the [downloads page](http://spark.apache.org/downloads.html) of the Apache Spark site. This documentation is for Spark version {{site.SPARK_VERSION}}. -Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). All you need to run it is to have `java` to installed on your system `PATH`, or the `JAVA_HOME` environment variable pointing to a Java installation. +Spark runs on both Windows and UNIX-like systems (e.g., Linux, Mac OS). All you need to run it is to have Java installed on your system `PATH` or point the `JAVA_HOME` environment variable to a Java installation. -# Building +For its Scala API, Spark {{site.SPARK_VERSION}} depends on Scala {{site.SCALA_BINARY_VERSION}}. If you write applications in +Scala, you will need to use a compatible Scala version (e.g., {{site.SCALA_BINARY_VERSION}}.X) -- newer major versions may no +t work. You can get the appropriate version of Scala from [scala-lang.org](http://www.scala-lang.org/download/). -Spark uses [Simple Build Tool](http://www.scala-sbt.org), which is bundled with it. To compile the code, go into the top-level Spark directory and run +# Build + +Spark uses the Hadoop-client library to talk to HDFS and other Hadoop-supported +storage systems. Because the HDFS protocol has changed in different versions of +Hadoop, you must build Spark against the same version that your cluster uses. + +Spark is bundled with the [Simple Build Tool](http://www.scala-sbt.org) (SBT). + +To compile the code so Spark links to Hadoop 1.0.4 (default), from the top-level Spark directory run: sbt/sbt assembly -For its Scala API, Spark {{site.SPARK_VERSION}} depends on Scala {{site.SCALA_BINARY_VERSION}}. If you write applications in Scala, you will need to use a compatible Scala version (e.g. {{site.SCALA_BINARY_VERSION}}.X) -- newer major versions may not work. You can get the right version of Scala from [scala-lang.org](http://www.scala-lang.org/download/). +By default, Spark links to Hadoop 1.0.4. You can change this by setting the +`SPARK_HADOOP_VERSION` variable when compiling: + + SPARK_HADOOP_VERSION=2.2.0 sbt/sbt assembly + + In addition, if you wish to run Spark on [YARN](running-on-yarn.html), set + `SPARK_YARN` to `true`: + + SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly + + Note that on Windows, you need to set the environment variables on separate lines, e.g., `set SPARK_HADOOP_VERSION=1.2.1`. # Running the Examples and Shell @@ -50,23 +70,6 @@ options for deployment: * [Apache Mesos](running-on-mesos.html) * [Hadoop YARN](running-on-yarn.html) -# A Note About Hadoop Versions - -Spark uses the Hadoop-client library to talk to HDFS and other Hadoop-supported -storage systems. Because the HDFS protocol has changed in different versions of -Hadoop, you must build Spark against the same version that your cluster uses. -By default, Spark links to Hadoop 1.0.4. You can change this by setting the -`SPARK_HADOOP_VERSION` variable when compiling: - - SPARK_HADOOP_VERSION=2.2.0 sbt/sbt assembly - -In addition, if you wish to run Spark on [YARN](running-on-yarn.html), set -`SPARK_YARN` to `true`: - - SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly - -Note that on Windows, you need to set the environment variables on separate lines, e.g., `set SPARK_HADOOP_VERSION=1.2.1`. - # Where to Go from Here **Programming guides:** From 9774867182db38078257e76f1db81cb09e9c313a Mon Sep 17 00:00:00 2001 From: rubarb Date: Tue, 22 Apr 2014 22:17:59 -0700 Subject: [PATCH 3/6] made editing pass --- docs/index.md | 96 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 70 insertions(+), 26 deletions(-) diff --git a/docs/index.md b/docs/index.md index b2c628a24b40..7f3819f7832f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -7,17 +7,19 @@ Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in [Scala](scala-programming-guide.html), [Java](java-programming-guide.html), and [Python](python-programming-guide.html) that make parallel jobs easy to write, and an optimized engine that supports general computation graphs. It also supports a rich set of higher-level tools including [Shark](http://shark.cs.berkeley.edu) (Hive on Spark), [MLlib](mllib-guide.html) for machine learning, [GraphX](graphx-programming-guide.html) for graph processing, and [Spark Streaming](streaming-programming-guide.html). -# Download +# Downloading Get Spark by visiting the [downloads page](http://spark.apache.org/downloads.html) of the Apache Spark site. This documentation is for Spark version {{site.SPARK_VERSION}}. -Spark runs on both Windows and UNIX-like systems (e.g., Linux, Mac OS). All you need to run it is to have Java installed on your system `PATH` or point the `JAVA_HOME` environment variable to a Java installation. +Spark runs on both Windows and Unix-like systems (e.g., Linux, Mac OS). All you need to run it is to have Java installed on your system `PATH` or point the `JAVA_HOME` environment variable to a Java installation. + +Note: The Spark Programming Guide is written through a Scala lens, so Java and Python developers may wish to download and install Scala so they can work hands-on with the Scala examples in the Spark Programming Guide. For its Scala API, Spark {{site.SPARK_VERSION}} depends on Scala {{site.SCALA_BINARY_VERSION}}. If you write applications in -Scala, you will need to use a compatible Scala version (e.g., {{site.SCALA_BINARY_VERSION}}.X) -- newer major versions may no +Scala, you will need to use a compatible Scala version (*e.g.*, {{site.SCALA_BINARY_VERSION}}.X) -- newer major versions may no t work. You can get the appropriate version of Scala from [scala-lang.org](http://www.scala-lang.org/download/). -# Build +# Building Spark uses the Hadoop-client library to talk to HDFS and other Hadoop-supported storage systems. Because the HDFS protocol has changed in different versions of @@ -27,55 +29,97 @@ Spark is bundled with the [Simple Build Tool](http://www.scala-sbt.org) (SBT). To compile the code so Spark links to Hadoop 1.0.4 (default), from the top-level Spark directory run: - sbt/sbt assembly + $ sbt/sbt assembly + +You can change the Hadoop version that Spark links to by setting the +`SPARK_HADOOP_VERSION` environment variable when compiling. For example: + + $ SPARK_HADOOP_VERSION=2.2.0 sbt/sbt assembly + +If you wish to run Spark on [YARN](running-on-yarn.html), set +`SPARK_YARN` to `true`. For example: + + $ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly + +Note: If you're using the Windows Command Prompt run each command separately: + + > set SPARK_HADOOP_VERSION=2.0.5-alpha + > set SPARK_YARN=true + > sbt/sbt assembly + +# Running Spark Examples + +Spark comes with a number of sample programs. Scala and Java examples are in the `examples` directory, and Python examples are in the `python/examples` directory. -By default, Spark links to Hadoop 1.0.4. You can change this by setting the -`SPARK_HADOOP_VERSION` variable when compiling: +To run one of the Java or Scala sample programs, in the top-level Spark directory: - SPARK_HADOOP_VERSION=2.2.0 sbt/sbt assembly + $ ./bin/run-example - In addition, if you wish to run Spark on [YARN](running-on-yarn.html), set - `SPARK_YARN` to `true`: +The `bin/run-example` script sets up the appropriate paths and launches the specified program. +For example, try this Scala program: - SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly + $ ./bin/run-example org.apache.spark.examples.SparkPi local - Note that on Windows, you need to set the environment variables on separate lines, e.g., `set SPARK_HADOOP_VERSION=1.2.1`. +Or run this Java program: -# Running the Examples and Shell + $ ./bin/run-example org.apache.spark.examples.JavaSparkPi local -Spark comes with several sample programs. Scala and Java examples are in the `examples` directory, and Python examples are in `python/examples`. -To run one of the Java or Scala sample programs, use `./bin/run-example ` in the top-level Spark directory -(the `bin/run-example` script sets up the appropriate paths and launches that program). -For example, try `./bin/run-example org.apache.spark.examples.SparkPi local`. -To run a Python sample program, use `./bin/pyspark `. For example, try `./bin/pyspark ./python/examples/pi.py local`. +To run a Python sample program, in the top-level Spark directory: -Each example prints usage help when run with no parameters. + $ ./bin/pyspark + +For example, try: + + $ ./bin/pyspark ./python/examples/pi.py local + +Each example prints usage help when run without parameters: + + $ ./bin/run-example org.apache.spark.examples.JavaWordCount + Usage: JavaWordCount + + $ ./bin/run-example org.apache.spark.examples.JavaWordCount local README.md + +The README.md file is located in the top-level Spark directory. Note that all of the sample programs take a `` parameter specifying the cluster URL to connect to. This can be a [URL for a distributed cluster](scala-programming-guide.html#master-urls), -or `local` to run locally with one thread, or `local[N]` to run locally with N threads. You should start by using +`local` to run locally with one thread, or `local[N]` to run locally with N threads. We recommend starting by using `local` for testing. -Finally, you can run Spark interactively through modified versions of the Scala shell (`./bin/spark-shell`) or -Python interpreter (`./bin/pyspark`). These are a great way to learn the framework. +# Using the Spark Shells + +You can run Spark interactively through modified versions of the Scala shell or +the Python interpreter. These are great ways to learn the Spark framework. + +To run Spark's Scala shell: + + $ ./bin/spark-shell + ... + > + +To run Spark's Python interpreter: + + $ ./bin/pyspark + ... + >>> # Launching on a Cluster The Spark [cluster mode overview](cluster-overview.html) explains the key concepts in running on a cluster. -Spark can run both by itself, or over several existing cluster managers. It currently provides several +Spark can run by itself or over several existing cluster managers. There are currently several options for deployment: * [Amazon EC2](ec2-scripts.html): our EC2 scripts let you launch a cluster in about 5 minutes -* [Standalone Deploy Mode](spark-standalone.html): simplest way to deploy Spark on a private cluster +* [Standalone Deploy Mode](spark-standalone.html): the simplest way to deploy Spark on a private cluster * [Apache Mesos](running-on-mesos.html) * [Hadoop YARN](running-on-yarn.html) # Where to Go from Here -**Programming guides:** +**Programming Guides:** * [Quick Start](quick-start.html): a quick introduction to the Spark API; start here! -* [Spark Programming Guide](scala-programming-guide.html): an overview of Spark concepts, and details on the Scala API +* [Spark Programming Guide](scala-programming-guide.html): an overview of Spark concepts though the lens of the Scala API * [Java Programming Guide](java-programming-guide.html): using Spark from Java * [Python Programming Guide](python-programming-guide.html): using Spark from Python * [Spark Streaming](streaming-programming-guide.html): Spark's API for processing data streams From 7da975e5db2781e71b3515bafe4a04ed49c3be1d Mon Sep 17 00:00:00 2001 From: rubarb Date: Thu, 24 Apr 2014 02:01:33 -0700 Subject: [PATCH 4/6] edits --- docs/index.md | 49 +++++++++-------- docs/quick-start.md | 126 ++++++++++++++++++++++---------------------- 2 files changed, 89 insertions(+), 86 deletions(-) diff --git a/docs/index.md b/docs/index.md index 7f3819f7832f..424209010348 100644 --- a/docs/index.md +++ b/docs/index.md @@ -13,11 +13,10 @@ Get Spark by visiting the [downloads page](http://spark.apache.org/downloads.htm Spark runs on both Windows and Unix-like systems (e.g., Linux, Mac OS). All you need to run it is to have Java installed on your system `PATH` or point the `JAVA_HOME` environment variable to a Java installation. -Note: The Spark Programming Guide is written through a Scala lens, so Java and Python developers may wish to download and install Scala so they can work hands-on with the Scala examples in the Spark Programming Guide. +Note: Some parts of the [Spark Programming Quick Start Guide](quick-start.html) and all of the [Spark Scala Programming Guide](scala-programming-guide.html) are written through a Scala lens, so Java and Python developers may wish to download and install Scala so they can work hands-on with the Scala examples. For its Scala API, Spark {{site.SPARK_VERSION}} depends on Scala {{site.SCALA_BINARY_VERSION}}. If you write applications in -Scala, you will need to use a compatible Scala version (*e.g.*, {{site.SCALA_BINARY_VERSION}}.X) -- newer major versions may no -t work. You can get the appropriate version of Scala from [scala-lang.org](http://www.scala-lang.org/download/). +Scala, you will need to use a compatible Scala version (*e.g.*, {{site.SCALA_BINARY_VERSION}}.x) -- newer major versions may not work. You can get the appropriate version of Scala from [scala-lang.org](http://www.scala-lang.org/download/). # Building @@ -25,9 +24,7 @@ Spark uses the Hadoop-client library to talk to HDFS and other Hadoop-supported storage systems. Because the HDFS protocol has changed in different versions of Hadoop, you must build Spark against the same version that your cluster uses. -Spark is bundled with the [Simple Build Tool](http://www.scala-sbt.org) (SBT). - -To compile the code so Spark links to Hadoop 1.0.4 (default), from the top-level Spark directory run: +Spark is bundled with the [Simple Build Tool](http://www.scala-sbt.org) (SBT). To compile the code with SBT so Spark links to Hadoop 1.0.4 (default), from the top-level Spark directory run: $ sbt/sbt assembly @@ -86,18 +83,20 @@ to connect to. This can be a [URL for a distributed cluster](scala-programming-g `local` to run locally with one thread, or `local[N]` to run locally with N threads. We recommend starting by using `local` for testing. -# Using the Spark Shells +# Using the Spark Shell You can run Spark interactively through modified versions of the Scala shell or -the Python interpreter. These are great ways to learn the Spark framework. +the Python interpreter. These are great ways to learn the Spark framework. + +The Spark Scala shell is discussed in greater detail in the [Spark Programming Quick Start Guide](quick-start.html) and the [Spark Scala Programming Guide](scala-programming-guide.html). The Spark Python interpreter is discussed in greater detail in the [Spark Python Programming Guide](python-programming-guide.html#interactive-use). -To run Spark's Scala shell: +To run Spark's Scala shell, from the top-level Spark directory: $ ./bin/spark-shell ... - > + scala> -To run Spark's Python interpreter: +To run Spark's Python interpreter, from the top-level Spark directory: $ ./bin/pyspark ... @@ -105,7 +104,7 @@ To run Spark's Python interpreter: # Launching on a Cluster -The Spark [cluster mode overview](cluster-overview.html) explains the key concepts in running on a cluster. +The Spark [cluster mode overview](cluster-overview.html) explains the key concepts of running on a cluster. Spark can run by itself or over several existing cluster managers. There are currently several options for deployment: @@ -118,14 +117,18 @@ options for deployment: **Programming Guides:** -* [Quick Start](quick-start.html): a quick introduction to the Spark API; start here! -* [Spark Programming Guide](scala-programming-guide.html): an overview of Spark concepts though the lens of the Scala API - * [Java Programming Guide](java-programming-guide.html): using Spark from Java - * [Python Programming Guide](python-programming-guide.html): using Spark from Python +We recommend that Scala, Java and Python developers work through the [Spark Programming Quick Start Guide](quick-start.html) and then work through the [Spark Scala Programming Guide](scala-programming-guide.html). + +Even though the [Spark Programming Quick Start Guide](quick-start.html) and the [Spark Scala Programming Guide](scala-programming-guide.html) are written through a Scala lens, Java and Python developers will find that these docs introduce key concepts that are very helpful to understand before diving into the [Spark Java Programming Guide](java-programming-guide.html) or the [Spark Python Programming Guide](python-programming-guide.html). + +* [Spark Programming Quick Start Guide](quick-start.html): a quick introduction to the Spark API; start here! +* [Spark Scala Programming Guide](scala-programming-guide.html): an overview of Spark concepts though a Scala lens; then go here! + * [Spark Java Programming Guide](java-programming-guide.html): using Spark from Java + * [Spark Python Programming Guide](python-programming-guide.html): using Spark from Python * [Spark Streaming](streaming-programming-guide.html): Spark's API for processing data streams * [Spark SQL](sql-programming-guide.html): Support for running relational queries on Spark * [MLlib (Machine Learning)](mllib-guide.html): Spark's built-in machine learning library -* [Bagel (Pregel on Spark)](bagel-programming-guide.html): simple graph processing model +* [Bagel (Pregel on Spark)](bagel-programming-guide.html): simple graph processing model; will soon be superseded by [GraphX](graphx-programming-guide.html) * [GraphX (Graphs on Spark)](graphx-programming-guide.html): Spark's new API for graphs **API Docs:** @@ -138,7 +141,7 @@ options for deployment: * [GraphX (Graphs on Spark) for Scala (Scaladoc)](api/graphx/index.html) -**Deployment guides:** +**Deployment Guides:** * [Cluster Overview](cluster-overview.html): overview of concepts and components when running on a cluster * [Amazon EC2](ec2-scripts.html): scripts that let you launch a cluster on EC2 in about 5 minutes @@ -147,17 +150,17 @@ options for deployment: [Apache Mesos](http://mesos.apache.org) * [YARN](running-on-yarn.html): deploy Spark on top of Hadoop NextGen (YARN) -**Other documents:** +**Other Documents:** * [Configuration](configuration.html): customize Spark via its configuration system -* [Tuning Guide](tuning.html): best practices to optimize performance and memory use +* [Tuning Guide](tuning.html): best practices for optimizing performance and memory use * [Security](security.html): Spark security support * [Hardware Provisioning](hardware-provisioning.html): recommendations for cluster hardware * [Job Scheduling](job-scheduling.html): scheduling resources across and within Spark applications * [Building Spark with Maven](building-with-maven.html): build Spark using the Maven system -* [Contributing to Spark](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark) +* [Contributing to Spark](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark): Spark Wiki discussing how to contribute code, contribute documentation, report issues, etc. -**External resources:** +**External Resources:** * [Spark Homepage](http://spark.apache.org) * [Shark](http://shark.cs.berkeley.edu): Apache Hive over Spark @@ -166,7 +169,7 @@ options for deployment: exercises about Spark, Shark, Mesos, and more. [Videos](http://ampcamp.berkeley.edu/agenda-2012), [slides](http://ampcamp.berkeley.edu/agenda-2012) and [exercises](http://ampcamp.berkeley.edu/exercises-2012) are available online for free. -* [Code Examples](http://spark.apache.org/examples.html): more are also available in the [examples subfolder](https://github.com/apache/spark/tree/master/examples/src/main/scala/) of Spark +* [Code Examples](http://spark.apache.org/examples.html): more are also available in the [examples subfolder](https://github.com/apache/spark/tree/master/examples/src/main/scala/) of the Apache Spark project * [Paper Describing Spark](http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf) * [Paper Describing Spark Streaming](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) diff --git a/docs/quick-start.md b/docs/quick-start.md index 60e8b1ba0eb4..b98e33746a04 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -1,26 +1,25 @@ --- layout: global -title: Quick Start +title: Spark Programming Quick Start Guide --- * This will become a table of contents (this text will be scraped). {:toc} -This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark's interactive Scala shell (don't worry if you don't know Scala -- you will not need much for this), then show how to write standalone applications in Scala, Java, and Python. -See the [programming guide](scala-programming-guide.html) for a more complete reference. +This tutorial provides a quick introduction to using Spark. We'll introduce the API through Spark's interactive Scala shell (don't worry if you don't know Scala -- you won't need much for this), then we'll show you how to write standalone applications in Scala, Java, and Python. +See the [Spark Scala Programming Guide](scala-programming-guide.html) for a more complete reference. -To follow along with this guide, you only need to have successfully built Spark on one machine. Simply go into your Spark directory and run: - -{% highlight bash %} -$ sbt/sbt assembly -{% endhighlight %} +To follow along with this guide, you only need to have successfully built Spark on one machine. If you haven't built Spark yet, see the [Building](index.html#building) section of Getting Started with Apache Spark. # Interactive Analysis with the Spark Shell -## Basics +Spark's interactive shell provides a simple way to learn the Spark API, and it's also a powerful tool for interactively analyzing datasets. -Spark's interactive shell provides a simple way to learn the API, as well as a powerful tool to analyze datasets interactively. -Start the shell by running `./bin/spark-shell` in the Spark directory. +To start the shell, from the top-level Spark directory: + + $ ./bin/spark-shell + ... + scala> Spark's primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). RDDs can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs. Let's make a new RDD from the text of the README file in the Spark source directory: @@ -39,14 +38,14 @@ scala> textFile.first() // First item in this RDD res1: String = # Apache Spark {% endhighlight %} -Now let's use a transformation. We will use the [`filter`](scala-programming-guide.html#transformations) transformation to return a new RDD with a subset of the items in the file. +Now let's use a transformation. We'll use the [`filter`](scala-programming-guide.html#transformations) transformation to return a new RDD with a subset of the items in the file: {% highlight scala %} scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09 {% endhighlight %} -We can chain together transformations and actions: +Now we'll chain together transformations and actions: {% highlight scala %} scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? @@ -54,14 +53,14 @@ res3: Long = 15 {% endhighlight %} ## More on RDD Operations -RDD actions and transformations can be used for more complex computations. Let's say we want to find the line with the most words: +RDD actions and transformations can be used for more complex computations. Let's find the line with the most words: {% highlight scala %} scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 16 {% endhighlight %} -This first maps a line to an integer value, creating a new RDD. `reduce` is called on that RDD to find the largest line count. The arguments to `map` and `reduce` are Scala function literals (closures), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. We'll use `Math.max()` function to make this code easier to understand: +This maps a line to an integer value, creating a new RDD, then `reduce` is called on that RDD to find the largest line count. The arguments to `map` and `reduce` are Scala function literals (closures), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. We'll use the `Math.max()` function to make this code easier to understand: {% highlight scala %} scala> import java.lang.Math @@ -71,14 +70,14 @@ scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b res5: Int = 16 {% endhighlight %} -One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows easily: +One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can easily implement MapReduce flows: {% highlight scala %} scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) wordCounts: spark.RDD[(java.lang.String, Int)] = spark.ShuffledAggregatedRDD@71f027b8 {% endhighlight %} -Here, we combined the [`flatMap`](scala-programming-guide.html#transformations), [`map`](scala-programming-guide.html#transformations) and [`reduceByKey`](scala-programming-guide.html#transformations) transformations to compute the per-word counts in the file as an RDD of (String, Int) pairs. To collect the word counts in our shell, we can use the [`collect`](scala-programming-guide.html#actions) action: +We combined the [`flatMap`](scala-programming-guide.html#transformations), [`map`](scala-programming-guide.html#transformations) and [`reduceByKey`](scala-programming-guide.html#transformations) transformations to compute the per-word counts in the file as an RDD of (String, Int) pairs. To collect the word counts in our shell, we'll use the [`collect`](scala-programming-guide.html#actions) action: {% highlight scala %} scala> wordCounts.collect() @@ -86,7 +85,9 @@ res6: Array[(java.lang.String, Int)] = Array((need,2), ("",43), (Extra,3), (usin {% endhighlight %} ## Caching -Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small "hot" dataset or when running an iterative algorithm like PageRank. As a simple example, let's mark our `linesWithSpark` dataset to be cached: +Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small "hot" dataset or when running an iterative algorithm like PageRank. + +As a simple example, let's cache our `linesWithSpark` dataset: {% highlight scala %} scala> linesWithSpark.cache() @@ -94,17 +95,14 @@ res7: spark.RDD[String] = spark.FilteredRDD@17e51082 scala> linesWithSpark.count() res8: Long = 15 - -scala> linesWithSpark.count() -res9: Long = 15 {% endhighlight %} -It may seem silly to use Spark to explore and cache a 30-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively by connecting `bin/spark-shell` to a cluster, as described in the [programming guide](scala-programming-guide.html#initializing-spark). +It may seem silly to use Spark to explore and cache a 30-line text file. What's interesting is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. You can also do this interactively by connecting `bin/spark-shell` to a cluster, as described in the [Spark Scala Programming Guide](scala-programming-guide.html#initializing-spark). # A Standalone App in Scala -Now say we wanted to write a standalone application using the Spark API. We will walk through a simple application in both Scala (with SBT), Java (with Maven), and Python. If you are using other build systems, consider using the Spark assembly JAR described in the developer guide. +Now we'll write a standalone application using the Spark Scala API. We'll walk through a simple application in Scala (with SBT), Java (with Maven), and Python. If you're using other build systems, consider using the Spark assembly JAR described in the [Linking with Spark](scala-programming-guide.html#linking-with-spark) section of the Spark Scala Programming Guide. -We'll create a very simple Spark application in Scala. So simple, in fact, that it's named `SimpleApp.scala`: +We'll create a very simple Spark application in Scala, so simple it's named `SimpleApp.scala`: {% highlight scala %} /*** SimpleApp.scala ***/ @@ -124,29 +122,29 @@ object SimpleApp { } {% endhighlight %} -This program just counts the number of lines containing 'a' and the number containing 'b' in the Spark README. Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. Unlike the earlier examples with the Spark shell, which initializes its own SparkContext, we initialize a SparkContext as part of the program. We pass the SparkContext constructor four arguments, the type of scheduler we want to use (in this case, a local scheduler), a name for the application, the directory where Spark is installed, and a name for the jar file containing the application's code. The final two arguments are needed in a distributed setting, where Spark is running across several nodes, so we include them for completeness. Spark will automatically ship the jar files you list to slave nodes. +This program counts the number of lines containing 'a' and the number of lines containing 'b' in the Spark README. Note that you'll need to replace $YOUR_SPARK_HOME with an environment variable pointing to your Spark installation directory. Unlike the earlier Spark shell examples, which initialize their own SparkContext, we initialize a SparkContext as part of the program. + +We pass four arguments to the SparkContext constructor: the type of scheduler we want to use (in this case, a local scheduler), a name for the application, the directory where Spark is installed, and a name for the JAR file containing the application's code. -This file depends on the Spark API, so we'll also include an sbt configuration file, `simple.sbt` which explains that Spark is a dependency. This file also adds a repository that Spark depends on: +The final two arguments are needed in a distributed setting, where Spark is running across several nodes, so we're including them for completeness. Spark will automatically ship the listed JAR files to your slave nodes. + +This file depends on the Spark API, so we'll also include the SBT configuration file `simple.sbt` that specifies that Spark is a dependency. This file also adds a repository that Spark depends on: {% highlight scala %} name := "Simple Project" - version := "1.0" - scalaVersion := "{{site.SCALA_VERSION}}" - libraryDependencies += "org.apache.spark" %% "spark-core" % "{{site.SPARK_VERSION}}" - resolvers += "Akka Repository" at "http://repo.akka.io/releases/" {% endhighlight %} -If you also wish to read data from Hadoop's HDFS, you will also need to add a dependency on `hadoop-client` for your version of HDFS: +If you also want to read data from HDFS, you'll need to add a dependency on `hadoop-client` for your version of HDFS: {% highlight scala %} libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "" {% endhighlight %} -Finally, for sbt to work correctly, we'll need to layout `SimpleApp.scala` and `simple.sbt` according to the typical directory structure. Once that is in place, we can create a JAR package containing the application's code, then use `sbt/sbt run` to execute our program. +Finally, for SBT to work correctly, we need to layout `SimpleApp.scala` and `simple.sbt` according to the typical directory structure. Once that is in place, we can create a JAR package containing the application's code, then use `sbt/sbt run` to execute our program: {% highlight bash %} $ find . @@ -164,7 +162,7 @@ Lines with a: 46, Lines with b: 23 {% endhighlight %} # A Standalone App in Java -Now say we wanted to write a standalone application using the Java API. We will walk through doing this with Maven. If you are using other build systems, consider using the Spark assembly JAR described in the developer guide. +Now we'll write a standalone application using the Spark Java API. We'll do this using Maven. If you're using other build systems, consider using the Spark assembly JAR described in the [Linking with Spark](scala-programming-guide.html#linking-with-spark) section of the Spark Scala Programming Guide. We'll create a very simple Spark application, `SimpleApp.java`: @@ -193,9 +191,11 @@ public class SimpleApp { } {% endhighlight %} -This program just counts the number of lines containing 'a' and the number containing 'b' in a text file. Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. As with the Scala example, we initialize a SparkContext, though we use the special `JavaSparkContext` class to get a Java-friendly one. We also create RDDs (represented by `JavaRDD`) and run transformations on them. Finally, we pass functions to Spark by creating classes that extend `spark.api.java.function.Function`. The [Java programming guide](java-programming-guide.html) describes these differences in more detail. +This program counts the number of lines containing 'a' and the number of lines containing 'b' in a text file. Note that you'll need to replace $YOUR_SPARK_HOME with an environment variable that points to your Spark installation directory. As with the Scala example, we initialize a SparkContext, though we use the special `JavaSparkContext` class to get a Java-friendly one. -To build the program, we also write a Maven `pom.xml` file that lists Spark as a dependency. Note that Spark artifacts are tagged with a Scala version. +We also create RDDs (represented by `JavaRDD`) and run transformations on them. Finally, we pass functions to Spark by creating classes that extend `spark.api.java.function.Function`. The [Spark Java Programming Guide](java-programming-guide.html) describes these differences in more detail. + +To build the program, we also write a Maven `pom.xml` file that lists Spark as a dependency. Note that Spark artifacts are tagged with a Scala version: {% highlight xml %} @@ -221,7 +221,7 @@ To build the program, we also write a Maven `pom.xml` file that lists Spark as a {% endhighlight %} -If you also wish to read data from Hadoop's HDFS, you will also need to add a dependency on `hadoop-client` for your version of HDFS: +If you want to read data from HDFS, you'll also need to add a dependency on `hadoop-client` for your version of HDFS: {% highlight xml %} @@ -241,7 +241,7 @@ $ find . ./src/main/java/SimpleApp.java {% endhighlight %} -Now, we can execute the application using Maven: +Now we'll execute the application using Maven: {% highlight bash %} $ mvn package @@ -251,9 +251,7 @@ Lines with a: 46, Lines with b: 23 {% endhighlight %} # A Standalone App in Python -Now we will show how to write a standalone application using the Python API (PySpark). - -As an example, we'll create a simple Spark application, `SimpleApp.py`: +Now we'll write a standalone application using the Spark Python API (PySpark). We'll create a simple Spark application, `SimpleApp.py`: {% highlight python %} """SimpleApp.py""" @@ -269,18 +267,17 @@ numBs = logData.filter(lambda s: 'b' in s).count() print "Lines with a: %i, lines with b: %i" % (numAs, numBs) {% endhighlight %} +This program counts the number of lines containing 'a' and the number of lines containing 'b' in a text file. +Note that you'll need to replace $YOUR_SPARK_HOME with an environment variable pointing to your Spark installation directory. -This program just counts the number of lines containing 'a' and the number containing 'b' in a text file. -Note that you'll need to replace $YOUR_SPARK_HOME with the location where Spark is installed. As with the Scala and Java examples, we use a SparkContext to create RDDs. We can pass Python functions to Spark, which are automatically serialized along with any variables that they reference. -For applications that use custom classes or third-party libraries, we can add those code dependencies to SparkContext to ensure that they will be available on remote machines; this is described in more detail in the [Python programming guide](python-programming-guide.html). -`SimpleApp` is simple enough that we do not need to specify any code dependencies. -We can run this application using the `bin/pyspark` script: +`SimpleApp` is simple enough that we do not need to specify any code dependencies. However, we can add code dependencies to SparkContext for applications that do use custom classes or third-party libraries to ensure that the dependencies will be available on remote machines. This is described in more detail in the [Standalone Programs](python-programming-guide.html#standalone-programs) section of the Spark Python Programming Guide. + +We'll run this application using the `bin/pyspark` script from the top-level Spark directory: {% highlight python %} -$ cd $SPARK_HOME $ ./bin/pyspark SimpleApp.py ... Lines with a: 46, Lines with b: 23 @@ -288,31 +285,30 @@ Lines with a: 46, Lines with b: 23 # Running on a Cluster -There are a few additional considerations when running applicaitons on a +There are a few additional considerations when running applications on a [Spark](spark-standalone.html), [YARN](running-on-yarn.html), or [Mesos](running-on-mesos.html) cluster. ### Including Your Dependencies -If your code depends on other projects, you will need to ensure they are also +If your code depends on other projects, you'll need to ensure they are also present on the slave nodes. A popular approach is to create an -assembly jar (or "uber" jar) containing your code and its dependencies. Both -[sbt](https://github.com/sbt/sbt-assembly) and +assembly JAR (or "uber" JAR) containing your code and its dependencies. [SBT](https://github.com/sbt/sbt-assembly) and [Maven](http://maven.apache.org/plugins/maven-assembly-plugin/) -have assembly plugins. When creating assembly jars, list Spark -itself as a `provided` dependency; it need not be bundled since it is -already present on the slaves. Once you have an assembled jar, -add it to the SparkContext as shown here. It is also possible to add -your dependent jars one-by-one using the `addJar` method of `SparkContext`. +both have assembly plugins. + +When creating assembly JARs, list Spark +itself as a `provided` dependency. It doesn't need to be bundled because it's +already present on the slaves. -For Python, you can use the `pyFiles` argument of SparkContext -or its `addPyFile` method to add `.py`, `.zip` or `.egg` files to be distributed. +For Python, you can add the assembled JAR to the SparkContext using the `pyFiles` argument as shown in the [Standalone Programs](python-programming-guide.html#standalone-programs) section of the Spark Python Programming Guide, or you can add dependent JARs one-by-one using SparkContext's `addJar` method. Likewise you can use SparkContext's `addPyFile` method to add `.py`, `.zip` or `.egg` files for distribution. ### Setting Configuration Options Spark includes several [configuration options](configuration.html#spark-properties) that influence the behavior of your application. -These should be set by building a [SparkConf](api/core/index.html#org.apache.spark.SparkConf) +These are set by building a [SparkConf](api/core/index.html#org.apache.spark.SparkConf) object and passing it to the SparkContext constructor. -For example, in Java and Scala, you can do: + +For example, in Java and Scala: {% highlight scala %} import org.apache.spark.{SparkConf, SparkContext} @@ -323,7 +319,7 @@ val conf = new SparkConf() val sc = new SparkContext(conf) {% endhighlight %} -Or in Python: +In Python: {% highlight scala %} from pyspark import SparkConf, SparkContext @@ -336,7 +332,11 @@ sc = SparkContext(conf = conf) ### Accessing Hadoop Filesystems -The examples here access a local file. To read data from a distributed +The examples in this Quick Start access a local file. To read data from a distributed filesystem, such as HDFS, include -[Hadoop version information](index.html#a-note-about-hadoop-versions) -in your build file. By default, Spark builds against HDFS 1.0.4. +Hadoop version information in your build file as explained in the [Building](index.html#building) section of Getting Started with Apache Spark. +By default, Spark builds against HDFS 1.0.4. + +# Where to Go from Here + +The next step for Scala, Java and Python developers is to go to the [Spark Scala Programming Guide](scala-programming-guide.html) to learn more about Spark's key concepts. From 2e0bb4a5b7e0c36398098ba73b96fc93785b25b2 Mon Sep 17 00:00:00 2001 From: rubarb Date: Thu, 24 Apr 2014 02:40:18 -0700 Subject: [PATCH 5/6] changed links to API docs to match Matei's latest changes --- docs/index.md | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/docs/index.md b/docs/index.md index 424209010348..ecc37d2b9bfc 100644 --- a/docs/index.md +++ b/docs/index.md @@ -133,13 +133,9 @@ Even though the [Spark Programming Quick Start Guide](quick-start.html) and the **API Docs:** -* [Spark for Java/Scala (Scaladoc)](api/core/index.html) -* [Spark for Python (Epydoc)](api/pyspark/index.html) -* [Spark Streaming for Java/Scala (Scaladoc)](api/streaming/index.html) -* [MLlib (Machine Learning) for Java/Scala (Scaladoc)](api/mllib/index.html) -* [Bagel (Pregel on Spark) for Scala (Scaladoc)](api/bagel/index.html) -* [GraphX (Graphs on Spark) for Scala (Scaladoc)](api/graphx/index.html) - +[Spark Scala API (Scaladoc)](api/scala/index.html#org.apache.spark.package) +[Spark Java API (Javadoc)](api/java/index.html) +[Spark Python API (Epydoc)](api/python/index.html) **Deployment Guides:** From 1a9ebeb52f2e0adfb6822455fb71c25e4b77a81b Mon Sep 17 00:00:00 2001 From: rubarb Date: Fri, 25 Apr 2014 00:25:18 -0700 Subject: [PATCH 6/6] edits --- docs/java-programming-guide.md | 149 +++++++-------- docs/python-programming-guide.md | 76 ++++---- docs/scala-programming-guide.md | 305 +++++++++++++++++-------------- 3 files changed, 283 insertions(+), 247 deletions(-) diff --git a/docs/java-programming-guide.md b/docs/java-programming-guide.md index 6632360f6e3c..72b9c5a6685f 100644 --- a/docs/java-programming-guide.md +++ b/docs/java-programming-guide.md @@ -1,68 +1,64 @@ --- layout: global -title: Java Programming Guide +title: Spark Java Programming Guide --- -The Spark Java API exposes all the Spark features available in the Scala version to Java. -To learn the basics of Spark, we recommend reading through the -[Scala programming guide](scala-programming-guide.html) first; it should be -easy to follow even if you don't know Scala. -This guide will show how to use the Spark features described there in Java. +The Spark Java API implements all of the Spark features available in the Spark Scala API. + +This Spark Java Programming Guide shows you how to use Java to implement the Spark features described in the [Spark Scala Programming Guide](scala-programming-guide.html). + +To learn the basics of Spark, we recommend that Java developers first read through the [Spark Scala Programming Guide](scala-programming-guide.html) before reading this manual. The [Spark Scala Programming Guide](scala-programming-guide.html) is easy to follow even if you don't know Scala. The Spark Java API is defined in the -[`org.apache.spark.api.java`](api/core/index.html#org.apache.spark.api.java.package) package, and includes -a [`JavaSparkContext`](api/core/index.html#org.apache.spark.api.java.JavaSparkContext) for -initializing Spark and [`JavaRDD`](api/core/index.html#org.apache.spark.api.java.JavaRDD) classes, -which support the same methods as their Scala counterparts but take Java functions and return -Java data and collection types. The main differences have to do with passing functions to RDD -operations (e.g. map) and handling RDDs of different types, as discussed next. +[`org.apache.spark.api.java`](api/java/index.html?org/apache/spark/api/java/package-summary.html) package, and includes a [`JavaSparkContext`](api/java/index.html?org/apache/spark/api/java/JavaSparkContext.html) for +initializing Spark and [`JavaRDD`](api/java/index.html?org/apache/spark/api/java/JavaRDD.html) classes. + +These classes support the same methods as their Scala counterparts but take Java functions and return Java data and collection types. The main differences have to do with passing functions to RDD operations (*e.g.*, `map`) and handling RDDs of different types, as discussed next. # Key Differences in the Java API There are a few key differences between the Java and Scala APIs: -* Java does not support anonymous or first-class functions, so functions are passed - using anonymous classes that implement the - [`org.apache.spark.api.java.function.Function`](api/core/index.html#org.apache.spark.api.java.function.Function), - [`Function2`](api/core/index.html#org.apache.spark.api.java.function.Function2), etc. - interfaces. -* To maintain type safety, the Java API defines specialized Function and RDD - classes for key-value pairs and doubles. For example, - [`JavaPairRDD`](api/core/index.html#org.apache.spark.api.java.JavaPairRDD) +* Java does not support anonymous or first-class functions, so functions are passed using anonymous classes that implement the + [`org.apache.spark.api.java.function.Function`](api/java/index.html?org/apache/spark/api/java/function/Function.html), + [`Function2`](api/java/index.html?org/apache/spark/api/java/function/Function2.html), etc. interfaces. + +* To maintain type safety, the Java API defines specialized Function and RDD classes for key-value pairs and doubles. For example, + [`JavaPairRDD`](api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html) stores key-value pairs. -* Some methods are defined on the basis of the passed anonymous function's - (a.k.a lambda expression) return type, - for example mapToPair(...) or flatMapToPair returns - [`JavaPairRDD`](api/core/index.html#org.apache.spark.api.java.JavaPairRDD), - similarly mapToDouble and flatMapToDouble returns - [`JavaDoubleRDD`](api/core/index.html#org.apache.spark.api.java.JavaDoubleRDD). -* RDD methods like `collect()` and `countByKey()` return Java collections types, - such as `java.util.List` and `java.util.Map`. + +* Some methods are defined on the basis of the passed function's return type. +For example `mapToPair()` returns +[`JavaPairRDD`](api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html), +and `mapToDouble()` returns +[`JavaDoubleRDD`](api/java/index.html?org/apache/spark/api/java/JavaDoubleRDD.html). + +* RDD methods like `collect()` and `countByKey()` return Java collection types, such as `java.util.List` and `java.util.Map`. + * Key-value pairs, which are simply written as `(key, value)` in Scala, are represented - by the `scala.Tuple2` class, and need to be created using `new Tuple2(key, value)`. +by the `scala.Tuple2` class and need to be created using `new Tuple2(key, value)`. ## RDD Classes -Spark defines additional operations on RDDs of key-value pairs and doubles, such -as `reduceByKey`, `join`, and `stdev`. +Spark defines additional operations on RDDs of key-value pairs and doubles, such as `reduceByKey`, `join`, and `stdev`. In the Scala API, these methods are automatically added using Scala's [implicit conversions](http://www.scala-lang.org/node/130) mechanism. In the Java API, the extra methods are defined in the -[`JavaPairRDD`](api/core/index.html#org.apache.spark.api.java.JavaPairRDD) -and [`JavaDoubleRDD`](api/core/index.html#org.apache.spark.api.java.JavaDoubleRDD) +[`JavaPairRDD`](api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html) +and [`JavaDoubleRDD`](api/java/index.html?org/apache/spark/api/java/JavaDoubleRDD.html) classes. RDD methods like `map` are overloaded by specialized `PairFunction` -and `DoubleFunction` classes, allowing them to return RDDs of the appropriate -types. Common methods like `filter` and `sample` are implemented by -each specialized RDD class, so filtering a `PairRDD` returns a new `PairRDD`, -etc (this acheives the "same-result-type" principle used by the [Scala collections -framework](http://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html)). +and `DoubleFunction` classes, allowing them to return RDDs of the appropriate types. + +Each common method like `filter` and `sample` is implemented by its own specialized RDD class, so filtering a `PairRDD` returns a new `PairRDD`, etc. This achieves the "same-result-type" principle used by the [Scala collections +framework](http://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html). ## Function Interfaces -The following table lists the function interfaces used by the Java API. Each -interface has a single abstract method, `call()`, that must be implemented. +The following table lists the function interfaces used by the Java API, located in the +[`org.apache.spark.api.java.function`](api/java/index.html?org/apache/spark/api/java/function/package-summary.html) +package. Each interface has a single abstract method, `call()`. @@ -80,42 +76,38 @@ interface has a single abstract method, `call()`, that must be implemented. ## Storage Levels -RDD [storage level](scala-programming-guide.html#rdd-persistence) constants, such as `MEMORY_AND_DISK`, are -declared in the [org.apache.spark.api.java.StorageLevels](api/core/index.html#org.apache.spark.api.java.StorageLevels) class. To -define your own storage level, you can use StorageLevels.create(...). +RDD [storage level](scala-programming-guide.html#rdd-persistence) constants like `MEMORY_AND_DISK` are +declared in the [org.apache.spark.api.java.StorageLevels](api/java/index.html?org/apache/spark/api/java/StorageLevels.html) class. To define your own storage level, use StorageLevels.create(...). # Other Features The Java API supports other Spark features, including [accumulators](scala-programming-guide.html#accumulators), [broadcast variables](scala-programming-guide.html#broadcast-variables), and -[caching](scala-programming-guide.html#rdd-persistence). +[caching](scala-programming-guide.html#rdd-persistence) (persistence). # Upgrading From Pre-1.0 Versions of Spark -In version 1.0 of Spark the Java API was refactored to better support Java 8 -lambda expressions. Users upgrading from older versions of Spark should note -the following changes: +In Spark version 1.0 the Java API was refactored to better support Java 8 lambda expressions. Users upgrading from older Spark versions should note the following changes: + +* All `org.apache.spark.api.java.function.*` classes have been changed from abstract classes to interfaces. This means that concrete implementations of these `Function` classes must use `implements` instead of `extends`. -* All `org.apache.spark.api.java.function.*` have been changed from abstract - classes to interfaces. This means that concrete implementations of these - `Function` classes will need to use `implements` rather than `extends`. -* Certain transformation functions now have multiple versions depending - on the return type. In Spark core, the map functions (map, flatMap, - mapPartitons) have type-specific versions, e.g. - [`mapToPair`](api/core/index.html#org.apache.spark.api.java.JavaRDD@mapToPair[K2,V2](f:org.apache.spark.api.java.function.PairFunction[T,K2,V2]):org.apache.spark.api.java.JavaPairRDD[K2,V2]) - and [`mapToDouble`](api/core/index.html#org.apache.spark.api.java.JavaRDD@mapToDouble[R](f:org.apache.spark.api.java.function.DoubleFunction[T]):org.apache.spark.api.java.JavaDoubleRDD). - Spark Streaming also uses the same approach, e.g. [`transformToPair`](api/streaming/index.html#org.apache.spark.streaming.api.java.JavaDStream@transformToPair[K2,V2](transformFunc:org.apache.spark.api.java.function.Function[R,org.apache.spark.api.java.JavaPairRDD[K2,V2]]):org.apache.spark.streaming.api.java.JavaPairDStream[K2,V2]). +* Certain transformation functions now have multiple versions depending on their return types. In Spark core, the map functions (map, flatMap, mapPartitons) have type-specific versions, *e.g.*, +on the return type. In Spark core, the map functions (`map`, `flatMap`, and +`mapPartitons`) have type-specific versions, e.g. +[`mapToPair`](api/java/org/apache/spark/api/java/JavaRDDLike.html#mapToPair(org.apache.spark.api.java.function.PairFunction)) +and [`mapToDouble`](api/java/org/apache/spark/api/java/JavaRDDLike.html#mapToDouble(org.apache.spark.api.java.function.DoubleFunction)). +Spark Streaming also uses the same approach, e.g., [`transformToPair`](api/java/org/apache/spark/streaming/api/java/JavaDStreamLike.html#transformToPair(org.apache.spark.api.java.function.Function)). # Example -As an example, we will implement word count using the Java API. +As an example, we'll implement word count using the Java API. {% highlight java %} import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.*; -JavaSparkContext sc = new JavaSparkContext(...); +JavaSparkContext ctx = new JavaSparkContext(...); JavaRDD lines = ctx.textFile("hdfs://..."); JavaRDD words = lines.flatMap( new FlatMapFunction() { @@ -128,15 +120,13 @@ JavaRDD words = lines.flatMap( The word count program starts by creating a `JavaSparkContext`, which accepts the same parameters as its Scala counterpart. `JavaSparkContext` supports the -same data loading methods as the regular `SparkContext`; here, `textFile` -loads lines from text files stored in HDFS. +same data loading methods as Scala's `SparkContext`. Here, `textFile` loads lines +from text files stored in HDFS. -To split the lines into words, we use `flatMap` to split each line on -whitespace. `flatMap` is passed a `FlatMapFunction` that accepts a string and -returns an `java.lang.Iterable` of strings. +To split the lines into words, we use `flatMap` to split each line on whitespace. `flatMap` is passed a `FlatMapFunction` that accepts a string and +returns a `java.lang.Iterable` of strings. -Here, the `FlatMapFunction` was created inline; another option is to subclass -`FlatMapFunction` and pass an instance to `flatMap`: +Here, the `FlatMapFunction` was created inline. Another option is to subclass `FlatMapFunction` and pass an instance to `flatMap`: {% highlight java %} class Split extends FlatMapFunction { @@ -147,7 +137,7 @@ class Split extends FlatMapFunction { JavaRDD words = lines.flatMap(new Split()); {% endhighlight %} -Java 8+ users can also write the above `FlatMapFunction` in a more concise way using +Java 8+ users can also write the `FlatMapFunction` in a more concise way using a lambda expression: {% highlight java %} @@ -172,7 +162,7 @@ JavaPairRDD ones = words.mapToPair( Note that `mapToPair` was passed a `PairFunction` and returned a `JavaPairRDD`. -To finish the word count program, we will use `reduceByKey` to count the +To finish the word count program, we'll use `reduceByKey` to count the occurrences of each word: {% highlight java %} @@ -202,27 +192,26 @@ JavaPairRDD counts = lines.flatMapToPair( ); {% endhighlight %} -There is no performance difference between these approaches; the choice is +There is no performance difference between these approaches, the choice is just a matter of style. -# Javadoc - -We currently provide documentation for the Java API as Scaladoc, in the -[`org.apache.spark.api.java` package](api/core/index.html#org.apache.spark.api.java.package), because -some of the classes are implemented in Scala. It is important to note that the types and function -definitions show Scala syntax (for example, `def reduce(func: Function2[T, T]): T` instead of -`T reduce(Function2 func)`). In addition, the Scala `trait` modifier is used for Java -interface classes. We hope to generate documentation with Java-style syntax in the future to -avoid these quirks. +# API Docs +[API documentation](api/java/index.html) for Spark in Java is available in Javadoc format. # Where to Go from Here Spark includes several sample programs using the Java API in -[`examples/src/main/java`](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples). You can run them by passing the class name to the +[`examples/src/main/java`](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples), including JavaWordCount.java. You can run them by passing the class name to the `bin/run-example` script included in Spark; for example: - ./bin/run-example org.apache.spark.examples.JavaWordCount + $ ./bin/run-example org.apache.spark.examples.JavaWordCount local README.md + +README.md is a file located in the top-level Spark directory. -Each example program prints usage help when run +Note: Each example program prints usage help when run without any arguments. + +For help on optimizing your programs, the Spark [Configuration](configuration.html) and +[Tuning](tuning.html) Guides provide information on best practices. They are especially important for +making sure that your data is stored in memory in an efficient format. diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md index 39de603b29f8..725eb8ee86b9 100644 --- a/docs/python-programming-guide.md +++ b/docs/python-programming-guide.md @@ -1,14 +1,15 @@ --- layout: global -title: Python Programming Guide +title: Spark Python Programming Guide --- The Spark Python API (PySpark) exposes the Spark programming model to Python. -To learn the basics of Spark, we recommend reading through the -[Scala programming guide](scala-programming-guide.html) first; it should be -easy to follow even if you don't know Scala. -This guide will show how to use the Spark features described there in Python. + +To learn the basics of Spark, we recommend that Python developers first read through the +[Spark Scala Programming Guide](scala-programming-guide.html). It's easy to follow even if you don't know Scala. + +This Spark Python Programming Guide shows you how to use Python to implement the Spark features described the [Spark Scala Programming Guide](scala-programming-guide.html). # Key Differences in the Python API @@ -16,9 +17,11 @@ This guide will show how to use the Spark features described there in Python. There are a few key differences between the Python and Scala APIs: * Python is dynamically typed, so RDDs can hold objects of multiple types. -* PySpark does not yet support a few API calls, such as `lookup` and non-text input files, though these will be added in future releases. -In PySpark, RDDs support the same methods as their Scala counterparts but take Python functions and return Python collection types. +* PySpark does not yet support a few API calls, such as `lookup` and non-text input files, although we plan to add these in future releases. + +In PySpark, RDDs support the same methods as their Scala counterparts, but take Python functions and return Python collection types. + Short functions can be passed to RDD methods using Python's [`lambda`](http://www.diveintopython.net/power_of_introspection/lambda_functions.html) syntax: {% highlight python %} @@ -26,7 +29,7 @@ logData = sc.textFile(logFile).cache() errors = logData.filter(lambda line: "ERROR" in line) {% endhighlight %} -You can also pass functions that are defined with the `def` keyword; this is useful for longer functions that can't be expressed using `lambda`: +You can also pass functions that are defined with the `def` keyword. This is useful for longer functions that can't be expressed using `lambda`: {% highlight python %} def is_error(line): @@ -45,32 +48,29 @@ errors = logData.filter(is_error) PySpark will automatically ship these functions to executors, along with any objects that they reference. Instances of classes will be serialized and shipped to executors by PySpark, but classes themselves cannot be automatically distributed to executors. -The [Standalone Use](#standalone-use) section describes how to ship code dependencies to executors. - -In addition, PySpark fully supports interactive use---simply run `./bin/pyspark` to launch an interactive shell. +The [Standalone Programs](#standalone-programs) section describes how to ship code dependencies to executors. +In addition, PySpark fully supports [interactive use](#interactive-use). # Installing and Configuring PySpark PySpark requires Python 2.6 or higher. -PySpark applications are executed using a standard CPython interpreter in order to support Python modules that use C extensions. -We have not tested PySpark with Python 3 or with alternative Python interpreters, such as [PyPy](http://pypy.org/) or [Jython](http://www.jython.org/). - -By default, PySpark requires `python` to be available on the system `PATH` and use it to run programs; an alternate Python executable may be specified by setting the `PYSPARK_PYTHON` environment variable in `conf/spark-env.sh` (or `.cmd` on Windows). +PySpark applications are executed using a standard CPython interpreter in order to support Python modules that use C extensions. We have not tested PySpark with Python 3 or with alternative Python interpreters, such as [PyPy](http://pypy.org/) or [Jython](http://www.jython.org/). -All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.net/), are bundled with PySpark and automatically imported. +By default, PySpark requires `python` to be available on the system `PATH` and use it to run programs. An alternate Python executable may be specified by setting the `PYSPARK_PYTHON` environment variable in `conf/spark-env.sh` (or `.cmd` on Windows). -Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`. -The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`. +All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.net/), are bundled with PySpark and are automatically imported. +Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Python environment using the settings in `conf/spark-env.sh` or `.cmd`. This script automatically adds the `bin/pyspark` package to the `PYTHONPATH`. # Interactive Use -The `bin/pyspark` script launches a Python interpreter that is configured to run PySpark applications. To use `pyspark` interactively, first build Spark, then launch it directly from the command line without any options: +The `bin/pyspark` script launches a Python interpreter that is configured to run PySpark applications. To use `pyspark` interactively, first [build](index.html#building) Spark, then launch the Python interpreter from the top-level Spark directory without any options: {% highlight bash %} -$ sbt/sbt assembly $ ./bin/pyspark +... +>>> {% endhighlight %} The Python shell can be used explore data interactively and is a simple way to learn the API: @@ -82,9 +82,9 @@ The Python shell can be used explore data interactively and is a simple way to l >>> help(pyspark) # Show all pyspark functions {% endhighlight %} -By default, the `bin/pyspark` shell creates SparkContext that runs applications locally on all of -your machine's logical cores. +By default, the `bin/pyspark` shell creates a SparkContext that runs applications locally on all of your machine's logical cores. To connect to a non-local cluster, or to specify a number of cores, set the `MASTER` environment variable. + For example, to use the `bin/pyspark` shell with a [standalone Spark cluster](spark-standalone.html): {% highlight bash %} @@ -100,9 +100,10 @@ $ MASTER=local[4] ./bin/pyspark ## IPython -It is also possible to launch PySpark in [IPython](http://ipython.org), the -enhanced Python interpreter. PySpark works with IPython 1.0.0 and later. To -use IPython, set the `IPYTHON` variable to `1` when running `bin/pyspark`: +It's also possible to launch PySpark in [IPython](http://ipython.org), the +enhanced Python interpreter. PySpark works with IPython 1.0.0 and later. + +To use IPython, set the `IPYTHON` variable to `1` when running `bin/pyspark`: {% highlight bash %} $ IPYTHON=1 ./bin/pyspark @@ -121,33 +122,33 @@ IPython also works on a cluster or on multiple cores if you set the `MASTER` env # Standalone Programs PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `bin/pyspark`. -The Quick Start guide includes a [complete example](quick-start.html#a-standalone-app-in-python) of a standalone Python application. +The Spark Programming Quick Start Guide includes a [complete example](quick-start.html#a-standalone-app-in-python) of a standalone Python application. Code dependencies can be deployed by listing them in the `pyFiles` option in the SparkContext constructor: {% highlight python %} from pyspark import SparkContext -sc = SparkContext("local", "App Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) +sc = SparkContext("local", "My App", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) {% endhighlight %} Files listed here will be added to the `PYTHONPATH` and shipped to remote worker machines. Code dependencies can be added to an existing SparkContext using its `addPyFile()` method. You can set [configuration properties](configuration.html#spark-properties) by passing a -[SparkConf](api/pyspark/pyspark.conf.SparkConf-class.html) object to SparkContext: +[SparkConf](api/python/pyspark.conf.SparkConf-class.html) object to SparkContext: {% highlight python %} from pyspark import SparkConf, SparkContext conf = (SparkConf() .setMaster("local") - .setAppName("My app") + .setAppName("My App") .set("spark.executor.memory", "1g")) sc = SparkContext(conf = conf) {% endhighlight %} # API Docs -[API documentation](api/pyspark/index.html) for PySpark is available as Epydoc. +[API documentation](api/python/index.html) for PySpark is available as Epydoc. Many of the methods also contain [doctests](http://docs.python.org/2/library/doctest.html) that provide additional usage examples. # Libraries @@ -158,9 +159,16 @@ some example applications. # Where to Go from Here -PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/spark/tree/master/python/examples). -You can run them by passing the files to `pyspark`; e.g.: +You can find some Spark Python examples in the `python/examples` folder](https://github.com/apache/spark/tree/master/python/examples) on GitHub or if you have Spark installed in your `python/examples` directory. + +You can run them by passing the files to `pyspark` from the top-level Spark directory: + + $ ./bin/pyspark python/examples/wordcount.py local README.md + +The README.md file is located in the top-level Spark directory. - ./bin/pyspark python/examples/wordcount.py +Note: Each program prints usage help when run without arguments. -Each program prints usage help when run without arguments. +For help on optimizing your programs, the Spark [Configuration](configuration.html) and +[Tuning](tuning.html) Guides provide information on best practices. They are especially important for +making sure that your data is stored in memory in an efficient format. diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md index 2b0a51e9dfc5..df0feb9f9c7d 100644 --- a/docs/scala-programming-guide.md +++ b/docs/scala-programming-guide.md @@ -1,6 +1,6 @@ --- layout: global -title: Spark Programming Guide +title: Spark Scala Programming Guide --- * This will become a table of contents (this text will be scraped). @@ -9,15 +9,27 @@ title: Spark Programming Guide # Overview -At a high level, every Spark application consists of a *driver program* that runs the user's `main` function and executes various *parallel operations* on a cluster. The main abstraction Spark provides is a *resilient distributed dataset* (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to *persist* an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures. +At a high level, every Spark application consists of a *driver program* that runs the user's `main` function and executes various *parallel operations* on a cluster. -A second abstraction in Spark is *shared variables* that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: *broadcast variables*, which can be used to cache a value in memory on all nodes, and *accumulators*, which are variables that are only "added" to, such as counters and sums. +The main abstraction Spark provides is a *resilient distributed dataset* (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. -This guide shows each of these features and walks through some samples. It assumes some familiarity with Scala, especially with the syntax for [closures](http://www.scala-lang.org/node/133). Note that you can also run Spark interactively using the `bin/spark-shell` script. We highly recommend doing that to follow along! +Users may also ask Spark to *persist* an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures. + +A second abstraction in Spark is *shared variables* that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. + +Spark supports two types of shared variables: *broadcast variables*, which can be used to cache a value in memory on all nodes, and *accumulators*, variables that are only "added" to, such as counters and sums. + +This guide shows each of these features and walks through some samples. It assumes some familiarity with Scala, especially with the syntax for [closures](http://www.scala-lang.org/node/133). + +You can run Spark interactively using the `bin/spark-shell` script. We highly recommend doing that to follow along! + + $ ./bin/spark-shell + ... + scala> # Linking with Spark -Spark {{site.SPARK_VERSION}} uses Scala {{site.SCALA_BINARY_VERSION}}. If you write applications in Scala, you will need to use a compatible Scala version (e.g. {{site.SCALA_BINARY_VERSION}}.X) -- newer major versions may not work. +Spark {{site.SPARK_VERSION}} uses Scala {{site.SCALA_BINARY_VERSION}}. If you write applications in Scala, you will need to use a compatible Scala version (*e.g.*, {{site.SCALA_BINARY_VERSION}}.x) -- newer major versions may not work. To write a Spark application, you need to add a dependency on Spark. If you use SBT or Maven, Spark is available through Maven Central at: @@ -25,42 +37,47 @@ To write a Spark application, you need to add a dependency on Spark. If you use artifactId = spark-core_{{site.SCALA_BINARY_VERSION}} version = {{site.SPARK_VERSION}} -In addition, if you wish to access an HDFS cluster, you need to add a dependency on `hadoop-client` for your version of HDFS: +In addition, if you wish to access an HDFS cluster, you need to add a dependency on `hadoop-client` for your version of HDFS. Some common HDFS version tags are listed on the +[third party distributions](hadoop-third-party-distributions.html) page. groupId = org.apache.hadoop artifactId = hadoop-client version = -For other build systems, you can run `sbt/sbt assembly` to pack Spark and its dependencies into one JAR (`assembly/target/scala-{{site.SCALA_BINARY_VERSION}}/spark-assembly-{{site.SPARK_VERSION}}-hadoop*.jar`), then add this to your CLASSPATH. Set the HDFS version as described [here](index.html#a-note-about-hadoop-versions). - Finally, you need to import some Spark classes and implicit conversions into your program. Add the following lines: {% highlight scala %} import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ +import org.apache.spark.SparkConf {% endhighlight %} # Initializing Spark The first thing a Spark program must do is to create a `SparkContext` object, which tells Spark how to access a cluster. -This is done through the following constructor: +To create a `SparkContext` you first need to build a `SparkConf` object +that contains information about your application. {% highlight scala %} -new SparkContext(master, appName, [sparkHome], [jars]) +val conf = new SparkConf().setAppName().setMaster() +new SparkContext(conf) {% endhighlight %} -or through `new SparkContext(conf)`, which takes a [SparkConf](api/core/index.html#org.apache.spark.SparkConf) -object for more advanced configuration. +The `` parameter is a string specifying a [Spark, Mesos or YARN cluster URL](#master-urls) to connect to, or a special "local" string to +run in local mode, as described below. `` is a name for your application, which will be shown in the cluster web UI. +It's also possible to set these variables [using a configuration file](cluster-overview.html#loading-configurations-from-a-file) which avoids hard-coding the master name in your application. -The `master` parameter is a string specifying a [Spark or Mesos cluster URL](#master-urls) to connect to, or a special "local" string to run in local mode, as described below. `appName` is a name for your application, which will be shown in the cluster web UI. Finally, the last two parameters are needed to deploy your code to a cluster if running in distributed mode, as described later. +In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own +SparkContext will not work. You can set which master the context connects to using the `MASTER` environment variable, and you can add JARs to the +classpath with the `ADD_JARS` variable. -In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `MASTER` environment variable, and you can add JARs to the classpath with the `ADD_JARS` variable. For example, to run `bin/spark-shell` on exactly four cores, use +For example, to run `bin/spark-shell` on exactly four cores: {% highlight bash %} $ MASTER=local[4] ./bin/spark-shell {% endhighlight %} -Or, to also add `code.jar` to its classpath, use: +To add `code.jar` to its classpath, use: {% highlight bash %} $ MASTER=local[4] ADD_JARS=code.jar ./bin/spark-shell @@ -72,39 +89,37 @@ The master URL passed to Spark can be in one of the following formats:
ClassFunction Type
- + + +
Master URLMeaning
local Run Spark locally with one worker thread (i.e. no parallelism at all).
local Run Spark locally with one worker thread (*i.e.*, no parallelism at all).
local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
local[*] Run Spark locally with as many worker threads as logical cores on your machine.
spark://HOST:PORT Connect to the given Spark standalone - cluster master. The port must be whichever one your master is configured to use, which is 7077 by default. + cluster master. The port must be the one your master is configured to use, which is 7077 by default.
mesos://HOST:PORT Connect to the given Mesos cluster. - The host parameter is the hostname of the Mesos master. The port must be whichever one the master is configured to use, + The host parameter is the hostname of the Mesos master. The port must be the one the master is configured to use, which is 5050 by default.
yarn-client Connect to a YARN cluster in client mode. The cluster location will be inferred based on the local Hadoop configuration. +
yarn-cluster Connect to a YARN cluster in +cluster mode. The cluster location will be inferred based on the local Hadoop configuration. +
-If no master URL is specified, the spark shell defaults to "local[*]". - -For running on YARN, Spark launches an instance of the standalone deploy cluster within YARN; see [running on YARN](running-on-yarn.html) for details. - -### Deploying Code on a Cluster - -If you want to run your application on a cluster, you will need to specify the two optional parameters to `SparkContext` to let it find your code: - -* `sparkHome`: The path at which Spark is installed on your worker machines (it should be the same on all of them). -* `jars`: A list of JAR files on the local machine containing your application's code and any dependencies, which Spark will deploy to all the worker nodes. You'll need to package your application into a set of JARs using your build system. For example, if you're using SBT, the [sbt-assembly](https://github.com/sbt/sbt-assembly) plugin is a good way to make a single JAR with your code and dependencies. - -If you run `bin/spark-shell` on a cluster, you can add JARs to it by specifying the `ADD_JARS` environment variable before you launch it. This variable should contain a comma-separated list of JARs. For example, `ADD_JARS=a.jar,b.jar ./bin/spark-shell` will launch a shell with `a.jar` and `b.jar` on its classpath. In addition, any new classes you define in the shell will automatically be distributed. +If no master URL is specified, the Spark shell defaults to "local[*]". # Resilient Distributed Datasets (RDDs) -Spark revolves around the concept of a _resilient distributed dataset_ (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are currently two types of RDDs: *parallelized collections*, which take an existing Scala collection and run functions on it in parallel, and *Hadoop datasets*, which run functions on each record of a file in Hadoop distributed file system or any other storage system supported by Hadoop. Both types of RDDs can be operated on through the same methods. +Spark revolves around the concept of a _resilient distributed dataset_ (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. + +There are currently two types of RDDs: *parallelized collections*, which take an existing Scala collection and run functions on it in parallel, and *Hadoop datasets*, which run functions on each record of a file in a Hadoop distributed file system or any other storage system supported by Hadoop. Both types of RDDs can be operated on through the same methods. ## Parallelized Collections -Parallelized collections are created by calling `SparkContext`'s `parallelize` method on an existing Scala collection (a `Seq` object). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is some interpreter output showing how to create a parallel collection from an array: +Parallelized collections are created by calling the `SparkContext` `parallelize` method on an existing Scala collection (a `Seq` object). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. + +For example, here is some interpreter output showing how to create a parallel collection from an array: {% highlight scala %} scala> val data = Array(1, 2, 3, 4, 5) @@ -114,38 +129,63 @@ scala> val distData = sc.parallelize(data) distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e {% endhighlight %} -Once created, the distributed dataset (`distData` here) can be operated on in parallel. For example, we might call `distData.reduce(_ + _)` to add up the elements of the array. We describe operations on distributed datasets later on. +Once created, the distributed dataset (`distData` here) can be operated on in parallel. For example, we might call `distData.reduce(_ + _)` to add the elements of the array: + +{% highlight scala %} +scala> distData.reduce(_ + _) +... +res0: Int = 15 +{% endhighlight %} + +We further describe operations on distributed datasets later on. + +One important parameter for parallel collections is the number of *slices* to cut the dataset into. Spark will run one task for each slice of the cluster. Typically you want 2-4 slices for each CPU in your cluster. -One important parameter for parallel collections is the number of *slices* to cut the dataset into. Spark will run one task for each slice of the cluster. Typically you want 2-4 slices for each CPU in your cluster. Normally, Spark tries to set the number of slices automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to `parallelize` (e.g. `sc.parallelize(data, 10)`). +Normally Spark tries to set the number of slices automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to `parallelize` (*e.g.*, `sc.parallelize(data, 10)`). ## Hadoop Datasets Spark can create distributed datasets from any file stored in the Hadoop distributed file system (HDFS) or other storage systems supported by Hadoop (including your local file system, [Amazon S3](http://wiki.apache.org/hadoop/AmazonS3), Hypertable, HBase, etc). Spark supports text files, [SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), and any other Hadoop InputFormat. -Text file RDDs can be created using `SparkContext`'s `textFile` method. This method takes an URI for the file (either a local path on the machine, or a `hdfs://`, `s3n://`, `kfs://`, etc URI). Here is an example invocation: +Text file RDDs can be created using the `SparkContext` `textFile` method. This method takes a URI for the file (either a local path on the machine or a URI like `hdfs://`, `s3n://`, `kfs://`, etc.). + +Here is an example invocation: {% highlight scala %} scala> val distFile = sc.textFile("data.txt") distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08 {% endhighlight %} -Once created, `distFile` can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the `map` and `reduce` operations as follows: `distFile.map(_.size).reduce(_ + _)`. +Once created, `distFile` can be acted on by dataset operations. For example, we can add the sizes of all the lines using the `map` and `reduce` operations as follows: `distFile.map(_.size).reduce(_ + _)`. The `textFile` method also takes an optional second argument for controlling the number of slices of the file. By default, Spark creates one slice for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of slices by passing a larger value. Note that you cannot have fewer slices than blocks. -For [SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), use SparkContext's `sequenceFile[K, V]` method where `K` and `V` are the types of key and values in the file. These should be subclasses of Hadoop's [Writable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Writable.html) interface, like [IntWritable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/IntWritable.html) and [Text](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Text.html). In addition, Spark allows you to specify native types for a few common Writables; for example, `sequenceFile[Int, String]` will automatically read IntWritables and Texts. +For [SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), use SparkContext's `sequenceFile[K, V]` method where `K` and `V` are the types of key and values in the file. These should be subclasses of Hadoop's [Writable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Writable.html) interface, like [IntWritable](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/IntWritable.html) and [Text](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Text.html). In addition, Spark allows you to specify native types for a few common Writables. For example, `sequenceFile[Int, String]` will automatically read IntWritables and Texts. Finally, for other Hadoop InputFormats, you can use the `SparkContext.hadoopRDD` method, which takes an arbitrary `JobConf` and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. ## RDD Operations -RDDs support two types of operations: *transformations*, which create a new dataset from an existing one, and *actions*, which return a value to the driver program after running a computation on the dataset. For example, `map` is a transformation that passes each dataset element through a function and returns a new distributed dataset representing the results. On the other hand, `reduce` is an action that aggregates all the elements of the dataset using some function and returns the final result to the driver program (although there is also a parallel `reduceByKey` that returns a distributed dataset). +RDDs support two types of operations: *transformations*, which create a new dataset from an existing one, and *actions*, which return a value to the driver program after running a computation on the dataset. + +For example, `map` is a transformation that passes each dataset element through a function and returns a new + distributed dataset representing the results. On the other hand, `reduce` is an action that aggregates all the elements of the dataset using some function and returns the final result to the driver program (although there is also a parallel `reduceByKey` that returns a distributed dataset). + +All transformations in Spark are *lazy*, in that they do not compute their results right away. Instead, they + just remember the transformations applied to some base dataset (*e.g.*, a file). The transformations are only + computed when an action requires a result to be returned to the driver program. -All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently -- for example, we can realize that a dataset created through `map` will be used in a `reduce` and return only the result of the `reduce` to the driver, rather than the larger mapped dataset. +This design enables Spark to run more efficiently. For example, we can realize that a dataset created through + `map` will be used in a `reduce` and return only the result of the `reduce` to the driver, rather than the + larger mapped dataset. -By default, each transformed RDD is recomputed each time you run an action on it. However, you may also *persist* an RDD in memory using the `persist` (or `cache`) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting datasets on disk, or replicated across the cluster. The next section in this document describes these options. +By default, each transformed RDD is recomputed each time you run an action on it. However, you may also + *persist* an RDD in memory using the `persist` (or `cache`) method, in which case Spark will keep the + elements around on the cluster for much faster access the next time you query it. -The following tables list the transformations and actions currently supported (see also the [RDD API doc](api/core/index.html#org.apache.spark.rdd.RDD) for details): +There is also support for persisting datasets on disk, or replicated across the cluster. The next section in this document describes these options. + +The following tables list the transformations and actions currently supported (see also the [RDD API doc](api/scala/index.html#org.apache.spark.rdd.RDD) for details): ### Transformations @@ -161,7 +201,8 @@ The following tables list the transformations and actions currently supported (s flatMap(func) - Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). + Similar to map, but each input item can be mapped to 0 or more output items (so func should + return a Seq rather than a single item). mapPartitions(func) @@ -171,34 +212,39 @@ The following tables list the transformations and actions currently supported (s mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of - the partition, so func must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T. + the partition, so func must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of + type T. sample(withReplacement, fraction, seed) - Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. + Sample a fraction fraction of the data, with or without replacement, using a given random number + generator seed. union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument. - distinct([numTasks])) + distinct([numTasks]) Return a new dataset that contains the distinct elements of the source dataset. groupByKey([numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs.
-Note: By default, if the RDD already has a partitioner, the task number is decided by the partition number of the partitioner, or else relies on the value of spark.default.parallelism if the property is set , otherwise depends on the partition number of the RDD. You can pass an optional numTasks argument to set a different number of tasks. +Note: By default, if the RDD already has a partitioner, the task number is decided by the partition + number of the partitioner, or else relies on the value of spark.default.parallelism if that + property is set, otherwise it depends on the partition number of the RDD. You can pass an optional numTasks argument to set a different number of tasks. reduceByKey(func, [numTasks]) - When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. + When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function. As with groupByKey, the number of reduce tasks is configurable through an optional second argument. sortByKey([ascending], [numTasks]) - When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument. + When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs + sorted by keys in ascending or descending order, as specified in the boolean ascending argument. join(otherDataset, [numTasks]) @@ -214,7 +260,7 @@ The following tables list the transformations and actions currently supported (s -A complete list of transformations is available in the [RDD API doc](api/core/index.html#org.apache.spark.rdd.RDD). +A complete list of transformations is available in the [RDD API doc](api/scala/index.html#org.apache.spark.rdd.RDD). ### Actions @@ -222,11 +268,13 @@ A complete list of transformations is available in the [RDD API doc](api/core/in ActionMeaning reduce(func) - Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. + Aggregate the elements of the dataset using a function func (which takes two arguments and + returns one). The function should be commutative and associative so that it can be computed correctly in parallel. collect() - Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data. + Return all the elements of the dataset to the driver program as an array. This is usually useful after +a filter or other operation that returns a sufficiently small subset of the data. count() @@ -246,15 +294,16 @@ A complete list of transformations is available in the [RDD API doc](api/core/in saveAsTextFile(path) - Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. + Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS, or any other Hadoop-supported file system. Spark will call toString on each element + to convert it to a line of text in the file. saveAsSequenceFile(path) - Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is only available on RDDs of key-value pairs that either implement Hadoop's Writable interface or are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). + Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is only available on RDDs of key-value pairs that either implement Hadoop's Writable interface or are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc.). countByKey() - Only available on RDDs of type (K, V). Returns a `Map` of (K, Int) pairs with the count of each key. + Only available on RDDs of type (K, V). Returns a Map of (K, Int) pairs with the count of each key. foreach(func) @@ -262,60 +311,62 @@ A complete list of transformations is available in the [RDD API doc](api/core/in -A complete list of actions is available in the [RDD API doc](api/core/index.html#org.apache.spark.rdd.RDD). +A complete list of actions is available in the [RDD API doc](api/scala/index.html#org.apache.spark.rdd.RDD). ## RDD Persistence One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory -across operations. When you persist an RDD, each node stores any slices of it that it computes in -memory and reuses them in other actions on that dataset (or datasets derived from it). This allows +across operations. When you persist an RDD, each node stores any slices of the RDD that it computes in +memory and reuses the slices in other actions on that dataset (or datasets derived from it). + +This allows future actions to be much faster (often by more than 10x). Caching is a key tool for building iterative algorithms with Spark and for interactive use from the interpreter. You can mark an RDD to be persisted using the `persist()` or `cache()` methods on it. The first time -it is computed in an action, it will be kept in memory on the nodes. The cache is fault-tolerant -- -if any partition of an RDD is lost, it will automatically be recomputed using the transformations -that originally created it. - -In addition, each RDD can be stored using a different *storage level*, allowing you, for example, to -persist the dataset on disk, or persist it in memory but as serialized Java objects (to save space), -or replicate it across nodes, or store the data in off-heap memory in [Tachyon](http://tachyon-project.org/). -These levels are chosen by passing a -[`org.apache.spark.storage.StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel) -object to `persist()`. The `cache()` method is a shorthand for using the default storage level, -which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The complete set of -available storage levels is: +it is computed in an action it will be kept in memory on the nodes. The cache is fault-tolerant. If any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it. + +In addition, each RDD can be stored using a different *storage level*, allowing you, for example, to: + +* persist the dataset on disk + +* persist it in memory but as serialized Java objects (to save space) + +* replicate it across nodes, or + +* store the data in off-heap memory in [Tachyon](http://tachyon-project.org/) + +These levels are chosen by passing an +[`org.apache.spark.storage.StorageLevel`](api/scala/index.html#org.apache.spark.storage.StorageLevel) +object to `persist()`. The `cache()` method is shorthand for using the default storage level, +which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). + +The complete set of available storage levels is: - + - + - + recomputing them on the fly each time they're needed. - @@ -331,58 +382,49 @@ available storage levels is: ### Which Storage Level to Choose? Spark's storage levels are meant to provide different trade-offs between memory usage and CPU -efficiency. It allows uses to choose memory, disk, or Tachyon for storing data. We recommend going -through the following process to select one: +efficiency. They allow uses to choose memory, disk, or Tachyon for storing data. + +We recommend going through the following process to select one: -* If your RDDs fit comfortably with the default storage level (`MEMORY_ONLY`), leave them that way. - This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible. +* If your RDDs fit comfortably with the default storage level (`MEMORY_ONLY`), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible. -* If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization library](tuning.html) to -make the objects much more space-efficient, but still reasonably fast to access. You can also use -`OFF_HEAP` mode to store the data off the heap in [Tachyon](http://tachyon-project.org/). This will -significantly reduce JVM GC overhead. +* If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization library](tuning.html) to make the objects much more space-efficient, but still reasonably fast to access. You can also use `OFF_HEAP` mode to store the data off the heap in [Tachyon](http://tachyon-project.org/). This will significantly reduce JVM GC overhead. -* Don't spill to disk unless the functions that computed your datasets are expensive, or they filter -a large amount of the data. Otherwise, recomputing a partition is about as fast as reading it from -disk. +* Don't spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition is about as fast as reading it from disk. -* Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve -requests from a web application). *All* the storage levels provide full fault tolerance by -recomputing lost data, but the replicated ones let you continue running tasks on the RDD without -waiting to recompute a lost partition. +* Use the replicated storage levels if you want fast fault recovery (*e.g.*, if using Spark to serve requests +from a web application). *All* the storage levels provide full fault tolerance by recomputing lost data, but +the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition. -If you want to define your own storage level (say, with replication factor of 3 instead of 2), then +If you want to define your own storage level (say, with a replication factor of 3 instead of 2), then use the function factor method `apply()` of the -[`StorageLevel`](api/core/index.html#org.apache.spark.storage.StorageLevel$) singleton object. +[`StorageLevel`](api/scala/index.html#org.apache.spark.storage.StorageLevel$) singleton object. -Spark has a block manager inside the Executors that let you chose memory, disk, or off-heap. The -latter is for storing RDDs off-heap outside the Executor JVM on top of the memory management system -[Tachyon](http://tachyon-project.org/). This mode has the following advantages: +Spark has a block manager inside the Executors that let you chose memory, disk, or off-heap. The latter is for storing RDDs off-heap outside the Executor JVM on top of the memory management system +[Tachyon](http://tachyon-project.org/). + +This mode has the following advantages: * Cached data will not be lost if individual executors crash. -* Executors can have a smaller memory footprint, allowing you to run more executors on the same -machine as the bulk of the memory will be inside Tachyon. + +* Executors can have a smaller memory footprint, allowing you to run more executors on the same machine as the bulk of the memory will be inside Tachyon. + * Reduced GC overhead since data is stored in Tachyon. # Shared Variables -Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a -remote cluster node, it works on separate copies of all the variables used in the function. These -variables are copied to each machine, and no updates to the variables on the remote machine are -propagated back to the driver program. Supporting general, read-write shared variables across tasks -would be inefficient. However, Spark does provide two limited types of *shared variables* for two -common usage patterns: broadcast variables and accumulators. +Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are +propagated back to the driver program. + +Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of *shared variables* for two common usage patterns: broadcast variables and accumulators. ## Broadcast Variables -Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather -than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a -large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables -using efficient broadcast algorithms to reduce communication cost. +Broadcast variables allow you to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost. + +Broadcast variables are created from a variable `v` by calling `SparkContext.broadcast(v)`. The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `value` method. -Broadcast variables are created from a variable `v` by calling `SparkContext.broadcast(v)`. The -broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `value` -method. The interpreter session below shows this: +The interpreter session below shows this: {% highlight scala %} scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) @@ -392,23 +434,18 @@ scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3) {% endhighlight %} -After the broadcast variable is created, it should be used instead of the value `v` in any functions -run on the cluster so that `v` is not shipped to the nodes more than once. In addition, the object -`v` should not be modified after it is broadcast in order to ensure that all nodes get the same -value of the broadcast variable (e.g. if the variable is shipped to a new node later). +After the broadcast variable is created, it should be used instead of the value `v` in any functions run on the cluster so that `v` is not shipped to the nodes more than once. + +In addition, the object `v` should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (*e.g.*, if the variable is shipped to a new node later). ## Accumulators -Accumulators are variables that are only "added" to through an associative operation and can -therefore be efficiently supported in parallel. They can be used to implement counters (as in -MapReduce) or sums. Spark natively supports accumulators of numeric value types and standard mutable -collections, and programmers can add support for new types. +Accumulators are variables that are only "added" to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric value types and standard mutable collections, and you can add support for new types. -An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks -running on the cluster can then add to it using the `+=` operator. However, they cannot read its -value. Only the driver program can read the accumulator's value, using its `value` method. +An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks +running on the cluster can then add to it using the `+=` operator. However, they cannot read its value. Only the driver program can read the accumulator's value, using its `value` method. -The interpreter session below shows an accumulator being used to add up the elements of an array: +This interpreter session shows an accumulator being used to add the elements of an array: {% highlight scala %} scala> val accum = sc.accumulator(0) @@ -426,12 +463,14 @@ res2: Int = 10 # Where to Go from Here You can see some [example Spark programs](http://spark.apache.org/examples.html) on the Spark website. -In addition, Spark includes several samples in `examples/src/main/scala`. Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what had to be changed to make the program run on a cluster. You can run them using by passing the class name to the `bin/run-example` script included in Spark; for example: +In addition, Spark includes several samples in `examples/src/main/scala`. Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what had to be changed to make the program run on a cluster. + +You can run them by passing the class name to the `bin/run-example` script included in Spark, for example: - ./bin/run-example org.apache.spark.examples.SparkPi + $ ./bin/run-example org.apache.spark.examples.SparkPi local -Each example program prints usage help when run without any arguments. +Note: Each example program prints usage help when run without any arguments. -For help on optimizing your program, the [configuration](configuration.html) and -[tuning](tuning.html) guides provide information on best practices. They are especially important for +For help on optimizing your programs, the Spark [Configuration](configuration.html) and +[Tuning](tuning.html) Guides provide information on best practices. They are especially important for making sure that your data is stored in memory in an efficient format.
Storage LevelMeaning
MEMORY_ONLY Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will - not be cached and will be recomputed on the fly each time they're needed. This is the default level. Store the RDD partitions as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISK Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the - partitions that don't fit on disk, and read them from there when they're needed. Store the RDD partitions as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from disk when they're needed.
MEMORY_ONLY_SER Store RDD as serialized Java objects (one byte array per partition). - This is generally more space-efficient than deserialized objects, especially when using a - fast serializer, but more CPU-intensive to read. + Store the RDD partitions as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast + serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of - recomputing them on the fly each time they're needed.
OFF_HEAP Store RDD in a serialized format in Tachyon. - This is generally more space-efficient than deserialized objects, especially when using a - fast serializer, but more CPU-intensive to read. - This also significantly reduces the overheads of GC. + Store the RDD partitions in a serialized format in Tachyon. This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read. This also significantly reduces the overheads of GC.