11from __future__ import absolute_import
22
3- from collections import defaultdict
43from itertools import izip_longest , repeat
54import logging
65import time
@@ -318,10 +317,17 @@ def get_messages(self, count=1, block=True, timeout=0.1):
318317 if timeout is not None :
319318 max_time = time .time () + timeout
320319
320+ new_offsets = {}
321321 while count > 0 and (timeout is None or timeout > 0 ):
322- message = self .get_message (block , timeout )
323- if message :
324- messages .append (message )
322+ result = self ._get_message (block , timeout , get_partition_info = True ,
323+ update_offset = False )
324+ if result :
325+ partition , message = result
326+ if self .partition_info :
327+ messages .append (result )
328+ else :
329+ messages .append (message )
330+ new_offsets [partition ] = message .offset + 1
325331 count -= 1
326332 else :
327333 # Ran out of messages for the last request.
@@ -333,24 +339,35 @@ def get_messages(self, count=1, block=True, timeout=0.1):
333339 # appropriate value
334340 timeout = max_time - time .time ()
335341
342+ # Update and commit offsets if necessary
343+ self .offsets .update (new_offsets )
344+ self .count_since_commit += len (messages )
345+ self ._auto_commit ()
336346 return messages
337347
338- def get_message (self , block = True , timeout = 0.1 ):
348+ def get_message (self , block = True , timeout = 0.1 , get_partition_info = None ):
349+ return self ._get_message (block , timeout , get_partition_info )
350+
351+ def _get_message (self , block = True , timeout = 0.1 , get_partition_info = None ,
352+ update_offset = True ):
339353 if self .queue .empty ():
340354 # We're out of messages, go grab some more.
341355 with FetchContext (self , block , timeout ):
342356 self ._fetch ()
343357 try :
344358 partition , message = self .queue .get_nowait ()
345359
346- # Update partition offset
347- self .offsets [partition ] = message .offset + 1
360+ if update_offset :
361+ # Update partition offset
362+ self .offsets [partition ] = message .offset + 1
348363
349- # Count, check and commit messages if necessary
350- self .count_since_commit += 1
351- self ._auto_commit ()
364+ # Count, check and commit messages if necessary
365+ self .count_since_commit += 1
366+ self ._auto_commit ()
352367
353- if self .partition_info :
368+ if get_partition_info is None :
369+ get_partition_info = self .partition_info
370+ if get_partition_info :
354371 return partition , message
355372 else :
356373 return message
@@ -613,6 +630,7 @@ def get_messages(self, count=1, block=True, timeout=10):
613630 if timeout is not None :
614631 max_time = time .time () + timeout
615632
633+ new_offsets = {}
616634 while count > 0 and (timeout is None or timeout > 0 ):
617635 # Trigger consumption only if the queue is empty
618636 # By doing this, we will ensure that consumers do not
@@ -627,11 +645,7 @@ def get_messages(self, count=1, block=True, timeout=10):
627645 break
628646
629647 messages .append (message )
630-
631- # Count, check and commit messages if necessary
632- self .offsets [partition ] = message .offset + 1
633- self .count_since_commit += 1
634- self ._auto_commit ()
648+ new_offsets [partition ] = message .offset + 1
635649 count -= 1
636650 if timeout is not None :
637651 timeout = max_time - time .time ()
@@ -640,4 +654,9 @@ def get_messages(self, count=1, block=True, timeout=10):
640654 self .start .clear ()
641655 self .pause .set ()
642656
657+ # Update and commit offsets if necessary
658+ self .offsets .update (new_offsets )
659+ self .count_since_commit += len (messages )
660+ self ._auto_commit ()
661+
643662 return messages
0 commit comments