diff --git a/kafka/consumer.py b/kafka/consumer.py index f2898ad43..9aac55d35 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -65,6 +65,7 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True, auto_commit_every_t=AUTO_COMMIT_INTERVAL): self.client = client + self.currentbuffersize = client.bufsize self.topic = topic self.group = group self.client._load_metadata_for_topics(topic) @@ -362,8 +363,9 @@ def __iter_partition__(self, partition, offset): while True: # use MaxBytes = client's bufsize since we're only # fetching one topic + partition + req = FetchRequest( - self.topic, partition, offset, self.client.bufsize) + self.topic, partition, offset, self.currentbuffersize) (resp,) = self.client.send_fetch_request( [req], @@ -376,6 +378,7 @@ def __iter_partition__(self, partition, offset): next_offset = None try: for message in resp.messages: + self.currentbuffersize = self.client.bufsize next_offset = message.offset # update the offset before the message is yielded. This @@ -390,10 +393,10 @@ def __iter_partition__(self, partition, offset): self.offsets[partition] = message.offset yield message except ConsumerFetchSizeTooSmall, e: - fetch_size *= 1.5 + self.currentbuffersize *= 2 log.warn( - "Fetch size too small, increasing to %d (1.5x) and retry", - fetch_size) + "Fetch size too small, increasing to %d (2x) and retry", + self.currentbuffersize) continue except ConsumerNoMoreData, e: log.debug("Iteration was ended by %r", e)