Skip to content

Commit 30cfa8d

Browse files
ScrapCodespwendell
authored andcommitted
SPARK-1565, update examples to be used with spark-submit script.
Commit for initial feedback, basically I am curious if we should prompt user for providing args esp. when its mandatory. And can we skip if they are not ? Also few other things that did not work like `bin/spark-submit examples/target/scala-2.10/spark-examples-1.0.0-SNAPSHOT-hadoop1.0.4.jar --class org.apache.spark.examples.SparkALS --arg 100 500 10 5 2` Not all the args get passed properly, may be I have messed up something will try to sort it out hopefully. Author: Prashant Sharma <[email protected]> Closes #552 from ScrapCodes/SPARK-1565/update-examples and squashes the following commits: 669dd23 [Prashant Sharma] Review comments 2727e70 [Prashant Sharma] SPARK-1565, update examples to be used with spark-submit script. (cherry picked from commit 44dd57f) Signed-off-by: Patrick Wendell <[email protected]>
1 parent 8f3b925 commit 30cfa8d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+405
-480
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ unit-tests.log
4949
/lib/
5050
rat-results.txt
5151
scalastyle.txt
52+
conf/*.conf
5253

5354
# For Hive
5455
metastore_db/

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,10 @@ class SparkContext(config: SparkConf) extends Logging {
7474
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
7575
* from a list of input files or InputFormats for the application.
7676
*/
77-
@DeveloperApi
78-
def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = {
79-
this(config)
80-
this.preferredNodeLocationData = preferredNodeLocationData
77+
@DeveloperApi
78+
def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = {
79+
this(config)
80+
this.preferredNodeLocationData = preferredNodeLocationData
8181
}
8282

8383
/**

examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.examples;
1919

20+
import org.apache.spark.SparkConf;
2021
import org.apache.spark.api.java.JavaRDD;
2122
import org.apache.spark.api.java.JavaSparkContext;
2223
import org.apache.spark.api.java.function.Function;
@@ -103,16 +104,16 @@ public static void printWeights(double[] a) {
103104

104105
public static void main(String[] args) {
105106

106-
if (args.length < 3) {
107-
System.err.println("Usage: JavaHdfsLR <master> <file> <iters>");
107+
if (args.length < 2) {
108+
System.err.println("Usage: JavaHdfsLR <file> <iters>");
108109
System.exit(1);
109110
}
110111

111-
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR",
112-
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaHdfsLR.class));
113-
JavaRDD<String> lines = sc.textFile(args[1]);
112+
SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR");
113+
JavaSparkContext sc = new JavaSparkContext(sparkConf);
114+
JavaRDD<String> lines = sc.textFile(args[0]);
114115
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
115-
int ITERATIONS = Integer.parseInt(args[2]);
116+
int ITERATIONS = Integer.parseInt(args[1]);
116117

117118
// Initialize w to a random value
118119
double[] w = new double[D];

examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.common.collect.Lists;
2121
import scala.Tuple2;
2222
import scala.Tuple3;
23+
import org.apache.spark.SparkConf;
2324
import org.apache.spark.api.java.JavaPairRDD;
2425
import org.apache.spark.api.java.JavaRDD;
2526
import org.apache.spark.api.java.JavaSparkContext;
@@ -34,6 +35,8 @@
3435

3536
/**
3637
* Executes a roll up-style query against Apache logs.
38+
*
39+
* Usage: JavaLogQuery [logFile]
3740
*/
3841
public final class JavaLogQuery {
3942

@@ -97,15 +100,11 @@ public static Stats extractStats(String line) {
97100
}
98101

99102
public static void main(String[] args) {
100-
if (args.length == 0) {
101-
System.err.println("Usage: JavaLogQuery <master> [logFile]");
102-
System.exit(1);
103-
}
104103

105-
JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
106-
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLogQuery.class));
104+
SparkConf sparkConf = new SparkConf().setAppName("JavaLogQuery");
105+
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
107106

108-
JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs);
107+
JavaRDD<String> dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs);
109108

110109
JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.mapToPair(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
111110
@Override

examples/src/main/java/org/apache/spark/examples/JavaPageRank.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
package org.apache.spark.examples;
1919

2020

21+
2122
import scala.Tuple2;
2223

2324
import com.google.common.collect.Iterables;
25+
26+
import org.apache.spark.SparkConf;
2427
import org.apache.spark.api.java.JavaPairRDD;
2528
import org.apache.spark.api.java.JavaRDD;
2629
import org.apache.spark.api.java.JavaSparkContext;
@@ -54,20 +57,20 @@ public Double call(Double a, Double b) {
5457
}
5558

5659
public static void main(String[] args) throws Exception {
57-
if (args.length < 3) {
58-
System.err.println("Usage: JavaPageRank <master> <file> <number_of_iterations>");
60+
if (args.length < 2) {
61+
System.err.println("Usage: JavaPageRank <file> <number_of_iterations>");
5962
System.exit(1);
6063
}
6164

62-
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
63-
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaPageRank.class));
65+
SparkConf sparkConf = new SparkConf().setAppName("JavaPageRank");
66+
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
6467

6568
// Loads in input file. It should be in format of:
6669
// URL neighbor URL
6770
// URL neighbor URL
6871
// URL neighbor URL
6972
// ...
70-
JavaRDD<String> lines = ctx.textFile(args[1], 1);
73+
JavaRDD<String> lines = ctx.textFile(args[0], 1);
7174

7275
// Loads all URLs from input file and initialize their neighbors.
7376
JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
@@ -87,7 +90,7 @@ public Double call(Iterable<String> rs) {
8790
});
8891

