Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka-src
Submodule kafka-src updated 340 files
18 changes: 7 additions & 11 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from kafka.common import (
ErrorMapping, FetchRequest,
OffsetRequest, OffsetCommitRequest,
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
)

Expand Down Expand Up @@ -105,17 +105,13 @@ def get_or_init_offset_callback(resp):
"partition=%d failed with errorcode=%s" % (
resp.topic, resp.partition, resp.error))

# Uncomment for 0.8.1
#
#for partition in partitions:
# req = OffsetFetchRequest(topic, partition)
# (offset,) = self.client.send_offset_fetch_request(group, [req],
# callback=get_or_init_offset_callback,
# fail_on_error=False)
# self.offsets[partition] = offset

for partition in partitions:
self.offsets[partition] = 0
req = OffsetFetchRequest(topic, partition)
(offset,) = self.client.send_offset_fetch_request(
group, [req],
callback=get_or_init_offset_callback,
fail_on_error=False)
self.offsets[partition] = offset

def commit(self, partitions=None):
"""
Expand Down