From 1b7cc327d148389c2c73d73140a24496b2f183ac Mon Sep 17 00:00:00 2001 From: motherhubbard Date: Mon, 2 Dec 2013 09:34:43 +0000 Subject: [PATCH] Resize buffer (rather than minbytes) Fix for issue 73. Also: Change buffer growth factor to be 2x (stops decimals when growing the buffer and gives a nicer multiple). Stop buffer growth from starting at 1 byte (when we already know the previous buffer (default of 4k) was too small --- kafka/consumer.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index f2898ad43..9aac55d35 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -65,6 +65,7 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True, auto_commit_every_t=AUTO_COMMIT_INTERVAL): self.client = client + self.currentbuffersize = client.bufsize self.topic = topic self.group = group self.client._load_metadata_for_topics(topic) @@ -362,8 +363,9 @@ def __iter_partition__(self, partition, offset): while True: # use MaxBytes = client's bufsize since we're only # fetching one topic + partition + req = FetchRequest( - self.topic, partition, offset, self.client.bufsize) + self.topic, partition, offset, self.currentbuffersize) (resp,) = self.client.send_fetch_request( [req], @@ -376,6 +378,7 @@ def __iter_partition__(self, partition, offset): next_offset = None try: for message in resp.messages: + self.currentbuffersize = self.client.bufsize next_offset = message.offset # update the offset before the message is yielded. This @@ -390,10 +393,10 @@ def __iter_partition__(self, partition, offset): self.offsets[partition] = message.offset yield message except ConsumerFetchSizeTooSmall, e: - fetch_size *= 1.5 + self.currentbuffersize *= 2 log.warn( - "Fetch size too small, increasing to %d (1.5x) and retry", - fetch_size) + "Fetch size too small, increasing to %d (2x) and retry", + self.currentbuffersize) continue except ConsumerNoMoreData, e: log.debug("Iteration was ended by %r", e)