@@ -284,7 +284,7 @@ def transform(self, func):
284284 on each RDD of 'this' DStream.
285285
286286 `func` can have one argument of `rdd`, or have two arguments of
287- (`time`, `rdd`)
287+ (`time`, `rdd`)
288288 """
289289 resue = False
290290 if func .func_code .co_argcount == 1 :
@@ -328,7 +328,8 @@ def _slideDuration(self):
328328 def union (self , other ):
329329 """
330330 Return a new DStream by unifying data of another DStream with this DStream.
331- @param other Another DStream having the same interval (i.e., slideDuration)
331+
332+ @param other: Another DStream having the same interval (i.e., slideDuration)
332333 as this DStream.
333334 """
334335 if self ._slideDuration != other ._slideDuration :
@@ -348,47 +349,47 @@ def cogroup(self, other, numPartitions=None):
348349
349350 def join (self , other , numPartitions = None ):
350351 """
351- Return a new DStream by applying 'join' between RDDs of `this` DStream and
352+ Return a new DStream by applying 'join' between RDDs of `this` DStream and
352353 `other` DStream.
353354
354355 Hash partitioning is used to generate the RDDs with `numPartitions`
355- partitions.
356+ partitions.
356357 """
357358 if numPartitions is None :
358359 numPartitions = self .ctx .defaultParallelism
359360 return self .transformWith (lambda a , b : a .join (b , numPartitions ), other )
360361
361362 def leftOuterJoin (self , other , numPartitions = None ):
362363 """
363- Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
364+ Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
364365 `other` DStream.
365366
366367 Hash partitioning is used to generate the RDDs with `numPartitions`
367- partitions.
368+ partitions.
368369 """
369370 if numPartitions is None :
370371 numPartitions = self .ctx .defaultParallelism
371372 return self .transformWith (lambda a , b : a .leftOuterJoin (b , numPartitions ), other )
372373
373374 def rightOuterJoin (self , other , numPartitions = None ):
374375 """
375- Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
376+ Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
376377 `other` DStream.
377378
378379 Hash partitioning is used to generate the RDDs with `numPartitions`
379- partitions.
380+ partitions.
380381 """
381382 if numPartitions is None :
382383 numPartitions = self .ctx .defaultParallelism
383384 return self .transformWith (lambda a , b : a .rightOuterJoin (b , numPartitions ), other )
384385
385386 def fullOuterJoin (self , other , numPartitions = None ):
386387 """
387- Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
388+ Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and
388389 `other` DStream.
389390
390391 Hash partitioning is used to generate the RDDs with `numPartitions`
391- partitions.
392+ partitions.
392393 """
393394 if numPartitions is None :
394395 numPartitions = self .ctx .defaultParallelism
@@ -424,9 +425,9 @@ def window(self, windowDuration, slideDuration=None):
424425 Return a new DStream in which each RDD contains all the elements in seen in a
425426 sliding window of time over this DStream.
426427
427- @param windowDuration width of the window; must be a multiple of this DStream's
428+ @param windowDuration: width of the window; must be a multiple of this DStream's
428429 batching interval
429- @param slideDuration sliding interval of the window (i.e., the interval after which
430+ @param slideDuration: sliding interval of the window (i.e., the interval after which
430431 the new DStream will generate RDDs); must be a multiple of this
431432 DStream's batching interval
432433 """
@@ -448,13 +449,13 @@ def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuratio
448449 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
449450 This is more efficient than `invReduceFunc` is None.
450451
451- @param reduceFunc associative reduce function
452- @param invReduceFunc inverse reduce function of `reduceFunc`
453- @param windowDuration width of the window; must be a multiple of this DStream's
454- batching interval
455- @param slideDuration sliding interval of the window (i.e., the interval after which
456- the new DStream will generate RDDs); must be a multiple of this
457- DStream's batching interval
452+ @param reduceFunc: associative reduce function
453+ @param invReduceFunc: inverse reduce function of `reduceFunc`
454+ @param windowDuration: width of the window; must be a multiple of this DStream's
455+ batching interval
456+ @param slideDuration: sliding interval of the window (i.e., the interval after which
457+ the new DStream will generate RDDs); must be a multiple of this
458+ DStream's batching interval
458459 """
459460 keyed = self .map (lambda x : (1 , x ))
460461 reduced = keyed .reduceByKeyAndWindow (reduceFunc , invReduceFunc ,
@@ -478,12 +479,12 @@ def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=Non
478479 Return a new DStream in which each RDD contains the count of distinct elements in
479480 RDDs in a sliding window over this DStream.
480481
481- @param windowDuration width of the window; must be a multiple of this DStream's
482+ @param windowDuration: width of the window; must be a multiple of this DStream's
482483 batching interval
483- @param slideDuration sliding interval of the window (i.e., the interval after which
484+ @param slideDuration: sliding interval of the window (i.e., the interval after which
484485 the new DStream will generate RDDs); must be a multiple of this
485486 DStream's batching interval
486- @param numPartitions number of partitions of each RDD in the new DStream.
487+ @param numPartitions: number of partitions of each RDD in the new DStream.
487488 """
488489 keyed = self .map (lambda x : (x , 1 ))
489490 counted = keyed .reduceByKeyAndWindow (operator .add , operator .sub ,
@@ -495,12 +496,12 @@ def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None)
495496 Return a new DStream by applying `groupByKey` over a sliding window.
496497 Similar to `DStream.groupByKey()`, but applies it over a sliding window.
497498
498- @param windowDuration width of the window; must be a multiple of this DStream's
499+ @param windowDuration: width of the window; must be a multiple of this DStream's
499500 batching interval
500- @param slideDuration sliding interval of the window (i.e., the interval after which
501+ @param slideDuration: sliding interval of the window (i.e., the interval after which
501502 the new DStream will generate RDDs); must be a multiple of this
502503 DStream's batching interval
503- @param numPartitions Number of partitions of each RDD in the new DStream.
504+ @param numPartitions: Number of partitions of each RDD in the new DStream.
504505 """
505506 ls = self .mapValues (lambda x : [x ])
506507 grouped = ls .reduceByKeyAndWindow (lambda a , b : a .extend (b ) or a , lambda a , b : a [len (b ):],
@@ -519,15 +520,15 @@ def reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None
519520 `invFunc` can be None, then it will reduce all the RDDs in window, could be slower
520521 than having `invFunc`.
521522
522- @param reduceFunc associative reduce function
523- @param invReduceFunc inverse function of `reduceFunc`
524- @param windowDuration width of the window; must be a multiple of this DStream's
523+ @param reduceFunc: associative reduce function
524+ @param invReduceFunc: inverse function of `reduceFunc`
525+ @param windowDuration: width of the window; must be a multiple of this DStream's
525526 batching interval
526- @param slideDuration sliding interval of the window (i.e., the interval after which
527+ @param slideDuration: sliding interval of the window (i.e., the interval after which
527528 the new DStream will generate RDDs); must be a multiple of this
528529 DStream's batching interval
529- @param numPartitions number of partitions of each RDD in the new DStream.
530- @param filterFunc function to filter expired key-value pairs;
530+ @param numPartitions: number of partitions of each RDD in the new DStream.
531+ @param filterFunc: function to filter expired key-value pairs;
531532 only pairs that satisfy the function are retained
532533 set this to null if you do not want to filter
533534 """
@@ -567,7 +568,7 @@ def updateStateByKey(self, updateFunc, numPartitions=None):
567568 Return a new "state" DStream where the state for each key is updated by applying
568569 the given function on the previous state of the key and the new values of the key.
569570
570- @param updateFunc State update function ([(k, vs, s)] -> [(k, s)]).
571+ @param updateFunc: State update function ([(k, vs, s)] -> [(k, s)]).
571572 If `s` is None, then `k` will be eliminated.
572573 """
573574 if numPartitions is None :
0 commit comments