8992
// Calculates and updates URL ranks continuously using PageRank algorithm.
90-
for (int current = 0; current < Integer.parseInt(args[2]); current++) {
93+
for (int current = 0; current < Integer.parseInt(args[1]); current++) {
9194
// Calculates URL contributions to the rank of other URLs.
9295
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
9396
.flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {

examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.examples;
1919

20+
import org.apache.spark.SparkConf;
2021
import org.apache.spark.api.java.JavaRDD;
2122
import org.apache.spark.api.java.JavaSparkContext;
2223
import org.apache.spark.api.java.function.Function;
@@ -25,19 +26,18 @@
2526
import java.util.ArrayList;
2627
import java.util.List;
2728

28-
/** Computes an approximation to pi */
29+
/**
30+
* Computes an approximation to pi
31+
* Usage: JavaSparkPi [slices]
32+
*/
2933
public final class JavaSparkPi {
34+
3035

3136
public static void main(String[] args) throws Exception {
32-
if (args.length == 0) {
33-
System.err.println("Usage: JavaSparkPi <master> [slices]");
34-
System.exit(1);
35-
}
36-
37-
JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaSparkPi",
38-
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkPi.class));
37+
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
38+
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
3939

40-
int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2;
40+
int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
4141
int n = 100000 * slices;
4242
List<Integer> l = new ArrayList<Integer>(n);
4343
for (int i = 0; i < n; i++) {

examples/src/main/java/org/apache/spark/examples/JavaTC.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,22 @@
1717

1818
package org.apache.spark.examples;
1919

20-
import scala.Tuple2;
21-
import org.apache.spark.api.java.JavaPairRDD;
22-
import org.apache.spark.api.java.JavaSparkContext;
23-
import org.apache.spark.api.java.function.PairFunction;
24-
2520
import java.util.ArrayList;
2621
import java.util.HashSet;
2722
import java.util.List;
2823
import java.util.Random;
2924
import java.util.Set;
3025

26+
import scala.Tuple2;
27+
28+
import org.apache.spark.SparkConf;
29+
import org.apache.spark.api.java.JavaPairRDD;
30+
import org.apache.spark.api.java.JavaSparkContext;
31+
import org.apache.spark.api.java.function.PairFunction;
32+
3133
/**
3234
* Transitive closure on a graph, implemented in Java.
35+
* Usage: JavaTC [slices]
3336
*/
3437
public final class JavaTC {
3538

@@ -61,14 +64,9 @@ public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, Integer>> t
6164
}
6265

6366
public static void main(String[] args) {
64-
if (args.length == 0) {
65-
System.err.println("Usage: JavaTC <host> [<slices>]");
66-
System.exit(1);
67-
}
68-
69-
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC",
70-
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaTC.class));
71-
Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
67+
SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR");
68+
JavaSparkContext sc = new JavaSparkContext(sparkConf);
69+
Integer slices = (args.length > 0) ? Integer.parseInt(args[0]): 2;
7270
JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache();
7371

7472
// Linear transitive closure: each round grows paths by one edge,

examples/src/main/java/org/apache/spark/examples/JavaWordCount.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.examples;
1919

2020
import scala.Tuple2;
21+
import org.apache.spark.SparkConf;
2122
import org.apache.spark.api.java.JavaPairRDD;
2223
import org.apache.spark.api.java.JavaRDD;
2324
import org.apache.spark.api.java.JavaSparkContext;
@@ -33,14 +34,15 @@ public final class JavaWordCount {
3334
private static final Pattern SPACE = Pattern.compile(" ");
3435

3536
public static void main(String[] args) throws Exception {
36-
if (args.length < 2) {
37-
System.err.println("Usage: JavaWordCount <master> <file>");
37+
38+
if (args.length < 1) {
39+
System.err.println("Usage: JavaWordCount <file>");
3840
System.exit(1);
3941
}
4042

41-
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
42-
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaWordCount.class));
43-
JavaRDD<String> lines = ctx.textFile(args[1], 1);
43+
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
44+
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
45+
JavaRDD<String> lines = ctx.textFile(args[0], 1);
4446

4547
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
4648
@Override

examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.examples.mllib;
1919

20+
import org.apache.spark.SparkConf;
2021
import org.apache.spark.api.java.JavaRDD;
2122
import org.apache.spark.api.java.JavaSparkContext;
2223
import org.apache.spark.api.java.function.Function;
@@ -57,23 +58,22 @@ public String call(Tuple2<Object, double[]> element) {
5758

5859
public static void main(String[] args) {
5960

60-
if (args.length != 5 && args.length != 6) {
61+
if (args.length < 4) {
6162
System.err.println(
62-
"Usage: JavaALS <master> <ratings_file> <rank> <iterations> <output_dir> [<blocks>]");
63+
"Usage: JavaALS <ratings_file> <rank> <iterations> <output_dir> [<blocks>]");
6364
System.exit(1);
6465
}
65-
66-
int rank = Integer.parseInt(args[2]);
67-
int iterations = Integer.parseInt(args[3]);
68-
String outputDir = args[4];
66+
SparkConf sparkConf = new SparkConf().setAppName("JavaALS");
67+
int rank = Integer.parseInt(args[1]);
68+
int iterations = Integer.parseInt(args[2]);
69+
String outputDir = args[3];
6970
int blocks = -1;
70-
if (args.length == 6) {
71-
blocks = Integer.parseInt(args[5]);
71+
if (args.length == 5) {
72+
blocks = Integer.parseInt(args[4]);
7273
}
7374

74-
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaALS",
75-
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaALS.class));
76-
JavaRDD<String> lines = sc.textFile(args[1]);
75+
JavaSparkContext sc = new JavaSparkContext(sparkConf);
76+
JavaRDD<String> lines = sc.textFile(args[0]);
7777

7878
JavaRDD<Rating> ratings = lines.map(new ParseRating());
7979

examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.regex.Pattern;
2121

22+
import org.apache.spark.SparkConf;
2223
import org.apache.spark.api.java.JavaRDD;
2324
import org.apache.spark.api.java.JavaSparkContext;
2425
import org.apache.spark.api.java.function.Function;
@@ -48,24 +49,21 @@ public Vector call(String line) {
4849
}
4950

5051
public static void main(String[] args) {
51-
52-
if (args.length < 4) {
52+
if (args.length < 3) {
5353
System.err.println(
54-
"Usage: JavaKMeans <master> <input_file> <k> <max_iterations> [<runs>]");
54+
"Usage: JavaKMeans <input_file> <k> <max_iterations> [<runs>]");
5555
System.exit(1);
5656
}
57-
58-
String inputFile = args[1];
59-
int k = Integer.parseInt(args[2]);
60-
int iterations = Integer.parseInt(args[3]);
57+
String inputFile = args[0];
58+
int k = Integer.parseInt(args[1]);
59+
int iterations = Integer.parseInt(args[2]);
6160
int runs = 1;
6261

63-
if (args.length >= 5) {
64-
runs = Integer.parseInt(args[4]);
62+
if (args.length >= 4) {
63+
runs = Integer.parseInt(args[3]);
6564
}
66-
67-
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
68-
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
65+
SparkConf sparkConf = new SparkConf().setAppName("JavaKMeans");
66+
JavaSparkContext sc = new JavaSparkContext(sparkConf);
6967
JavaRDD<String> lines = sc.textFile(inputFile);
7068

7169
JavaRDD<Vector> points = lines.map(new ParsePoint());

0 commit comments

Comments
 (0)