2727from subprocess import Popen , PIPE
2828from tempfile import NamedTemporaryFile
2929from threading import Thread
30+ import warnings
3031
3132from pyspark .serializers import NoOpSerializer , CartesianDeserializer , \
3233 BatchedSerializer , CloudPickleSerializer , pack_long
@@ -179,7 +180,7 @@ def flatMap(self, f, preservesPartitioning=False):
179180 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
180181 """
181182 def func (s , iterator ): return chain .from_iterable (imap (f , iterator ))
182- return self .mapPartitionsWithSplit (func , preservesPartitioning )
183+ return self .mapPartitionsWithIndex (func , preservesPartitioning )
183184
184185 def mapPartitions (self , f , preservesPartitioning = False ):
185186 """
@@ -191,10 +192,24 @@ def mapPartitions(self, f, preservesPartitioning=False):
191192 [3, 7]
192193 """
193194 def func (s , iterator ): return f (iterator )
194- return self .mapPartitionsWithSplit (func )
195+ return self .mapPartitionsWithIndex (func )
196+
197+ def mapPartitionsWithIndex (self , f , preservesPartitioning = False ):
198+ """
199+ Return a new RDD by applying a function to each partition of this RDD,
200+ while tracking the index of the original partition.
201+
202+ >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
203+ >>> def f(splitIndex, iterator): yield splitIndex
204+ >>> rdd.mapPartitionsWithIndex(f).sum()
205+ 6
206+ """
207+ return PipelinedRDD (self , f , preservesPartitioning )
195208
196209 def mapPartitionsWithSplit (self , f , preservesPartitioning = False ):
197210 """
211+ Deprecated: use mapPartitionsWithIndex instead.
212+
198213 Return a new RDD by applying a function to each partition of this RDD,
199214 while tracking the index of the original partition.
200215
@@ -203,7 +218,9 @@ def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
203218 >>> rdd.mapPartitionsWithSplit(f).sum()
204219 6
205220 """
206- return PipelinedRDD (self , f , preservesPartitioning )
221+ warnings .warn ("mapPartitionsWithSplit is deprecated; "
222+ "use mapPartitionsWithIndex instead" , DeprecationWarning , stacklevel = 2 )
223+ return self .mapPartitionsWithIndex (f , preservesPartitioning )
207224
208225 def filter (self , f ):
209226 """
@@ -235,7 +252,7 @@ def sample(self, withReplacement, fraction, seed):
235252 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
236253 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
237254 """
238- return self .mapPartitionsWithSplit (RDDSampler (withReplacement , fraction , seed ).func , True )
255+ return self .mapPartitionsWithIndex (RDDSampler (withReplacement , fraction , seed ).func , True )
239256
240257 # this is ported from scala/spark/RDD.scala
241258 def takeSample (self , withReplacement , num , seed ):
0 commit comments