Skip to content

Conversation

@rdiomar
Copy link
Collaborator

@rdiomar rdiomar commented Jan 16, 2014

These changes do the following:

  • Only update offsets just before returning messages to the caller rather than after fetching them from kafka
  • Never re-fetch messages already in the internal queue
  • Clear the queue after seeking since the messages are no longer the right ones. It will clear the queue even if the offsets did not actually change, but I'm trying to keep the code simple instead of handling this corner case efficiently.
  • Fix offsets stored in MultiProcessConsumer

* Increment the offset before returning a message rather than when 
  putting it in the internal queue. This prevents committing the wrong offsets.
* In MultiProcessConsumer, store the offset of the next message
Fetch requests can be repeated if we get a ConsumerFetchSizeTooSmall
or if _fetch() is called multiple times for some reason. We don't want
to re-fetch messages that are already in our queue, so store the offsets
of the last enqueued messages from each partition.
@rdiomar
Copy link
Collaborator Author

rdiomar commented Jan 17, 2014

Bump. Can someone please review this?

rdiomar referenced this pull request in rdiomar/kafka-python Jan 17, 2014
* Tried to be consistent with the Scala client.
* Support in MultiProcessConsumer not implemented.
* Removed check for self.count_since_commit in commit() since we may have manually changed the offsets (e.g. with a seek)
@dpkp
Copy link
Owner

dpkp commented Jan 28, 2014

+1 for merge

rdiomar added a commit that referenced this pull request Jan 28, 2014
@rdiomar rdiomar merged commit bcd5539 into dpkp:master Jan 28, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants