From d32072c0bd60b1256a1c95bffcd1264bc80a4594 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 15 May 2014 18:32:37 -0700 Subject: [PATCH 1/5] Remove from examples + update usages --- examples/src/main/python/als.py | 18 +++++++++--------- examples/src/main/python/kmeans.py | 12 ++++++------ .../src/main/python/logistic_regression.py | 10 +++++----- examples/src/main/python/mllib/kmeans.py | 10 +++++----- .../main/python/mllib/logistic_regression.py | 10 +++++----- examples/src/main/python/pagerank.py | 10 +++++----- examples/src/main/python/pi.py | 10 +++++----- examples/src/main/python/sort.py | 8 ++++---- examples/src/main/python/transitive_closure.py | 10 +++++----- examples/src/main/python/wordcount.py | 8 ++++---- 10 files changed, 53 insertions(+), 53 deletions(-) diff --git a/examples/src/main/python/als.py b/examples/src/main/python/als.py index 01552dc1d449e..1af21c15e2d0e 100755 --- a/examples/src/main/python/als.py +++ b/examples/src/main/python/als.py @@ -46,15 +46,15 @@ def update(i, vec, mat, ratings): return np.linalg.solve(XtX, Xty) if __name__ == "__main__": - if len(sys.argv) < 2: - print >> sys.stderr, "Usage: als " - exit(-1) - sc = SparkContext(sys.argv[1], "PythonALS", pyFiles=[realpath(__file__)]) - M = int(sys.argv[2]) if len(sys.argv) > 2 else 100 - U = int(sys.argv[3]) if len(sys.argv) > 3 else 500 - F = int(sys.argv[4]) if len(sys.argv) > 4 else 10 - ITERATIONS = int(sys.argv[5]) if len(sys.argv) > 5 else 5 - slices = int(sys.argv[6]) if len(sys.argv) > 6 else 2 + """ + Usage: als [M] [U] [F] [iterations] [slices]" + """ + sc = SparkContext(appName="PythonALS", pyFiles=[realpath(__file__)]) + M = int(sys.argv[1]) if len(sys.argv) > 1 else 100 + U = int(sys.argv[2]) if len(sys.argv) > 2 else 500 + F = int(sys.argv[3]) if len(sys.argv) > 3 else 10 + ITERATIONS = int(sys.argv[4]) if len(sys.argv) > 4 else 5 + slices = int(sys.argv[5]) if len(sys.argv) > 5 else 2 print "Running ALS with M=%d, U=%d, F=%d, iters=%d, slices=%d\n" % \ (M, U, F, ITERATIONS, slices) diff --git a/examples/src/main/python/kmeans.py b/examples/src/main/python/kmeans.py index e3596488faf9e..fc16586c28a46 100755 --- a/examples/src/main/python/kmeans.py +++ b/examples/src/main/python/kmeans.py @@ -45,14 +45,14 @@ def closestPoint(p, centers): if __name__ == "__main__": - if len(sys.argv) < 5: - print >> sys.stderr, "Usage: kmeans " + if len(sys.argv) != 4: + print >> sys.stderr, "Usage: kmeans " exit(-1) - sc = SparkContext(sys.argv[1], "PythonKMeans") - lines = sc.textFile(sys.argv[2]) + sc = SparkContext(appName="PythonKMeans") + lines = sc.textFile(sys.argv[1]) data = lines.map(parseVector).cache() - K = int(sys.argv[3]) - convergeDist = float(sys.argv[4]) + K = int(sys.argv[2]) + convergeDist = float(sys.argv[3]) kPoints = data.takeSample(False, K, 1) tempDist = 1.0 diff --git a/examples/src/main/python/logistic_regression.py b/examples/src/main/python/logistic_regression.py index fe5373cf799b1..d7da1819806b2 100755 --- a/examples/src/main/python/logistic_regression.py +++ b/examples/src/main/python/logistic_regression.py @@ -47,12 +47,12 @@ def readPointBatch(iterator): return [matrix] if __name__ == "__main__": - if len(sys.argv) != 4: - print >> sys.stderr, "Usage: logistic_regression " + if len(sys.argv) != 3: + print >> sys.stderr, "Usage: logistic_regression " exit(-1) - sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)]) - points = sc.textFile(sys.argv[2]).mapPartitions(readPointBatch).cache() - iterations = int(sys.argv[3]) + sc = SparkContext(appName="PythonLR", pyFiles=[realpath(__file__)]) + points = sc.textFile(sys.argv[1]).mapPartitions(readPointBatch).cache() + iterations = int(sys.argv[2]) # Initialize w to a random value w = 2 * np.random.ranf(size=D) - 1 diff --git a/examples/src/main/python/mllib/kmeans.py b/examples/src/main/python/mllib/kmeans.py index dec82ff34fbac..b308132c9aeeb 100755 --- a/examples/src/main/python/mllib/kmeans.py +++ b/examples/src/main/python/mllib/kmeans.py @@ -33,12 +33,12 @@ def parseVector(line): if __name__ == "__main__": - if len(sys.argv) < 4: - print >> sys.stderr, "Usage: kmeans " + if len(sys.argv) != 3: + print >> sys.stderr, "Usage: kmeans " exit(-1) - sc = SparkContext(sys.argv[1], "KMeans") - lines = sc.textFile(sys.argv[2]) + sc = SparkContext(appName="KMeans") + lines = sc.textFile(sys.argv[1]) data = lines.map(parseVector) - k = int(sys.argv[3]) + k = int(sys.argv[2]) model = KMeans.train(data, k) print "Final centers: " + str(model.clusterCenters) diff --git a/examples/src/main/python/mllib/logistic_regression.py b/examples/src/main/python/mllib/logistic_regression.py index 8631051d00ff2..6e0f7a4ee5a81 100755 --- a/examples/src/main/python/mllib/logistic_regression.py +++ b/examples/src/main/python/mllib/logistic_regression.py @@ -39,12 +39,12 @@ def parsePoint(line): if __name__ == "__main__": - if len(sys.argv) != 4: - print >> sys.stderr, "Usage: logistic_regression " + if len(sys.argv) != 3: + print >> sys.stderr, "Usage: logistic_regression " exit(-1) - sc = SparkContext(sys.argv[1], "PythonLR") - points = sc.textFile(sys.argv[2]).map(parsePoint) - iterations = int(sys.argv[3]) + sc = SparkContext(appName="PythonLR") + points = sc.textFile(sys.argv[1]).map(parsePoint) + iterations = int(sys.argv[2]) model = LogisticRegressionWithSGD.train(points, iterations) print "Final weights: " + str(model.weights) print "Final intercept: " + str(model.intercept) diff --git a/examples/src/main/python/pagerank.py b/examples/src/main/python/pagerank.py index cd774cf3a319f..d350fa46fa49a 100755 --- a/examples/src/main/python/pagerank.py +++ b/examples/src/main/python/pagerank.py @@ -36,19 +36,19 @@ def parseNeighbors(urls): if __name__ == "__main__": - if len(sys.argv) < 3: - print >> sys.stderr, "Usage: pagerank " + if len(sys.argv) != 3: + print >> sys.stderr, "Usage: pagerank " exit(-1) # Initialize the spark context. - sc = SparkContext(sys.argv[1], "PythonPageRank") + sc = SparkContext(appName="PythonPageRank") # Loads in input file. It should be in format of: # URL neighbor URL # URL neighbor URL # URL neighbor URL # ... - lines = sc.textFile(sys.argv[2], 1) + lines = sc.textFile(sys.argv[1], 1) # Loads all URLs from input file and initialize their neighbors. links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache() @@ -57,7 +57,7 @@ def parseNeighbors(urls): ranks = links.map(lambda (url, neighbors): (url, 1.0)) # Calculates and updates URL ranks continuously using PageRank algorithm. - for iteration in xrange(int(sys.argv[3])): + for iteration in xrange(int(sys.argv[2])): # Calculates URL contributions to the rank of other URLs. contribs = links.join(ranks).flatMap(lambda (url, (urls, rank)): computeContribs(urls, rank)) diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py index ab0645fc2f326..234720b55fa49 100755 --- a/examples/src/main/python/pi.py +++ b/examples/src/main/python/pi.py @@ -23,11 +23,11 @@ if __name__ == "__main__": - if len(sys.argv) == 1: - print >> sys.stderr, "Usage: pi []" - exit(-1) - sc = SparkContext(sys.argv[1], "PythonPi") - slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2 + """ + Usage: pi [slices] + """ + sc = SparkContext(appName="PythonPi") + slices = int(sys.argv[1]) if len(sys.argv) > 1 else 2 n = 100000 * slices def f(_): x = random() * 2 - 1 diff --git a/examples/src/main/python/sort.py b/examples/src/main/python/sort.py index 5de20a6d98f43..4913ee926aa03 100755 --- a/examples/src/main/python/sort.py +++ b/examples/src/main/python/sort.py @@ -21,11 +21,11 @@ if __name__ == "__main__": - if len(sys.argv) < 3: - print >> sys.stderr, "Usage: sort " + if len(sys.argv) != 2: + print >> sys.stderr, "Usage: sort " exit(-1) - sc = SparkContext(sys.argv[1], "PythonSort") - lines = sc.textFile(sys.argv[2], 1) + sc = SparkContext(appName="PythonSort") + lines = sc.textFile(sys.argv[1], 1) sortedCount = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (int(x), 1)) \ .sortByKey(lambda x: x) diff --git a/examples/src/main/python/transitive_closure.py b/examples/src/main/python/transitive_closure.py index 744cce6651607..00b8050ab3f59 100755 --- a/examples/src/main/python/transitive_closure.py +++ b/examples/src/main/python/transitive_closure.py @@ -36,11 +36,11 @@ def generateGraph(): if __name__ == "__main__": - if len(sys.argv) == 1: - print >> sys.stderr, "Usage: transitive_closure []" - exit(-1) - sc = SparkContext(sys.argv[1], "PythonTransitiveClosure") - slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2 + """ + Usage: transitive_closure [slices] + """ + sc = SparkContext(appName="PythonTransitiveClosure") + slices = int(sys.argv[1]) if len(sys.argv) > 1 else 2 tc = sc.parallelize(generateGraph(), slices).cache() # Linear transitive closure: each round grows paths by one edge, diff --git a/examples/src/main/python/wordcount.py b/examples/src/main/python/wordcount.py index b9139b9d76520..dcc095fdd0ed9 100755 --- a/examples/src/main/python/wordcount.py +++ b/examples/src/main/python/wordcount.py @@ -22,11 +22,11 @@ if __name__ == "__main__": - if len(sys.argv) < 3: - print >> sys.stderr, "Usage: wordcount " + if len(sys.argv) != 2: + print >> sys.stderr, "Usage: wordcount " exit(-1) - sc = SparkContext(sys.argv[1], "PythonWordCount") - lines = sc.textFile(sys.argv[2], 1) + sc = SparkContext(appName="PythonWordCount") + lines = sc.textFile(sys.argv[1], 1) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) From 427a5f0a5c096b3211938b4f464c9f3977630399 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 15 May 2014 19:07:02 -0700 Subject: [PATCH 2/5] Update docs Note that this reflects changes incorporated in #799. --- docs/index.md | 3 ++- docs/python-programming-guide.md | 32 +++++++++++++++++--------------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/docs/index.md b/docs/index.md index 48182a27d28ae..31bf7e5ace63f 100644 --- a/docs/index.md +++ b/docs/index.md @@ -46,9 +46,10 @@ locally with one thread, or `local[N]` to run locally with N threads. You should Spark also provides a Python interface. To run an example Spark application written in Python, use `bin/pyspark [params]`. For example, - ./bin/pyspark examples/src/main/python/pi.py local[2] 10 + ./bin/pyspark examples/src/main/python/pi.py 10 or simply `bin/pyspark` without any arguments to run Spark interactively in a python interpreter. +As in Spark shell, you can also pass in the `--master` option to configure your master URL. # Launching on a Cluster diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md index 17675acba6bb8..b686bee1aebe2 100644 --- a/docs/python-programming-guide.md +++ b/docs/python-programming-guide.md @@ -60,13 +60,9 @@ By default, PySpark requires `python` to be available on the system `PATH` and u All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.net/), are bundled with PySpark and automatically imported. -Standalone PySpark applications should be run using the `bin/spark-submit` script, which automatically -configures the Java and Python environment for running Spark. - - # Interactive Use -The `bin/pyspark` script launches a Python interpreter that is configured to run PySpark applications. To use `pyspark` interactively, first build Spark, then launch it directly from the command line without any options: +The `bin/pyspark` script launches a Python interpreter that is configured to run PySpark applications. To use `pyspark` interactively, first build Spark, then launch it directly from the command line: {% highlight bash %} $ sbt/sbt assembly @@ -83,20 +79,24 @@ The Python shell can be used explore data interactively and is a simple way to l {% endhighlight %} By default, the `bin/pyspark` shell creates SparkContext that runs applications locally on all of -your machine's logical cores. -To connect to a non-local cluster, or to specify a number of cores, set the `MASTER` environment variable. -For example, to use the `bin/pyspark` shell with a [standalone Spark cluster](spark-standalone.html): +your machine's logical cores. To connect to a non-local cluster, or to specify a number of cores, +set the `--master` flag. For example, to use the `bin/pyspark` shell with a +[standalone Spark cluster](spark-standalone.html): {% highlight bash %} -$ MASTER=spark://IP:PORT ./bin/pyspark +$ ./bin/pyspark --master spark://1.2.3.4:7077 {% endhighlight %} Or, to use exactly four cores on the local machine: {% highlight bash %} -$ MASTER=local[4] ./bin/pyspark +$ ./bin/pyspark --master local[4] {% endhighlight %} +Under the hood `bin/pyspark` is a wrapper around the +[Spark submit script](cluster-overview.html#launching-applications-with-spark-submit), so these +two scripts share the same list of options. For a complete list of options, run `bin/pyspark` with +the `--help` option. ## IPython @@ -115,13 +115,14 @@ the [IPython Notebook](http://ipython.org/notebook.html) with PyLab graphing sup $ IPYTHON_OPTS="notebook --pylab inline" ./bin/pyspark {% endhighlight %} -IPython also works on a cluster or on multiple cores if you set the `MASTER` environment variable. +IPython also works on a cluster or on multiple cores if you set the `--master` flag. # Standalone Programs -PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `bin/spark-submit`. -The Quick Start guide includes a [complete example](quick-start.html#standalone-applications) of a standalone Python application. +PySpark can also be used from standalone Python scripts by creating a SparkContext in your script +and running the script using `bin/spark-submit`. The Quick Start guide includes a +[complete example](quick-start.html#standalone-applications) of a standalone Python application. Code dependencies can be deployed by passing .zip or .egg files in the `--py-files` option of `spark-submit`: @@ -138,6 +139,7 @@ You can set [configuration properties](configuration.html#spark-properties) by p {% highlight python %} from pyspark import SparkConf, SparkContext conf = (SparkConf() + .setMaster("local") .setAppName("My app") .set("spark.executor.memory", "1g")) sc = SparkContext(conf = conf) @@ -164,6 +166,6 @@ some example applications. PySpark also includes several sample programs in the [`examples/src/main/python` folder](https://github.com/apache/spark/tree/master/examples/src/main/python). You can run them by passing the files to `pyspark`; e.g.: - ./bin/spark-submit examples/src/main/python/wordcount.py local[2] README.md + ./bin/spark-submit examples/src/main/python/wordcount.py README.md -Each program prints usage help when run without arguments. +Each program prints usage help when run without the sufficient arguments. From c362f69afa6c04bf75633783e112dc5c6e07033e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 16 May 2014 12:08:52 -0700 Subject: [PATCH 3/5] Update docs to use spark-submit for python applications --- docs/index.md | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/index.md b/docs/index.md index 31bf7e5ace63f..c9b10376cc809 100644 --- a/docs/index.md +++ b/docs/index.md @@ -43,13 +43,15 @@ The `--master` option specifies the locally with one thread, or `local[N]` to run locally with N threads. You should start by using `local` for testing. For a full list of options, run Spark shell with the `--help` option. -Spark also provides a Python interface. To run an example Spark application written in Python, use -`bin/pyspark [params]`. For example, +Spark also provides a Python interface. To run Spark interactively in a Python interpreter, use +`bin/pyspark`. As in Spark shell, you can also pass in the `--master` option to configure your +master URL. - ./bin/pyspark examples/src/main/python/pi.py 10 + ./bin/pyspark --master local[2] -or simply `bin/pyspark` without any arguments to run Spark interactively in a python interpreter. -As in Spark shell, you can also pass in the `--master` option to configure your master URL. +Example applications are also provided in Python. For example, + + ./bin/spark-submit examples/src/main/python/pi.py 10 # Launching on a Cluster From 50f80b1856d4bbfc7a87b10cdae62e62de8083d5 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 16 May 2014 12:12:19 -0700 Subject: [PATCH 4/5] Remove pyFiles from SparkContext construction --- examples/src/main/python/als.py | 2 +- examples/src/main/python/logistic_regression.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/src/main/python/als.py b/examples/src/main/python/als.py index 1af21c15e2d0e..779f47a2f6893 100755 --- a/examples/src/main/python/als.py +++ b/examples/src/main/python/als.py @@ -49,7 +49,7 @@ def update(i, vec, mat, ratings): """ Usage: als [M] [U] [F] [iterations] [slices]" """ - sc = SparkContext(appName="PythonALS", pyFiles=[realpath(__file__)]) + sc = SparkContext(appName="PythonALS") M = int(sys.argv[1]) if len(sys.argv) > 1 else 100 U = int(sys.argv[2]) if len(sys.argv) > 2 else 500 F = int(sys.argv[3]) if len(sys.argv) > 3 else 10 diff --git a/examples/src/main/python/logistic_regression.py b/examples/src/main/python/logistic_regression.py index d7da1819806b2..0f22d0b32319e 100755 --- a/examples/src/main/python/logistic_regression.py +++ b/examples/src/main/python/logistic_regression.py @@ -50,7 +50,7 @@ def readPointBatch(iterator): if len(sys.argv) != 3: print >> sys.stderr, "Usage: logistic_regression " exit(-1) - sc = SparkContext(appName="PythonLR", pyFiles=[realpath(__file__)]) + sc = SparkContext(appName="PythonLR") points = sc.textFile(sys.argv[1]).mapPartitions(readPointBatch).cache() iterations = int(sys.argv[2]) From cf50b9fd529a5ecdf91255bc240ddb751d032ad6 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 16 May 2014 19:33:35 -0700 Subject: [PATCH 5/5] De-indent python comments (minor) --- examples/src/main/python/als.py | 2 +- examples/src/main/python/transitive_closure.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/src/main/python/als.py b/examples/src/main/python/als.py index 779f47a2f6893..f0b46cd28b7aa 100755 --- a/examples/src/main/python/als.py +++ b/examples/src/main/python/als.py @@ -47,7 +47,7 @@ def update(i, vec, mat, ratings): if __name__ == "__main__": """ - Usage: als [M] [U] [F] [iterations] [slices]" + Usage: als [M] [U] [F] [iterations] [slices]" """ sc = SparkContext(appName="PythonALS") M = int(sys.argv[1]) if len(sys.argv) > 1 else 100 diff --git a/examples/src/main/python/transitive_closure.py b/examples/src/main/python/transitive_closure.py index 00b8050ab3f59..8698369b13d84 100755 --- a/examples/src/main/python/transitive_closure.py +++ b/examples/src/main/python/transitive_closure.py @@ -37,7 +37,7 @@ def generateGraph(): if __name__ == "__main__": """ - Usage: transitive_closure [slices] + Usage: transitive_closure [slices] """ sc = SparkContext(appName="PythonTransitiveClosure") slices = int(sys.argv[1]) if len(sys.argv) > 1 else 2