From 8f56ad6725f725158b532e752c40bae39e1f9046 Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Fri, 5 Jul 2019 20:17:28 +0900 Subject: [PATCH] Remove the legacy Epydoc in PySpark API documentation --- python/docs/conf.py | 1 - python/docs/epytext.py | 30 ------ python/pyspark/accumulators.py | 14 +-- python/pyspark/broadcast.py | 6 +- python/pyspark/conf.py | 8 +- python/pyspark/context.py | 56 +++++------ python/pyspark/files.py | 7 +- python/pyspark/ml/feature.py | 2 +- python/pyspark/ml/linalg/__init__.py | 8 +- python/pyspark/mllib/classification.py | 4 +- python/pyspark/mllib/clustering.py | 6 +- python/pyspark/mllib/linalg/__init__.py | 8 +- python/pyspark/mllib/random.py | 6 +- python/pyspark/mllib/stat/_statistics.py | 4 +- python/pyspark/mllib/util.py | 4 +- python/pyspark/rdd.py | 114 +++++++++++------------ python/pyspark/serializers.py | 12 +-- python/pyspark/sql/dataframe.py | 10 +- python/pyspark/sql/types.py | 2 +- python/pyspark/streaming/context.py | 42 ++++----- python/pyspark/streaming/dstream.py | 50 +++++----- python/pyspark/taskcontext.py | 2 +- python/pyspark/testing/streamingutils.py | 6 +- 23 files changed, 185 insertions(+), 217 deletions(-) delete mode 100644 python/docs/epytext.py diff --git a/python/docs/conf.py b/python/docs/conf.py index f507ee33bc65..9e7afb7c0729 100644 --- a/python/docs/conf.py +++ b/python/docs/conf.py @@ -31,7 +31,6 @@ extensions = [ 'sphinx.ext.autodoc', 'sphinx.ext.viewcode', - 'epytext', 'sphinx.ext.mathjax', ] diff --git a/python/docs/epytext.py b/python/docs/epytext.py deleted file mode 100644 index 4bbbf650a13e..000000000000 --- a/python/docs/epytext.py +++ /dev/null @@ -1,30 +0,0 @@ -import re - -RULES = ( - (r"<(!BLANKLINE)[\w.]+>", r""), - (r"L{([\w.()]+)}", r":class:`\1`"), - (r"[LC]{(\w+\.\w+)\(\)}", r":func:`\1`"), - (r"C{([\w.()]+)}", r":class:`\1`"), - (r"[IBCM]{([^}]+)}", r"`\1`"), - ('pyspark.rdd.RDD', 'RDD'), -) - - -def _convert_epytext(line): - """ - >>> _convert_epytext("L{A}") - :class:`A` - """ - line = line.replace('@', ':') - for p, sub in RULES: - line = re.sub(p, sub, line) - return line - - -def _process_docstring(app, what, name, obj, options, lines): - for i in range(len(lines)): - lines[i] = _convert_epytext(lines[i]) - - -def setup(app): - app.connect("autodoc-process-docstring", _process_docstring) diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 00ec094e7e3b..a5d513262b26 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -123,13 +123,13 @@ class Accumulator(object): """ A shared variable that can be accumulated, i.e., has a commutative and associative "add" - operation. Worker tasks on a Spark cluster can add values to an Accumulator with the C{+=} - operator, but only the driver program is allowed to access its value, using C{value}. + operation. Worker tasks on a Spark cluster can add values to an Accumulator with the `+=` + operator, but only the driver program is allowed to access its value, using `value`. Updates from the workers get propagated automatically to the driver program. - While C{SparkContext} supports accumulators for primitive data types like C{int} and - C{float}, users can also define accumulators for custom types by providing a custom - L{AccumulatorParam} object. Refer to the doctest of this module for an example. + While :class:`SparkContext` supports accumulators for primitive data types like :class:`int` and + :class:`float`, users can also define accumulators for custom types by providing a custom + :class:`AccumulatorParam` object. Refer to the doctest of this module for an example. """ def __init__(self, aid, value, accum_param): @@ -185,14 +185,14 @@ class AccumulatorParam(object): def zero(self, value): """ Provide a "zero value" for the type, compatible in dimensions with the - provided C{value} (e.g., a zero vector) + provided `value` (e.g., a zero vector) """ raise NotImplementedError def addInPlace(self, value1, value2): """ Add two values of the accumulator's data type, returning a new value; - for efficiency, can also update C{value1} in place and return it. + for efficiency, can also update `value1` in place and return it. """ raise NotImplementedError diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index cca64b5de3a1..a97d409e7328 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -49,8 +49,8 @@ def _from_id(bid): class Broadcast(object): """ - A broadcast variable created with L{SparkContext.broadcast()}. - Access its value through C{.value}. + A broadcast variable created with :meth:`SparkContext.broadcast`. + Access its value through :attr:`value`. Examples: @@ -69,7 +69,7 @@ class Broadcast(object): def __init__(self, sc=None, value=None, pickle_registry=None, path=None, sock_file=None): """ - Should not be called directly by users -- use L{SparkContext.broadcast()} + Should not be called directly by users -- use :meth:`SparkContext.broadcast` instead. """ if sc is not None: diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py index ab429d9ab10d..202426086819 100644 --- a/python/pyspark/conf.py +++ b/python/pyspark/conf.py @@ -79,16 +79,16 @@ class SparkConf(object): parameters as key-value pairs. Most of the time, you would create a SparkConf object with - C{SparkConf()}, which will load values from C{spark.*} Java system + ``SparkConf()``, which will load values from `spark.*` Java system properties as well. In this case, any parameters you set directly on - the C{SparkConf} object take priority over system properties. + the :class:`SparkConf` object take priority over system properties. - For unit tests, you can also call C{SparkConf(false)} to skip + For unit tests, you can also call ``SparkConf(false)`` to skip loading external settings and get the same configuration no matter what the system properties are. All setter methods in this class support chaining. For example, - you can write C{conf.setMaster("local").setAppName("My app")}. + you can write ``conf.setMaster("local").setAppName("My app")``. .. note:: Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified by the user. diff --git a/python/pyspark/context.py b/python/pyspark/context.py index a835298e29c3..69020e6585ff 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -61,7 +61,7 @@ class SparkContext(object): """ Main entry point for Spark functionality. A SparkContext represents the - connection to a Spark cluster, and can be used to create L{RDD} and + connection to a Spark cluster, and can be used to create :class:`RDD` and broadcast variables on that cluster. .. note:: Only one :class:`SparkContext` should be active per JVM. You must `stop()` @@ -86,7 +86,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, gateway=None, jsc=None, profiler_cls=BasicProfiler): """ Create a new SparkContext. At least the master and app name should be set, - either through the named parameters here or through C{conf}. + either through the named parameters here or through `conf`. :param master: Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). @@ -102,7 +102,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, the batch size based on object sizes, or -1 to use an unlimited batch size :param serializer: The serializer for RDDs. - :param conf: A L{SparkConf} object setting Spark properties. + :param conf: A :class:`SparkConf` object setting Spark properties. :param gateway: Use an existing gateway and JVM, otherwise a new JVM will be instantiated. :param jsc: The JavaSparkContext instance (optional). @@ -576,7 +576,7 @@ def _serialize_to_jvm(self, data, serializer, reader_func, createRDDServer): def pickleFile(self, name, minPartitions=None): """ - Load an RDD previously saved using L{RDD.saveAsPickleFile} method. + Load an RDD previously saved using :meth:`RDD.saveAsPickleFile` method. >>> tmpFile = NamedTemporaryFile(delete=True) >>> tmpFile.close() @@ -624,20 +624,24 @@ def wholeTextFiles(self, path, minPartitions=None, use_unicode=True): as `utf-8`), which is faster and smaller than unicode. (Added in Spark 1.2) - For example, if you have the following files:: + For example, if you have the following files: - hdfs://a-hdfs-path/part-00000 - hdfs://a-hdfs-path/part-00001 - ... - hdfs://a-hdfs-path/part-nnnnn + .. code-block:: text - Do C{rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")}, - then C{rdd} contains:: + hdfs://a-hdfs-path/part-00000 + hdfs://a-hdfs-path/part-00001 + ... + hdfs://a-hdfs-path/part-nnnnn + + Do ``rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")``, + then ``rdd`` contains: - (a-hdfs-path/part-00000, its content) - (a-hdfs-path/part-00001, its content) - ... - (a-hdfs-path/part-nnnnn, its content) + .. code-block:: text + + (a-hdfs-path/part-00000, its content) + (a-hdfs-path/part-00001, its content) + ... + (a-hdfs-path/part-nnnnn, its content) .. note:: Small files are preferred, as each file will be loaded fully in memory. @@ -705,7 +709,7 @@ def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None, and value Writable classes 2. Serialization is attempted via Pyrolite pickling 3. If this fails, the fallback is to call 'toString' on each key and value - 4. C{PickleSerializer} is used to deserialize pickled objects on the Python side + 4. :class:`PickleSerializer` is used to deserialize pickled objects on the Python side :param path: path to sequncefile :param keyClass: fully qualified classname of key Writable class @@ -872,8 +876,7 @@ def union(self, rdds): def broadcast(self, value): """ - Broadcast a read-only variable to the cluster, returning a - L{Broadcast} + Broadcast a read-only variable to the cluster, returning a :class:`Broadcast` object for reading it in distributed functions. The variable will be sent to each cluster only once. """ @@ -881,8 +884,8 @@ def broadcast(self, value): def accumulator(self, value, accum_param=None): """ - Create an L{Accumulator} with the given initial value, using a given - L{AccumulatorParam} helper object to define how to add values of the + Create an :class:`Accumulator` with the given initial value, using a given + :class:`AccumulatorParam` helper object to define how to add values of the data type if provided. Default AccumulatorParams are used for integers and floating-point numbers if you do not provide one. For other types, a custom AccumulatorParam can be used. @@ -902,12 +905,11 @@ def accumulator(self, value, accum_param=None): def addFile(self, path, recursive=False): """ Add a file to be downloaded with this Spark job on every node. - The C{path} passed can be either a local file, a file in HDFS + The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI. - To access the file in Spark jobs, use - L{SparkFiles.get(fileName)} with the + To access the file in Spark jobs, use :meth:`SparkFiles.get` with the filename to find its download location. A directory can be given if the recursive option is set to True. @@ -932,7 +934,7 @@ def addFile(self, path, recursive=False): def addPyFile(self, path): """ Add a .py or .zip dependency for all tasks to be executed on this - SparkContext in the future. The C{path} passed can be either a local + SparkContext in the future. The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI. @@ -978,7 +980,7 @@ def setJobGroup(self, groupId, description, interruptOnCancel=False): Application programmers can use this method to group all those jobs together and give a group description. Once set, the Spark web UI will associate such jobs with this group. - The application can use L{SparkContext.cancelJobGroup} to cancel all + The application can use :meth:`SparkContext.cancelJobGroup` to cancel all running jobs in this group. >>> import threading @@ -1023,7 +1025,7 @@ def setLocalProperty(self, key, value): def getLocalProperty(self, key): """ Get a local property set in this thread, or null if it is missing. See - L{setLocalProperty} + :meth:`setLocalProperty`. """ return self._jsc.getLocalProperty(key) @@ -1041,7 +1043,7 @@ def sparkUser(self): def cancelJobGroup(self, groupId): """ - Cancel active jobs for the specified group. See L{SparkContext.setJobGroup} + Cancel active jobs for the specified group. See :meth:`SparkContext.setJobGroup`. for more information. """ self._jsc.sc().cancelJobGroup(groupId) diff --git a/python/pyspark/files.py b/python/pyspark/files.py index 797573f49dac..c08db4105e2c 100644 --- a/python/pyspark/files.py +++ b/python/pyspark/files.py @@ -24,8 +24,7 @@ class SparkFiles(object): """ - Resolves paths to files added through - L{SparkContext.addFile()}. + Resolves paths to files added through :meth:`SparkContext.addFile`. SparkFiles contains only classmethods; users should not create SparkFiles instances. @@ -41,7 +40,7 @@ def __init__(self): @classmethod def get(cls, filename): """ - Get the absolute path of a file added through C{SparkContext.addFile()}. + Get the absolute path of a file added through :meth:`SparkContext.addFile`. """ path = os.path.join(SparkFiles.getRootDirectory(), filename) return os.path.abspath(path) @@ -50,7 +49,7 @@ def get(cls, filename): def getRootDirectory(cls): """ Get the root directory that contains files added through - C{SparkContext.addFile()}. + :meth:`SparkContext.addFile`. """ if cls._is_running_on_worker: return cls._root_directory diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 9827a2af5a31..78d02690c4d4 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -2560,7 +2560,7 @@ class IndexToString(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, corresponding string values. The index-string mapping is either from the ML attributes of the input column, or from user-supplied labels (which take precedence over ML attributes). - See L{StringIndexer} for converting strings into indices. + See :class:`StringIndexer` for converting strings into indices. .. versionadded:: 1.6.0 """ diff --git a/python/pyspark/ml/linalg/__init__.py b/python/pyspark/ml/linalg/__init__.py index f6ddc0977d23..a79d5e5dcbb1 100644 --- a/python/pyspark/ml/linalg/__init__.py +++ b/python/pyspark/ml/linalg/__init__.py @@ -17,9 +17,9 @@ """ MLlib utilities for linear algebra. For dense vectors, MLlib -uses the NumPy C{array} type, so you can simply pass NumPy arrays -around. For sparse vectors, users can construct a L{SparseVector} -object from MLlib or pass SciPy C{scipy.sparse} column vectors if +uses the NumPy `array` type, so you can simply pass NumPy arrays +around. For sparse vectors, users can construct a :class:`SparseVector` +object from MLlib or pass SciPy `scipy.sparse` column vectors if SciPy is available in their environment. """ @@ -758,7 +758,7 @@ class Vectors(object): .. note:: Dense vectors are simply represented as NumPy array objects, so there is no need to covert them for use in MLlib. For sparse vectors, the factory methods in this class create an MLlib-compatible type, or users - can pass in SciPy's C{scipy.sparse} column vectors. + can pass in SciPy's `scipy.sparse` column vectors. """ @staticmethod diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index d2037be2c64f..c52da2ad633b 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -659,11 +659,11 @@ def train(cls, data, lambda_=1.0): Train a Naive Bayes model given an RDD of (label, features) vectors. - This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which + This is the `Multinomial NB `_ which can handle all kinds of discrete data. For example, by converting documents into TF-IDF vectors, it can be used for document classification. By making every vector a 0-1 vector, - it can also be used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}). + it can also be used as `Bernoulli NB `_. The input feature values must be nonnegative. :param data: diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 58da434fc38a..3524fcfeb795 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -130,9 +130,9 @@ class BisectingKMeans(object): clusters, larger clusters get higher priority. Based on - U{http://glaros.dtc.umn.edu/gkhome/fetch/papers/docclusterKDDTMW00.pdf} - Steinbach, Karypis, and Kumar, A comparison of document clustering - techniques, KDD Workshop on Text Mining, 2000. + `Steinbach, Karypis, and Kumar, A comparison of document clustering + techniques, KDD Workshop on Text Mining, 2000 + `_. .. versionadded:: 2.0.0 """ diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py index df411d79903c..cd09621b13b5 100644 --- a/python/pyspark/mllib/linalg/__init__.py +++ b/python/pyspark/mllib/linalg/__init__.py @@ -17,9 +17,9 @@ """ MLlib utilities for linear algebra. For dense vectors, MLlib -uses the NumPy C{array} type, so you can simply pass NumPy arrays -around. For sparse vectors, users can construct a L{SparseVector} -object from MLlib or pass SciPy C{scipy.sparse} column vectors if +uses the NumPy `array` type, so you can simply pass NumPy arrays +around. For sparse vectors, users can construct a :class:`SparseVector` +object from MLlib or pass SciPy `scipy.sparse` column vectors if SciPy is available in their environment. """ @@ -847,7 +847,7 @@ class Vectors(object): .. note:: Dense vectors are simply represented as NumPy array objects, so there is no need to covert them for use in MLlib. For sparse vectors, the factory methods in this class create an MLlib-compatible type, or users - can pass in SciPy's C{scipy.sparse} column vectors. + can pass in SciPy's `scipy.sparse` column vectors. """ @staticmethod diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py index a8833cb44692..6106c5858488 100644 --- a/python/pyspark/mllib/random.py +++ b/python/pyspark/mllib/random.py @@ -54,8 +54,7 @@ def uniformRDD(sc, size, numPartitions=None, seed=None): To transform the distribution in the generated RDD from U(0.0, 1.0) to U(a, b), use - C{RandomRDDs.uniformRDD(sc, n, p, seed)\ - .map(lambda v: a + (b - a) * v)} + ``RandomRDDs.uniformRDD(sc, n, p, seed).map(lambda v: a + (b - a) * v)`` :param sc: SparkContext used to create the RDD. :param size: Size of the RDD. @@ -85,8 +84,7 @@ def normalRDD(sc, size, numPartitions=None, seed=None): To transform the distribution in the generated RDD from standard normal to some other normal N(mean, sigma^2), use - C{RandomRDDs.normal(sc, n, p, seed)\ - .map(lambda v: mean + sigma * v)} + ``RandomRDDs.normal(sc, n, p, seed).map(lambda v: mean + sigma * v)`` :param sc: SparkContext used to create the RDD. :param size: Size of the RDD. diff --git a/python/pyspark/mllib/stat/_statistics.py b/python/pyspark/mllib/stat/_statistics.py index 6e89bfd691d1..d49f741a2f44 100644 --- a/python/pyspark/mllib/stat/_statistics.py +++ b/python/pyspark/mllib/stat/_statistics.py @@ -98,10 +98,10 @@ def corr(x, y=None, method=None): """ Compute the correlation (matrix) for the input RDD(s) using the specified method. - Methods currently supported: I{pearson (default), spearman}. + Methods currently supported: `pearson (default), spearman`. If a single RDD of Vectors is passed in, a correlation matrix - comparing the columns in the input RDD is returned. Use C{method=} + comparing the columns in the input RDD is returned. Use `method` to specify the method to be used for single RDD inout. If two RDDs of floats are passed in, a single float is returned. diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index 0190bf3cc0e3..1a0ce42dc4e4 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -95,7 +95,7 @@ def loadLibSVMFile(sc, path, numFeatures=-1, minPartitions=None): which leads to inconsistent feature dimensions. :param minPartitions: min number of partitions - @return: labeled data stored as an RDD of LabeledPoint + :return: labeled data stored as an RDD of LabeledPoint >>> from tempfile import NamedTemporaryFile >>> from pyspark.mllib.util import MLUtils @@ -156,7 +156,7 @@ def loadLabeledPoints(sc, path, minPartitions=None): :param path: file or directory path in any Hadoop-supported file system URI :param minPartitions: min number of partitions - @return: labeled data stored as an RDD of LabeledPoint + :return: labeled data stored as an RDD of LabeledPoint >>> from tempfile import NamedTemporaryFile >>> from pyspark.mllib.util import MLUtils diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 8edb7f3f2839..8bcc67ab1c3e 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -286,13 +286,13 @@ def __getnewargs__(self): @property def context(self): """ - The L{SparkContext} that this RDD was created on. + The :class:`SparkContext` that this RDD was created on. """ return self.ctx def cache(self): """ - Persist this RDD with the default storage level (C{MEMORY_ONLY}). + Persist this RDD with the default storage level (`MEMORY_ONLY`). """ self.is_cached = True self.persist(StorageLevel.MEMORY_ONLY) @@ -303,7 +303,7 @@ def persist(self, storageLevel=StorageLevel.MEMORY_ONLY): Set this RDD's storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. - If no storage level is specified defaults to (C{MEMORY_ONLY}). + If no storage level is specified defaults to (`MEMORY_ONLY`). >>> rdd = sc.parallelize(["b", "a", "c"]) >>> rdd.persist().is_cached @@ -330,7 +330,7 @@ def unpersist(self, blocking=False): def checkpoint(self): """ Mark this RDD for checkpointing. It will be saved to a file inside the - checkpoint directory set with L{SparkContext.setCheckpointDir()} and + checkpoint directory set with :meth:`SparkContext.setCheckpointDir` and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it @@ -360,9 +360,9 @@ def localCheckpoint(self): This is NOT safe to use with dynamic allocation, which removes executors along with their cached blocks. If you must use both features, you are advised to set - L{spark.dynamicAllocation.cachedExecutorIdleTimeout} to a high value. + `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value. - The checkpoint directory set through L{SparkContext.setCheckpointDir()} is not used. + The checkpoint directory set through :meth:`SparkContext.setCheckpointDir` is not used. """ self._jrdd.rdd().localCheckpoint() @@ -786,8 +786,8 @@ def func(iterator): def cartesian(self, other): """ Return the Cartesian product of this RDD and another one, that is, the - RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and - C{b} is in C{other}. + RDD of all pairs of elements ``(a, b)`` where ``a`` is in `self` and + ``b`` is in `other`. >>> rdd = sc.parallelize([1, 2]) >>> sorted(rdd.cartesian(rdd).collect()) @@ -960,9 +960,9 @@ def fold(self, zeroValue, op): Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral "zero value." - The function C{op(t1, t2)} is allowed to modify C{t1} and return it + The function ``op(t1, t2)`` is allowed to modify ``t1`` and return it as its result value to avoid object allocation; however, it should not - modify C{t2}. + modify ``t2``. This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. @@ -995,9 +995,9 @@ def aggregate(self, zeroValue, seqOp, combOp): the partitions, using a given combine functions and a neutral "zero value." - The functions C{op(t1, t2)} is allowed to modify C{t1} and return it + The functions ``op(t1, t2)`` is allowed to modify ``t1`` and return it as its result value to avoid object allocation; however, it should not - modify C{t2}. + modify ``t2``. The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into @@ -1128,7 +1128,7 @@ def count(self): def stats(self): """ - Return a L{StatCounter} object that captures the mean, variance + Return a :class:`StatCounter` object that captures the mean, variance and count of the RDD's elements in one operation. """ def redFunc(left_counter, right_counter): @@ -1467,10 +1467,10 @@ def isEmpty(self): def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None): """ - Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file + Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are converted for output using either user specified converters or, by default, - L{org.apache.spark.api.python.JavaToWritableConverter}. + "org.apache.spark.api.python.JavaToWritableConverter". :param conf: Hadoop job configuration, passed in as a dict :param keyConverter: (None by default) @@ -1484,11 +1484,11 @@ def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None): """ - Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file + Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types will be inferred if not specified. Keys and values are converted for output using either - user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The - C{conf} is applied on top of the base Hadoop conf associated with the SparkContext + user specified converters or "org.apache.spark.api.python.JavaToWritableConverter". The + `conf` is applied on top of the base Hadoop conf associated with the SparkContext of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. :param path: path to Hadoop file @@ -1511,10 +1511,10 @@ def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueCl def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None): """ - Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file + Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file system, using the old Hadoop OutputFormat API (mapred package). Keys/values are converted for output using either user specified converters or, by default, - L{org.apache.spark.api.python.JavaToWritableConverter}. + "org.apache.spark.api.python.JavaToWritableConverter". :param conf: Hadoop job configuration, passed in as a dict :param keyConverter: (None by default) @@ -1529,11 +1529,11 @@ def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=No keyConverter=None, valueConverter=None, conf=None, compressionCodecClass=None): """ - Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file + Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file system, using the old Hadoop OutputFormat API (mapred package). Key and value types will be inferred if not specified. Keys and values are converted for output using either - user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The - C{conf} is applied on top of the base Hadoop conf associated with the SparkContext + user specified converters or "org.apache.spark.api.python.JavaToWritableConverter". The + `conf` is applied on top of the base Hadoop conf associated with the SparkContext of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. :param path: path to Hadoop file @@ -1558,8 +1558,8 @@ def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=No def saveAsSequenceFile(self, path, compressionCodecClass=None): """ - Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file - system, using the L{org.apache.hadoop.io.Writable} types that we convert from the + Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file + system, using the "org.apache.hadoop.io.Writable" types that we convert from the RDD's key and value types. The mechanism is as follows: 1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects. @@ -1575,7 +1575,7 @@ def saveAsSequenceFile(self, path, compressionCodecClass=None): def saveAsPickleFile(self, path, batchSize=10): """ Save this RDD as a SequenceFile of serialized objects. The serializer - used is L{pyspark.serializers.PickleSerializer}, default batch size + used is :class:`pyspark.serializers.PickleSerializer`, default batch size is 10. >>> tmpFile = NamedTemporaryFile(delete=True) @@ -1595,8 +1595,8 @@ def saveAsTextFile(self, path, compressionCodecClass=None): """ Save this RDD as a text file, using string representations of elements. - @param path: path to text file - @param compressionCodecClass: (None by default) string i.e. + :param path: path to text file + :param compressionCodecClass: (None by default) string i.e. "org.apache.hadoop.io.compress.GzipCodec" >>> tempFile = NamedTemporaryFile(delete=True) @@ -1685,8 +1685,8 @@ def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash): This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. - Output will be partitioned with C{numPartitions} partitions, or - the default parallelism level if C{numPartitions} is not specified. + Output will be partitioned with `numPartitions` partitions, or + the default parallelism level if `numPartitions` is not specified. Default partitioner is hash-partition. >>> from operator import add @@ -1737,10 +1737,10 @@ def countByKey(self): def join(self, other, numPartitions=None): """ Return an RDD containing all pairs of elements with matching keys in - C{self} and C{other}. + `self` and `other`. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where - (k, v1) is in C{self} and (k, v2) is in C{other}. + (k, v1) is in `self` and (k, v2) is in `other`. Performs a hash join across the cluster. @@ -1753,11 +1753,11 @@ def join(self, other, numPartitions=None): def leftOuterJoin(self, other, numPartitions=None): """ - Perform a left outer join of C{self} and C{other}. + Perform a left outer join of `self` and `other`. - For each element (k, v) in C{self}, the resulting RDD will either - contain all pairs (k, (v, w)) for w in C{other}, or the pair - (k, (v, None)) if no elements in C{other} have key k. + For each element (k, v) in `self`, the resulting RDD will either + contain all pairs (k, (v, w)) for w in `other`, or the pair + (k, (v, None)) if no elements in `other` have key k. Hash-partitions the resulting RDD into the given number of partitions. @@ -1770,11 +1770,11 @@ def leftOuterJoin(self, other, numPartitions=None): def rightOuterJoin(self, other, numPartitions=None): """ - Perform a right outer join of C{self} and C{other}. + Perform a right outer join of `self` and `other`. - For each element (k, w) in C{other}, the resulting RDD will either + For each element (k, w) in `other`, the resulting RDD will either contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) - if no elements in C{self} have key k. + if no elements in `self` have key k. Hash-partitions the resulting RDD into the given number of partitions. @@ -1787,15 +1787,15 @@ def rightOuterJoin(self, other, numPartitions=None): def fullOuterJoin(self, other, numPartitions=None): """ - Perform a right outer join of C{self} and C{other}. + Perform a right outer join of `self` and `other`. - For each element (k, v) in C{self}, the resulting RDD will either - contain all pairs (k, (v, w)) for w in C{other}, or the pair - (k, (v, None)) if no elements in C{other} have key k. + For each element (k, v) in `self`, the resulting RDD will either + contain all pairs (k, (v, w)) for w in `other`, or the pair + (k, (v, None)) if no elements in `other` have key k. - Similarly, for each element (k, w) in C{other}, the resulting RDD will - either contain all pairs (k, (v, w)) for v in C{self}, or the pair - (k, (None, w)) if no elements in C{self} have key k. + Similarly, for each element (k, w) in `other`, the resulting RDD will + either contain all pairs (k, (v, w)) for v in `self`, or the pair + (k, (None, w)) if no elements in `self` have key k. Hash-partitions the resulting RDD into the given number of partitions. @@ -1891,11 +1891,11 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners, Users provide three functions: - - C{createCombiner}, which turns a V into a C (e.g., creates + - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) - - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of + - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) - - C{mergeCombiners}, to combine two C's into a single one (e.g., merges + - `mergeCombiners`, to combine two C's into a single one (e.g., merges the lists) To avoid memory allocation, both mergeValue and mergeCombiners are allowed to @@ -2072,9 +2072,9 @@ def groupWith(self, other, *others): # TODO: add variant with custom parittioner def cogroup(self, other, numPartitions=None): """ - For each key k in C{self} or C{other}, return a resulting RDD that - contains a tuple with the list of values for that key in C{self} as - well as C{other}. + For each key k in `self` or `other`, return a resulting RDD that + contains a tuple with the list of values for that key in `self` as + well as `other`. >>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2)]) @@ -2106,8 +2106,8 @@ def sampleByKey(self, withReplacement, fractions, seed=None): def subtractByKey(self, other, numPartitions=None): """ - Return each (key, value) pair in C{self} that has no pair with matching - key in C{other}. + Return each (key, value) pair in `self` that has no pair with matching + key in `other`. >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) >>> y = sc.parallelize([("a", 3), ("c", None)]) @@ -2121,7 +2121,7 @@ def filter_func(pair): def subtract(self, other, numPartitions=None): """ - Return each value in C{self} that is not contained in C{other}. + Return each value in `self` that is not contained in `other`. >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) >>> y = sc.parallelize([("a", 3), ("c", None)]) @@ -2134,7 +2134,7 @@ def subtract(self, other, numPartitions=None): def keyBy(self, f): """ - Creates tuples of the elements in this RDD by applying C{f}. + Creates tuples of the elements in this RDD by applying `f`. >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) >>> y = sc.parallelize(zip(range(0,5), range(0,5))) @@ -2260,7 +2260,7 @@ def zipWithUniqueId(self): Items in the kth partition will get ids k, n+k, 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method won't trigger a spark job, which is different from - L{zipWithIndex} + :meth:`zipWithIndex`. >>> sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect() [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)] diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index ddca2a71828a..00f6081a3b14 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -19,12 +19,12 @@ PySpark supports custom serializers for transferring data; this can improve performance. -By default, PySpark uses L{PickleSerializer} to serialize objects using Python's -C{cPickle} serializer, which can serialize nearly any Python object. -Other serializers, like L{MarshalSerializer}, support fewer datatypes but can be +By default, PySpark uses :class:`PickleSerializer` to serialize objects using Python's +`cPickle` serializer, which can serialize nearly any Python object. +Other serializers, like :class:`MarshalSerializer`, support fewer datatypes but can be faster. -The serializer is chosen when creating L{SparkContext}: +The serializer is chosen when creating :class:`SparkContext`: >>> from pyspark.context import SparkContext >>> from pyspark.serializers import MarshalSerializer @@ -34,7 +34,7 @@ >>> sc.stop() PySpark serializes objects in batches; by default, the batch size is chosen based -on the size of objects and is also configurable by SparkContext's C{batchSize} +on the size of objects and is also configurable by SparkContext's `batchSize` parameter: >>> sc = SparkContext('local', 'test', batchSize=2) @@ -129,7 +129,7 @@ class FramedSerializer(Serializer): """ Serializer that writes objects as a stream of (length, data) pairs, - where C{length} is a 32-bit integer and data is C{length} bytes. + where `length` is a 32-bit integer and data is `length` bytes. """ def __init__(self): diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index e66697376c8c..87d4b81bb4dc 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -408,7 +408,7 @@ def checkpoint(self, eager=True): """Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the logical plan of this DataFrame, which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint - directory set with L{SparkContext.setCheckpointDir()}. + directory set with :meth:`SparkContext.setCheckpointDir`. :param eager: Whether to checkpoint this DataFrame immediately @@ -581,9 +581,9 @@ def foreachPartition(self, f): @since(1.3) def cache(self): - """Persists the :class:`DataFrame` with the default storage level (C{MEMORY_AND_DISK}). + """Persists the :class:`DataFrame` with the default storage level (`MEMORY_AND_DISK`). - .. note:: The default storage level has changed to C{MEMORY_AND_DISK} to match Scala in 2.0. + .. note:: The default storage level has changed to `MEMORY_AND_DISK` to match Scala in 2.0. """ self.is_cached = True self._jdf.cache() @@ -594,9 +594,9 @@ def persist(self, storageLevel=StorageLevel.MEMORY_AND_DISK): """Sets the storage level to persist the contents of the :class:`DataFrame` across operations after the first time it is computed. This can only be used to assign a new storage level if the :class:`DataFrame` does not have a storage level set yet. - If no storage level is specified defaults to (C{MEMORY_AND_DISK}). + If no storage level is specified defaults to (`MEMORY_AND_DISK`). - .. note:: The default storage level has changed to C{MEMORY_AND_DISK} to match Scala in 2.0. + .. note:: The default storage level has changed to `MEMORY_AND_DISK` to match Scala in 2.0. """ self.is_cached = True javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index f9b12f15117d..da84fc1e0066 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1405,7 +1405,7 @@ def _create_row(fields, values): class Row(tuple): """ - A row in L{DataFrame}. + A row in :class:`DataFrame`. The fields in it can be accessed: * like attributes (``row.key``) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 6fbe26b64949..769121c19ff4 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -33,7 +33,7 @@ class StreamingContext(object): """ Main entry point for Spark Streaming functionality. A StreamingContext represents the connection to a Spark cluster, and can be used to create - L{DStream} various input sources. It can be from an existing L{SparkContext}. + :class:`DStream` various input sources. It can be from an existing :class:`SparkContext`. After creating and transforming DStreams, the streaming computation can be started and stopped using `context.start()` and `context.stop()`, respectively. `context.awaitTermination()` allows the current thread @@ -48,8 +48,8 @@ def __init__(self, sparkContext, batchDuration=None, jssc=None): """ Create a new StreamingContext. - @param sparkContext: L{SparkContext} object. - @param batchDuration: the time interval (in seconds) at which streaming + :param sparkContext: :class:`SparkContext` object. + :param batchDuration: the time interval (in seconds) at which streaming data will be divided into batches """ @@ -92,8 +92,8 @@ def getOrCreate(cls, checkpointPath, setupFunc): recreated from the checkpoint data. If the data does not exist, then the provided setupFunc will be used to create a new context. - @param checkpointPath: Checkpoint directory used in an earlier streaming program - @param setupFunc: Function to create a new context and setup DStreams + :param checkpointPath: Checkpoint directory used in an earlier streaming program + :param setupFunc: Function to create a new context and setup DStreams """ cls._ensure_initialized() gw = SparkContext._gateway @@ -149,10 +149,10 @@ def getActiveOrCreate(cls, checkpointPath, setupFunc): valid checkpoint data, then setupFunc will be called to create a new context and setup DStreams. - @param checkpointPath: Checkpoint directory used in an earlier streaming program. Can be + :param checkpointPath: Checkpoint directory used in an earlier streaming program. Can be None if the intention is to always create a new context when there is no active context. - @param setupFunc: Function to create a new JavaStreamingContext and setup DStreams + :param setupFunc: Function to create a new JavaStreamingContext and setup DStreams """ if setupFunc is None: @@ -183,7 +183,7 @@ def awaitTermination(self, timeout=None): """ Wait for the execution to stop. - @param timeout: time to wait in seconds + :param timeout: time to wait in seconds """ if timeout is None: self._jssc.awaitTermination() @@ -196,7 +196,7 @@ def awaitTerminationOrTimeout(self, timeout): throw the reported error during the execution; or `false` if the waiting time elapsed before returning from the method. - @param timeout: time to wait in seconds + :param timeout: time to wait in seconds """ return self._jssc.awaitTerminationOrTimeout(int(timeout * 1000)) @@ -205,8 +205,8 @@ def stop(self, stopSparkContext=True, stopGraceFully=False): Stop the execution of the streams, with option of ensuring all received data has been processed. - @param stopSparkContext: Stop the associated SparkContext or not - @param stopGracefully: Stop gracefully by waiting for the processing + :param stopSparkContext: Stop the associated SparkContext or not + :param stopGracefully: Stop gracefully by waiting for the processing of all received data to be completed """ self._jssc.stop(stopSparkContext, stopGraceFully) @@ -223,7 +223,7 @@ def remember(self, duration): the RDDs (if the developer wishes to query old data outside the DStream computation). - @param duration: Minimum duration (in seconds) that each DStream + :param duration: Minimum duration (in seconds) that each DStream should remember its RDDs """ self._jssc.remember(self._jduration(duration)) @@ -233,7 +233,7 @@ def checkpoint(self, directory): Sets the context to periodically checkpoint the DStream operations for master fault-tolerance. The graph will be checkpointed every batch interval. - @param directory: HDFS-compatible directory where the checkpoint data + :param directory: HDFS-compatible directory where the checkpoint data will be reliably stored """ self._jssc.checkpoint(directory) @@ -244,9 +244,9 @@ def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_ a TCP socket and receive byte is interpreted as UTF8 encoded ``\\n`` delimited lines. - @param hostname: Hostname to connect to for receiving data - @param port: Port to connect to for receiving data - @param storageLevel: Storage level to use for storing the received objects + :param hostname: Hostname to connect to for receiving data + :param port: Port to connect to for receiving data + :param storageLevel: Storage level to use for storing the received objects """ jlevel = self._sc._getJavaStorageLevel(storageLevel) return DStream(self._jssc.socketTextStream(hostname, port, jlevel), self, @@ -270,8 +270,8 @@ def binaryRecordsStream(self, directory, recordLength): them from another location within the same file system. File names starting with . are ignored. - @param directory: Directory to load data from - @param recordLength: Length of each record in bytes + :param directory: Directory to load data from + :param recordLength: Length of each record in bytes """ return DStream(self._jssc.binaryRecordsStream(directory, recordLength), self, NoOpSerializer()) @@ -290,9 +290,9 @@ def queueStream(self, rdds, oneAtATime=True, default=None): .. note:: Changes to the queue after the stream is created will not be recognized. - @param rdds: Queue of RDDs - @param oneAtATime: pick one rdd each time or pick all of them once. - @param default: The default rdd if no more in rdds + :param rdds: Queue of RDDs + :param oneAtATime: pick one rdd each time or pick all of them once. + :param default: The default rdd if no more in rdds """ if default and not isinstance(default, RDD): default = self._sc.parallelize(default) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index c253e5ce0e72..60562a6c92af 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -41,11 +41,11 @@ class DStream(object): """ A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a - continuous stream of data (see L{RDD} in the Spark core documentation + continuous stream of data (see :class:`RDD` in the Spark core documentation for more details on RDDs). DStreams can either be created from live data (such as, data from TCP - sockets, etc.) using a L{StreamingContext} or it can be + sockets, etc.) using a :class:`StreamingContext` or it can be generated by transforming existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream periodically generates a RDD, either @@ -167,7 +167,7 @@ def pprint(self, num=10): """ Print the first num elements of each RDD generated in this DStream. - @param num: the number of elements from the first will be printed. + :param num: the number of elements from the first will be printed. """ def takeAndPrint(time, rdd): taken = rdd.take(num + 1) @@ -210,7 +210,7 @@ def func(iterator): def cache(self): """ Persist the RDDs of this DStream with the default storage level - (C{MEMORY_ONLY}). + (`MEMORY_ONLY`). """ self.is_cached = True self.persist(StorageLevel.MEMORY_ONLY) @@ -229,7 +229,7 @@ def checkpoint(self, interval): """ Enable periodic checkpointing of RDDs of this DStream - @param interval: time in seconds, after each period of that, generated + :param interval: time in seconds, after each period of that, generated RDD will be checkpointed """ self.is_checkpointed = True @@ -333,7 +333,7 @@ def union(self, other): """ Return a new DStream by unifying data of another DStream with this DStream. - @param other: Another DStream having the same interval (i.e., slideDuration) + :param other: Another DStream having the same interval (i.e., slideDuration) as this DStream. """ if self._slideDuration != other._slideDuration: @@ -429,9 +429,9 @@ def window(self, windowDuration, slideDuration=None): Return a new DStream in which each RDD contains all the elements in seen in a sliding window of time over this DStream. - @param windowDuration: width of the window; must be a multiple of this DStream's + :param windowDuration: width of the window; must be a multiple of this DStream's batching interval - @param slideDuration: sliding interval of the window (i.e., the interval after which + :param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval """ @@ -455,13 +455,13 @@ def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuratio 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) This is more efficient than `invReduceFunc` is None. - @param reduceFunc: associative and commutative reduce function - @param invReduceFunc: inverse reduce function of `reduceFunc`; such that for all y, + :param reduceFunc: associative and commutative reduce function + :param invReduceFunc: inverse reduce function of `reduceFunc`; such that for all y, and invertible x: `invReduceFunc(reduceFunc(x, y), x) = y` - @param windowDuration: width of the window; must be a multiple of this DStream's + :param windowDuration: width of the window; must be a multiple of this DStream's batching interval - @param slideDuration: sliding interval of the window (i.e., the interval after which + :param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval """ @@ -487,12 +487,12 @@ def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=Non Return a new DStream in which each RDD contains the count of distinct elements in RDDs in a sliding window over this DStream. - @param windowDuration: width of the window; must be a multiple of this DStream's + :param windowDuration: width of the window; must be a multiple of this DStream's batching interval - @param slideDuration: sliding interval of the window (i.e., the interval after which + :param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval - @param numPartitions: number of partitions of each RDD in the new DStream. + :param numPartitions: number of partitions of each RDD in the new DStream. """ keyed = self.map(lambda x: (x, 1)) counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub, @@ -504,12 +504,12 @@ def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None) Return a new DStream by applying `groupByKey` over a sliding window. Similar to `DStream.groupByKey()`, but applies it over a sliding window. - @param windowDuration: width of the window; must be a multiple of this DStream's + :param windowDuration: width of the window; must be a multiple of this DStream's batching interval - @param slideDuration: sliding interval of the window (i.e., the interval after which + :param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval - @param numPartitions: Number of partitions of each RDD in the new DStream. + :param numPartitions: Number of partitions of each RDD in the new DStream. """ ls = self.mapValues(lambda x: [x]) grouped = ls.reduceByKeyAndWindow(lambda a, b: a.extend(b) or a, lambda a, b: a[len(b):], @@ -528,15 +528,15 @@ def reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None `invFunc` can be None, then it will reduce all the RDDs in window, could be slower than having `invFunc`. - @param func: associative and commutative reduce function - @param invFunc: inverse function of `reduceFunc` - @param windowDuration: width of the window; must be a multiple of this DStream's + :param func: associative and commutative reduce function + :param invFunc: inverse function of `reduceFunc` + :param windowDuration: width of the window; must be a multiple of this DStream's batching interval - @param slideDuration: sliding interval of the window (i.e., the interval after which + :param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval - @param numPartitions: number of partitions of each RDD in the new DStream. - @param filterFunc: function to filter expired key-value pairs; + :param numPartitions: number of partitions of each RDD in the new DStream. + :param filterFunc: function to filter expired key-value pairs; only pairs that satisfy the function are retained set this to null if you do not want to filter """ @@ -578,7 +578,7 @@ def updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None): Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values of the key. - @param updateFunc: State update function. If this function returns None, then + :param updateFunc: State update function. If this function returns None, then corresponding state key-value pair will be eliminated. """ if numPartitions is None: diff --git a/python/pyspark/taskcontext.py b/python/pyspark/taskcontext.py index dff5e183bdc7..6d28491e4aff 100644 --- a/python/pyspark/taskcontext.py +++ b/python/pyspark/taskcontext.py @@ -28,7 +28,7 @@ class TaskContext(object): Contextual information about a task which can be read or mutated during execution. To access the TaskContext for a running task, use: - L{TaskContext.get()}. + :meth:`TaskContext.get`. """ _taskContext = None diff --git a/python/pyspark/testing/streamingutils.py b/python/pyspark/testing/streamingutils.py index 4c27f8aad538..a6abc2ef673b 100644 --- a/python/pyspark/testing/streamingutils.py +++ b/python/pyspark/testing/streamingutils.py @@ -137,9 +137,9 @@ def get_output(_, rdd): def _test_func(self, input, func, expected, sort=False, input2=None): """ - @param input: dataset for the test. This should be list of lists. - @param func: wrapped function. This function should return PythonDStream object. - @param expected: expected output for this testcase. + :param input: dataset for the test. This should be list of lists. + :param func: wrapped function. This function should return PythonDStream object. + :param expected: expected output for this testcase. """ if not isinstance(input[0], RDD): input = [self.sc.parallelize(d, 1) for d in input]