@@ -46,6 +46,12 @@ def context(self):
4646 """
4747 return self ._ssc
4848
49+ def context (self ):
50+ """
51+ Return the StreamingContext associated with this DStream
52+ """
53+ return self ._ssc
54+
4955 def count (self ):
5056 """
5157 Return a new DStream which contains the number of elements in this DStream.
@@ -56,7 +62,7 @@ def _sum(self):
5662 """
5763 Add up the elements in this DStream.
5864 """
59- return self ._mapPartitions (lambda x : [sum (x )]).reduce (operator .add )
65+ return self .mapPartitions (lambda x : [sum (x )]).reduce (operator .add )
6066
6167 def print_ (self , label = None ):
6268 """
@@ -75,7 +81,7 @@ def filter(self, f):
7581 Return a new DStream containing only the elements that satisfy predicate.
7682 """
7783 def func (iterator ): return ifilter (f , iterator )
78- return self ._mapPartitions (func )
84+ return self .mapPartitions (func )
7985
8086 def flatMap (self , f , preservesPartitioning = False ):
8187 """
@@ -86,7 +92,7 @@ def func(s, iterator):
8692 return chain .from_iterable (imap (f , iterator ))
8793 return self ._mapPartitionsWithIndex (func , preservesPartitioning )
8894
89- def map (self , f ):
95+ def map (self , f , preservesPartitioning = False ):
9096 """
9197 Return a new DStream by applying a function to each element of DStream.
9298 """
@@ -146,7 +152,7 @@ def combineLocally(iterator):
146152 else :
147153 combiners [k ] = mergeValue (combiners [k ], v )
148154 return combiners .iteritems ()
149- locally_combined = self ._mapPartitions (combineLocally )
155+ locally_combined = self .mapPartitions (combineLocally )
150156 shuffled = locally_combined .partitionBy (numPartitions )
151157
152158 def _mergeCombiners (iterator ):
@@ -474,4 +480,4 @@ def _jdstream(self):
474480 return self ._jdstream_val
475481
476482 def _is_pipelinable (self ):
477- return not ( self .is_cached )
483+ return not self .is_cached
0 commit comments