From 410567e0f3c9c633bae11d8f795735c6a0af7d92 Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Mon, 6 Jan 2014 17:37:41 -0800 Subject: [PATCH 01/36] Check for socket status on read as well as send --- kafka/conn.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 1a3e2607c..b1988b662 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -98,8 +98,13 @@ def recv(self, request_id): Get a response from Kafka """ log.debug("Reading response %d from Kafka" % request_id) - self.data = self._consume_response() - return self.data + try: + if self._dirty: + self.reinit() + return self._consume_response() + except socket.error: + log.exception('Unable to read response from Kafka') + self._raise_connection_error() def copy(self): """ From 83b7adc7a88a1714145f1f5cc521e4823864302a Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Mon, 6 Jan 2014 17:50:39 -0800 Subject: [PATCH 02/36] Propagate error immediately if dirty --- kafka/conn.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index b1988b662..57b3e7b13 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -98,9 +98,10 @@ def recv(self, request_id): Get a response from Kafka """ log.debug("Reading response %d from Kafka" % request_id) + if self._dirty: + self._raise_connection_error() + try: - if self._dirty: - self.reinit() return self._consume_response() except socket.error: log.exception('Unable to read response from Kafka') From 6ba62e72e2a5a9330b81cc6ec127b7f03a78c121 Mon Sep 17 00:00:00 2001 From: Joe Crobak Date: Mon, 21 Oct 2013 20:07:43 +0000 Subject: [PATCH 03/36] Enable absolute imports for modules using Queue. When running on Linux with code on a case-insensitive file system, imports of the `Queue` module fail because python resolves the wrong file (It is trying to use a relative import of `queue.py` in the kafka directory). This change forces absolute imports via PEP328. --- kafka/consumer.py | 2 ++ kafka/producer.py | 2 ++ kafka/queue.py | 2 ++ 3 files changed, 6 insertions(+) diff --git a/kafka/consumer.py b/kafka/consumer.py index 57b5b9742..226700e2a 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import + from collections import defaultdict from itertools import izip_longest, repeat import logging diff --git a/kafka/producer.py b/kafka/producer.py index 1d4733688..a82d99b5c 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import + from collections import defaultdict from itertools import cycle from multiprocessing import Queue, Process diff --git a/kafka/queue.py b/kafka/queue.py index a99636922..ada495f78 100644 --- a/kafka/queue.py +++ b/kafka/queue.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import + from copy import copy import logging from multiprocessing import Process, Queue, Event From 6c1fbaf2155f592adfaf3837e8ceef222fb97ab7 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 18 Dec 2013 16:53:03 -0800 Subject: [PATCH 04/36] Allow customizing socket timeouts. Previously, if you try to consume a message with a timeout greater than 10 seconds, but you don't receive data in those 10 seconds, a socket.timeout exception is raised. This allows a higher socket timeout to be set, or even None for no timeout. --- kafka/client.py | 7 ++++--- kafka/conn.py | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 71ededaa0..96593642f 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -19,12 +19,13 @@ class KafkaClient(object): CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID): + def __init__(self, host, port, bufsize=4098, client_id=CLIENT_ID, timeout=10): # We need one connection to bootstrap self.bufsize = bufsize self.client_id = client_id + self.timeout = timeout self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, bufsize) + (host, port): KafkaConnection(host, port, bufsize, timeout=timeout) } self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id @@ -41,7 +42,7 @@ def _get_conn_for_broker(self, broker): """ if (broker.host, broker.port) not in self.conns: self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, self.bufsize) + KafkaConnection(broker.host, broker.port, self.bufsize, timeout=self.timeout) return self.conns[(broker.host, broker.port)] diff --git a/kafka/conn.py b/kafka/conn.py index 57b3e7b13..03a43c9f9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -19,14 +19,14 @@ class KafkaConnection(local): we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. """ - def __init__(self, host, port, bufsize=4096): + def __init__(self, host, port, bufsize=4098, timeout=10): super(KafkaConnection, self).__init__() self.host = host self.port = port self.bufsize = bufsize self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) - self._sock.settimeout(10) + self._sock.settimeout(timeout) self._dirty = False def __str__(self): From 29370286020495869a1b97bc0f67513e4334f3fa Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 18 Dec 2013 17:47:52 -0800 Subject: [PATCH 05/36] Read the correct number of bytes from kafka. According to the protocol documentation, the 4 byte integer at the beginning of a response represents the size of the payload only, not including those bytes. See http://goo.gl/rg5uom --- kafka/conn.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 03a43c9f9..83f9f8e8e 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -55,12 +55,11 @@ def _consume_response_iter(self): self._raise_connection_error() (size,) = struct.unpack('>i', resp) - messagesize = size - 4 - log.debug("About to read %d bytes from Kafka", messagesize) + log.debug("About to read %d bytes from Kafka", size) # Read the remainder of the response total = 0 - while total < messagesize: + while total < size: resp = self._sock.recv(self.bufsize) log.debug("Read %d bytes from Kafka", len(resp)) if resp == "": From afa54feb3135d5a29b3457aa888839d455efc7c6 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 18 Dec 2013 17:51:22 -0800 Subject: [PATCH 06/36] * Guarantee reading the expected number of bytes from the socket every time * Remove bufsize from client and conn, since they're not actually enforced Notes: This commit changes behavior a bit by raising a BufferUnderflowError when no data is received for the message size rather than a ConnectionError. Since bufsize in the socket is not actually enforced, but it is used by the consumer when creating requests, moving it there until a better solution is implemented. --- kafka/client.py | 7 +++---- kafka/conn.py | 48 ++++++++++++++++++++++------------------------- kafka/consumer.py | 7 +++++-- 3 files changed, 30 insertions(+), 32 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 96593642f..bd3a21406 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -19,13 +19,12 @@ class KafkaClient(object): CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, host, port, bufsize=4098, client_id=CLIENT_ID, timeout=10): + def __init__(self, host, port, client_id=CLIENT_ID, timeout=10): # We need one connection to bootstrap - self.bufsize = bufsize self.client_id = client_id self.timeout = timeout self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, bufsize, timeout=timeout) + (host, port): KafkaConnection(host, port, timeout=timeout) } self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id @@ -42,7 +41,7 @@ def _get_conn_for_broker(self, broker): """ if (broker.host, broker.port) not in self.conns: self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, self.bufsize, timeout=self.timeout) + KafkaConnection(broker.host, broker.port, timeout=self.timeout) return self.conns[(broker.host, broker.port)] diff --git a/kafka/conn.py b/kafka/conn.py index 83f9f8e8e..7c6d51035 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -19,11 +19,10 @@ class KafkaConnection(local): we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. """ - def __init__(self, host, port, bufsize=4098, timeout=10): + def __init__(self, host, port, timeout=10): super(KafkaConnection, self).__init__() self.host = host self.port = port - self.bufsize = bufsize self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) self._sock.settimeout(timeout) @@ -36,38 +35,35 @@ def __str__(self): # Private API # ################### - def _consume_response(self): - """ - Fully consume the response iterator - """ - return "".join(self._consume_response_iter()) + def _read_bytes(self, num_bytes): + bytes_left = num_bytes + resp = '' + log.debug("About to read %d bytes from Kafka", num_bytes) + + while bytes_left: + data = self._sock.recv(bytes_left) + if data == '': + raise BufferUnderflowError("Not enough data to read this response") + bytes_left -= len(data) + log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) + resp += data + + return resp - def _consume_response_iter(self): + def _consume_response(self): """ This method handles the response header and error messages. It - then returns an iterator for the chunks of the response + then returns the response """ - log.debug("Handling response from Kafka") - + log.debug("Expecting response from Kafka") # Read the size off of the header - resp = self._sock.recv(4) - if resp == "": - self._raise_connection_error() - (size,) = struct.unpack('>i', resp) + resp = self._read_bytes(4) - log.debug("About to read %d bytes from Kafka", size) + (size,) = struct.unpack('>i', resp) # Read the remainder of the response - total = 0 - while total < size: - resp = self._sock.recv(self.bufsize) - log.debug("Read %d bytes from Kafka", len(resp)) - if resp == "": - raise BufferUnderflowError( - "Not enough data to read this response") - - total += len(resp) - yield resp + resp = self._read_bytes(size) + return str(resp) def _raise_connection_error(self): self._dirty = True diff --git a/kafka/consumer.py b/kafka/consumer.py index 226700e2a..22e67417c 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -24,6 +24,7 @@ FETCH_DEFAULT_BLOCK_TIMEOUT = 1 FETCH_MAX_WAIT_TIME = 100 FETCH_MIN_BYTES = 4096 +FETCH_BUFFER_SIZE_BYTES = 4096 class FetchContext(object): @@ -218,8 +219,10 @@ class SimpleConsumer(Consumer): def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, - fetch_size_bytes=FETCH_MIN_BYTES): + fetch_size_bytes=FETCH_MIN_BYTES, + buffer_size=FETCH_BUFFER_SIZE_BYTES): + self.buffer_size = buffer_size self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes @@ -366,7 +369,7 @@ def __iter_partition__(self, partition, offset): # use MaxBytes = client's bufsize since we're only # fetching one topic + partition req = FetchRequest( - self.topic, partition, offset, self.client.bufsize) + self.topic, partition, offset, self.buffer_size) (resp,) = self.client.send_fetch_request( [req], From c1877adf075415c193f8021cec51feb33e7210a6 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 18 Dec 2013 17:54:17 -0800 Subject: [PATCH 07/36] Allow None timeout in FetchContext even if block is False --- kafka/consumer.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 22e67417c..7b150d22d 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -35,10 +35,10 @@ def __init__(self, consumer, block, timeout): self.consumer = consumer self.block = block - if block and not timeout: - timeout = FETCH_DEFAULT_BLOCK_TIMEOUT - - self.timeout = timeout * 1000 + if block: + if not timeout: + timeout = FETCH_DEFAULT_BLOCK_TIMEOUT + self.timeout = timeout * 1000 def __enter__(self): """Set fetch values based on blocking status""" From 2dd78f206ff3b7d7e2b2aa4dda92d6b401e5c16c Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 18 Dec 2013 17:56:29 -0800 Subject: [PATCH 08/36] Reset consumer fields to original values rather than defaults in FetchContext --- kafka/consumer.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 7b150d22d..060700af2 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -42,6 +42,8 @@ def __init__(self, consumer, block, timeout): def __enter__(self): """Set fetch values based on blocking status""" + self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time + self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes if self.block: self.consumer.fetch_max_wait_time = self.timeout self.consumer.fetch_min_bytes = 1 @@ -49,9 +51,9 @@ def __enter__(self): self.consumer.fetch_min_bytes = 0 def __exit__(self, type, value, traceback): - """Reset values to default""" - self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME - self.consumer.fetch_min_bytes = FETCH_MIN_BYTES + """Reset values""" + self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time + self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes class Consumer(object): From 225db8e4cf90113e5c68457094273cbb22dbc014 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 18 Dec 2013 18:36:32 -0800 Subject: [PATCH 09/36] SimpleConsumer flow changes: * Combine partition fetch requests into a single request * Put the messages received in a queue and update offsets * Grab as many messages from the queue as requested * When the queue is empty, request more * timeout param for get_messages() is the actual timeout for getting those messages * Based on https://github.com/mumrah/kafka-python/pull/74 - don't increase min_bytes if the consumer fetch buffer size is too small. Notes: Change MultiProcessConsumer and _mp_consume() accordingly. Previously, when querying each partition separately, it was possible to block waiting for messages on partition 0 even if there are new ones in partition 1. These changes allow us to block while waiting for messages on all partitions, and reduce total number of kafka requests. Use Queue.Queue for single proc Queue instead of already imported multiprocessing.Queue because the latter doesn't seem to guarantee immediate availability of items after a put: >>> from multiprocessing import Queue >>> q = Queue() >>> q.put(1); q.get_nowait() Traceback (most recent call last): File "", line 1, in File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait return self.get(False) File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get raise Empty Queue.Empty --- kafka/consumer.py | 182 ++++++++++++++++++---------------------------- 1 file changed, 70 insertions(+), 112 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 060700af2..a5cbd4fb5 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -5,8 +5,8 @@ import logging import time from threading import Lock -from multiprocessing import Process, Queue, Event, Value -from Queue import Empty +from multiprocessing import Process, Queue as MPQueue, Event, Value +from Queue import Empty, Queue from kafka.common import ( ErrorMapping, FetchRequest, @@ -229,6 +229,7 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes self.fetch_started = defaultdict(bool) # defaults to false + self.queue = Queue(buffer_size) super(SimpleConsumer, self).__init__( client, group, topic, @@ -294,122 +295,75 @@ def get_messages(self, count=1, block=True, timeout=0.1): count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If None, and block=True, the API will block infinitely. - If >0, API will block for specified time (in seconds) + timeout: If block is True, the function will block for the specified time (in seconds) + until count messages is fetched. If None, it will block forever. """ messages = [] - iterator = self.__iter__() - - # HACK: This splits the timeout between available partitions if timeout: - timeout = timeout * 1.0 / len(self.offsets) + max_time = time.time() + timeout - with FetchContext(self, block, timeout): - while count > 0: - try: - messages.append(next(iterator)) - except StopIteration: - break + while count > 0 and (timeout is None or timeout > 0): + message = self.get_message(block, timeout) + if message: + messages.append(message) count -= 1 + else: + # Ran out of messages for the last request. If we're not blocking, break. + if not block: + break + if timeout: + timeout = max_time - time.time() return messages - def __iter__(self): - """ - Create an iterate per partition. Iterate through them calling next() - until they are all exhausted. - """ - iters = {} - for partition, offset in self.offsets.items(): - iters[partition] = self.__iter_partition__(partition, offset) - - if len(iters) == 0: - return - - while True: - if len(iters) == 0: - break - - for partition, it in iters.items(): - try: - if self.partition_info: - yield (partition, it.next()) - else: - yield it.next() - except StopIteration: - log.debug("Done iterating over partition %s" % partition) - del iters[partition] - - # skip auto-commit since we didn't yield anything - continue - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - - def __iter_partition__(self, partition, offset): - """ - Iterate over the messages in a partition. Create a FetchRequest - to get back a batch of messages, yield them one at a time. - After a batch is exhausted, start a new batch unless we've reached - the end of this partition. - """ - - # The offset that is stored in the consumer is the offset that - # we have consumed. In subsequent iterations, we are supposed to - # fetch the next message (that is from the next offset) - # However, for the 0th message, the offset should be as-is. - # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is - # problematic, since 0 is offset of a message which we have not yet - # consumed. - if self.fetch_started[partition]: - offset += 1 - - fetch_size = self.fetch_min_bytes + def get_message(self, block=True, timeout=0.1): + if self.queue.empty(): + with FetchContext(self, block, timeout): + self._fetch() + try: + return self.queue.get_nowait() + except Empty: + return None + def __iter__(self): while True: - # use MaxBytes = client's bufsize since we're only - # fetching one topic + partition - req = FetchRequest( - self.topic, partition, offset, self.buffer_size) - - (resp,) = self.client.send_fetch_request( - [req], - max_wait_time=self.fetch_max_wait_time, - min_bytes=fetch_size) - - assert resp.topic == self.topic - assert resp.partition == partition + message = self.get_message(True, 100) + if message: + yield message + else: + # In case we did not receive any message, give up the CPU for + # a while before we try again + time.sleep(0.1) - next_offset = None + def _fetch(self): + requests = [] + partitions = self.offsets.keys() + for partition in partitions: + requests.append(FetchRequest(self.topic, partition, self.offsets[partition], self.buffer_size)) + responses = self.client.send_fetch_request( + requests, + max_wait_time=int(self.fetch_max_wait_time), + min_bytes=self.fetch_min_bytes) + for resp in responses: + partition = resp.partition try: for message in resp.messages: - next_offset = message.offset - - # update the offset before the message is yielded. This - # is so that the consumer state is not lost in certain - # cases. - # - # For eg: the message is yielded and consumed by the - # caller, but the caller does not come back into the - # generator again. The message will be consumed but the - # status will not be updated in the consumer - self.fetch_started[partition] = True - self.offsets[partition] = message.offset - yield message + self.offsets[partition] = message.offset + 1 + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() + if self.partition_info: + self.queue.put((partition, message)) + else: + self.queue.put(message) except ConsumerFetchSizeTooSmall, e: - fetch_size *= 1.5 - log.warn( - "Fetch size too small, increasing to %d (1.5x) and retry", - fetch_size) - continue + self.buffer_size *= 2 + log.warn("Fetch size too small, increasing to %d (2x) and retry", self.buffer_size) except ConsumerNoMoreData, e: log.debug("Iteration was ended by %r", e) - - if next_offset is None: - break - else: - offset = next_offset + 1 + except StopIteration: + # Stop iterating through this partition + log.debug("Done iterating over partition %s" % partition) def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): @@ -448,8 +402,9 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # indicates a specific number of messages, follow that advice count = 0 - for partition, message in consumer: - queue.put((partition, message)) + message = consumer.get_message() + if message: + queue.put(message) count += 1 # We have reached the required size. The controller might have @@ -459,11 +414,10 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # can reset the 'start' event if count == size.value: pause.wait() - break - # In case we did not receive any message, give up the CPU for - # a while before we try again - if count == 0: + else: + # In case we did not receive any message, give up the CPU for + # a while before we try again time.sleep(0.1) consumer.stop() @@ -509,7 +463,7 @@ def __init__(self, client, group, topic, auto_commit=True, # Variables for managing and controlling the data flow from # consumer child process to master - self.queue = Queue(1024) # Child consumers dump messages into this + self.queue = MPQueue(1024) # Child consumers dump messages into this self.start = Event() # Indicates the consumers to start fetch self.exit = Event() # Requests the consumers to shutdown self.pause = Event() # Requests the consumers to pause fetch @@ -591,8 +545,8 @@ def get_messages(self, count=1, block=True, timeout=10): count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If None, and block=True, the API will block infinitely. - If >0, API will block for specified time (in seconds) + timeout: If block is True, the function will block for the specified time (in seconds) + until count messages is fetched. If None, it will block forever. """ messages = [] @@ -603,7 +557,10 @@ def get_messages(self, count=1, block=True, timeout=10): self.size.value = count self.pause.clear() - while count > 0: + if timeout: + max_time = time.time() + timeout + + while count > 0 and (timeout is None or timeout > 0): # Trigger consumption only if the queue is empty # By doing this, we will ensure that consumers do not # go into overdrive and keep consuming thousands of @@ -623,6 +580,7 @@ def get_messages(self, count=1, block=True, timeout=10): self.count_since_commit += 1 self._auto_commit() count -= 1 + timeout = max_time - time.time() self.size.value = 0 self.start.clear() From 781d6aa361a4057b1ca3d12eedff294e0c0aa253 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Thu, 19 Dec 2013 11:36:15 -0800 Subject: [PATCH 10/36] Remove SimpleConsumer queue size limit since it can cause the iterator to block forever if it's reached. --- kafka/consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index a5cbd4fb5..0268aa650 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -229,7 +229,7 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes self.fetch_started = defaultdict(bool) # defaults to false - self.queue = Queue(buffer_size) + self.queue = Queue() super(SimpleConsumer, self).__init__( client, group, topic, From 8570457f69c768b97964240537540fcfea4654a8 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Thu, 19 Dec 2013 13:19:40 -0800 Subject: [PATCH 11/36] Add buffer_size param description to docstring --- kafka/consumer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 0268aa650..6156d020c 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -209,8 +209,9 @@ class SimpleConsumer(Consumer): before a commit auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit - fetch_size_bytes: number of bytes to request in a FetchRequest + buffer_size: initial number of bytes to tell kafka we have + available. This will double every time it's not enough Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will From 6840af8b27389e429a0de1fde67a30558d1f6010 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Thu, 19 Dec 2013 13:24:28 -0800 Subject: [PATCH 12/36] Add iter_timeout option to SimpleConsumer. If not None, it causes the iterator to exit when reached. Also put constant timeout values in pre-defined constants --- kafka/consumer.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 6156d020c..c3421c9cb 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -26,6 +26,9 @@ FETCH_MIN_BYTES = 4096 FETCH_BUFFER_SIZE_BYTES = 4096 +ITER_TIMEOUT_SECONDS = 60 +NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 + class FetchContext(object): """ @@ -212,6 +215,9 @@ class SimpleConsumer(Consumer): fetch_size_bytes: number of bytes to request in a FetchRequest buffer_size: initial number of bytes to tell kafka we have available. This will double every time it's not enough + iter_timeout: default None. How much time (in seconds) to wait for a + message in the iterator before exiting. None means no + timeout, so it will wait forever. Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -223,13 +229,15 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, fetch_size_bytes=FETCH_MIN_BYTES, - buffer_size=FETCH_BUFFER_SIZE_BYTES): + buffer_size=FETCH_BUFFER_SIZE_BYTES, + iter_timeout=None): self.buffer_size = buffer_size self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes self.fetch_started = defaultdict(bool) # defaults to false + self.iter_timeout = iter_timeout self.queue = Queue() super(SimpleConsumer, self).__init__( @@ -327,14 +335,22 @@ def get_message(self, block=True, timeout=0.1): return None def __iter__(self): + if self.iter_timeout is None: + timeout = ITER_TIMEOUT_SECONDS + else: + timeout = self.iter_timeout + while True: - message = self.get_message(True, 100) + message = self.get_message(True, timeout) if message: yield message + elif self.iter_timeout is None: + # We did not receive any message yet but we don't have a + # timeout, so give up the CPU for a while before trying again + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) else: - # In case we did not receive any message, give up the CPU for - # a while before we try again - time.sleep(0.1) + # Timed out waiting for a message + break def _fetch(self): requests = [] @@ -419,7 +435,7 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): else: # In case we did not receive any message, give up the CPU for # a while before we try again - time.sleep(0.1) + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) consumer.stop() From 3b8f445bf389cd6be7a66e01864d59325d43bba7 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Thu, 19 Dec 2013 13:35:53 -0800 Subject: [PATCH 13/36] Add comments and maintain 80 character line limit --- kafka/consumer.py | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index c3421c9cb..a01276c97 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -304,8 +304,9 @@ def get_messages(self, count=1, block=True, timeout=0.1): count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified time (in seconds) - until count messages is fetched. If None, it will block forever. + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] if timeout: @@ -317,16 +318,20 @@ def get_messages(self, count=1, block=True, timeout=0.1): messages.append(message) count -= 1 else: - # Ran out of messages for the last request. If we're not blocking, break. + # Ran out of messages for the last request. if not block: + # If we're not blocking, break. break if timeout: + # If we're blocking and have a timeout, reduce it to the + # appropriate value timeout = max_time - time.time() return messages def get_message(self, block=True, timeout=0.1): if self.queue.empty(): + # We're out of messages, go grab some more. with FetchContext(self, block, timeout): self._fetch() try: @@ -353,29 +358,39 @@ def __iter__(self): break def _fetch(self): + # Create fetch request payloads for all the partitions requests = [] partitions = self.offsets.keys() for partition in partitions: - requests.append(FetchRequest(self.topic, partition, self.offsets[partition], self.buffer_size)) + requests.append(FetchRequest(self.topic, partition, + self.offsets[partition], + self.buffer_size)) + # Send request responses = self.client.send_fetch_request( requests, max_wait_time=int(self.fetch_max_wait_time), min_bytes=self.fetch_min_bytes) + for resp in responses: partition = resp.partition try: for message in resp.messages: + # Update partition offset self.offsets[partition] = message.offset + 1 + # Count, check and commit messages if necessary self.count_since_commit += 1 self._auto_commit() + + # Put the message in our queue if self.partition_info: self.queue.put((partition, message)) else: self.queue.put(message) except ConsumerFetchSizeTooSmall, e: self.buffer_size *= 2 - log.warn("Fetch size too small, increasing to %d (2x) and retry", self.buffer_size) + log.warn("Fetch size too small, increase to %d (2x) and retry", + self.buffer_size) except ConsumerNoMoreData, e: log.debug("Iteration was ended by %r", e) except StopIteration: @@ -562,8 +577,9 @@ def get_messages(self, count=1, block=True, timeout=10): count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified time (in seconds) - until count messages is fetched. If None, it will block forever. + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] From a3031058d16848b0b66f4720bc4653933af351b3 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Thu, 2 Jan 2014 18:10:57 -0800 Subject: [PATCH 14/36] Add and fix comments to protocol.py --- kafka/protocol.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/kafka/protocol.py b/kafka/protocol.py index 612acf63f..74a0dce0e 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -121,7 +121,7 @@ def _decode_message_set_iter(cls, data): except BufferUnderflowError: if read_message is False: # If we get a partial read of a message, but haven't - # yielded anyhting there's a problem + # yielded anything there's a problem raise ConsumerFetchSizeTooSmall() else: raise StopIteration() @@ -171,7 +171,7 @@ def encode_produce_request(cls, client_id, correlation_id, Params ====== client_id: string - correlation_id: string + correlation_id: int payloads: list of ProduceRequest acks: How "acky" you want the request to be 0: immediate response @@ -231,7 +231,7 @@ def encode_fetch_request(cls, client_id, correlation_id, payloads=None, Params ====== client_id: string - correlation_id: string + correlation_id: int payloads: list of FetchRequest max_wait_time: int, how long to block waiting on min_bytes of data min_bytes: int, the minimum number of bytes to accumulate before @@ -338,7 +338,7 @@ def encode_metadata_request(cls, client_id, correlation_id, topics=None): Params ====== client_id: string - correlation_id: string + correlation_id: int topics: list of strings """ topics = [] if topics is None else topics @@ -376,12 +376,16 @@ def decode_metadata_response(cls, data): topic_metadata = {} for i in range(num_topics): + # NOTE: topic_error is discarded. Should probably be returned with + # the topic metadata. ((topic_error,), cur) = relative_unpack('>h', data, cur) (topic_name, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) partition_metadata = {} for j in range(num_partitions): + # NOTE: partition_error_code is discarded. Should probably be + # returned with the partition metadata. ((partition_error_code, partition, leader, numReplicas), cur) = \ relative_unpack('>hiii', data, cur) @@ -408,7 +412,7 @@ def encode_offset_commit_request(cls, client_id, correlation_id, Params ====== client_id: string - correlation_id: string + correlation_id: int group: string, the consumer group you are committing offsets for payloads: list of OffsetCommitRequest """ @@ -459,7 +463,7 @@ def encode_offset_fetch_request(cls, client_id, correlation_id, Params ====== client_id: string - correlation_id: string + correlation_id: int group: string, the consumer group you are fetching offsets for payloads: list of OffsetFetchRequest """ From a31156e985b801ef885691e354b85fc102089ea7 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Thu, 2 Jan 2014 18:14:44 -0800 Subject: [PATCH 15/36] Add note about questionable error handling while decoding messages. Will remove once any error handling issues are resolved. --- kafka/protocol.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/kafka/protocol.py b/kafka/protocol.py index 74a0dce0e..54b8eeeeb 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -119,6 +119,14 @@ def _decode_message_set_iter(cls, data): read_message = True yield OffsetAndMessage(offset, message) except BufferUnderflowError: + # NOTE: Not sure this is correct error handling: + # Is it possible to get a BUE if the message set is somewhere + # in the middle of the fetch response? If so, we probably have + # an issue that's not fetch size too small. + # Aren't we ignoring errors if we fail to unpack data by + # raising StopIteration()? + # If _decode_message() raises a ChecksumError, couldn't that + # also be due to the fetch size being too small? if read_message is False: # If we get a partial read of a message, but haven't # yielded anything there's a problem From 39d76bb7adba98333a3b6ed9743b5ed2fcdfab81 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Thu, 2 Jan 2014 18:16:41 -0800 Subject: [PATCH 16/36] Fix unit tests. This is pretty much a rewrite. The tests that involve offset requests/responses are not implemented since that API is not supported in kafka 0.8 yet. Only kafka.codec and kafka.protocol are currently tested, so there is more work to be done here. --- test/test_unit.py | 464 +++++++++++++++++++++++++++++++--------------- 1 file changed, 314 insertions(+), 150 deletions(-) diff --git a/test/test_unit.py b/test/test_unit.py index 3f3af66ac..93d88a151 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -3,13 +3,22 @@ import struct import unittest -from kafka.client import KafkaClient -from kafka.common import ProduceRequest, FetchRequest +from kafka.common import ( + ProduceRequest, FetchRequest, Message, ChecksumError, + ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, + OffsetAndMessage, BrokerMetadata, PartitionMetadata +) from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode ) +from kafka.protocol import ( + create_gzip_message, + create_message, + create_snappy_message, + KafkaProtocol +) ITERATIONS = 1000 STRLEN = 100 @@ -20,16 +29,13 @@ def random_string(): class TestPackage(unittest.TestCase): - @unittest.expectedFailure + def test_top_level_namespace(self): import kafka as kafka1 self.assertEquals(kafka1.KafkaClient.__name__, "KafkaClient") - self.assertEquals(kafka1.gzip_encode.__name__, "gzip_encode") - self.assertEquals(kafka1.snappy_encode.__name__, "snappy_encode") self.assertEquals(kafka1.client.__name__, "kafka.client") self.assertEquals(kafka1.codec.__name__, "kafka.codec") - @unittest.expectedFailure def test_submodule_namespace(self): import kafka.client as client1 self.assertEquals(client1.__name__, "kafka.client") @@ -48,19 +54,8 @@ def test_submodule_namespace(self): from kafka import KafkaClient as KafkaClient2 self.assertEquals(KafkaClient2.__name__, "KafkaClient") - from kafka import gzip_encode as gzip_encode2 - self.assertEquals(gzip_encode2.__name__, "gzip_encode") - - from kafka import snappy_encode as snappy_encode2 - self.assertEquals(snappy_encode2.__name__, "snappy_encode") - - -class TestMisc(unittest.TestCase): - @unittest.expectedFailure - def test_length_prefix(self): - for i in xrange(ITERATIONS): - s1 = random_string() - self.assertEquals(struct.unpack('>i', s2[0:4])[0], len(s1)) + from kafka.codec import snappy_encode + self.assertEquals(snappy_encode.__name__, "snappy_encode") class TestCodec(unittest.TestCase): @@ -81,140 +76,309 @@ def test_snappy(self): self.assertEquals(s1, s2) -# XXX(sandello): These really should be protocol tests. -class TestMessage(unittest.TestCase): - @unittest.expectedFailure - def test_create(self): - msg = KafkaClient.create_message("testing") - self.assertEquals(msg.payload, "testing") - self.assertEquals(msg.magic, 1) - self.assertEquals(msg.attributes, 0) - self.assertEquals(msg.crc, -386704890) +class TestProtocol(unittest.TestCase): + + def test_create_message(self): + payload = "test" + key = "key" + msg = create_message(payload, key) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, 0) + self.assertEqual(msg.key, key) + self.assertEqual(msg.value, payload) - @unittest.expectedFailure def test_create_gzip(self): - msg = KafkaClient.create_gzip_message("testing") - self.assertEquals(msg.magic, 1) - self.assertEquals(msg.attributes, 1) - # Can't check the crc or payload for gzip since it's non-deterministic - (messages, _) = KafkaClient.read_message_set(gzip_decode(msg.payload)) - inner = messages[0] - self.assertEquals(inner.magic, 1) - self.assertEquals(inner.attributes, 0) - self.assertEquals(inner.payload, "testing") - self.assertEquals(inner.crc, -386704890) - - @unittest.expectedFailure + payloads = ["v1", "v2"] + msg = create_gzip_message(payloads) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & + KafkaProtocol.CODEC_GZIP) + self.assertEqual(msg.key, None) + # Need to decode to check since gzipped payload is non-deterministic + decoded = gzip_decode(msg.value) + expect = ("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2" + "\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00" + "\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff\xff" + "\xff\xff\x00\x00\x00\x02v2") + self.assertEqual(decoded, expect) + def test_create_snappy(self): - msg = KafkaClient.create_snappy_message("testing") - self.assertEquals(msg.magic, 1) - self.assertEquals(msg.attributes, 2) - self.assertEquals(msg.crc, -62350868) - (messages, _) = KafkaClient.read_message_set(snappy_decode(msg.payload)) - inner = messages[0] - self.assertEquals(inner.magic, 1) - self.assertEquals(inner.attributes, 0) - self.assertEquals(inner.payload, "testing") - self.assertEquals(inner.crc, -386704890) - - @unittest.expectedFailure - def test_message_simple(self): - msg = KafkaClient.create_message("testing") - enc = KafkaClient.encode_message(msg) - expect = "\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing" - self.assertEquals(enc, expect) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), 1) - self.assertEquals(messages[0], msg) - - @unittest.expectedFailure - def test_message_list(self): - msgs = [ - KafkaClient.create_message("one"), - KafkaClient.create_message("two"), - KafkaClient.create_message("three") - ] - enc = KafkaClient.encode_message_set(msgs) - expect = ("\x00\x00\x00\t\x01\x00zl\x86\xf1one\x00\x00\x00\t\x01\x00\x11" - "\xca\x8aftwo\x00\x00\x00\x0b\x01\x00F\xc5\xd8\xf5three") - self.assertEquals(enc, expect) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), 3) - self.assertEquals(messages[0].payload, "one") - self.assertEquals(messages[1].payload, "two") - self.assertEquals(messages[2].payload, "three") - - @unittest.expectedFailure - def test_message_gzip(self): - msg = KafkaClient.create_gzip_message("one", "two", "three") - enc = KafkaClient.encode_message(msg) - # Can't check the bytes directly since Gzip is non-deterministic - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), 3) - self.assertEquals(messages[0].payload, "one") - self.assertEquals(messages[1].payload, "two") - self.assertEquals(messages[2].payload, "three") - - @unittest.expectedFailure - def test_message_snappy(self): - msg = KafkaClient.create_snappy_message("one", "two", "three") - enc = KafkaClient.encode_message(msg) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), 3) - self.assertEquals(messages[0].payload, "one") - self.assertEquals(messages[1].payload, "two") - self.assertEquals(messages[2].payload, "three") - - @unittest.expectedFailure - def test_message_simple_random(self): - for i in xrange(ITERATIONS): - n = random.randint(0, 10) - msgs = [KafkaClient.create_message(random_string()) for j in range(n)] - enc = KafkaClient.encode_message_set(msgs) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), n) - for j in range(n): - self.assertEquals(messages[j], msgs[j]) - - @unittest.expectedFailure - def test_message_gzip_random(self): - for i in xrange(ITERATIONS): - n = random.randint(1, 10) - strings = [random_string() for j in range(n)] - msg = KafkaClient.create_gzip_message(*strings) - enc = KafkaClient.encode_message(msg) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), n) - for j in range(n): - self.assertEquals(messages[j].payload, strings[j]) - - @unittest.expectedFailure - def test_message_snappy_random(self): - for i in xrange(ITERATIONS): - n = random.randint(1, 10) - strings = [random_string() for j in range(n)] - msg = KafkaClient.create_snappy_message(*strings) - enc = KafkaClient.encode_message(msg) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), n) - for j in range(n): - self.assertEquals(messages[j].payload, strings[j]) - - -class TestRequests(unittest.TestCase): - @unittest.expectedFailure - def test_produce_request(self): - req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")]) - enc = KafkaClient.encode_produce_request(req) - expect = "\x00\x00\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x11\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing" - self.assertEquals(enc, expect) - - @unittest.expectedFailure - def test_fetch_request(self): - req = FetchRequest("my-topic", 0, 0, 1024) - enc = KafkaClient.encode_fetch_request(req) - expect = "\x00\x01\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00" - self.assertEquals(enc, expect) + payloads = ["v1", "v2"] + msg = create_snappy_message(payloads) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & + KafkaProtocol.CODEC_SNAPPY) + self.assertEqual(msg.key, None) + expect = ("8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff\xff\xff\xff" + "\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5\x96\nx\x00\x00\xff" + "\xff\xff\xff\x00\x00\x00\x02v2") + self.assertEqual(msg.value, expect) + + def test_encode_message_header(self): + expect = '\x00\n\x00\x00\x00\x00\x00\x04\x00\x07client3' + encoded = KafkaProtocol._encode_message_header("client3", 4, 10) + self.assertEqual(encoded, expect) + + def test_encode_message(self): + message = create_message("test", "key") + encoded = KafkaProtocol._encode_message(message) + expect = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test" + self.assertEqual(encoded, expect) + + def test_encode_message_failure(self): + self.assertRaises(Exception, KafkaProtocol._encode_message, + Message(1, 0, "key", "test")) + + def test_encode_message_set(self): + message_set = [create_message("v1", "k1"), create_message("v2", "k2")] + encoded = KafkaProtocol._encode_message_set(message_set) + expect = ("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x12W\xe7In\x00" + "\x00\x00\x00\x00\x02k1\x00\x00\x00\x02v1\x00\x00\x00\x00" + "\x00\x00\x00\x00\x00\x00\x00\x12\xff\x06\x02I\x00\x00\x00" + "\x00\x00\x02k2\x00\x00\x00\x02v2") + self.assertEqual(encoded, expect) + + def test_decode_message(self): + encoded = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test" + offset = 10 + (returned_offset, decoded_message) = \ + list(KafkaProtocol._decode_message(encoded, offset))[0] + self.assertEqual(returned_offset, offset) + self.assertEqual(decoded_message, create_message("test", "key")) + + def test_decode_message_set(self): + encoded = ('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2' + '\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff' + '\xff\xff\xff\x00\x00\x00\x02v2') + iter = KafkaProtocol._decode_message_set_iter(encoded) + decoded = list(iter) + self.assertEqual(len(decoded), 2) + (returned_offset1, decoded_message1) = decoded[0] + self.assertEqual(returned_offset1, 0) + self.assertEqual(decoded_message1, create_message("v1")) + (returned_offset2, decoded_message2) = decoded[1] + self.assertEqual(returned_offset2, 0) + self.assertEqual(decoded_message2, create_message("v2")) + + def test_decode_message_gzip(self): + gzip_encoded = ('\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000' + '\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01' + '\x9f\xf9\xd1\x87\x18\x18\xfe\x03\x01\x90\xc7Tf\xc8' + '\x80$wu\x1aW\x05\x92\x9c\x11\x00z\xc0h\x888\x00\x00' + '\x00') + offset = 11 + decoded = list(KafkaProtocol._decode_message(gzip_encoded, offset)) + self.assertEqual(len(decoded), 2) + (returned_offset1, decoded_message1) = decoded[0] + self.assertEqual(returned_offset1, 0) + self.assertEqual(decoded_message1, create_message("v1")) + (returned_offset2, decoded_message2) = decoded[1] + self.assertEqual(returned_offset2, 0) + self.assertEqual(decoded_message2, create_message("v2")) + + def test_decode_message_snappy(self): + snappy_encoded = ('\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00' + '\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff' + '\xff\xff\xff\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5' + '\x96\nx\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v2') + offset = 11 + decoded = list(KafkaProtocol._decode_message(snappy_encoded, offset)) + self.assertEqual(len(decoded), 2) + (returned_offset1, decoded_message1) = decoded[0] + self.assertEqual(returned_offset1, 0) + self.assertEqual(decoded_message1, create_message("v1")) + (returned_offset2, decoded_message2) = decoded[1] + self.assertEqual(returned_offset2, 0) + self.assertEqual(decoded_message2, create_message("v2")) + + def test_decode_message_checksum_error(self): + invalid_encoded_message = "This is not a valid encoded message" + iter = KafkaProtocol._decode_message(invalid_encoded_message, 0) + self.assertRaises(ChecksumError, list, iter) + + # NOTE: The error handling in _decode_message_set_iter() is questionable. + # If it's modified, the next two tests might need to be fixed. + def test_decode_message_set_fetch_size_too_small(self): + iter = KafkaProtocol._decode_message_set_iter('a') + self.assertRaises(ConsumerFetchSizeTooSmall, list, iter) + + def test_decode_message_set_stop_iteration(self): + encoded = ('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2' + '\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff' + '\xff\xff\xff\x00\x00\x00\x02v2') + iter = KafkaProtocol._decode_message_set_iter(encoded + "@#$%(Y!") + decoded = list(iter) + self.assertEqual(len(decoded), 2) + (returned_offset1, decoded_message1) = decoded[0] + self.assertEqual(returned_offset1, 0) + self.assertEqual(decoded_message1, create_message("v1")) + (returned_offset2, decoded_message2) = decoded[1] + self.assertEqual(returned_offset2, 0) + self.assertEqual(decoded_message2, create_message("v2")) + + def test_encode_produce_request(self): + requests = [ProduceRequest("topic1", 0, [create_message("a"), + create_message("b")]), + ProduceRequest("topic2", 1, [create_message("c")])] + expect = ('\x00\x00\x00\x94\x00\x00\x00\x00\x00\x00\x00\x02\x00\x07' + 'client1\x00\x02\x00\x00\x00d\x00\x00\x00\x02\x00\x06topic1' + '\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x0fQ\xdf:2\x00\x00\xff\xff' + '\xff\xff\x00\x00\x00\x01a\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x0f\xc8\xd6k\x88\x00\x00\xff\xff\xff\xff\x00' + '\x00\x00\x01b\x00\x06topic2\x00\x00\x00\x01\x00\x00\x00\x01' + '\x00\x00\x00\x1b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x0f\xbf\xd1[\x1e\x00\x00\xff\xff\xff\xff\x00\x00\x00' + '\x01c') + encoded = KafkaProtocol.encode_produce_request("client1", 2, requests, + 2, 100) + self.assertEqual(encoded, expect) + + def test_decode_produce_response(self): + t1 = "topic1" + t2 = "topic2" + encoded = struct.pack('>iih%dsiihqihqh%dsiihq' % (len(t1), len(t2)), + 2, 2, len(t1), t1, 2, 0, 0, 10L, 1, 1, 20L, + len(t2), t2, 1, 0, 0, 30L) + responses = list(KafkaProtocol.decode_produce_response(encoded)) + self.assertEqual(responses, + [ProduceResponse(t1, 0, 0, 10L), + ProduceResponse(t1, 1, 1, 20L), + ProduceResponse(t2, 0, 0, 30L)]) + + def test_encode_fetch_request(self): + requests = [FetchRequest("topic1", 0, 10, 1024), + FetchRequest("topic2", 1, 20, 100)] + expect = ('\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07' + 'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00' + '\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x04\x00\x00\x06' + 'topic2\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00' + '\x00\x00\x14\x00\x00\x00d') + encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2, + 100) + self.assertEqual(encoded, expect) + + def test_decode_fetch_response(self): + t1 = "topic1" + t2 = "topic2" + msgs = map(create_message, ["message1", "hi", "boo", "foo", "so fun!"]) + ms1 = KafkaProtocol._encode_message_set([msgs[0], msgs[1]]) + ms2 = KafkaProtocol._encode_message_set([msgs[2]]) + ms3 = KafkaProtocol._encode_message_set([msgs[3], msgs[4]]) + + encoded = struct.pack('>iih%dsiihqi%dsihqi%dsh%dsiihqi%ds' % + (len(t1), len(ms1), len(ms2), len(t2), len(ms3)), + 4, 2, len(t1), t1, 2, 0, 0, 10, len(ms1), ms1, 1, + 1, 20, len(ms2), ms2, len(t2), t2, 1, 0, 0, 30, + len(ms3), ms3) + + responses = list(KafkaProtocol.decode_fetch_response(encoded)) + def expand_messages(response): + return FetchResponse(response.topic, response.partition, + response.error, response.highwaterMark, + list(response.messages)) + + expanded_responses = map(expand_messages, responses) + expect = [FetchResponse(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]), + OffsetAndMessage(0, msgs[1])]), + FetchResponse(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]), + FetchResponse(t2, 0, 0, 30, [OffsetAndMessage(0, msgs[3]), + OffsetAndMessage(0, msgs[4])])] + self.assertEqual(expanded_responses, expect) + + def test_encode_metadata_request_no_topics(self): + encoded = KafkaProtocol.encode_metadata_request("cid", 4) + self.assertEqual(encoded, '\x00\x00\x00\x11\x00\x03\x00\x00\x00\x00' + '\x00\x04\x00\x03cid\x00\x00\x00\x00') + + def test_encode_metadata_request_with_topics(self): + encoded = KafkaProtocol.encode_metadata_request("cid", 4, ["t1", "t2"]) + self.assertEqual(encoded, '\x00\x00\x00\x19\x00\x03\x00\x00\x00\x00' + '\x00\x04\x00\x03cid\x00\x00\x00\x02\x00\x02' + 't1\x00\x02t2') + + def _create_encoded_metadata_response(self, broker_data, topic_data, + topic_errors, partition_errors): + encoded = struct.pack('>ii', 3, len(broker_data)) + for node_id, broker in broker_data.iteritems(): + encoded += struct.pack('>ih%dsi' % len(broker.host), node_id, + len(broker.host), broker.host, broker.port) + + encoded += struct.pack('>i', len(topic_data)) + for topic, partitions in topic_data.iteritems(): + encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic], + len(topic), topic, len(partitions)) + for partition, metadata in partitions.iteritems(): + encoded += struct.pack('>hiii', + partition_errors[(topic, partition)], + partition, metadata.leader, + len(metadata.replicas)) + if len(metadata.replicas) > 0: + encoded += struct.pack('>%di' % len(metadata.replicas), + *metadata.replicas) + + encoded += struct.pack('>i', len(metadata.isr)) + if len(metadata.isr) > 0: + encoded += struct.pack('>%di' % len(metadata.isr), + *metadata.isr) + + return encoded + + def test_decode_metadata_response(self): + node_brokers = { + 0: BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000), + 1: BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001), + 3: BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000) + } + topic_partitions = { + "topic1": { + 0: PartitionMetadata("topic1", 0, 1, (0, 2), (2,)), + 1: PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1)) + }, + "topic2": { + 0: PartitionMetadata("topic2", 0, 0, (), ()) + } + } + topic_errors = {"topic1": 0, "topic2": 1} + partition_errors = { + ("topic1", 0): 0, + ("topic1", 1): 1, + ("topic2", 0): 0 + } + encoded = self._create_encoded_metadata_response(node_brokers, + topic_partitions, + topic_errors, + partition_errors) + decoded = KafkaProtocol.decode_metadata_response(encoded) + self.assertEqual(decoded, (node_brokers, topic_partitions)) + + @unittest.skip("Not Implemented") + def test_encode_offset_request(self): + pass + + @unittest.skip("Not Implemented") + def test_decode_offset_response(self): + pass + + + @unittest.skip("Not Implemented") + def test_encode_offset_commit_request(self): + pass + + @unittest.skip("Not Implemented") + def test_decode_offset_commit_response(self): + pass + + @unittest.skip("Not Implemented") + def test_encode_offset_fetch_request(self): + pass + + @unittest.skip("Not Implemented") + def test_decode_offset_fetch_response(self): + pass if __name__ == '__main__': From 8009bb04b5509e890f420621e4576f88dbcbeb33 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 10:57:29 -0800 Subject: [PATCH 17/36] Style fix for imports --- test/test_unit.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/test/test_unit.py b/test/test_unit.py index 93d88a151..08fef9c2a 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -9,15 +9,11 @@ OffsetAndMessage, BrokerMetadata, PartitionMetadata ) from kafka.codec import ( - has_gzip, has_snappy, - gzip_encode, gzip_decode, + has_gzip, has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode ) from kafka.protocol import ( - create_gzip_message, - create_message, - create_snappy_message, - KafkaProtocol + create_gzip_message, create_message, create_snappy_message, KafkaProtocol ) ITERATIONS = 1000 From b7ee169c1ce1fb9d017b18ab8279e166a25a4398 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 15:18:32 -0800 Subject: [PATCH 18/36] Fix seek offset deltas We always store the offset of the next available message, so we shouldn't decrement the offset deltas when seeking by an extra 1 --- kafka/consumer.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index a01276c97..e2fcdcfcf 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -282,12 +282,6 @@ def seek(self, offset, whence): reqs.append(OffsetRequest(self.topic, partition, -2, 1)) elif whence == 2: reqs.append(OffsetRequest(self.topic, partition, -1, 1)) - - # The API returns back the next available offset - # For eg: if the current offset is 18, the API will return - # back 19. So, if we have to seek 5 points before, we will - # end up going back to 14, instead of 13. Adjust this - deltas[partition] -= 1 else: pass From 27240439dffaea65a48128cb7ee132f8c6535603 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 15:26:00 -0800 Subject: [PATCH 19/36] Raise a ConnectionError when a socket.error is raised when receiving data Also, log.exception() is unhelpfully noisy. Use log.error() with some error details in the message instead. --- kafka/conn.py | 18 +++++++++++------- kafka/producer.py | 6 +++--- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 7c6d51035..3661c00b8 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -35,13 +35,21 @@ def __str__(self): # Private API # ################### + def _raise_connection_error(self): + self._dirty = True + raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port)) + def _read_bytes(self, num_bytes): bytes_left = num_bytes resp = '' log.debug("About to read %d bytes from Kafka", num_bytes) while bytes_left: - data = self._sock.recv(bytes_left) + try: + data = self._sock.recv(bytes_left) + except socket.error, e: + log.error('Unable to receive data from Kafka: %s', e) + self._raise_connection_error() if data == '': raise BufferUnderflowError("Not enough data to read this response") bytes_left -= len(data) @@ -65,10 +73,6 @@ def _consume_response(self): resp = self._read_bytes(size) return str(resp) - def _raise_connection_error(self): - self._dirty = True - raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port)) - ################## # Public API # ################## @@ -84,8 +88,8 @@ def send(self, request_id, payload): sent = self._sock.sendall(payload) if sent is not None: self._raise_connection_error() - except socket.error: - log.exception('Unable to send payload to Kafka') + except socket.error, e: + log.error('Unable to send payload to Kafka: %s', e) self._raise_connection_error() def recv(self, request_id): diff --git a/kafka/producer.py b/kafka/producer.py index a82d99b5c..4ff13b4aa 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -69,8 +69,8 @@ def _send_upstream(topic, queue, client, batch_time, batch_size, client.send_produce_request(reqs, acks=req_acks, timeout=ack_timeout) - except Exception as exp: - log.exception("Unable to send message") + except Exception as e: + log.error("Unable to send message: %s", e) class Producer(object): @@ -147,7 +147,7 @@ def send_messages(self, partition, *msg): resp = self.client.send_produce_request([req], acks=self.req_acks, timeout=self.ack_timeout) except Exception as e: - log.exception("Unable to send messages") + log.error("Unable to send messages: %s", e) raise e return resp From 34da78b15fde08fc82c098bbf4ec0098d25c881d Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 15:29:04 -0800 Subject: [PATCH 20/36] Fix client error handling This differentiates between errors that occur when sending the request and receiving the response, and adds BufferUnderflowError handling. --- kafka/client.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index bd3a21406..821904c87 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -6,8 +6,10 @@ import socket import time -from kafka.common import ErrorMapping, TopicAndPartition -from kafka.common import ConnectionError, FailedPayloadsException +from kafka.common import ( + ErrorMapping, TopicAndPartition, BufferUnderflowError, ConnectionError, + FailedPayloadsException +) from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -165,14 +167,24 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): request = encoder_fn(client_id=self.client_id, correlation_id=requestId, payloads=payloads) + failed = False # Send the request, recv the response try: conn.send(requestId, request) if decoder_fn is None: continue - response = conn.recv(requestId) - except ConnectionError, e: # ignore BufferUnderflow for now - log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) + try: + response = conn.recv(requestId) + except (ConnectionError, BufferUnderflowError), e: + log.warning("Could not receive response to request [%s] " + "from server %s: %s", request, conn, e) + failed = True + except ConnectionError, e: + log.warning("Could not send request [%s] to server %s: %s", + request, conn, e) + failed = True + + if failed: failed_payloads += payloads self.topics_to_brokers = {} # reset metadata continue From 3dfed952d1167a227132a3ee858a375c970171a6 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 15:49:55 -0800 Subject: [PATCH 21/36] Add a limit to fetch buffer size, and actually retry requests when fetch size is too small Note: This can cause fetching a message to exceed a given timeout, but timeouts are not guaranteed anyways, and in this case it's the client's fault for not sending a big enough buffer size rather than the kafka server. This can be bad if max_fetch_size is None (no limit) and there is some message in Kafka that is crazy huge, but that is why we should have some max_fetch_size. --- kafka/consumer.py | 95 +++++++++++++++++++++++++++++------------------ 1 file changed, 58 insertions(+), 37 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index e2fcdcfcf..bb2b7614d 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -25,6 +25,7 @@ FETCH_MAX_WAIT_TIME = 100 FETCH_MIN_BYTES = 4096 FETCH_BUFFER_SIZE_BYTES = 4096 +MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 ITER_TIMEOUT_SECONDS = 60 NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 @@ -213,8 +214,10 @@ class SimpleConsumer(Consumer): auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit fetch_size_bytes: number of bytes to request in a FetchRequest - buffer_size: initial number of bytes to tell kafka we have - available. This will double every time it's not enough + buffer_size: default 4K. Initial number of bytes to tell kafka we + have available. This will double as needed. + max_buffer_size: default 16K. Max number of bytes to tell kafka we have + available. None means no limit. iter_timeout: default None. How much time (in seconds) to wait for a message in the iterator before exiting. None means no timeout, so it will wait forever. @@ -230,9 +233,15 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_t=AUTO_COMMIT_INTERVAL, fetch_size_bytes=FETCH_MIN_BYTES, buffer_size=FETCH_BUFFER_SIZE_BYTES, + max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, iter_timeout=None): + if max_buffer_size is not None and buffer_size > max_buffer_size: + raise ValueError("buffer_size (%d) is greater than " + "max_buffer_size (%d)" % + (buffer_size, max_buffer_size)) self.buffer_size = buffer_size + self.max_buffer_size = max_buffer_size self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes @@ -355,42 +364,54 @@ def _fetch(self): # Create fetch request payloads for all the partitions requests = [] partitions = self.offsets.keys() - for partition in partitions: - requests.append(FetchRequest(self.topic, partition, - self.offsets[partition], - self.buffer_size)) - # Send request - responses = self.client.send_fetch_request( - requests, - max_wait_time=int(self.fetch_max_wait_time), - min_bytes=self.fetch_min_bytes) - - for resp in responses: - partition = resp.partition - try: - for message in resp.messages: - # Update partition offset - self.offsets[partition] = message.offset + 1 - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - - # Put the message in our queue - if self.partition_info: - self.queue.put((partition, message)) + while partitions: + for partition in partitions: + requests.append(FetchRequest(self.topic, partition, + self.offsets[partition], + self.buffer_size)) + # Send request + responses = self.client.send_fetch_request( + requests, + max_wait_time=int(self.fetch_max_wait_time), + min_bytes=self.fetch_min_bytes) + + retry_partitions = set() + for resp in responses: + partition = resp.partition + try: + for message in resp.messages: + # Update partition offset + self.offsets[partition] = message.offset + 1 + + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() + + # Put the message in our queue + if self.partition_info: + self.queue.put((partition, message)) + else: + self.queue.put(message) + except ConsumerFetchSizeTooSmall, e: + if (self.max_buffer_size is not None and + self.buffer_size == self.max_buffer_size): + log.error("Max fetch size %d too small", + self.max_buffer_size) + raise e + if self.max_buffer_size is None: + self.buffer_size *= 2 else: - self.queue.put(message) - except ConsumerFetchSizeTooSmall, e: - self.buffer_size *= 2 - log.warn("Fetch size too small, increase to %d (2x) and retry", - self.buffer_size) - except ConsumerNoMoreData, e: - log.debug("Iteration was ended by %r", e) - except StopIteration: - # Stop iterating through this partition - log.debug("Done iterating over partition %s" % partition) - + self.buffer_size = max([self.buffer_size * 2, + self.max_buffer_size]) + log.warn("Fetch size too small, increase to %d (2x) " + "and retry", self.buffer_size) + retry_partitions.add(partition) + except ConsumerNoMoreData, e: + log.debug("Iteration was ended by %r", e) + except StopIteration: + # Stop iterating through this partition + log.debug("Done iterating over partition %s" % partition) + partitions = retry_partitions def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): """ From 98023936be8015dcfae5090d58ad789d67772fe7 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 15:52:37 -0800 Subject: [PATCH 22/36] Handle starting/stopping Kafka brokers that are already started/stopped in integration tests If some of the tests stop brokers then error out, the teardown method will try to close the same brokers and fail. This change allows it to continue. --- test/fixtures.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test/fixtures.py b/test/fixtures.py index c771a5859..17e6672f6 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -272,8 +272,13 @@ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas= self.tmp_dir = None self.child = None + self.running = False def open(self): + if self.running: + print("*** Kafka instance already running") + return + self.tmp_dir = tempfile.mkdtemp() print("*** Running local Kafka instance") print(" host = %s" % self.host) @@ -318,10 +323,16 @@ def open(self): self.child.start() self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id) print("*** Done!") + self.running = True def close(self): + if not self.running: + print("*** Kafka instance already stopped") + return + print("*** Stopping Kafka...") self.child.stop() self.child = None print("*** Done!") shutil.rmtree(self.tmp_dir) + self.running = False From 1e3f24b0d3abedf57186e1ce632d56a1f3dd0b1f Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 17:40:07 -0800 Subject: [PATCH 23/36] Remove unnecessary brackets --- kafka/consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index bb2b7614d..eba291247 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -401,8 +401,8 @@ def _fetch(self): if self.max_buffer_size is None: self.buffer_size *= 2 else: - self.buffer_size = max([self.buffer_size * 2, - self.max_buffer_size]) + self.buffer_size = max(self.buffer_size * 2, + self.max_buffer_size) log.warn("Fetch size too small, increase to %d (2x) " "and retry", self.buffer_size) retry_partitions.add(partition) From 339650fba984b1216d3272b5ee32507c212886e3 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 17:47:39 -0800 Subject: [PATCH 24/36] Fix client and consumer params in integration tests --- test/test_integration.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/test/test_integration.py b/test/test_integration.py index a10dae243..d141c3682 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -554,7 +554,7 @@ def setUpClass(cls): cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192) + cls.client = KafkaClient(cls.server2.host, cls.server2.port) @classmethod def tearDownClass(cls): # noqa @@ -583,7 +583,9 @@ def test_simple_consumer(self): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", + "test_simple_consumer", auto_commit=False, + iter_timeout=0) all_messages = [] for message in consumer: all_messages.append(message) @@ -609,7 +611,9 @@ def test_simple_consumer(self): consumer.stop() def test_simple_consumer_blocking(self): - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_blocking", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", + "test_simple_consumer_blocking", + auto_commit=False, iter_timeout=0) # Blocking API start = datetime.now() @@ -657,7 +661,8 @@ def test_simple_consumer_pending(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", + auto_commit=False, iter_timeout=0) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) @@ -764,7 +769,8 @@ def test_large_messages(self): self.assertEquals(resp.offset, 10) # Consumer should still get all of them - consumer = SimpleConsumer(self.client, "group1", "test_large_messages", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", "test_large_messages", + auto_commit=False, iter_timeout=0) all_messages = messages1 + messages2 for i, message in enumerate(consumer): self.assertEquals(all_messages[i], message.message) @@ -869,7 +875,7 @@ def _kill_leader(self, topic, partition): def _count_messages(self, group, topic): client = KafkaClient(self.brokers[0].host, self.brokers[0].port) - consumer = SimpleConsumer(client, group, topic, auto_commit=False) + consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0) all_messages = [] for message in consumer: all_messages.append(message) From fcf23c45fa9d3a2f92a91a4072d01a36d12bfd61 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 17:49:23 -0800 Subject: [PATCH 25/36] Add tests for limited and unlimited consumer max_buffer_size --- test/test_integration.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/test/test_integration.py b/test/test_integration.py index d141c3682..f638956b6 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -8,6 +8,7 @@ from kafka import * # noqa from kafka.common import * # noqa from kafka.codec import has_gzip, has_snappy +from kafka.consumer import FETCH_BUFFER_SIZE_BYTES, MAX_FETCH_BUFFER_SIZE_BYTES from .fixtures import ZookeeperFixture, KafkaFixture @@ -760,7 +761,7 @@ def test_large_messages(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - # Produce 10 messages that are too large (bigger than default fetch size) + # Produce 10 messages that are large (bigger than default fetch size) messages2 = [create_message(random_string(5000)) for i in range(10)] produce2 = ProduceRequest("test_large_messages", 0, messages2) @@ -776,6 +777,29 @@ def test_large_messages(self): self.assertEquals(all_messages[i], message.message) self.assertEquals(i, 19) + # Produce 1 message that is too large (bigger than max fetch size) + big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10 + big_message = create_message(random_string(big_message_size)) + produce3 = ProduceRequest("test_large_messages", 0, [big_message]) + for resp in self.client.send_produce_request([produce3]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 20) + + self.assertRaises(ConsumerFetchSizeTooSmall, consumer.get_message, False, 0.1) + + # Create a consumer with no fetch size limit + big_consumer = SimpleConsumer(self.client, "group1", "test_large_messages", + max_buffer_size=None, partitions=[0], + auto_commit=False, iter_timeout=0) + + # Seek to the last message + big_consumer.seek(-1, 2) + + # Consume giant message successfully + message = big_consumer.get_message(block=False, timeout=10) + self.assertIsNotNone(message) + self.assertEquals(message.message.value, big_message.value) + class TestFailover(unittest.TestCase): @classmethod From ac411bf5cff47452fc1f5e8b150ad52c5adb7ee4 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Mon, 6 Jan 2014 14:09:16 -0800 Subject: [PATCH 26/36] Make kafka brokers per-test in failover integration tests This is better since the tests stop/start brokers, and if something goes wrong they can affect eachother. --- test/test_integration.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/test/test_integration.py b/test/test_integration.py index f638956b6..ccf9990fe 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -802,25 +802,23 @@ def test_large_messages(self): class TestFailover(unittest.TestCase): - @classmethod - def setUpClass(cls): + def setUp(self): - zk_chroot = random_string(10) + zk_chroot = random_string(10) replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - cls.zk = ZookeeperFixture.instance() - kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] - cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port) - - @classmethod - def tearDownClass(cls): - cls.client.close() - for broker in cls.brokers: + self.zk = ZookeeperFixture.instance() + kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] + self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] + self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + + def tearDown(self): + self.client.close() + for broker in self.brokers: broker.close() - cls.zk.close() + self.zk.close() def test_switch_leader(self): From 7a999a1b2f50c73b8ed07c413fd33b90d77d9dd3 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Mon, 6 Jan 2014 14:16:11 -0800 Subject: [PATCH 27/36] Add object type and ID to message prefix in fixtures output for easier debugging --- test/fixtures.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index 17e6672f6..28d6519a9 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -208,9 +208,12 @@ def __init__(self, host, port): self.tmp_dir = None self.child = None + def out(self, message): + print("*** Zookeeper[%s]: %s" % (id(self), message)) + def open(self): self.tmp_dir = tempfile.mkdtemp() - print("*** Running local Zookeeper instance...") + print("*** [%s] Running local Zookeeper instance..." % id(self)) print(" host = %s" % self.host) print(" port = %s" % self.port) print(" tmp_dir = %s" % self.tmp_dir) @@ -229,16 +232,16 @@ def open(self): self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt")) # Party! - print("*** Starting Zookeeper...") + self.out("Starting...") self.child.start() self.child.wait_for(r"Snapshotting") - print("*** Done!") + self.out("Done!") def close(self): - print("*** Stopping Zookeeper...") + self.out("Stopping...") self.child.stop() self.child = None - print("*** Done!") + self.out("Done!") shutil.rmtree(self.tmp_dir) @@ -274,13 +277,16 @@ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas= self.child = None self.running = False + def out(self, message): + print("*** Kafka[%s]: %s" % (id(self), message)) + def open(self): if self.running: - print("*** Kafka instance already running") + self.out("Instance already running") return self.tmp_dir = tempfile.mkdtemp() - print("*** Running local Kafka instance") + self.out("Running local instance") print(" host = %s" % self.host) print(" port = %s" % self.port) print(" broker_id = %s" % self.broker_id) @@ -308,31 +314,31 @@ def open(self): self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt")) # Party! - print("*** Creating Zookeeper chroot node...") + self.out("Creating Zookeeper chroot node...") proc = subprocess.Popen(kafka_run_class_args( "org.apache.zookeeper.ZooKeeperMain", "-server", "%s:%d" % (self.zk_host, self.zk_port), "create", "/%s" % self.zk_chroot, "kafka-python" )) if proc.wait() != 0: - print("*** Failed to create Zookeeper chroot node") + self.out("Failed to create Zookeeper chroot node") raise RuntimeError("Failed to create Zookeeper chroot node") - print("*** Done!") + self.out("Done!") - print("*** Starting Kafka...") + self.out("Starting...") self.child.start() self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id) - print("*** Done!") + self.out("Done!") self.running = True def close(self): if not self.running: - print("*** Kafka instance already stopped") + self.out("Instance already stopped") return - print("*** Stopping Kafka...") + self.out("Stopping...") self.child.stop() self.child = None - print("*** Done!") + self.out("Done!") shutil.rmtree(self.tmp_dir) self.running = False From a61990b582aa26e50f3e78530fa491204eefd9ae Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Tue, 7 Jan 2014 17:23:30 -0800 Subject: [PATCH 28/36] Use the same timeout when reinitializing a connection --- kafka/conn.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 3661c00b8..4d0027b1d 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -25,7 +25,8 @@ def __init__(self, host, port, timeout=10): self.port = port self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) - self._sock.settimeout(timeout) + self.timeout = timeout + self._sock.settimeout(self.timeout) self._dirty = False def __str__(self): @@ -129,5 +130,5 @@ def reinit(self): self.close() self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((self.host, self.port)) - self._sock.settimeout(10) + self._sock.settimeout(self.timeout) self._dirty = False From c33c0d5aa73e8d5e548a6d1363f6e48ea71d6ee0 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Tue, 7 Jan 2014 17:26:09 -0800 Subject: [PATCH 29/36] Handle dirty flag in conn.recv() * If the connection is dirty, reinit * If we get a BufferUnderflowError, the server could have gone away, so mark it dirty --- kafka/conn.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 4d0027b1d..80f157fb9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -44,7 +44,8 @@ def _read_bytes(self, num_bytes): bytes_left = num_bytes resp = '' log.debug("About to read %d bytes from Kafka", num_bytes) - + if self._dirty: + self.reinit() while bytes_left: try: data = self._sock.recv(bytes_left) @@ -52,6 +53,7 @@ def _read_bytes(self, num_bytes): log.error('Unable to receive data from Kafka: %s', e) self._raise_connection_error() if data == '': + self._dirty = True raise BufferUnderflowError("Not enough data to read this response") bytes_left -= len(data) log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) From 86770b825df7ed3a29d18dc08a43f0ab53bcc9b7 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Tue, 7 Jan 2014 17:27:21 -0800 Subject: [PATCH 30/36] Remove unnecessary method --- kafka/conn.py | 31 ++++++++----------------------- 1 file changed, 8 insertions(+), 23 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 80f157fb9..833f9c5fc 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -61,21 +61,6 @@ def _read_bytes(self, num_bytes): return resp - def _consume_response(self): - """ - This method handles the response header and error messages. It - then returns the response - """ - log.debug("Expecting response from Kafka") - # Read the size off of the header - resp = self._read_bytes(4) - - (size,) = struct.unpack('>i', resp) - - # Read the remainder of the response - resp = self._read_bytes(size) - return str(resp) - ################## # Public API # ################## @@ -91,7 +76,7 @@ def send(self, request_id, payload): sent = self._sock.sendall(payload) if sent is not None: self._raise_connection_error() - except socket.error, e: + except socket.error: log.error('Unable to send payload to Kafka: %s', e) self._raise_connection_error() @@ -100,14 +85,14 @@ def recv(self, request_id): Get a response from Kafka """ log.debug("Reading response %d from Kafka" % request_id) - if self._dirty: - self._raise_connection_error() + # Read the size off of the header + resp = self._read_bytes(4) - try: - return self._consume_response() - except socket.error: - log.exception('Unable to read response from Kafka') - self._raise_connection_error() + (size,) = struct.unpack('>i', resp) + + # Read the remainder of the response + resp = self._read_bytes(size) + return str(resp) def copy(self): """ From 4afe2dd6e7cbe040aa6bf748e950cc8d4b80e28d Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Tue, 7 Jan 2014 17:46:01 -0800 Subject: [PATCH 31/36] Skip snappy/gzip tests if they're not available --- test/test_unit.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/test/test_unit.py b/test/test_unit.py index 08fef9c2a..e3fd4bb5b 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -55,17 +55,16 @@ def test_submodule_namespace(self): class TestCodec(unittest.TestCase): + + @unittest.skipUnless(has_gzip(), "Gzip not available") def test_gzip(self): - if not has_gzip(): - return for i in xrange(ITERATIONS): s1 = random_string() s2 = gzip_decode(gzip_encode(s1)) self.assertEquals(s1, s2) + @unittest.skipUnless(has_snappy(), "Snappy not available") def test_snappy(self): - if not has_snappy(): - return for i in xrange(ITERATIONS): s1 = random_string() s2 = snappy_decode(snappy_encode(s1)) @@ -83,6 +82,7 @@ def test_create_message(self): self.assertEqual(msg.key, key) self.assertEqual(msg.value, payload) + @unittest.skipUnless(has_gzip(), "Snappy not available") def test_create_gzip(self): payloads = ["v1", "v2"] msg = create_gzip_message(payloads) @@ -98,6 +98,7 @@ def test_create_gzip(self): "\xff\xff\x00\x00\x00\x02v2") self.assertEqual(decoded, expect) + @unittest.skipUnless(has_snappy(), "Snappy not available") def test_create_snappy(self): payloads = ["v1", "v2"] msg = create_snappy_message(payloads) @@ -157,6 +158,7 @@ def test_decode_message_set(self): self.assertEqual(returned_offset2, 0) self.assertEqual(decoded_message2, create_message("v2")) + @unittest.skipUnless(has_gzip(), "Gzip not available") def test_decode_message_gzip(self): gzip_encoded = ('\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000' '\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01' @@ -173,6 +175,7 @@ def test_decode_message_gzip(self): self.assertEqual(returned_offset2, 0) self.assertEqual(decoded_message2, create_message("v2")) + @unittest.skipUnless(has_snappy(), "Snappy not available") def test_decode_message_snappy(self): snappy_encoded = ('\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00' '\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff' From d4d7981a31bff18de5606b3e212f1d3ee70f8323 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Tue, 7 Jan 2014 17:54:05 -0800 Subject: [PATCH 32/36] Some cleanup and easier to read test fixture output --- test/fixtures.py | 8 ++++---- test/test_integration.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index 28d6519a9..9e283d3c5 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -209,11 +209,11 @@ def __init__(self, host, port): self.child = None def out(self, message): - print("*** Zookeeper[%s]: %s" % (id(self), message)) + print("*** Zookeeper [%s:%d]: %s" % (self.host, self.port, message)) def open(self): self.tmp_dir = tempfile.mkdtemp() - print("*** [%s] Running local Zookeeper instance..." % id(self)) + self.out("Running local instance...") print(" host = %s" % self.host) print(" port = %s" % self.port) print(" tmp_dir = %s" % self.tmp_dir) @@ -278,7 +278,7 @@ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas= self.running = False def out(self, message): - print("*** Kafka[%s]: %s" % (id(self), message)) + print("*** Kafka [%s:%d]: %s" % (self.host, self.port, message)) def open(self): if self.running: @@ -286,7 +286,7 @@ def open(self): return self.tmp_dir = tempfile.mkdtemp() - self.out("Running local instance") + self.out("Running local instance...") print(" host = %s" % self.host) print(" port = %s" % self.port) print(" broker_id = %s" % self.broker_id) diff --git a/test/test_integration.py b/test/test_integration.py index ccf9990fe..eaf432d64 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -8,7 +8,7 @@ from kafka import * # noqa from kafka.common import * # noqa from kafka.codec import has_gzip, has_snappy -from kafka.consumer import FETCH_BUFFER_SIZE_BYTES, MAX_FETCH_BUFFER_SIZE_BYTES +from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES from .fixtures import ZookeeperFixture, KafkaFixture From a2cfc70f2de4644b910a1fd91f0ebb786507f475 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 8 Jan 2014 11:30:15 -0800 Subject: [PATCH 33/36] Change BufferUnderflowError to ConnectionError in conn._read_bytes() Both errors are handled the same way when raised and caught, so this makes sense. --- kafka/client.py | 5 ++--- kafka/conn.py | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 821904c87..33c6d778f 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -3,11 +3,10 @@ from functools import partial from itertools import count import logging -import socket import time from kafka.common import ( - ErrorMapping, TopicAndPartition, BufferUnderflowError, ConnectionError, + ErrorMapping, TopicAndPartition, ConnectionError, FailedPayloadsException ) from kafka.conn import KafkaConnection @@ -175,7 +174,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): continue try: response = conn.recv(requestId) - except (ConnectionError, BufferUnderflowError), e: + except ConnectionError, e: log.warning("Could not receive response to request [%s] " "from server %s: %s", request, conn, e) failed = True diff --git a/kafka/conn.py b/kafka/conn.py index 833f9c5fc..0af9e72ed 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -4,7 +4,6 @@ import struct from threading import local -from kafka.common import BufferUnderflowError from kafka.common import ConnectionError log = logging.getLogger("kafka") @@ -53,8 +52,8 @@ def _read_bytes(self, num_bytes): log.error('Unable to receive data from Kafka: %s', e) self._raise_connection_error() if data == '': - self._dirty = True - raise BufferUnderflowError("Not enough data to read this response") + log.error("Not enough data to read this response") + self._raise_connection_error() bytes_left -= len(data) log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) resp += data From 2b4451c8e5ff9c9269d1e44f6371f84a6b55e348 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 8 Jan 2014 11:46:10 -0800 Subject: [PATCH 34/36] Change log.error() back to log.exception() --- kafka/conn.py | 6 +++--- kafka/producer.py | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 0af9e72ed..ec42fda4b 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -48,8 +48,8 @@ def _read_bytes(self, num_bytes): while bytes_left: try: data = self._sock.recv(bytes_left) - except socket.error, e: - log.error('Unable to receive data from Kafka: %s', e) + except socket.error: + log.exception('Unable to receive data from Kafka') self._raise_connection_error() if data == '': log.error("Not enough data to read this response") @@ -76,7 +76,7 @@ def send(self, request_id, payload): if sent is not None: self._raise_connection_error() except socket.error: - log.error('Unable to send payload to Kafka: %s', e) + log.exception('Unable to send payload to Kafka') self._raise_connection_error() def recv(self, request_id): diff --git a/kafka/producer.py b/kafka/producer.py index 4ff13b4aa..5aead439a 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -69,8 +69,8 @@ def _send_upstream(topic, queue, client, batch_time, batch_size, client.send_produce_request(reqs, acks=req_acks, timeout=ack_timeout) - except Exception as e: - log.error("Unable to send message: %s", e) + except Exception: + log.exception("Unable to send message") class Producer(object): @@ -146,9 +146,9 @@ def send_messages(self, partition, *msg): try: resp = self.client.send_produce_request([req], acks=self.req_acks, timeout=self.ack_timeout) - except Exception as e: - log.error("Unable to send messages: %s", e) - raise e + except Exception: + log.exception("Unable to send messages") + raise return resp def stop(self, timeout=1): From e303b3714f5953353d78c1d90cbfd7ba38d66504 Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Mon, 6 Jan 2014 17:37:41 -0800 Subject: [PATCH 35/36] Check for socket status on read as well as send --- kafka/conn.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index ec42fda4b..8d3e725f8 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -83,7 +83,6 @@ def recv(self, request_id): """ Get a response from Kafka """ - log.debug("Reading response %d from Kafka" % request_id) # Read the size off of the header resp = self._read_bytes(4) From a6c25fd833931121f1367e5e7bf8209471c85b29 Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Mon, 6 Jan 2014 17:50:39 -0800 Subject: [PATCH 36/36] Propagate error immediately if dirty --- kafka/conn.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/conn.py b/kafka/conn.py index 8d3e725f8..ec42fda4b 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -83,6 +83,7 @@ def recv(self, request_id): """ Get a response from Kafka """ + log.debug("Reading response %d from Kafka" % request_id) # Read the size off of the header resp = self._read_bytes(4)