11from __future__ import absolute_import
22
3- from collections import defaultdict
43from itertools import izip_longest , repeat
54import logging
65import time
@@ -235,6 +234,12 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
235234 buffer_size = FETCH_BUFFER_SIZE_BYTES ,
236235 max_buffer_size = MAX_FETCH_BUFFER_SIZE_BYTES ,
237236 iter_timeout = None ):
237+ super (SimpleConsumer , self ).__init__ (
238+ client , group , topic ,
239+ partitions = partitions ,
240+ auto_commit = auto_commit ,
241+ auto_commit_every_n = auto_commit_every_n ,
242+ auto_commit_every_t = auto_commit_every_t )
238243
239244 if max_buffer_size is not None and buffer_size > max_buffer_size :
240245 raise ValueError ("buffer_size (%d) is greater than "
@@ -245,17 +250,10 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
245250 self .partition_info = False # Do not return partition info in msgs
246251 self .fetch_max_wait_time = FETCH_MAX_WAIT_TIME
247252 self .fetch_min_bytes = fetch_size_bytes
248- self .fetch_started = defaultdict ( bool ) # defaults to false
253+ self .fetch_offsets = self . offsets . copy ()
249254 self .iter_timeout = iter_timeout
250255 self .queue = Queue ()
251256
252- super (SimpleConsumer , self ).__init__ (
253- client , group , topic ,
254- partitions = partitions ,
255- auto_commit = auto_commit ,
256- auto_commit_every_n = auto_commit_every_n ,
257- auto_commit_every_t = auto_commit_every_t )
258-
259257 def __repr__ (self ):
260258 return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
261259 (self .group , self .topic , str (self .offsets .keys ()))
@@ -305,6 +303,10 @@ def seek(self, offset, whence):
305303 else :
306304 raise ValueError ("Unexpected value for `whence`, %d" % whence )
307305
306+ # Reset queue and fetch offsets since they are invalid
307+ self .fetch_offsets = self .offsets .copy ()
308+ self .queue = Queue ()
309+
308310 def get_messages (self , count = 1 , block = True , timeout = 0.1 ):
309311 """
310312 Fetch the specified number of messages
@@ -316,33 +318,69 @@ def get_messages(self, count=1, block=True, timeout=0.1):
316318 it will block forever.
317319 """
318320 messages = []
319- if timeout :
321+ if timeout is not None :
320322 max_time = time .time () + timeout
321323
324+ new_offsets = {}
322325 while count > 0 and (timeout is None or timeout > 0 ):
323- message = self .get_message (block , timeout )
324- if message :
325- messages .append (message )
326+ result = self ._get_message (block , timeout , get_partition_info = True ,
327+ update_offset = False )
328+ if result :
329+ partition , message = result
330+ if self .partition_info :
331+ messages .append (result )
332+ else :
333+ messages .append (message )
334+ new_offsets [partition ] = message .offset + 1
326335 count -= 1
327336 else :
328337 # Ran out of messages for the last request.
329338 if not block :
330339 # If we're not blocking, break.
331340 break
332- if timeout :
341+ if timeout is not None :
333342 # If we're blocking and have a timeout, reduce it to the
334343 # appropriate value
335344 timeout = max_time - time .time ()
336345
346+ # Update and commit offsets if necessary
347+ self .offsets .update (new_offsets )
348+ self .count_since_commit += len (messages )
349+ self ._auto_commit ()
337350 return messages
338351
339- def get_message (self , block = True , timeout = 0.1 ):
352+ def get_message (self , block = True , timeout = 0.1 , get_partition_info = None ):
353+ return self ._get_message (block , timeout , get_partition_info )
354+
355+ def _get_message (self , block = True , timeout = 0.1 , get_partition_info = None ,
356+ update_offset = True ):
357+ """
358+ If no messages can be fetched, returns None.
359+ If get_partition_info is None, it defaults to self.partition_info
360+ If get_partition_info is True, returns (partition, message)
361+ If get_partition_info is False, returns message
362+ """
340363 if self .queue .empty ():
341364 # We're out of messages, go grab some more.
342365 with FetchContext (self , block , timeout ):
343366 self ._fetch ()
344367 try :
345- return self .queue .get_nowait ()
368+ partition , message = self .queue .get_nowait ()
369+
370+ if update_offset :
371+ # Update partition offset
372+ self .offsets [partition ] = message .offset + 1
373+
374+ # Count, check and commit messages if necessary
375+ self .count_since_commit += 1
376+ self ._auto_commit ()
377+
378+ if get_partition_info is None :
379+ get_partition_info = self .partition_info
380+ if get_partition_info :
381+ return partition , message
382+ else :
383+ return message
346384 except Empty :
347385 return None
348386
@@ -367,11 +405,11 @@ def __iter__(self):
367405 def _fetch (self ):
368406 # Create fetch request payloads for all the partitions
369407 requests = []
370- partitions = self .offsets .keys ()
408+ partitions = self .fetch_offsets .keys ()
371409 while partitions :
372410 for partition in partitions :
373411 requests .append (FetchRequest (self .topic , partition ,
374- self .offsets [partition ],
412+ self .fetch_offsets [partition ],
375413 self .buffer_size ))
376414 # Send request
377415 responses = self .client .send_fetch_request (
@@ -384,18 +422,9 @@ def _fetch(self):
384422 partition = resp .partition
385423 try :
386424 for message in resp .messages :
387- # Update partition offset
388- self .offsets [partition ] = message .offset + 1
389-
390- # Count, check and commit messages if necessary
391- self .count_since_commit += 1
392- self ._auto_commit ()
393-
394425 # Put the message in our queue
395- if self .partition_info :
396- self .queue .put ((partition , message ))
397- else :
398- self .queue .put (message )
426+ self .queue .put ((partition , message ))
427+ self .fetch_offsets [partition ] = message .offset + 1
399428 except ConsumerFetchSizeTooSmall , e :
400429 if (self .max_buffer_size is not None and
401430 self .buffer_size == self .max_buffer_size ):
@@ -585,12 +614,11 @@ def __iter__(self):
585614 break
586615
587616 # Count, check and commit messages if necessary
588- self .offsets [partition ] = message .offset
617+ self .offsets [partition ] = message .offset + 1
589618 self .start .clear ()
590- yield message
591-
592619 self .count_since_commit += 1
593620 self ._auto_commit ()
621+ yield message
594622
595623 self .start .clear ()
596624
@@ -613,9 +641,10 @@ def get_messages(self, count=1, block=True, timeout=10):
613641 self .size .value = count
614642 self .pause .clear ()
615643
616- if timeout :
644+ if timeout is not None :
617645 max_time = time .time () + timeout
618646
647+ new_offsets = {}
619648 while count > 0 and (timeout is None or timeout > 0 ):
620649 # Trigger consumption only if the queue is empty
621650 # By doing this, we will ensure that consumers do not
@@ -630,16 +659,18 @@ def get_messages(self, count=1, block=True, timeout=10):
630659 break
631660
632661 messages .append (message )
633-
634- # Count, check and commit messages if necessary
635- self .offsets [partition ] = message .offset
636- self .count_since_commit += 1
637- self ._auto_commit ()
662+ new_offsets [partition ] = message .offset + 1
638663 count -= 1
639- timeout = max_time - time .time ()
664+ if timeout is not None :
665+ timeout = max_time - time .time ()
640666
641667 self .size .value = 0
642668 self .start .clear ()
643669 self .pause .set ()
644670
671+ # Update and commit offsets if necessary
672+ self .offsets .update (new_offsets )
673+ self .count_since_commit += len (messages )
674+ self ._auto_commit ()
675+
645676 return messages
0 commit comments