diff --git a/docs/quick-start.md b/docs/quick-start.md index 64023994771b7..978038c5d6986 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -234,7 +234,7 @@ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system - val conf = new SparkConf().setAppName("Simple Application") + val conf = new SparkConf().setAppName("Simple Application").setIfMissing("spark.master", "local[2]") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() @@ -313,7 +313,7 @@ import org.apache.spark.api.java.function.Function; public class SimpleApp { public static void main(String[] args) { String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system - SparkConf conf = new SparkConf().setAppName("Simple Application"); + SparkConf conf = new SparkConf().setAppName("Simple Application").setIfMissing("spark.master", "local[2]") JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD logData = sc.textFile(logFile).cache(); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java index 6c177de359b60..6eef6909930f5 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java @@ -109,7 +109,8 @@ public static void main(String[] args) { System.exit(1); } - SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR"); + SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR") + .setIfMissing("spark.master", "local[2]"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD lines = sc.textFile(args[0]); JavaRDD points = lines.map(new ParsePoint()).cache(); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java index 812e9d5580cbf..4dd2a44d2c126 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java @@ -28,7 +28,6 @@ import org.apache.spark.api.java.function.PairFunction; import java.io.Serializable; -import java.util.Collections; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -101,7 +100,8 @@ public static Stats extractStats(String line) { public static void main(String[] args) { - SparkConf sparkConf = new SparkConf().setAppName("JavaLogQuery"); + SparkConf sparkConf = new SparkConf().setAppName("JavaLogQuery") + .setIfMissing("spark.master", "local[2]"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaRDD dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java index 7ea6df9c17245..9f07406bdd4fb 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java @@ -34,7 +34,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Iterator; import java.util.regex.Pattern; /** @@ -62,7 +61,8 @@ public static void main(String[] args) throws Exception { System.exit(1); } - SparkConf sparkConf = new SparkConf().setAppName("JavaPageRank"); + SparkConf sparkConf = new SparkConf().setAppName("JavaPageRank") + .setIfMissing("spark.master", "local[2]"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); // Loads in input file. It should be in format of: @@ -73,13 +73,14 @@ public static void main(String[] args) throws Exception { JavaRDD lines = ctx.textFile(args[0], 1); // Loads all URLs from input file and initialize their neighbors. - JavaPairRDD> links = lines.mapToPair(new PairFunction() { - @Override - public Tuple2 call(String s) { - String[] parts = SPACES.split(s); - return new Tuple2(parts[0], parts[1]); - } - }).distinct().groupByKey().cache(); + JavaPairRDD> links = lines.mapToPair( + new PairFunction() { + @Override + public Tuple2 call(String s) { + String[] parts = SPACES.split(s); + return new Tuple2(parts[0], parts[1]); + } + }).distinct().groupByKey().cache(); // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one. JavaPairRDD ranks = links.mapValues(new Function, Double>() { diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java index 11157d7573fae..dec8b0ffb5518 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java @@ -31,10 +31,10 @@ * Usage: JavaSparkPi [slices] */ public final class JavaSparkPi { - public static void main(String[] args) throws Exception { - SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi"); + SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi") + .setIfMissing("spark.master", "local[2]"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2; diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java index 2563fcdd234bb..07dcc80a28fa3 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java @@ -64,7 +64,8 @@ public Tuple2 call(Tuple2> t } public static void main(String[] args) { - SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR"); + SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR") + .setIfMissing("spark.master", "local[2]"); JavaSparkContext sc = new JavaSparkContext(sparkConf); Integer slices = (args.length > 0) ? Integer.parseInt(args[0]): 2; JavaPairRDD tc = sc.parallelizePairs(generateGraph(), slices).cache(); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java index 9a6a944f7edef..fe8fa08b0780c 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java @@ -40,7 +40,8 @@ public static void main(String[] args) throws Exception { System.exit(1); } - SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount"); + SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount") + .setIfMissing("spark.master", "local[2]"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaRDD lines = ctx.textFile(args[0], 1); @@ -51,19 +52,21 @@ public Iterable call(String s) { } }); - JavaPairRDD ones = words.mapToPair(new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2(s, 1); - } - }); + JavaPairRDD ones = words.mapToPair( + new PairFunction() { + @Override + public Tuple2 call(String s) { + return new Tuple2(s, 1); + } + }); - JavaPairRDD counts = ones.reduceByKey(new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + JavaPairRDD counts = ones.reduceByKey( + new Function2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); List> output = counts.collect(); for (Tuple2 tuple : output) { diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java index 8d381d4e0a943..aabe431d81444 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java @@ -63,7 +63,8 @@ public static void main(String[] args) { "Usage: JavaALS []"); System.exit(1); } - SparkConf sparkConf = new SparkConf().setAppName("JavaALS"); + SparkConf sparkConf = new SparkConf().setAppName("JavaALS") + .setIfMissing("spark.master", "local[2]"); int rank = Integer.parseInt(args[1]); int iterations = Integer.parseInt(args[2]); String outputDir = args[3]; diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java index f796123a25727..0cab23dc228f7 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java @@ -62,7 +62,8 @@ public static void main(String[] args) { if (args.length >= 4) { runs = Integer.parseInt(args[3]); } - SparkConf sparkConf = new SparkConf().setAppName("JavaKMeans"); + SparkConf sparkConf = new SparkConf().setAppName("JavaKMeans") + .setIfMissing("spark.master", "local[2]"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD lines = sc.textFile(inputFile); diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java index eceb6927d5551..2bc07f325a0f6 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java @@ -56,7 +56,8 @@ public static void main(String[] args) { System.err.println("Usage: JavaLR "); System.exit(1); } - SparkConf sparkConf = new SparkConf().setAppName("JavaLR"); + SparkConf sparkConf = new SparkConf().setAppName("JavaLR") + .setIfMissing("spark.master", "local[2]"); JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD lines = sc.textFile(args[0]); JavaRDD points = lines.map(new ParsePoint()).cache(); diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java index ad5ec84b71e69..b66e3a697e727 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java @@ -52,13 +52,15 @@ public void setAge(int age) { } public static void main(String[] args) throws Exception { - SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL"); + SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL") + .setIfMissing("spark.master", "local[2]"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaSQLContext sqlCtx = new JavaSQLContext(ctx); // Load a text file and convert each line to a Java Bean. JavaRDD people = ctx.textFile("examples/src/main/resources/people.txt").map( new Function() { + @Override public Person call(String line) throws Exception { String[] parts = line.split(","); @@ -80,6 +82,7 @@ public Person call(String line) throws Exception { // The results of SQL queries are SchemaRDDs and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. List teenagerNames = teenagers.map(new Function() { + @Override public String call(Row row) { return "Name: " + row.getString(0); } @@ -88,12 +91,13 @@ public String call(Row row) { // JavaSchemaRDDs can be saved as parquet files, maintaining the schema information. schemaPeople.saveAsParquetFile("people.parquet"); - // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. - // The result of loading a parquet file is also a JavaSchemaRDD. + // Read in the parquet file created above. Parquet files are self-describing so the schema + // is preserved. The result of loading a parquet file is also a JavaSchemaRDD. JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet"); //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerAsTable("parquetFile"); - JavaSchemaRDD teenagers2 = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); + JavaSchemaRDD teenagers2 = + sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); } } diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java index 5622df5ce03ff..861ddb7e56483 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java @@ -42,8 +42,7 @@ * Custom Receiver that receives data over a socket. Received bytes is interpreted as * text and \n delimited lines are considered as records. They are then counted and printed. * - * Usage: JavaCustomReceiver - * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + * Usage: JavaCustomReceiver * and of the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server @@ -64,13 +63,14 @@ public static void main(String[] args) { StreamingExamples.setStreamingLogLevels(); // Create the context with a 1 second batch size - SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver"); + SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver") + .setIfMissing("spark.master", "local[2]"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); // Create a input stream with the custom receiver on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') JavaReceiverInputDStream lines = ssc.receiverStream( - new JavaCustomReceiver(args[1], Integer.parseInt(args[2]))); + new JavaCustomReceiver(args[0], Integer.parseInt(args[1]))); JavaDStream words = lines.flatMap(new FlatMapFunction() { @Override public Iterable call(String x) { diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java index da56637fe891a..7114d1b3e67ea 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java @@ -19,7 +19,6 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; -import org.apache.spark.examples.streaming.StreamingExamples; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.flume.FlumeUtils; @@ -56,9 +55,11 @@ public static void main(String[] args) { int port = Integer.parseInt(args[1]); Duration batchInterval = new Duration(2000); - SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); + SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount") + .setIfMissing("spark.master", "local[2]"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); - JavaReceiverInputDStream flumeStream = FlumeUtils.createStream(ssc, host, port); + JavaReceiverInputDStream flumeStream = + FlumeUtils.createStream(ssc, host, port); flumeStream.count(); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java index 16ae9a3319ee2..f3b0a90cf75c1 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java @@ -30,7 +30,6 @@ import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.examples.streaming.StreamingExamples; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; @@ -65,8 +64,9 @@ public static void main(String[] args) { } StreamingExamples.setStreamingLogLevels(); - SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); - // Create the context with a 1 second batch size + SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount") + .setIfMissing("spark.master", "local[2]"); + // Create the context with a 2 second batch size JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); int numThreads = Integer.parseInt(args[3]); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java index 45bcedebb4117..d9635b46059b5 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java @@ -55,7 +55,8 @@ public static void main(String[] args) { StreamingExamples.setStreamingLogLevels(); // Create the context with a 1 second batch size - SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount"); + SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount") + .setIfMissing("spark.master", "local[2]"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); // Create a JavaReceiverInputDStream on target ip:port and count the diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java index 4ce8437f82705..dee1da1153af5 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java @@ -30,7 +30,6 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.examples.streaming.StreamingExamples; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; @@ -43,7 +42,8 @@ private JavaQueueStream() { public static void main(String[] args) throws Exception { StreamingExamples.setStreamingLogLevels(); - SparkConf sparkConf = new SparkConf().setAppName("JavaQueueStream"); + SparkConf sparkConf = new SparkConf().setAppName("JavaQueueStream") + .setIfMissing("spark.master", "local[2]"); // Create the context JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala index 973049b95a7bd..2315a2c73b3c4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala @@ -31,7 +31,8 @@ object BroadcastTest { System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory") System.setProperty("spark.broadcast.blockSize", blockSize) - val sparkConf = new SparkConf().setAppName("Broadcast Test") + val sparkConf = new SparkConf().setAppName("BroadcastTest") + .setIfMissing("spark.master", "local[2]") val sc = new SparkContext(sparkConf) diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala index 9a00701f985f0..8139c1b5bfd94 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala @@ -72,7 +72,8 @@ import org.apache.spark.SparkContext._ object CassandraCQLTest { def main(args: Array[String]) { - val sparkConf = new SparkConf().setAppName("CQLTestApp") + val sparkConf = new SparkConf().setAppName("CassandraCQLTest") + .setIfMissing("spark.master", "local[2]") val sc = new SparkContext(sparkConf) val cHost: String = args(0) diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala index 91ba364a346a5..7753126ce818e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala @@ -54,7 +54,8 @@ import org.apache.spark.SparkContext._ object CassandraTest { def main(args: Array[String]) { - val sparkConf = new SparkConf().setAppName("casDemo") + val sparkConf = new SparkConf().setAppName("CassandraTest") + .setIfMissing("spark.master", "local[2]") // Get a SparkContext val sc = new SparkContext(sparkConf) diff --git a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala index d42f63e87052e..f3fc1013aea9c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ExceptionHandlingTest.scala @@ -22,6 +22,7 @@ import org.apache.spark.{SparkConf, SparkContext} object ExceptionHandlingTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("ExceptionHandlingTest") + .setIfMissing("spark.master", "local[2]") val sc = new SparkContext(sparkConf) sc.parallelize(0 until sc.defaultParallelism).foreach { i => if (math.random > 0.75) { diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala index efd91bb054981..ad0354fc30084 100644 --- a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala @@ -27,7 +27,8 @@ import org.apache.spark.SparkContext._ */ object GroupByTest { def main(args: Array[String]) { - val sparkConf = new SparkConf().setAppName("GroupBy Test") + val sparkConf = new SparkConf().setAppName("GroupByTest") + .setIfMissing("spark.master", "local[2]") var numMappers = if (args.length > 0) args(0).toInt else 2 var numKVPairs = if (args.length > 1) args(1).toInt else 1000 var valSize = if (args.length > 2) args(2).toInt else 1000 @@ -44,7 +45,7 @@ object GroupByTest { arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr) } arr1 - }.cache + }.cache() // Enforce that everything has been calculated and in cache pairs1.count diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala index a8c338480e6e2..07c8fe32029fd 100644 --- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala @@ -22,11 +22,11 @@ import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark._ -import org.apache.spark.rdd.NewHadoopRDD object HBaseTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("HBaseTest") + .setIfMissing("spark.master", "local[2]") val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() // Other options for configuring scan behavior are available. More information available at diff --git a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala index 331de3ad1ef53..81ee58d896b23 100644 --- a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala @@ -21,7 +21,7 @@ import org.apache.spark._ object HdfsTest { def main(args: Array[String]) { - val sparkConf = new SparkConf().setAppName("HdfsTest") + val sparkConf = new SparkConf().setAppName("HdfsTest").setIfMissing("spark.master", "local[2]") val sc = new SparkContext(sparkConf) val file = sc.textFile(args(1)) val mapped = file.map(s => s.length).cache() diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala index 4c655b84fde2e..b276425c6a089 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala @@ -43,7 +43,7 @@ object LogQuery { def main(args: Array[String]) { - val sparkConf = new SparkConf().setAppName("Log Query") + val sparkConf = new SparkConf().setAppName("LogQuery").setIfMissing("spark.master", "local[2]") val sc = new SparkContext(sparkConf) val dataSet = diff --git a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala index 2a5c0c0defe13..fa0ed39c21199 100644 --- a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala @@ -26,7 +26,8 @@ import org.apache.spark.{SparkConf, SparkContext} object MultiBroadcastTest { def main(args: Array[String]) { - val sparkConf = new SparkConf().setAppName("Multi-Broadcast Test") + val sparkConf = new SparkConf().setAppName("MultiBroadcastTest") + .setIfMissing("spark.master", "local[2]") val sc = new SparkContext(sparkConf) val slices = if (args.length > 0) args(0).toInt else 2 diff --git a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala index 5291ab81f459e..07084b4b76026 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala @@ -29,6 +29,7 @@ object SimpleSkewedGroupByTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("SimpleSkewedGroupByTest") + .setIfMissing("spark.master", "local[2]") var numMappers = if (args.length > 0) args(0).toInt else 2 var numKVPairs = if (args.length > 1) args(1).toInt else 1000 var valSize = if (args.length > 2) args(2).toInt else 1000 @@ -54,7 +55,7 @@ object SimpleSkewedGroupByTest { } } result - }.cache + }.cache() // Enforce that everything has been calculated and in cache pairs1.count diff --git a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala index 017d4e1e5ce13..184062d0df370 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala @@ -27,7 +27,8 @@ import org.apache.spark.SparkContext._ */ object SkewedGroupByTest { def main(args: Array[String]) { - val sparkConf = new SparkConf().setAppName("GroupBy Test") + val sparkConf = new SparkConf().setAppName("SkewedGroupByTest") + .setIfMissing("spark.master", "local[2]") var numMappers = if (args.length > 0) args(0).toInt else 2 var numKVPairs = if (args.length > 1) args(1).toInt else 1000 var valSize = if (args.length > 2) args(2).toInt else 1000 @@ -38,7 +39,7 @@ object SkewedGroupByTest { val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random - // map output sizes lineraly increase from the 1st to the last + // map output sizes linearly increase from the 1st to the last numKVPairs = (1.0 * (p + 1) / numMappers * numKVPairs).toInt var arr1 = new Array[(Int, Array[Byte])](numKVPairs) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala index 5cbc966bf06ca..aa783bd8adf8f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala @@ -105,6 +105,7 @@ object SparkALS { } printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS) val sparkConf = new SparkConf().setAppName("SparkALS") + .setIfMissing("spark.master", "local[2]") val sc = new SparkContext(sparkConf) val R = generateR() diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index 4906a696e90a7..5c6d2743f0f7a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -55,6 +55,7 @@ object SparkHdfsLR { } val sparkConf = new SparkConf().setAppName("SparkHdfsLR") + .setIfMissing("spark.master", "local[2]") val inputPath = args(0) val conf = SparkHadoopUtil.get.newConfiguration() val sc = new SparkContext(sparkConf, diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala index 4d28e0aad6597..7ca79e3c88366 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala @@ -36,7 +36,6 @@ object SparkKMeans { } def closestPoint(p: Vector[Double], centers: Array[Vector[Double]]): Int = { - var index = 0 var bestIndex = 0 var closest = Double.PositiveInfinity @@ -57,6 +56,7 @@ object SparkKMeans { System.exit(1) } val sparkConf = new SparkConf().setAppName("SparkKMeans") + .setIfMissing("spark.master", "local[2]") val sc = new SparkContext(sparkConf) val lines = sc.textFile(args(0)) val data = lines.map(parseVector _).cache() diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala index 99ceb3089e9fe..0c275e4016d1f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala @@ -49,6 +49,7 @@ object SparkLR { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("SparkLR") + .setIfMissing("spark.master", "local[2]") val sc = new SparkContext(sparkConf) val numSlices = if (args.length > 0) args(0).toInt else 2 val points = sc.parallelize(generateData, numSlices).cache() diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala index 40b36c779afd6..af9363e67450a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala @@ -31,7 +31,8 @@ import org.apache.spark.{SparkConf, SparkContext} */ object SparkPageRank { def main(args: Array[String]) { - val sparkConf = new SparkConf().setAppName("PageRank") + val sparkConf = new SparkConf().setAppName("SparkPageRank") + .setIfMissing("spark.master", "local[2]") var iters = args(1).toInt val ctx = new SparkContext(sparkConf) val lines = ctx.textFile(args(0), 1) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala index 9fbb0a800d735..22766cca746c0 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala @@ -24,7 +24,7 @@ import org.apache.spark._ /** Computes an approximation to pi */ object SparkPi { def main(args: Array[String]) { - val conf = new SparkConf().setAppName("Spark Pi") + val conf = new SparkConf().setAppName("SparkPi").setIfMissing("spark.master", "local[2]") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = 100000 * slices diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala index f7f83086df3db..0b4473f3482a2 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala @@ -42,7 +42,7 @@ object SparkTC { } def main(args: Array[String]) { - val sparkConf = new SparkConf().setAppName("SparkTC") + val sparkConf = new SparkConf().setAppName("SparkTC").setIfMissing("spark.master", "local[2]") val spark = new SparkContext(sparkConf) val slices = if (args.length > 0) args(0).toInt else 2 var tc = spark.parallelize(generateGraph, slices).cache() diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala index 22127621867e1..af6da2520e0b5 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala @@ -31,10 +31,10 @@ import org.apache.spark.storage.StorageLevel /** * Logistic regression based classification. - * This example uses Tachyon to persist rdds during computation. + * This example uses Tachyon to persist RDDs during computation. */ object SparkTachyonHdfsLR { - val D = 10 // Numer of dimensions + val D = 10 // Number of dimensions val rand = new Random(42) case class DataPoint(x: Vector[Double], y: Double) @@ -54,6 +54,7 @@ object SparkTachyonHdfsLR { val inputPath = args(0) val conf = SparkHadoopUtil.get.newConfiguration() val sparkConf = new SparkConf().setAppName("SparkTachyonHdfsLR") + .setIfMissing("spark.master", "local[2]") val sc = new SparkContext(sparkConf, InputFormatInfo.computePreferredLocations( Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath)) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala index 7743f7968b100..1d0d8f16ff08d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala @@ -24,11 +24,12 @@ import org.apache.spark.storage.StorageLevel /** * Computes an approximation to pi - * This example uses Tachyon to persist rdds during computation. + * This example uses Tachyon to persist RDDs during computation. */ object SparkTachyonPi { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("SparkTachyonPi") + .setIfMissing("spark.master", "local[2]") val spark = new SparkContext(sparkConf) val slices = if (args.length > 0) args(0).toInt else 2 diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala index 235c3bf820244..c1a6209c851b7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala @@ -21,7 +21,6 @@ import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.bagel._ -import org.apache.spark.bagel.Bagel._ import scala.xml.{XML,NodeSeq} @@ -35,10 +34,10 @@ object WikipediaPageRank { if (args.length < 4) { System.err.println( "Usage: WikipediaPageRank ") - System.exit(-1) + System.exit(1) } val sparkConf = new SparkConf() - sparkConf.setAppName("WikipediaPageRank") + sparkConf.setAppName("WikipediaPageRank").setIfMissing("spark.master", "local[2]") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("spark.kryo.registrator", classOf[PRKryoRegistrator].getName) @@ -47,7 +46,6 @@ object WikipediaPageRank { val numPartitions = args(2).toInt val usePartitioner = args(3).toBoolean - sparkConf.setAppName("WikipediaPageRank") val sc = new SparkContext(sparkConf) // Parse the Wikipedia page data into a graph @@ -78,9 +76,9 @@ object WikipediaPageRank { (id, new PRVertex(1.0 / numVertices, outEdges)) }) if (usePartitioner) { - vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache + vertices = vertices.partitionBy(new HashPartitioner(sc.defaultParallelism)).cache() } else { - vertices = vertices.cache + vertices = vertices.cache() } println("Done parsing input file.") diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala index 576a3e371b993..b5dbd18b36aa1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala @@ -38,6 +38,7 @@ object WikipediaPageRankStandalone { System.exit(-1) } val sparkConf = new SparkConf() + sparkConf.setAppName("WikipediaPageRankStandalone").setIfMissing("spark.master", "local[2]") sparkConf.set("spark.serializer", "spark.bagel.examples.WPRSerializer") val inputFile = args(0) @@ -45,7 +46,6 @@ object WikipediaPageRankStandalone { val numIterations = args(2).toInt val usePartitioner = args(3).toBoolean - sparkConf.setAppName("WikipediaPageRankStandalone") val sc = new SparkContext(sparkConf) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala index 56b02b65d8724..61f74f8a1a395 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala @@ -102,6 +102,7 @@ object BinaryClassification { def run(params: Params) { val conf = new SparkConf().setAppName(s"BinaryClassification with $params") + .setIfMissing("spark.master", "local[2]") val sc = new SparkContext(conf) Logger.getRootLogger.setLevel(Level.WARN) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala index 9832bec90d7ee..36243921c8850 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala @@ -96,6 +96,7 @@ object DecisionTreeRunner { def run(params: Params) { val conf = new SparkConf().setAppName("DecisionTreeRunner") + .setIfMissing("spark.master", "local[2]") val sc = new SparkContext(conf) // Load training data and cache it. diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala index f96bc1bf00b92..f70bb01338dbf 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseKMeans.scala @@ -77,6 +77,7 @@ object DenseKMeans { def run(params: Params) { val conf = new SparkConf().setAppName(s"DenseKMeans with $params") + .setIfMissing("spark.master", "local[2]") val sc = new SparkContext(conf) Logger.getRootLogger.setLevel(Level.WARN) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala index 4811bb70e4b28..2b64f43755391 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LinearRegression.scala @@ -87,6 +87,7 @@ object LinearRegression extends App { def run(params: Params) { val conf = new SparkConf().setAppName(s"LinearRegression with $params") + .setIfMissing("spark.master", "local[2]") val sc = new SparkContext(conf) Logger.getRootLogger.setLevel(Level.WARN) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala index 6eb41e7ba36fb..ebe07668e02db 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -96,6 +96,7 @@ object MovieLensALS { def run(params: Params) { val conf = new SparkConf().setAppName(s"MovieLensALS with $params") + .setIfMissing("spark.master", "local[2]") if (params.kryo) { conf.set("spark.serializer", classOf[KryoSerializer].getName) .set("spark.kryo.registrator", classOf[ALSRegistrator].getName) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala index 537e68a0991aa..0aa81db7e495e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SparseNaiveBayes.scala @@ -68,6 +68,7 @@ object SparseNaiveBayes { def run(params: Params) { val conf = new SparkConf().setAppName(s"SparseNaiveBayes with $params") + .setIfMissing("spark.master", "local[2]") val sc = new SparkContext(conf) Logger.getRootLogger.setLevel(Level.WARN) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala index 3cd9cb743e309..21a90a0dbcaa1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnyPCA.scala @@ -40,7 +40,7 @@ object TallSkinnyPCA { System.exit(1) } - val conf = new SparkConf().setAppName("TallSkinnyPCA") + val conf = new SparkConf().setAppName("TallSkinnyPCA").setIfMissing("spark.master", "local[2]") val sc = new SparkContext(conf) // Load and parse the data file. diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala index 4d6690318615a..524b0336627d9 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/TallSkinnySVD.scala @@ -40,7 +40,7 @@ object TallSkinnySVD { System.exit(1) } - val conf = new SparkConf().setAppName("TallSkinnySVD") + val conf = new SparkConf().setAppName("TallSkinnySVD").setIfMissing("spark.master", "local[2]") val sc = new SparkContext(conf) // Load and parse the data file. diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala index 61c460c6b1de8..7e6229547d838 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala @@ -27,6 +27,7 @@ case class Record(key: Int, value: String) object RDDRelation { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("RDDRelation") + .setIfMissing("spark.master", "local[2]") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) @@ -59,10 +60,10 @@ object RDDRelation { // Write out an RDD as a parquet file. rdd.saveAsParquetFile("pair.parquet") - // Read in parquet file. Parquet files are self-describing so the schmema is preserved. + // Read in parquet file. Parquet files are self-describing so the schema is preserved. val parquetFile = sqlContext.parquetFile("pair.parquet") - // Queries can be run using the DSL on parequet files just like the original RDD. + // Queries can be run using the DSL on parquet files just like the original RDD. parquetFile.where('key === 1).select('value as 'a).collect().foreach(println) // These files can also be registered as tables. diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index b262fabbe0e0d..e0e260572df6f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -26,6 +26,7 @@ object HiveFromSpark { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("HiveFromSpark") + .setIfMissing("spark.master", "local[2]") val sc = new SparkContext(sparkConf) // A local hive context creates an instance of the Hive Metastore in process, storing the diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index b433082dce1a2..0fcd1352db574 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -146,6 +146,7 @@ object ActorWordCount { val Seq(host, port) = args.toSeq val sparkConf = new SparkConf().setAppName("ActorWordCount") + .setIfMissing("spark.master", "local[2]") // Create the context and set the batch size val ssc = new StreamingContext(sparkConf, Seconds(2)) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala index 6bb659fbd8be8..8a51a0a2844ad 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala @@ -46,6 +46,7 @@ object CustomReceiver { // Create the context with a 1 second batch size val sparkConf = new SparkConf().setAppName("CustomReceiver") + .setIfMissing("spark.master", "local[2]") val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a input stream with the custom receiver on target ip:port and count the diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala index 20e7df7c45b1b..d662b2d0bdb1a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala @@ -54,6 +54,7 @@ object FlumeEventCount { // Create the context and set the batch size val sparkConf = new SparkConf().setAppName("FlumeEventCount") + .setIfMissing("spark.master", "local[2]") val ssc = new StreamingContext(sparkConf, batchInterval) // Create a flume stream diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala index 6c24bc3ad09e0..1fde505be3c73 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala @@ -41,6 +41,7 @@ object HdfsWordCount { StreamingExamples.setStreamingLogLevels() val sparkConf = new SparkConf().setAppName("HdfsWordCount") + .setIfMissing("spark.master", "local[2]") // Create the context val ssc = new StreamingContext(sparkConf, Seconds(2)) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala index 566ba6f911e02..4a9ff7b2ce4e8 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala @@ -50,11 +50,12 @@ object KafkaWordCount { val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") + .setIfMissing("spark.master", "local[2]") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") - val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) + val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap + val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) @@ -77,7 +78,7 @@ object KafkaWordCountProducer { val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args - // Zookeper connection properties + // Zookeeper connection properties val props = new Properties() props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala index e4283e04a1b11..3514baa46f0a7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala @@ -70,7 +70,7 @@ object MQTTPublisher { * * To work with Mqtt, Mqtt Message broker/server required. * Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker - * In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto` + * In Ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto` * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/ * Example Java code for Mqtt Publisher and Subscriber can be found here * https://bitbucket.org/mkjinesh/mqttclient @@ -95,6 +95,7 @@ object MQTTWordCount { val Seq(brokerUrl, topic) = args.toSeq val sparkConf = new SparkConf().setAppName("MQTTWordCount") + .setIfMissing("spark.master", "local[2]") val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala index ae0a08c6cdb1a..7479c4b8d56d5 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala @@ -44,6 +44,7 @@ object NetworkWordCount { // Create the context with a 1 second batch size val sparkConf = new SparkConf().setAppName("NetworkWordCount") + .setIfMissing("spark.master", "local[2]") val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a socket stream on target ip:port and count the diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala index 4caa90659111a..e87acac96fdb9 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala @@ -30,6 +30,7 @@ object QueueStream { StreamingExamples.setStreamingLogLevels() val sparkConf = new SparkConf().setAppName("QueueStream") + .setIfMissing("spark.master", "local[2]") // Create the context val ssc = new StreamingContext(sparkConf, Seconds(1)) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala index a9aaa445bccb6..fbab4d0a9d712 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala @@ -33,7 +33,7 @@ import org.apache.spark.util.IntParam * of work nodes in the cluster * is "localhost". * is the port on which RawTextSender is running in the worker nodes. - * is the Spark Streaming batch duration in milliseconds. + * is the Spark Streaming batch duration in milliseconds. */ object RawNetworkGrep { def main(args: Array[String]) { @@ -46,6 +46,7 @@ object RawNetworkGrep { val Array(IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args val sparkConf = new SparkConf().setAppName("RawNetworkGrep") + .setIfMissing("spark.master", "local[2]") // Create the context val ssc = new StreamingContext(sparkConf, Duration(batchMillis)) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index 6af3a0f33efc2..afe33921d66f3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -77,6 +77,7 @@ object RecoverableNetworkWordCount { val outputFile = new File(outputPath) if (outputFile.exists()) outputFile.delete() val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount") + .setIfMissing("spark.master", "local[2]") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala index daa1ced63c701..7a4f6eb0a9bf8 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala @@ -44,7 +44,7 @@ object StatefulNetworkWordCount { StreamingExamples.setStreamingLogLevels() val updateFunc = (values: Seq[Int], state: Option[Int]) => { - val currentCount = values.foldLeft(0)(_ + _) + val currentCount = values.sum val previousCount = state.getOrElse(0) @@ -52,6 +52,7 @@ object StatefulNetworkWordCount { } val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") + .setIfMissing("spark.master", "local[2]") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(".") diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala index 683752ac96241..fda9e97dd1006 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala @@ -63,6 +63,7 @@ object TwitterAlgebirdCMS { val filters = args val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS") + .setIfMissing("spark.master", "local[2]") val ssc = new StreamingContext(sparkConf, Seconds(10)) val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala index 62db5e663b8af..d9beffb202d69 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala @@ -35,7 +35,7 @@ import org.apache.spark.SparkConf * blog post and this * * blog post - * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for + * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient data structure for * estimating the cardinality of a data stream, i.e. the number of unique elements. *

* Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the @@ -51,6 +51,7 @@ object TwitterAlgebirdHLL { val BIT_SIZE = 12 val filters = args val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL") + .setIfMissing("spark.master", "local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala index f55d23ab3924b..fdd9794f6c244 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala @@ -45,13 +45,14 @@ object TwitterPopularTags { val filters = args.takeRight(args.length - 4) // Set the system properties so that Twitter4j library used by twitter stream - // can use them to generat OAuth credentials + // can use them to generate OAuth credentials System.setProperty("twitter4j.oauth.consumerKey", consumerKey) System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) System.setProperty("twitter4j.oauth.accessToken", accessToken) System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) val sparkConf = new SparkConf().setAppName("TwitterPopularTags") + .setIfMissing("spark.master", "local[2]") val ssc = new StreamingContext(sparkConf, Seconds(2)) val stream = TwitterUtils.createStream(ssc, None, filters) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala index 79905af381a12..6e93b08d48559 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -84,6 +84,7 @@ object ZeroMQWordCount { StreamingExamples.setStreamingLogLevels() val Seq(url, topic) = args.toSeq val sparkConf = new SparkConf().setAppName("ZeroMQWordCount") + .setIfMissing("spark.master", "local[2]") // Create the context and set the batch size val ssc = new StreamingContext(sparkConf, Seconds(2)) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 21443ebbbfb0e..a0e2ec20db59e 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -38,7 +38,7 @@ import org.apache.spark.streaming.receiver.Receiver /** * Input stream that pulls messages from a Kafka Broker. * - * @param kafkaParams Map of kafka configuration paramaters. + * @param kafkaParams Map of kafka configuration parameters. * See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. @@ -76,13 +76,14 @@ class KafkaReceiver[ // Connection to Kafka var consumerConnector : ConsumerConnector = null - def onStop() { } + def onStop() { + if (consumerConnector != null) { + consumerConnector.shutdown() + } + } def onStart() { - // In case we are using multiple Threads to handle Kafka Messages - val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) - logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) // Kafka connection properties @@ -95,7 +96,7 @@ class KafkaReceiver[ consumerConnector = Consumer.create(consumerConfig) logInfo("Connected to " + kafkaParams("zookeeper.connect")) - // When autooffset.reset is defined, it is our responsibility to try and whack the + // When auto.offset.reset is defined, it is our responsibility to try and whack the // consumer group zk node. if (kafkaParams.contains("auto.offset.reset")) { tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id")) @@ -112,42 +113,51 @@ class KafkaReceiver[ val topicMessageStreams = consumerConnector.createMessageStreams( topics, keyDecoder, valueDecoder) - - // Start the messages handler for each partition - topicMessageStreams.values.foreach { streams => - streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } + val executorPool = Executors.newFixedThreadPool(topics.values.sum) + try { + // Start the messages handler for each partition + topicMessageStreams.values.foreach { streams => + streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } + } + } finally { + executorPool.shutdown() } } // Handles Kafka Messages - private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V]) + private class MessageHandler(stream: KafkaStream[K, V]) extends Runnable { def run() { logInfo("Starting MessageHandler.") - for (msgAndMetadata <- stream) { - store((msgAndMetadata.key, msgAndMetadata.message)) + try { + for (msgAndMetadata <- stream) { + store((msgAndMetadata.key, msgAndMetadata.message)) + } + } catch { + case e: Throwable => logError("Error handling message; exiting", e) } } } - // It is our responsibility to delete the consumer group when specifying autooffset.reset. This + // It is our responsibility to delete the consumer group when specifying auto.offset.reset. This // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper. // // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied - // from Kafkas' ConsoleConsumer. See code related to 'autooffset.reset' when it is set to + // from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to // 'smallest'/'largest': // scalastyle:off // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala // scalastyle:on private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { + val dir = "/consumers/" + groupId + logInfo("Cleaning up temporary Zookeeper data under " + dir + ".") + val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) try { - val dir = "/consumers/" + groupId - logInfo("Cleaning up temporary zookeeper data under " + dir + ".") - val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) zk.deleteRecursive(dir) - zk.close() } catch { - case _ : Throwable => // swallow + case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e) + } finally { + zk.close() } } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala index 069e042ed94a3..d61a3445fb0ee 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -78,7 +78,9 @@ object Analytics extends Logging { println("| PageRank |") println("======================================") - val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")")) + val sc = new SparkContext( + conf.setAppName("PageRank(" + fname + ")") + .setIfMissing("spark.master", "local[2]")) val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, minEdgePartitions = numEPart).cache() @@ -125,7 +127,9 @@ object Analytics extends Logging { println("| Connected Components |") println("======================================") - val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")")) + val sc = new SparkContext( + conf.setAppName("ConnectedComponents(" + fname + ")") + .setIfMissing("spark.master", "local[2]")) val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, minEdgePartitions = numEPart).cache() val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) @@ -147,7 +151,9 @@ object Analytics extends Logging { println("======================================") println("| Triangle Count |") println("======================================") - val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")")) + val sc = new SparkContext( + conf.setAppName("TriangleCount(" + fname + ")") + .setIfMissing("spark.master", "local[2]")) val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache() val triangles = TriangleCount.run(graph)