From 1fe5be48b19db048687ed4f12c54fc973eb1f471 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20Subtil?= Date: Fri, 11 Apr 2014 18:18:28 +0200 Subject: [PATCH 1/2] Always use OffsetFetchRequest in consumers --- kafka/consumer.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 8ac28daf4..ce29c35d2 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -9,7 +9,7 @@ from kafka.common import ( ErrorMapping, FetchRequest, - OffsetRequest, OffsetCommitRequest, + OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData ) @@ -105,17 +105,13 @@ def get_or_init_offset_callback(resp): "partition=%d failed with errorcode=%s" % ( resp.topic, resp.partition, resp.error)) - # Uncomment for 0.8.1 - # - #for partition in partitions: - # req = OffsetFetchRequest(topic, partition) - # (offset,) = self.client.send_offset_fetch_request(group, [req], - # callback=get_or_init_offset_callback, - # fail_on_error=False) - # self.offsets[partition] = offset - for partition in partitions: - self.offsets[partition] = 0 + req = OffsetFetchRequest(topic, partition) + (offset,) = self.client.send_offset_fetch_request( + group, [req], + callback=get_or_init_offset_callback, + fail_on_error=False) + self.offsets[partition] = offset def commit(self, partitions=None): """ From 8ba017ade5f7d50a5752a92db9057bea72512686 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20Subtil?= Date: Fri, 11 Apr 2014 19:18:48 +0200 Subject: [PATCH 2/2] Update kafka-src to 0.8.1 --- kafka-src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-src b/kafka-src index 15bb3961d..b5971264f 160000 --- a/kafka-src +++ b/kafka-src @@ -1 +1 @@ -Subproject commit 15bb3961d9171c1c54c4c840a554ce2c76168163 +Subproject commit b5971264f29c6646bc543b47c58786f6322f0bd0