@@ -250,7 +250,7 @@ def getCheckpointFile(self):
250250 def map (self , f , preservesPartitioning = False ):
251251 """
252252 Return a new RDD by applying a function to each element of this RDD.
253-
253+
254254 >>> rdd = sc.parallelize(["b", "a", "c"])
255255 >>> sorted(rdd.map(lambda x: (x, 1)).collect())
256256 [('a', 1), ('b', 1), ('c', 1)]
@@ -312,6 +312,15 @@ def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
312312 "use mapPartitionsWithIndex instead" , DeprecationWarning , stacklevel = 2 )
313313 return self .mapPartitionsWithIndex (f , preservesPartitioning )
314314
315+ def getNumPartitions (self ):
316+ """
317+ Returns the number of partitions in RDD
318+ >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
319+ >>> rdd.getNumPartitions()
320+ 2
321+ """
322+ return self ._jrdd .splits ().size ()
323+
315324 def filter (self , f ):
316325 """
317326 Return a new RDD containing only the elements that satisfy a predicate.
@@ -413,9 +422,9 @@ def union(self, other):
413422
414423 def intersection (self , other ):
415424 """
416- Return the intersection of this RDD and another one. The output will not
425+ Return the intersection of this RDD and another one. The output will not
417426 contain any duplicate elements, even if the input RDDs did.
418-
427+
419428 Note that this method performs a shuffle internally.
420429
421430 >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
@@ -571,14 +580,14 @@ def foreachPartition(self, f):
571580 """
572581 Applies a function to each partition of this RDD.
573582
574- >>> def f(iterator):
575- ... for x in iterator:
576- ... print x
583+ >>> def f(iterator):
584+ ... for x in iterator:
585+ ... print x
577586 ... yield None
578587 >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
579588 """
580589 self .mapPartitions (f ).collect () # Force evaluation
581-
590+
582591 def collect (self ):
583592 """
584593 Return a list that contains all of the elements in this RDD.
@@ -673,7 +682,7 @@ def func(iterator):
673682 yield acc
674683
675684 return self .mapPartitions (func ).fold (zeroValue , combOp )
676-
685+
677686
678687 def max (self ):
679688 """
@@ -692,7 +701,7 @@ def min(self):
692701 1.0
693702 """
694703 return self .reduce (min )
695-
704+
696705 def sum (self ):
697706 """
698707 Add up the elements in this RDD.
@@ -786,7 +795,7 @@ def mergeMaps(m1, m2):
786795 m1 [k ] += v
787796 return m1
788797 return self .mapPartitions (countPartition ).reduce (mergeMaps )
789-
798+
790799 def top (self , num ):
791800 """
792801 Get the top N elements from a RDD.
@@ -814,7 +823,7 @@ def merge(a, b):
814823 def takeOrdered (self , num , key = None ):
815824 """
816825 Get the N elements from a RDD ordered in ascending order or as specified
817- by the optional key function.
826+ by the optional key function.
818827
819828 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
820829 [1, 2, 3, 4, 5, 6]
@@ -834,7 +843,7 @@ def unKey(x, key_=None):
834843 if key_ != None :
835844 x = [i [1 ] for i in x ]
836845 return x
837-
846+
838847 def merge (a , b ):
839848 return next (topNKeyedElems (a + b ))
840849 result = self .mapPartitions (lambda i : topNKeyedElems (i , key )).reduce (merge )
@@ -1169,21 +1178,21 @@ def _mergeCombiners(iterator):
11691178 combiners [k ] = mergeCombiners (combiners [k ], v )
11701179 return combiners .iteritems ()
11711180 return shuffled .mapPartitions (_mergeCombiners )
1172-
1181+
11731182 def foldByKey (self , zeroValue , func , numPartitions = None ):
11741183 """
11751184 Merge the values for each key using an associative function "func" and a neutral "zeroValue"
1176- which may be added to the result an arbitrary number of times, and must not change
1177- the result (e.g., 0 for addition, or 1 for multiplication.).
1185+ which may be added to the result an arbitrary number of times, and must not change
1186+ the result (e.g., 0 for addition, or 1 for multiplication.).
11781187
11791188 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
11801189 >>> from operator import add
11811190 >>> rdd.foldByKey(0, add).collect()
11821191 [('a', 2), ('b', 1)]
11831192 """
11841193 return self .combineByKey (lambda v : func (zeroValue , v ), func , func , numPartitions )
1185-
1186-
1194+
1195+
11871196 # TODO: support variant with custom partitioner
11881197 def groupByKey (self , numPartitions = None ):
11891198 """
@@ -1302,7 +1311,7 @@ def keyBy(self, f):
13021311 def repartition (self , numPartitions ):
13031312 """
13041313 Return a new RDD that has exactly numPartitions partitions.
1305-
1314+
13061315 Can increase or decrease the level of parallelism in this RDD. Internally, this uses
13071316 a shuffle to redistribute data.
13081317 If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
0 commit comments