2525
2626try :
2727 import psutil
28+
2829 def get_used_memory ():
2930 self = psutil .Process (os .getpid ())
3031 return self .memory_info ().rss >> 20
3132
3233except ImportError :
34+
3335 def get_used_memory ():
3436 if platform .system () == 'Linux' :
3537 for line in open ('/proc/self/status' ):
@@ -48,6 +50,7 @@ class Merger(object):
4850 """
4951 merge shuffled data together by combinator
5052 """
53+
5154 def merge (self , iterator ):
5255 raise NotImplementedError
5356
@@ -59,13 +62,14 @@ class MapMerger(Merger):
5962 """
6063 In memory merger based on map
6164 """
65+
6266 def __init__ (self , combiner ):
6367 self .combiner = combiner
6468 self .data = {}
6569
6670 def merge (self , iterator ):
6771 d , comb = self .data , self .combiner
68- for k ,v in iter (iterator ):
72+ for k , v in iter (iterator ):
6973 d [k ] = comb (d [k ], v ) if k in d else v
7074
7175 def iteritems (self ):
@@ -75,7 +79,7 @@ def iteritems(self):
7579class ExternalHashMapMerger (Merger ):
7680
7781 """
78- External merger will dump the aggregated data into disks when memory usage
82+ External merger will dump the aggregated data into disks when memory usage
7983 is above the limit, then merge them together.
8084
8185 >>> combiner = lambda x, y:x+y
@@ -85,7 +89,6 @@ class ExternalHashMapMerger(Merger):
8589 >>> assert merger.spills > 0
8690 >>> sum(v for k,v in merger.iteritems())
8791 499950000
88- >>>
8992 """
9093
9194 PARTITIONS = 64
@@ -95,7 +98,8 @@ def __init__(self, combiner, memory_limit=512, serializer=None,
9598 localdirs = None , scale = 1 ):
9699 self .combiner = combiner
97100 self .memory_limit = memory_limit
98- self .serializer = serializer or BatchedSerializer (AutoSerializer (), 1024 )
101+ self .serializer = serializer or \
102+ BatchedSerializer (AutoSerializer (), 1024 )
99103 self .localdirs = localdirs or self ._get_dirs ()
100104 self .scale = scale
101105 self .data = {}
@@ -182,7 +186,7 @@ def _spill(self):
182186 os .makedirs (path )
183187 for i in range (self .PARTITIONS ):
184188 p = os .path .join (path , str (i ))
185- with open (p , 'w' ) as f :
189+ with open (p , "w" ) as f :
186190 self .serializer .dump_stream (self .pdata [i ].iteritems (), f )
187191 self .pdata [i ].clear ()
188192 self .spills += 1
@@ -205,9 +209,9 @@ def _external_items(self):
205209 for j in range (self .spills ):
206210 path = self ._get_spill_dir (j )
207211 p = os .path .join (path , str (i ))
208- self .merge (self .serializer .load_stream (open (p )), check = False )
212+ self .merge (self .serializer .load_stream (open (p )), False )
209213
210- if j > 0 and self .used_memory > hard_limit and j < self .spills - 1 :
214+ if self .used_memory > hard_limit and j < self .spills - 1 :
211215 self .data .clear () # will read from disk again
212216 for v in self ._recursive_merged_items (i ):
213217 yield v
@@ -230,9 +234,10 @@ def _recursive_merged_items(self, start):
230234 self ._spill ()
231235
232236 for i in range (start , self .PARTITIONS ):
233- subdirs = [os .path .join (d , 'merge' , str (i )) for d in self .localdirs ]
237+ subdirs = [os .path .join (d , "merge" , str (i ))
238+ for d in self .localdirs ]
234239 m = ExternalHashMapMerger (self .combiner , self .memory_limit ,
235- self .serializer , subdirs , self .scale * self .PARTITIONS )
240+ self .serializer , subdirs , self .scale * self .PARTITIONS )
236241 m .pdata = [{} for _ in range (self .PARTITIONS )]
237242 limit = self .next_limit
238243
@@ -248,6 +253,6 @@ def _recursive_merged_items(self, start):
248253 yield v
249254
250255
251- if __name__ == ' __main__' :
256+ if __name__ == " __main__" :
252257 import doctest
253258 doctest .testmod ()
0 commit comments