Skip to content

Conversation

@turtlesoupy
Copy link
Contributor

Hey,

I ran a "kill the broker" test for producers and got into a state where the topic metadata wasn't being refreshed because client._send_broker_aware_request does a conn.recv which doesn't propagate the correct exception upon socket timeout. I believe this is the correct behaviour below.

Integration tests still pass.

@dpkp
Copy link
Owner

dpkp commented Jan 8, 2014

we ran into the same problems in #88 and fixed in a couple different places (last was b4c20ac)

@turtlesoupy
Copy link
Contributor Author

Doh, okay - should have checked the other issues first. I'll take a glance at that review and then close this after it is accepted.

Joe Crobak and others added 26 commits January 13, 2014 14:19
When running on Linux with code on a case-insensitive file system,
imports of the `Queue` module fail because python resolves the
wrong file (It is trying to use a relative import of `queue.py` in
the kafka directory). This change forces absolute imports via PEP328.
Previously, if you try to consume a message with a timeout greater than 10 seconds,
but you don't receive data in those 10 seconds, a socket.timeout exception is raised.
This allows a higher socket timeout to be set, or even None for no timeout.
According to the protocol documentation, the 4 byte integer at the beginning
of a response represents the size of the payload only, not including those bytes.
See http://goo.gl/rg5uom
…y time

* Remove bufsize from client and conn, since they're not actually enforced

Notes:

This commit changes behavior a bit by raising a BufferUnderflowError when
no data is received for the message size rather than a ConnectionError.

Since bufsize in the socket is not actually enforced, but it is used by the consumer
when creating requests, moving it there until a better solution is implemented.
* Combine partition fetch requests into a single request
* Put the messages received in a queue and update offsets
* Grab as many messages from the queue as requested
* When the queue is empty, request more
* timeout param for get_messages() is the actual timeout for getting those messages
* Based on #74 -
  don't increase min_bytes if the consumer fetch buffer size is too small.

Notes:

Change MultiProcessConsumer and _mp_consume() accordingly.

Previously, when querying each partition separately, it was possible to
block waiting for messages on partition 0 even if there are new ones in partition 1.
These changes allow us to block while waiting for messages on all partitions,
and reduce total number of kafka requests.

Use Queue.Queue for single proc Queue instead of already imported
multiprocessing.Queue because the latter doesn't seem to guarantee immediate
availability of items after a put:

>>> from multiprocessing import Queue
>>> q = Queue()
>>> q.put(1); q.get_nowait()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait
    return self.get(False)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get
    raise Empty
Queue.Empty
… iterator to exit when reached.

Also put constant timeout values in pre-defined constants
Will remove once any error handling issues are resolved.
This is pretty much a rewrite. The tests that involve offset requests/responses
are not implemented since that API is not supported in kafka 0.8 yet.
Only kafka.codec and kafka.protocol are currently tested, so there is more work to be done here.
We always store the offset of the next available message, so we shouldn't decrement the offset deltas
when seeking by an extra 1
…data

Also, log.exception() is unhelpfully noisy. Use log.error() with some error details in the message instead.
This differentiates between errors that occur when sending the request
and receiving the response, and adds BufferUnderflowError handling.
…tch size is too small

Note: This can cause fetching a message to exceed a given timeout, but timeouts are not guaranteed anyways, and in this case it's the client's fault for not sending a big enough buffer size rather than the kafka server. This can be bad if max_fetch_size is None (no limit) and there is some message in Kafka that is crazy huge, but that is why we should have some max_fetch_size.
…ed in integration tests

If some of the tests stop brokers then error out, the teardown method will try to close the
same brokers and fail. This change allows it to continue.
This is better since the tests stop/start brokers, and if something goes wrong
they can affect eachother.
@turtlesoupy
Copy link
Contributor Author

Closing since upstream has the fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants