Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val conf = new SparkConf().setAppName("Simple Application").setIfMissing("spark.master", "local[2]")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
Expand Down Expand Up @@ -313,7 +313,7 @@ import org.apache.spark.api.java.function.Function;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkConf conf = new SparkConf().setAppName("Simple Application");
SparkConf conf = new SparkConf().setAppName("Simple Application").setIfMissing("spark.master", "local[2]")
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logData = sc.textFile(logFile).cache();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public static void main(String[] args) {
System.exit(1);
}

SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR");
SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR")
.setIfMissing("spark.master", "local[2]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.spark.api.java.function.PairFunction;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -101,7 +100,8 @@ public static Stats extractStats(String line) {

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf().setAppName("JavaLogQuery");
SparkConf sparkConf = new SparkConf().setAppName("JavaLogQuery")
.setIfMissing("spark.master", "local[2]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

JavaRDD<String> dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs);
Expand Down
19 changes: 10 additions & 9 deletions examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
import java.util.regex.Pattern;

/**
Expand Down Expand Up @@ -62,7 +61,8 @@ public static void main(String[] args) throws Exception {
System.exit(1);
}

SparkConf sparkConf = new SparkConf().setAppName("JavaPageRank");
SparkConf sparkConf = new SparkConf().setAppName("JavaPageRank")
.setIfMissing("spark.master", "local[2]");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);

// Loads in input file. It should be in format of:
Expand All @@ -73,13 +73,14 @@ public static void main(String[] args) throws Exception {
JavaRDD<String> lines = ctx.textFile(args[0], 1);

// Loads all URLs from input file and initialize their neighbors.
JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
String[] parts = SPACES.split(s);
return new Tuple2<String, String>(parts[0], parts[1]);
}
}).distinct().groupByKey().cache();
JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(
new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
String[] parts = SPACES.split(s);
return new Tuple2<String, String>(parts[0], parts[1]);
}
}).distinct().groupByKey().cache();

// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
JavaPairRDD<String, Double> ranks = links.mapValues(new Function<Iterable<String>, Double>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
* Usage: JavaSparkPi [slices]
*/
public final class JavaSparkPi {


public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi")
.setIfMissing("spark.master", "local[2]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
Expand Down
3 changes: 2 additions & 1 deletion examples/src/main/java/org/apache/spark/examples/JavaTC.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, Integer>> t
}

public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR");
SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR")
.setIfMissing("spark.master", "local[2]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Integer slices = (args.length > 0) ? Integer.parseInt(args[0]): 2;
JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache();
Expand Down
29 changes: 16 additions & 13 deletions examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public static void main(String[] args) throws Exception {
System.exit(1);
}

SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount")
.setIfMissing("spark.master", "local[2]");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile(args[0], 1);

Expand All @@ -51,19 +52,21 @@ public Iterable<String> call(String s) {
}
});

JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});

JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});

List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public static void main(String[] args) {
"Usage: JavaALS <ratings_file> <rank> <iterations> <output_dir> [<blocks>]");
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaALS");
SparkConf sparkConf = new SparkConf().setAppName("JavaALS")
.setIfMissing("spark.master", "local[2]");
int rank = Integer.parseInt(args[1]);
int iterations = Integer.parseInt(args[2]);
String outputDir = args[3];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public static void main(String[] args) {
if (args.length >= 4) {
runs = Integer.parseInt(args[3]);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaKMeans");
SparkConf sparkConf = new SparkConf().setAppName("JavaKMeans")
.setIfMissing("spark.master", "local[2]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = sc.textFile(inputFile);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public static void main(String[] args) {
System.err.println("Usage: JavaLR <input_dir> <step_size> <niters>");
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaLR");
SparkConf sparkConf = new SparkConf().setAppName("JavaLR")
.setIfMissing("spark.master", "local[2]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ public void setAge(int age) {
}

public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL");
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL")
.setIfMissing("spark.master", "local[2]");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);

// Load a text file and convert each line to a Java Bean.
JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map(
new Function<String, Person>() {
@Override
public Person call(String line) throws Exception {
String[] parts = line.split(",");

Expand All @@ -80,6 +82,7 @@ public Person call(String line) throws Exception {
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0);
}
Expand All @@ -88,12 +91,13 @@ public String call(Row row) {
// JavaSchemaRDDs can be saved as parquet files, maintaining the schema information.
schemaPeople.saveAsParquetFile("people.parquet");

// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
// Read in the parquet file created above. Parquet files are self-describing so the schema
// is preserved. The result of loading a parquet file is also a JavaSchemaRDD.
JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile");
JavaSchemaRDD teenagers2 = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
JavaSchemaRDD teenagers2 =
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@
* Custom Receiver that receives data over a socket. Received bytes is interpreted as
* text and \n delimited lines are considered as records. They are then counted and printed.
*
* Usage: JavaCustomReceiver <master> <hostname> <port>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
* Usage: JavaCustomReceiver <hostname> <port>
* <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
Expand All @@ -64,13 +63,14 @@ public static void main(String[] args) {
StreamingExamples.setStreamingLogLevels();

// Create the context with a 1 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver");
SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver")
.setIfMissing("spark.master", "local[2]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));

// Create a input stream with the custom receiver on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
JavaReceiverInputDStream<String> lines = ssc.receiverStream(
new JavaCustomReceiver(args[1], Integer.parseInt(args[2])));
new JavaCustomReceiver(args[0], Integer.parseInt(args[1])));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.flume.FlumeUtils;
Expand Down Expand Up @@ -56,9 +55,11 @@ public static void main(String[] args) {
int port = Integer.parseInt(args[1]);

Duration batchInterval = new Duration(2000);
SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount")
.setIfMissing("spark.master", "local[2]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
FlumeUtils.createStream(ssc, host, port);

flumeStream.count();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
Expand Down Expand Up @@ -65,8 +64,9 @@ public static void main(String[] args) {
}

StreamingExamples.setStreamingLogLevels();
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
// Create the context with a 1 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount")
.setIfMissing("spark.master", "local[2]");
// Create the context with a 2 second batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

int numThreads = Integer.parseInt(args[3]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public static void main(String[] args) {
StreamingExamples.setStreamingLogLevels();

// Create the context with a 1 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount")
.setIfMissing("spark.master", "local[2]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));

// Create a JavaReceiverInputDStream on target ip:port and count the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
Expand All @@ -43,7 +42,8 @@ private JavaQueueStream() {
public static void main(String[] args) throws Exception {

StreamingExamples.setStreamingLogLevels();
SparkConf sparkConf = new SparkConf().setAppName("JavaQueueStream");
SparkConf sparkConf = new SparkConf().setAppName("JavaQueueStream")
.setIfMissing("spark.master", "local[2]");

// Create the context
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ object BroadcastTest {
System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName +
"BroadcastFactory")
System.setProperty("spark.broadcast.blockSize", blockSize)
val sparkConf = new SparkConf().setAppName("Broadcast Test")
val sparkConf = new SparkConf().setAppName("BroadcastTest")
.setIfMissing("spark.master", "local[2]")

val sc = new SparkContext(sparkConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ import org.apache.spark.SparkContext._
object CassandraCQLTest {

def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("CQLTestApp")
val sparkConf = new SparkConf().setAppName("CassandraCQLTest")
.setIfMissing("spark.master", "local[2]")

val sc = new SparkContext(sparkConf)
val cHost: String = args(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ import org.apache.spark.SparkContext._
object CassandraTest {

def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("casDemo")
val sparkConf = new SparkConf().setAppName("CassandraTest")
.setIfMissing("spark.master", "local[2]")
// Get a SparkContext
val sc = new SparkContext(sparkConf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.{SparkConf, SparkContext}
object ExceptionHandlingTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("ExceptionHandlingTest")
.setIfMissing("spark.master", "local[2]")
val sc = new SparkContext(sparkConf)
sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
if (math.random > 0.75) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.spark.SparkContext._
*/
object GroupByTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("GroupBy Test")
val sparkConf = new SparkConf().setAppName("GroupByTest")
.setIfMissing("spark.master", "local[2]")
var numMappers = if (args.length > 0) args(0).toInt else 2
var numKVPairs = if (args.length > 1) args(1).toInt else 1000
var valSize = if (args.length > 2) args(2).toInt else 1000
Expand All @@ -44,7 +45,7 @@ object GroupByTest {
arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
}
arr1
}.cache
}.cache()
// Enforce that everything has been calculated and in cache
pairs1.count

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD

object HBaseTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HBaseTest")
.setIfMissing("spark.master", "local[2]")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
// Other options for configuring scan behavior are available. More information available at
Expand Down
Loading