@@ -57,7 +57,7 @@ def __init__(self, creator, combiner, mergeCombiner=None):
5757class Merger (object ):
5858
5959 """
60- merge shuffled data together by combinator
60+ merge shuffled data together by aggregator
6161 """
6262
6363 def __init__ (self , aggregator ):
@@ -77,8 +77,9 @@ def iteritems(self):
7777
7878
7979class InMemoryMerger (Merger ):
80+
8081 """
81- In memory merger based on map
82+ In memory merger based on in-memory dict.
8283 """
8384
8485 def __init__ (self , aggregator ):
@@ -107,8 +108,30 @@ def iteritems(self):
107108class ExternalMerger (Merger ):
108109
109110 """
110- External merger will dump the aggregated data into disks when memory usage
111- is above the limit, then merge them together.
111+ External merger will dump the aggregated data into disks when
112+ memory usage goes above the limit, then merge them together.
113+
114+ This class works as follows:
115+
116+ - It repeatedly combine the items and save them in one dict in
117+ memory.
118+
119+ - When the used memory goes above memory limit, it will split
120+ the combined data into partitions by hash code, dump them
121+ into disk, one file per partition.
122+
123+ - Then it goes through the rest of the iterator, combine items
124+ into different dict by hash. Until the used memory goes over
125+ memory limit, it dump all the dicts into disks, one file per
126+ dict. Repeat this again until combine all the items.
127+
128+ - Before return any items, it will load each partition and
129+ combine them seperately. Yield them before loading next
130+ partition.
131+
132+ - During loading a partition, if the memory goes over limit,
133+ it will partition the loaded data and dump them into disks
134+ and load them partition by partition again.
112135
113136 >>> agg = Aggregator(lambda x: x, lambda x, y: x + y)
114137 >>> merger = ExternalMerger(agg, 10)
0 commit comments