From fa351d9326c65685f4b6a2c5a471e1e38844774a Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Tue, 7 Jan 2014 19:17:21 -0800 Subject: [PATCH 01/46] Add error hierarchy, remove client infinite loop --- kafka/client.py | 42 ++++++++++++++++---------------- kafka/common.py | 21 +++++++++++----- test/test_integration.py | 52 +++++++++++++++++++++++++++------------- 3 files changed, 70 insertions(+), 45 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 71ededaa0..b55888c6c 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -6,8 +6,10 @@ import socket import time -from kafka.common import ErrorMapping, TopicAndPartition -from kafka.common import ConnectionError, FailedPayloadsException +from kafka.common import ErrorMapping, TopicAndPartition, \ + ConnectionError, FailedPayloadsException, BrokerErrorResponse, \ + PartitionUnavailableError + from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -29,7 +31,6 @@ def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID): self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] - self._load_metadata_for_topics() ################## # Private API # @@ -51,7 +52,7 @@ def _get_leader_for_partition(self, topic, partition): self._load_metadata_for_topics(topic) if key not in self.topics_to_brokers: - raise Exception("Partition does not exist: %s" % str(key)) + raise BrokerErrorResponse("Partition does not exist: %s" % str(key)) return self.topics_to_brokers[key] @@ -82,20 +83,12 @@ def _load_metadata_for_topics(self, *topics): self.topic_partitions.pop(topic, None) if not partitions: - log.info("Partition is unassigned, delay for 1s and retry") - time.sleep(1) - self._load_metadata_for_topics(topic) - break + raise PartitionUnavailableError("Partitions for %s are unassigned!" % topic) for partition, meta in partitions.items(): - if meta.leader == -1: - log.info("Partition is unassigned, delay for 1s and retry") - time.sleep(1) - self._load_metadata_for_topics(topic) - else: - topic_part = TopicAndPartition(topic, partition) - self.topics_to_brokers[topic_part] = brokers[meta.leader] - self.topic_partitions[topic].append(partition) + topic_part = TopicAndPartition(topic, partition) + self.topics_to_brokers[topic_part] = brokers[meta.leader] + self.topic_partitions[topic].append(partition) def _next_id(self): """ @@ -149,6 +142,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): for payload in payloads: leader = self._get_leader_for_partition(payload.topic, payload.partition) + if leader == -1: + raise PartitionUnavailableError("Leader is unassigned for %s-%s" % payload.topic, payload.partition) payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) @@ -174,7 +169,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): except ConnectionError, e: # ignore BufferUnderflow for now log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) failed_payloads += payloads - self.topics_to_brokers = {} # reset metadata + self.reset_metadata() continue for response in decoder_fn(response): @@ -190,6 +185,9 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): # Public API # ################# + def reset_metadata(self): + self.topics_to_brokers = {} + def close(self): for conn in self.conns.values(): conn.close() @@ -247,7 +245,7 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000, for resp in resps: # Check for errors if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception( + raise BrokerErrorResponse( "ProduceRequest for %s failed with errorcode=%d" % (TopicAndPartition(resp.topic, resp.partition), resp.error)) @@ -280,7 +278,7 @@ def send_fetch_request(self, payloads=[], fail_on_error=True, for resp in resps: # Check for errors if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception( + raise BrokerErrorResponse( "FetchRequest for %s failed with errorcode=%d" % (TopicAndPartition(resp.topic, resp.partition), resp.error)) @@ -302,7 +300,7 @@ def send_offset_request(self, payloads=[], fail_on_error=True, out = [] for resp in resps: if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetRequest failed with errorcode=%s", + raise BrokerErrorResponse("OffsetRequest failed with errorcode=%s", resp.error) if callback is not None: out.append(callback(resp)) @@ -320,7 +318,7 @@ def send_offset_commit_request(self, group, payloads=[], out = [] for resp in resps: if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetCommitRequest failed with " + raise BrokerErrorResponse("OffsetCommitRequest failed with " "errorcode=%s", resp.error) if callback is not None: @@ -340,7 +338,7 @@ def send_offset_fetch_request(self, group, payloads=[], out = [] for resp in resps: if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetCommitRequest failed with errorcode=%s", + raise BrokerErrorResponse("OffsetCommitRequest failed with errorcode=%s", resp.error) if callback is not None: out.append(callback(resp)) diff --git a/kafka/common.py b/kafka/common.py index 6f0dd322b..a7590ea3a 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -69,23 +69,32 @@ class ErrorMapping(object): # Exceptions # ################# -class FailedPayloadsException(Exception): +class KafkaException(RuntimeError): pass -class ConnectionError(Exception): +class BrokerErrorResponse(KafkaException): pass -class BufferUnderflowError(Exception): +class PartitionUnavailableError(KafkaException): + pass + +class FailedPayloadsException(KafkaException): + pass + +class ConnectionError(KafkaException): + pass + +class BufferUnderflowError(KafkaException): pass -class ChecksumError(Exception): +class ChecksumError(KafkaException): pass -class ConsumerFetchSizeTooSmall(Exception): +class ConsumerFetchSizeTooSmall(KafkaException): pass -class ConsumerNoMoreData(Exception): +class ConsumerNoMoreData(KafkaException): pass diff --git a/test/test_integration.py b/test/test_integration.py index a10dae243..43a604112 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -10,8 +10,24 @@ from kafka.codec import has_gzip, has_snappy from .fixtures import ZookeeperFixture, KafkaFixture - -class TestKafkaClient(unittest.TestCase): +class KafkaTestCase(unittest.TestCase): + def setUp(self): + partition_name = self.id()[self.id().rindex(".")+1:] + times = 0 + while True: + times += 1 + try: + self.client._load_metadata_for_topics(partition_name) + break + except PartitionUnavailableError: + print "Waiting for %s partition to be created" % partition_name + time.sleep(1) + + if times > 30: + raise Exception("Unable to create partition %s" % partition_name) + + +class TestKafkaClient(KafkaTestCase): @classmethod def setUpClass(cls): # noqa cls.zk = ZookeeperFixture.instance() @@ -24,11 +40,13 @@ def tearDownClass(cls): # noqa cls.server.close() cls.zk.close() + ##################### # Produce Tests # ##################### def test_produce_many_simple(self): + produce = ProduceRequest("test_produce_many_simple", 0, messages=[ create_message("Test message %d" % i) for i in range(100) ]) @@ -330,15 +348,15 @@ def test_round_robin_partitioner(self): producer.stop() def test_hashed_partitioner(self): - producer = KeyedProducer(self.client, "test_hash_partitioner", + producer = KeyedProducer(self.client, "test_hashed_partitioner", partitioner=HashedPartitioner) producer.send(1, "one") producer.send(2, "two") producer.send(3, "three") producer.send(4, "four") - fetch1 = FetchRequest("test_hash_partitioner", 0, 0, 1024) - fetch2 = FetchRequest("test_hash_partitioner", 1, 0, 1024) + fetch1 = FetchRequest("test_hashed_partitioner", 0, 0, 1024) + fetch2 = FetchRequest("test_hashed_partitioner", 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -548,7 +566,7 @@ def test_batched_simple_producer(self): producer.stop() -class TestConsumer(unittest.TestCase): +class TestConsumer(KafkaTestCase): @classmethod def setUpClass(cls): cls.zk = ZookeeperFixture.instance() @@ -643,21 +661,21 @@ def test_simple_consumer_blocking(self): def test_simple_consumer_pending(self): # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_simple_pending", 0, messages=[ + produce1 = ProduceRequest("test_simple_consumer_pending", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_simple_pending", 1, messages=[ + produce2 = ProduceRequest("test_simple_consumer_pending", 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_pending", auto_commit=False) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) @@ -665,7 +683,7 @@ def test_simple_consumer_pending(self): def test_multi_process_consumer(self): # Produce 100 messages to partition 0 - produce1 = ProduceRequest("test_mpconsumer", 0, messages=[ + produce1 = ProduceRequest("test_multi_process_consumer", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(100) ]) @@ -674,7 +692,7 @@ def test_multi_process_consumer(self): self.assertEquals(resp.offset, 0) # Produce 100 messages to partition 1 - produce2 = ProduceRequest("test_mpconsumer", 1, messages=[ + produce2 = ProduceRequest("test_multi_process_consumer", 1, messages=[ create_message("Test message 1 %d" % i) for i in range(100) ]) @@ -683,7 +701,7 @@ def test_multi_process_consumer(self): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer", auto_commit=False) + consumer = MultiProcessConsumer(self.client, "grp1", "test_multi_process_consumer", auto_commit=False) all_messages = [] for message in consumer: all_messages.append(message) @@ -700,7 +718,7 @@ def test_multi_process_consumer(self): self.assertEqual(len(messages), 0) # Send 10 messages - produce = ProduceRequest("test_mpconsumer", 0, messages=[ + produce = ProduceRequest("test_multi_process_consumer", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -723,7 +741,7 @@ def test_multi_process_consumer(self): def test_multi_proc_pending(self): # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_mppending", 0, messages=[ + produce1 = ProduceRequest("test_multi_proc_pending", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -731,7 +749,7 @@ def test_multi_proc_pending(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_mppending", 1, messages=[ + produce2 = ProduceRequest("test_multi_proc_pending", 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) @@ -739,7 +757,7 @@ def test_multi_proc_pending(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = MultiProcessConsumer(self.client, "group1", "test_mppending", auto_commit=False) + consumer = MultiProcessConsumer(self.client, "group1", "test_multi_proc_pending", auto_commit=False) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) @@ -770,7 +788,7 @@ def test_large_messages(self): self.assertEquals(all_messages[i], message.message) self.assertEquals(i, 19) -class TestFailover(unittest.TestCase): +class TestFailover(KafkaTestCase): @classmethod def setUpClass(cls): From 7d7a5874f92c51a8e2058c507bff9530feba9c05 Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Tue, 7 Jan 2014 19:41:39 -0800 Subject: [PATCH 02/46] Better error handling on broker exception, flake8 fixes --- kafka/client.py | 136 +++++++++++++++++++-------------------- kafka/common.py | 6 ++ kafka/consumer.py | 2 +- kafka/producer.py | 15 ++--- test/test_integration.py | 2 +- 5 files changed, 82 insertions(+), 79 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index b55888c6c..31650aae9 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,15 +1,13 @@ import copy +import logging + from collections import defaultdict from functools import partial from itertools import count -import logging -import socket -import time from kafka.common import ErrorMapping, TopicAndPartition, \ - ConnectionError, FailedPayloadsException, BrokerErrorResponse, \ - PartitionUnavailableError - + ConnectionError, FailedPayloadsException, BrokerErrorResponse, \ + PartitionUnavailableError, KafkaException from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -49,47 +47,13 @@ def _get_conn_for_broker(self, broker): def _get_leader_for_partition(self, topic, partition): key = TopicAndPartition(topic, partition) if key not in self.topics_to_brokers: - self._load_metadata_for_topics(topic) + self.load_metadata_for_topics(topic) if key not in self.topics_to_brokers: raise BrokerErrorResponse("Partition does not exist: %s" % str(key)) return self.topics_to_brokers[key] - def _load_metadata_for_topics(self, *topics): - """ - Discover brokers and metadata for a set of topics. This method will - recurse in the event of a retry. - """ - request_id = self._next_id() - request = KafkaProtocol.encode_metadata_request(self.client_id, - request_id, topics) - - response = self._send_broker_unaware_request(request_id, request) - if response is None: - raise Exception("All servers failed to process request") - - (brokers, topics) = KafkaProtocol.decode_metadata_response(response) - - log.debug("Broker metadata: %s", brokers) - log.debug("Topic metadata: %s", topics) - - self.brokers = brokers - self.topics_to_brokers = {} - - for topic, partitions in topics.items(): - # Clear the list once before we add it. This removes stale entries - # and avoids duplicates - self.topic_partitions.pop(topic, None) - - if not partitions: - raise PartitionUnavailableError("Partitions for %s are unassigned!" % topic) - - for partition, meta in partitions.items(): - topic_part = TopicAndPartition(topic, partition) - self.topics_to_brokers[topic_part] = brokers[meta.leader] - self.topic_partitions[topic].append(partition) - def _next_id(self): """ Generate a new correlation id @@ -111,7 +75,7 @@ def _send_broker_unaware_request(self, requestId, request): "trying next server: %s" % (request, conn, e)) continue - return None + raise KafkaException("All servers failed to process request") def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): """ @@ -169,7 +133,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): except ConnectionError, e: # ignore BufferUnderflow for now log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) failed_payloads += payloads - self.reset_metadata() + self.reset_all_metadata() continue for response in decoder_fn(response): @@ -181,11 +145,26 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) if acc else () + def _raise_on_response_error(self, resp): + if resp.error == ErrorMapping.NO_ERROR: + return + + if resp.error in (ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON, + ErrorMapping.NOT_LEADER_FOR_PARTITION): + self.reset_topic_metadata(resp.topic) + + raise BrokerErrorResponse( + "Request for %s failed with errorcode=%d" % + (TopicAndPartition(resp.topic, resp.partition), resp.error)) + ################# # Public API # ################# + def reset_topic_metadata(self, *topics): + for topic in topics: + del self.topics_to_brokers[topic] - def reset_metadata(self): + def reset_all_metadata(self): self.topics_to_brokers = {} def close(self): @@ -206,6 +185,38 @@ def reinit(self): for conn in self.conns.values(): conn.reinit() + def load_metadata_for_topics(self, *topics): + """ + Discover brokers and metadata for a set of topics. This function is called + lazily whenever metadata is unavailable. + """ + request_id = self._next_id() + request = KafkaProtocol.encode_metadata_request(self.client_id, + request_id, topics) + + response = self._send_broker_unaware_request(request_id, request) + + (brokers, topics) = KafkaProtocol.decode_metadata_response(response) + + log.debug("Broker metadata: %s", brokers) + log.debug("Topic metadata: %s", topics) + + self.brokers = brokers + self.topics_to_brokers = {} + + for topic, partitions in topics.items(): + # Clear the list once before we add it. This removes stale entries + # and avoids duplicates + self.topic_partitions.pop(topic, None) + + if not partitions: + raise PartitionUnavailableError("Partitions for %s are unassigned!" % topic) + + for partition, meta in partitions.items(): + topic_part = TopicAndPartition(topic, partition) + self.topics_to_brokers[topic_part] = brokers[meta.leader] + self.topic_partitions[topic].append(partition) + def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): """ @@ -243,14 +254,9 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000, out = [] for resp in resps: - # Check for errors - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise BrokerErrorResponse( - "ProduceRequest for %s failed with errorcode=%d" % - (TopicAndPartition(resp.topic, resp.partition), - resp.error)) - - # Run the callback + if fail_on_error is True: + self._raise_on_response_error(resp) + if callback is not None: out.append(callback(resp)) else: @@ -276,14 +282,9 @@ def send_fetch_request(self, payloads=[], fail_on_error=True, out = [] for resp in resps: - # Check for errors - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise BrokerErrorResponse( - "FetchRequest for %s failed with errorcode=%d" % - (TopicAndPartition(resp.topic, resp.partition), - resp.error)) - - # Run the callback + if fail_on_error is True: + self._raise_on_response_error(resp) + if callback is not None: out.append(callback(resp)) else: @@ -299,9 +300,8 @@ def send_offset_request(self, payloads=[], fail_on_error=True, out = [] for resp in resps: - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise BrokerErrorResponse("OffsetRequest failed with errorcode=%s", - resp.error) + if fail_on_error is True: + self._raise_on_response_error(resp) if callback is not None: out.append(callback(resp)) else: @@ -317,9 +317,8 @@ def send_offset_commit_request(self, group, payloads=[], out = [] for resp in resps: - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise BrokerErrorResponse("OffsetCommitRequest failed with " - "errorcode=%s", resp.error) + if fail_on_error is True: + self._raise_on_response_error(resp) if callback is not None: out.append(callback(resp)) @@ -337,9 +336,8 @@ def send_offset_fetch_request(self, group, payloads=[], out = [] for resp in resps: - if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: - raise BrokerErrorResponse("OffsetCommitRequest failed with errorcode=%s", - resp.error) + if fail_on_error is True: + self._raise_on_response_error(resp) if callback is not None: out.append(callback(resp)) else: diff --git a/kafka/common.py b/kafka/common.py index a7590ea3a..01e3195e2 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -69,21 +69,27 @@ class ErrorMapping(object): # Exceptions # ################# + class KafkaException(RuntimeError): pass + class BrokerErrorResponse(KafkaException): pass + class PartitionUnavailableError(KafkaException): pass + class FailedPayloadsException(KafkaException): pass + class ConnectionError(KafkaException): pass + class BufferUnderflowError(KafkaException): pass diff --git a/kafka/consumer.py b/kafka/consumer.py index 57b5b9742..79f60bbf5 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -67,7 +67,7 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True, self.client = client self.topic = topic self.group = group - self.client._load_metadata_for_topics(topic) + self.client.load_metadata_for_topics(topic) self.offsets = {} if not partitions: diff --git a/kafka/producer.py b/kafka/producer.py index 1d4733688..a6d5bde9b 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -1,15 +1,14 @@ +import logging +import time + +from Queue import Empty from collections import defaultdict from itertools import cycle from multiprocessing import Queue, Process -from Queue import Empty -import logging -import sys -import time from kafka.common import ProduceRequest -from kafka.common import FailedPayloadsException -from kafka.protocol import create_message from kafka.partitioner import HashedPartitioner +from kafka.protocol import create_message log = logging.getLogger("kafka") @@ -186,7 +185,7 @@ def __init__(self, client, topic, async=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): self.topic = topic - client._load_metadata_for_topics(topic) + client.load_metadata_for_topics(topic) self.next_partition = cycle(client.topic_partitions[topic]) super(SimpleProducer, self).__init__(client, async, req_acks, @@ -223,7 +222,7 @@ def __init__(self, client, topic, partitioner=None, async=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): self.topic = topic - client._load_metadata_for_topics(topic) + client.load_metadata_for_topics(topic) if not partitioner: partitioner = HashedPartitioner diff --git a/test/test_integration.py b/test/test_integration.py index 43a604112..889f599d3 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -17,7 +17,7 @@ def setUp(self): while True: times += 1 try: - self.client._load_metadata_for_topics(partition_name) + self.client.load_metadata_for_topics(partition_name) break except PartitionUnavailableError: print "Waiting for %s partition to be created" % partition_name From d842c789884da0add133031b73d79b7ae531f65f Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Wed, 8 Jan 2014 10:23:31 -0800 Subject: [PATCH 03/46] Add back the global load metadata on client initialization --- kafka/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/client.py b/kafka/client.py index 31650aae9..26ffc0018 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -29,6 +29,7 @@ def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID): self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] + self.load_metadata_for_topics() # bootstrap with all metadata ################## # Private API # From 5cccd638092879251ce02b877a091ad2a335120d Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Wed, 8 Jan 2014 12:05:09 -0800 Subject: [PATCH 04/46] No exception for empty partitions, fix topic metadata reset --- kafka/client.py | 26 ++++++++++++++++++-------- test/test_integration.py | 14 +++++++------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 26ffc0018..53bee7a8f 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -28,7 +28,7 @@ def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID): } self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id - self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] + self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] self.load_metadata_for_topics() # bootstrap with all metadata ################## @@ -163,10 +163,22 @@ def _raise_on_response_error(self, resp): ################# def reset_topic_metadata(self, *topics): for topic in topics: - del self.topics_to_brokers[topic] + try: + partitions = self.topic_partitions[topic] + except KeyError: + continue + + for partition in partitions: + self.topics_to_brokers.pop(TopicAndPartition(topic, partition), None) + + del self.topic_partitions[topic] def reset_all_metadata(self): - self.topics_to_brokers = {} + self.topics_to_brokers.clear() + self.topic_partitions.clear() + + def has_metadata_for_topic(self, topic): + return topic in self.topic_partitions def close(self): for conn in self.conns.values(): @@ -203,16 +215,14 @@ def load_metadata_for_topics(self, *topics): log.debug("Topic metadata: %s", topics) self.brokers = brokers - self.topics_to_brokers = {} for topic, partitions in topics.items(): - # Clear the list once before we add it. This removes stale entries - # and avoids duplicates - self.topic_partitions.pop(topic, None) + self.reset_topic_metadata(topic) if not partitions: - raise PartitionUnavailableError("Partitions for %s are unassigned!" % topic) + continue + self.topic_partitions[topic] = [] for partition, meta in partitions.items(): topic_part = TopicAndPartition(topic, partition) self.topics_to_brokers[topic_part] = brokers[meta.leader] diff --git a/test/test_integration.py b/test/test_integration.py index 889f599d3..ff98da0a8 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -10,21 +10,21 @@ from kafka.codec import has_gzip, has_snappy from .fixtures import ZookeeperFixture, KafkaFixture + class KafkaTestCase(unittest.TestCase): def setUp(self): - partition_name = self.id()[self.id().rindex(".")+1:] + topic_name = self.id()[self.id().rindex(".")+1:] times = 0 while True: times += 1 - try: - self.client.load_metadata_for_topics(partition_name) + self.client.load_metadata_for_topics(topic_name) + if self.client.has_metadata_for_topic(topic_name): break - except PartitionUnavailableError: - print "Waiting for %s partition to be created" % partition_name - time.sleep(1) + print "Waiting for %s topic to be created" % topic_name + time.sleep(1) if times > 30: - raise Exception("Unable to create partition %s" % partition_name) + raise Exception("Unable to create topic %s" % topic_name) class TestKafkaClient(KafkaTestCase): From 0dd8c3f22b002c0d3decd5c655eac65d3d41f1a7 Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Thu, 9 Jan 2014 17:06:34 -0800 Subject: [PATCH 05/46] Change exception names, make a new "KafkaRequestError" --- kafka/client.py | 15 ++++++++------- kafka/common.py | 22 +++++++++++++--------- test/test_integration.py | 2 +- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 53bee7a8f..e29bdb61b 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -5,9 +5,10 @@ from functools import partial from itertools import count -from kafka.common import ErrorMapping, TopicAndPartition, \ - ConnectionError, FailedPayloadsException, BrokerErrorResponse, \ - PartitionUnavailableError, KafkaException +from kafka.common import (ErrorMapping, TopicAndPartition, + ConnectionError, FailedPayloadsError, + BrokerResponseError, PartitionUnavailableError, + KafkaRequestError) from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -51,7 +52,7 @@ def _get_leader_for_partition(self, topic, partition): self.load_metadata_for_topics(topic) if key not in self.topics_to_brokers: - raise BrokerErrorResponse("Partition does not exist: %s" % str(key)) + raise KafkaRequestError("Partition does not exist: %s" % str(key)) return self.topics_to_brokers[key] @@ -76,7 +77,7 @@ def _send_broker_unaware_request(self, requestId, request): "trying next server: %s" % (request, conn, e)) continue - raise KafkaException("All servers failed to process request") + raise BrokerResponseError("All servers failed to process request") def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): """ @@ -141,7 +142,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): acc[(response.topic, response.partition)] = response if failed_payloads: - raise FailedPayloadsException(failed_payloads) + raise FailedPayloadsError(failed_payloads) # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) if acc else () @@ -154,7 +155,7 @@ def _raise_on_response_error(self, resp): ErrorMapping.NOT_LEADER_FOR_PARTITION): self.reset_topic_metadata(resp.topic) - raise BrokerErrorResponse( + raise BrokerResponseError( "Request for %s failed with errorcode=%d" % (TopicAndPartition(resp.topic, resp.partition), resp.error)) diff --git a/kafka/common.py b/kafka/common.py index 01e3195e2..5bd9a967d 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -70,37 +70,41 @@ class ErrorMapping(object): ################# -class KafkaException(RuntimeError): +class KafkaError(RuntimeError): pass -class BrokerErrorResponse(KafkaException): +class KafkaRequestError(KafkaError): pass -class PartitionUnavailableError(KafkaException): +class BrokerResponseError(KafkaError): pass -class FailedPayloadsException(KafkaException): +class PartitionUnavailableError(KafkaError): pass -class ConnectionError(KafkaException): +class FailedPayloadsError(KafkaError): pass -class BufferUnderflowError(KafkaException): +class ConnectionError(KafkaError): pass -class ChecksumError(KafkaException): +class BufferUnderflowError(KafkaError): pass -class ConsumerFetchSizeTooSmall(KafkaException): +class ChecksumError(KafkaError): pass -class ConsumerNoMoreData(KafkaException): +class ConsumerFetchSizeTooSmall(KafkaError): + pass + + +class ConsumerNoMoreData(KafkaError): pass diff --git a/test/test_integration.py b/test/test_integration.py index ff98da0a8..6833088a0 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -825,7 +825,7 @@ def test_switch_leader(self): broker = self._kill_leader(topic, partition) # expect failure, reload meta data - with self.assertRaises(FailedPayloadsException): + with self.assertRaises(FailedPayloadsError): producer.send_messages('part 1') producer.send_messages('part 2') time.sleep(1) From ec23d94b3f27dba365c0197365a49242f7996b7e Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Mon, 6 Jan 2014 17:37:41 -0800 Subject: [PATCH 06/46] Check for socket status on read as well as send --- kafka/conn.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 1a3e2607c..b1988b662 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -98,8 +98,13 @@ def recv(self, request_id): Get a response from Kafka """ log.debug("Reading response %d from Kafka" % request_id) - self.data = self._consume_response() - return self.data + try: + if self._dirty: + self.reinit() + return self._consume_response() + except socket.error: + log.exception('Unable to read response from Kafka') + self._raise_connection_error() def copy(self): """ From ae44207a0a264254f11b53b3f08fb90549beb2e6 Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Mon, 6 Jan 2014 17:50:39 -0800 Subject: [PATCH 07/46] Propagate error immediately if dirty --- kafka/conn.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index b1988b662..57b3e7b13 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -98,9 +98,10 @@ def recv(self, request_id): Get a response from Kafka """ log.debug("Reading response %d from Kafka" % request_id) + if self._dirty: + self._raise_connection_error() + try: - if self._dirty: - self.reinit() return self._consume_response() except socket.error: log.exception('Unable to read response from Kafka') From 6fe9dce180443c1d7655e2a67d9c010e7441aee9 Mon Sep 17 00:00:00 2001 From: Joe Crobak Date: Mon, 21 Oct 2013 20:07:43 +0000 Subject: [PATCH 08/46] Enable absolute imports for modules using Queue. When running on Linux with code on a case-insensitive file system, imports of the `Queue` module fail because python resolves the wrong file (It is trying to use a relative import of `queue.py` in the kafka directory). This change forces absolute imports via PEP328. --- kafka/consumer.py | 2 ++ kafka/producer.py | 5 ++++- kafka/queue.py | 2 ++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 79f60bbf5..140842dcb 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import + from collections import defaultdict from itertools import izip_longest, repeat import logging diff --git a/kafka/producer.py b/kafka/producer.py index a6d5bde9b..0f1fc85a1 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -1,7 +1,10 @@ +from __future__ import absolute_import + import logging import time from Queue import Empty + from collections import defaultdict from itertools import cycle from multiprocessing import Queue, Process @@ -66,7 +69,7 @@ def _send_upstream(topic, queue, client, batch_time, batch_size, client.send_produce_request(reqs, acks=req_acks, timeout=ack_timeout) - except Exception as exp: + except Exception: log.exception("Unable to send message") diff --git a/kafka/queue.py b/kafka/queue.py index a99636922..ada495f78 100644 --- a/kafka/queue.py +++ b/kafka/queue.py @@ -1,3 +1,5 @@ +from __future__ import absolute_import + from copy import copy import logging from multiprocessing import Process, Queue, Event From 40db4e144a5621b95fb3233ae10287630d6e6a47 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 18 Dec 2013 16:53:03 -0800 Subject: [PATCH 09/46] Allow customizing socket timeouts. Previously, if you try to consume a message with a timeout greater than 10 seconds, but you don't receive data in those 10 seconds, a socket.timeout exception is raised. This allows a higher socket timeout to be set, or even None for no timeout. --- kafka/client.py | 7 ++++--- kafka/conn.py | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index e29bdb61b..42b3db84e 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -20,12 +20,13 @@ class KafkaClient(object): CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID): + def __init__(self, host, port, bufsize=4098, client_id=CLIENT_ID, timeout=10): # We need one connection to bootstrap self.bufsize = bufsize self.client_id = client_id + self.timeout = timeout self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, bufsize) + (host, port): KafkaConnection(host, port, bufsize, timeout=timeout) } self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id @@ -42,7 +43,7 @@ def _get_conn_for_broker(self, broker): """ if (broker.host, broker.port) not in self.conns: self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, self.bufsize) + KafkaConnection(broker.host, broker.port, self.bufsize, timeout=self.timeout) return self.conns[(broker.host, broker.port)] diff --git a/kafka/conn.py b/kafka/conn.py index 57b3e7b13..03a43c9f9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -19,14 +19,14 @@ class KafkaConnection(local): we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. """ - def __init__(self, host, port, bufsize=4096): + def __init__(self, host, port, bufsize=4098, timeout=10): super(KafkaConnection, self).__init__() self.host = host self.port = port self.bufsize = bufsize self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) - self._sock.settimeout(10) + self._sock.settimeout(timeout) self._dirty = False def __str__(self): From 6d691b95dcb15676ec05bf92a07dd7b79a663fbc Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 18 Dec 2013 17:47:52 -0800 Subject: [PATCH 10/46] Read the correct number of bytes from kafka. According to the protocol documentation, the 4 byte integer at the beginning of a response represents the size of the payload only, not including those bytes. See http://goo.gl/rg5uom --- kafka/conn.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 03a43c9f9..83f9f8e8e 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -55,12 +55,11 @@ def _consume_response_iter(self): self._raise_connection_error() (size,) = struct.unpack('>i', resp) - messagesize = size - 4 - log.debug("About to read %d bytes from Kafka", messagesize) + log.debug("About to read %d bytes from Kafka", size) # Read the remainder of the response total = 0 - while total < messagesize: + while total < size: resp = self._sock.recv(self.bufsize) log.debug("Read %d bytes from Kafka", len(resp)) if resp == "": From be564d9b8886bffe9235b3294c2a49158d669295 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 18 Dec 2013 17:51:22 -0800 Subject: [PATCH 11/46] * Guarantee reading the expected number of bytes from the socket every time * Remove bufsize from client and conn, since they're not actually enforced Notes: This commit changes behavior a bit by raising a BufferUnderflowError when no data is received for the message size rather than a ConnectionError. Since bufsize in the socket is not actually enforced, but it is used by the consumer when creating requests, moving it there until a better solution is implemented. --- kafka/client.py | 7 +++---- kafka/conn.py | 48 ++++++++++++++++++++++------------------------- kafka/consumer.py | 7 +++++-- 3 files changed, 30 insertions(+), 32 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 42b3db84e..0cf17d33a 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -20,13 +20,12 @@ class KafkaClient(object): CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, host, port, bufsize=4098, client_id=CLIENT_ID, timeout=10): + def __init__(self, host, port, client_id=CLIENT_ID, timeout=10): # We need one connection to bootstrap - self.bufsize = bufsize self.client_id = client_id self.timeout = timeout self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, bufsize, timeout=timeout) + (host, port): KafkaConnection(host, port, timeout=timeout) } self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id @@ -43,7 +42,7 @@ def _get_conn_for_broker(self, broker): """ if (broker.host, broker.port) not in self.conns: self.conns[(broker.host, broker.port)] = \ - KafkaConnection(broker.host, broker.port, self.bufsize, timeout=self.timeout) + KafkaConnection(broker.host, broker.port, timeout=self.timeout) return self.conns[(broker.host, broker.port)] diff --git a/kafka/conn.py b/kafka/conn.py index 83f9f8e8e..7c6d51035 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -19,11 +19,10 @@ class KafkaConnection(local): we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. """ - def __init__(self, host, port, bufsize=4098, timeout=10): + def __init__(self, host, port, timeout=10): super(KafkaConnection, self).__init__() self.host = host self.port = port - self.bufsize = bufsize self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) self._sock.settimeout(timeout) @@ -36,38 +35,35 @@ def __str__(self): # Private API # ################### - def _consume_response(self): - """ - Fully consume the response iterator - """ - return "".join(self._consume_response_iter()) + def _read_bytes(self, num_bytes): + bytes_left = num_bytes + resp = '' + log.debug("About to read %d bytes from Kafka", num_bytes) + + while bytes_left: + data = self._sock.recv(bytes_left) + if data == '': + raise BufferUnderflowError("Not enough data to read this response") + bytes_left -= len(data) + log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) + resp += data + + return resp - def _consume_response_iter(self): + def _consume_response(self): """ This method handles the response header and error messages. It - then returns an iterator for the chunks of the response + then returns the response """ - log.debug("Handling response from Kafka") - + log.debug("Expecting response from Kafka") # Read the size off of the header - resp = self._sock.recv(4) - if resp == "": - self._raise_connection_error() - (size,) = struct.unpack('>i', resp) + resp = self._read_bytes(4) - log.debug("About to read %d bytes from Kafka", size) + (size,) = struct.unpack('>i', resp) # Read the remainder of the response - total = 0 - while total < size: - resp = self._sock.recv(self.bufsize) - log.debug("Read %d bytes from Kafka", len(resp)) - if resp == "": - raise BufferUnderflowError( - "Not enough data to read this response") - - total += len(resp) - yield resp + resp = self._read_bytes(size) + return str(resp) def _raise_connection_error(self): self._dirty = True diff --git a/kafka/consumer.py b/kafka/consumer.py index 140842dcb..54c3fdf60 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -24,6 +24,7 @@ FETCH_DEFAULT_BLOCK_TIMEOUT = 1 FETCH_MAX_WAIT_TIME = 100 FETCH_MIN_BYTES = 4096 +FETCH_BUFFER_SIZE_BYTES = 4096 class FetchContext(object): @@ -218,8 +219,10 @@ class SimpleConsumer(Consumer): def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, - fetch_size_bytes=FETCH_MIN_BYTES): + fetch_size_bytes=FETCH_MIN_BYTES, + buffer_size=FETCH_BUFFER_SIZE_BYTES): + self.buffer_size = buffer_size self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes @@ -366,7 +369,7 @@ def __iter_partition__(self, partition, offset): # use MaxBytes = client's bufsize since we're only # fetching one topic + partition req = FetchRequest( - self.topic, partition, offset, self.client.bufsize) + self.topic, partition, offset, self.buffer_size) (resp,) = self.client.send_fetch_request( [req], From d7387cd63aaef847871e377d72a6db258e5e2ddf Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 18 Dec 2013 17:54:17 -0800 Subject: [PATCH 12/46] Allow None timeout in FetchContext even if block is False --- kafka/consumer.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 54c3fdf60..dce8c2207 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -35,10 +35,10 @@ def __init__(self, consumer, block, timeout): self.consumer = consumer self.block = block - if block and not timeout: - timeout = FETCH_DEFAULT_BLOCK_TIMEOUT - - self.timeout = timeout * 1000 + if block: + if not timeout: + timeout = FETCH_DEFAULT_BLOCK_TIMEOUT + self.timeout = timeout * 1000 def __enter__(self): """Set fetch values based on blocking status""" From a359a1386c1b18e16e7a211342de13a9aefb456d Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 18 Dec 2013 17:56:29 -0800 Subject: [PATCH 13/46] Reset consumer fields to original values rather than defaults in FetchContext --- kafka/consumer.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index dce8c2207..191e50f6a 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -42,6 +42,8 @@ def __init__(self, consumer, block, timeout): def __enter__(self): """Set fetch values based on blocking status""" + self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time + self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes if self.block: self.consumer.fetch_max_wait_time = self.timeout self.consumer.fetch_min_bytes = 1 @@ -49,9 +51,9 @@ def __enter__(self): self.consumer.fetch_min_bytes = 0 def __exit__(self, type, value, traceback): - """Reset values to default""" - self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME - self.consumer.fetch_min_bytes = FETCH_MIN_BYTES + """Reset values""" + self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time + self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes class Consumer(object): From 648071233c75e6fe28e526c50ba4fbbc21f0ff92 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 18 Dec 2013 18:36:32 -0800 Subject: [PATCH 14/46] SimpleConsumer flow changes: * Combine partition fetch requests into a single request * Put the messages received in a queue and update offsets * Grab as many messages from the queue as requested * When the queue is empty, request more * timeout param for get_messages() is the actual timeout for getting those messages * Based on https://github.com/mumrah/kafka-python/pull/74 - don't increase min_bytes if the consumer fetch buffer size is too small. Notes: Change MultiProcessConsumer and _mp_consume() accordingly. Previously, when querying each partition separately, it was possible to block waiting for messages on partition 0 even if there are new ones in partition 1. These changes allow us to block while waiting for messages on all partitions, and reduce total number of kafka requests. Use Queue.Queue for single proc Queue instead of already imported multiprocessing.Queue because the latter doesn't seem to guarantee immediate availability of items after a put: >>> from multiprocessing import Queue >>> q = Queue() >>> q.put(1); q.get_nowait() Traceback (most recent call last): File "", line 1, in File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 152, in get_nowait return self.get(False) File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 134, in get raise Empty Queue.Empty --- kafka/consumer.py | 182 ++++++++++++++++++---------------------------- 1 file changed, 70 insertions(+), 112 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 191e50f6a..128a86230 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -5,8 +5,8 @@ import logging import time from threading import Lock -from multiprocessing import Process, Queue, Event, Value -from Queue import Empty +from multiprocessing import Process, Queue as MPQueue, Event, Value +from Queue import Empty, Queue from kafka.common import ( ErrorMapping, FetchRequest, @@ -229,6 +229,7 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes self.fetch_started = defaultdict(bool) # defaults to false + self.queue = Queue(buffer_size) super(SimpleConsumer, self).__init__( client, group, topic, @@ -294,122 +295,75 @@ def get_messages(self, count=1, block=True, timeout=0.1): count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If None, and block=True, the API will block infinitely. - If >0, API will block for specified time (in seconds) + timeout: If block is True, the function will block for the specified time (in seconds) + until count messages is fetched. If None, it will block forever. """ messages = [] - iterator = self.__iter__() - - # HACK: This splits the timeout between available partitions if timeout: - timeout = timeout * 1.0 / len(self.offsets) + max_time = time.time() + timeout - with FetchContext(self, block, timeout): - while count > 0: - try: - messages.append(next(iterator)) - except StopIteration: - break + while count > 0 and (timeout is None or timeout > 0): + message = self.get_message(block, timeout) + if message: + messages.append(message) count -= 1 + else: + # Ran out of messages for the last request. If we're not blocking, break. + if not block: + break + if timeout: + timeout = max_time - time.time() return messages - def __iter__(self): - """ - Create an iterate per partition. Iterate through them calling next() - until they are all exhausted. - """ - iters = {} - for partition, offset in self.offsets.items(): - iters[partition] = self.__iter_partition__(partition, offset) - - if len(iters) == 0: - return - - while True: - if len(iters) == 0: - break - - for partition, it in iters.items(): - try: - if self.partition_info: - yield (partition, it.next()) - else: - yield it.next() - except StopIteration: - log.debug("Done iterating over partition %s" % partition) - del iters[partition] - - # skip auto-commit since we didn't yield anything - continue - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - - def __iter_partition__(self, partition, offset): - """ - Iterate over the messages in a partition. Create a FetchRequest - to get back a batch of messages, yield them one at a time. - After a batch is exhausted, start a new batch unless we've reached - the end of this partition. - """ - - # The offset that is stored in the consumer is the offset that - # we have consumed. In subsequent iterations, we are supposed to - # fetch the next message (that is from the next offset) - # However, for the 0th message, the offset should be as-is. - # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is - # problematic, since 0 is offset of a message which we have not yet - # consumed. - if self.fetch_started[partition]: - offset += 1 - - fetch_size = self.fetch_min_bytes + def get_message(self, block=True, timeout=0.1): + if self.queue.empty(): + with FetchContext(self, block, timeout): + self._fetch() + try: + return self.queue.get_nowait() + except Empty: + return None + def __iter__(self): while True: - # use MaxBytes = client's bufsize since we're only - # fetching one topic + partition - req = FetchRequest( - self.topic, partition, offset, self.buffer_size) - - (resp,) = self.client.send_fetch_request( - [req], - max_wait_time=self.fetch_max_wait_time, - min_bytes=fetch_size) - - assert resp.topic == self.topic - assert resp.partition == partition + message = self.get_message(True, 100) + if message: + yield message + else: + # In case we did not receive any message, give up the CPU for + # a while before we try again + time.sleep(0.1) - next_offset = None + def _fetch(self): + requests = [] + partitions = self.offsets.keys() + for partition in partitions: + requests.append(FetchRequest(self.topic, partition, self.offsets[partition], self.buffer_size)) + responses = self.client.send_fetch_request( + requests, + max_wait_time=int(self.fetch_max_wait_time), + min_bytes=self.fetch_min_bytes) + for resp in responses: + partition = resp.partition try: for message in resp.messages: - next_offset = message.offset - - # update the offset before the message is yielded. This - # is so that the consumer state is not lost in certain - # cases. - # - # For eg: the message is yielded and consumed by the - # caller, but the caller does not come back into the - # generator again. The message will be consumed but the - # status will not be updated in the consumer - self.fetch_started[partition] = True - self.offsets[partition] = message.offset - yield message + self.offsets[partition] = message.offset + 1 + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() + if self.partition_info: + self.queue.put((partition, message)) + else: + self.queue.put(message) except ConsumerFetchSizeTooSmall, e: - fetch_size *= 1.5 - log.warn( - "Fetch size too small, increasing to %d (1.5x) and retry", - fetch_size) - continue + self.buffer_size *= 2 + log.warn("Fetch size too small, increasing to %d (2x) and retry", self.buffer_size) except ConsumerNoMoreData, e: log.debug("Iteration was ended by %r", e) - - if next_offset is None: - break - else: - offset = next_offset + 1 + except StopIteration: + # Stop iterating through this partition + log.debug("Done iterating over partition %s" % partition) def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): @@ -448,8 +402,9 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # indicates a specific number of messages, follow that advice count = 0 - for partition, message in consumer: - queue.put((partition, message)) + message = consumer.get_message() + if message: + queue.put(message) count += 1 # We have reached the required size. The controller might have @@ -459,11 +414,10 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): # can reset the 'start' event if count == size.value: pause.wait() - break - # In case we did not receive any message, give up the CPU for - # a while before we try again - if count == 0: + else: + # In case we did not receive any message, give up the CPU for + # a while before we try again time.sleep(0.1) consumer.stop() @@ -509,7 +463,7 @@ def __init__(self, client, group, topic, auto_commit=True, # Variables for managing and controlling the data flow from # consumer child process to master - self.queue = Queue(1024) # Child consumers dump messages into this + self.queue = MPQueue(1024) # Child consumers dump messages into this self.start = Event() # Indicates the consumers to start fetch self.exit = Event() # Requests the consumers to shutdown self.pause = Event() # Requests the consumers to pause fetch @@ -591,8 +545,8 @@ def get_messages(self, count=1, block=True, timeout=10): count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If None, and block=True, the API will block infinitely. - If >0, API will block for specified time (in seconds) + timeout: If block is True, the function will block for the specified time (in seconds) + until count messages is fetched. If None, it will block forever. """ messages = [] @@ -603,7 +557,10 @@ def get_messages(self, count=1, block=True, timeout=10): self.size.value = count self.pause.clear() - while count > 0: + if timeout: + max_time = time.time() + timeout + + while count > 0 and (timeout is None or timeout > 0): # Trigger consumption only if the queue is empty # By doing this, we will ensure that consumers do not # go into overdrive and keep consuming thousands of @@ -623,6 +580,7 @@ def get_messages(self, count=1, block=True, timeout=10): self.count_since_commit += 1 self._auto_commit() count -= 1 + timeout = max_time - time.time() self.size.value = 0 self.start.clear() From 8f301af808278f31ac97053ec6e104f464f2145e Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Thu, 19 Dec 2013 11:36:15 -0800 Subject: [PATCH 15/46] Remove SimpleConsumer queue size limit since it can cause the iterator to block forever if it's reached. --- kafka/consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 128a86230..f3eaa6d98 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -229,7 +229,7 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes self.fetch_started = defaultdict(bool) # defaults to false - self.queue = Queue(buffer_size) + self.queue = Queue() super(SimpleConsumer, self).__init__( client, group, topic, From 9d9c209d253b07d822213ca0687121aa0d64b8e5 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Thu, 19 Dec 2013 13:19:40 -0800 Subject: [PATCH 16/46] Add buffer_size param description to docstring --- kafka/consumer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index f3eaa6d98..170136094 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -209,8 +209,9 @@ class SimpleConsumer(Consumer): before a commit auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit - fetch_size_bytes: number of bytes to request in a FetchRequest + buffer_size: initial number of bytes to tell kafka we have + available. This will double every time it's not enough Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will From e5ed16b94bee4ad93f3c1b8422d1a0ab8bf74f01 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Thu, 19 Dec 2013 13:24:28 -0800 Subject: [PATCH 17/46] Add iter_timeout option to SimpleConsumer. If not None, it causes the iterator to exit when reached. Also put constant timeout values in pre-defined constants --- kafka/consumer.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 170136094..6ff5ea94f 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -26,6 +26,9 @@ FETCH_MIN_BYTES = 4096 FETCH_BUFFER_SIZE_BYTES = 4096 +ITER_TIMEOUT_SECONDS = 60 +NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 + class FetchContext(object): """ @@ -212,6 +215,9 @@ class SimpleConsumer(Consumer): fetch_size_bytes: number of bytes to request in a FetchRequest buffer_size: initial number of bytes to tell kafka we have available. This will double every time it's not enough + iter_timeout: default None. How much time (in seconds) to wait for a + message in the iterator before exiting. None means no + timeout, so it will wait forever. Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -223,13 +229,15 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, fetch_size_bytes=FETCH_MIN_BYTES, - buffer_size=FETCH_BUFFER_SIZE_BYTES): + buffer_size=FETCH_BUFFER_SIZE_BYTES, + iter_timeout=None): self.buffer_size = buffer_size self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes self.fetch_started = defaultdict(bool) # defaults to false + self.iter_timeout = iter_timeout self.queue = Queue() super(SimpleConsumer, self).__init__( @@ -327,14 +335,22 @@ def get_message(self, block=True, timeout=0.1): return None def __iter__(self): + if self.iter_timeout is None: + timeout = ITER_TIMEOUT_SECONDS + else: + timeout = self.iter_timeout + while True: - message = self.get_message(True, 100) + message = self.get_message(True, timeout) if message: yield message + elif self.iter_timeout is None: + # We did not receive any message yet but we don't have a + # timeout, so give up the CPU for a while before trying again + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) else: - # In case we did not receive any message, give up the CPU for - # a while before we try again - time.sleep(0.1) + # Timed out waiting for a message + break def _fetch(self): requests = [] @@ -419,7 +435,7 @@ def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): else: # In case we did not receive any message, give up the CPU for # a while before we try again - time.sleep(0.1) + time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) consumer.stop() From b2219abf6de852515731bf93374c1b638c094184 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Thu, 19 Dec 2013 13:35:53 -0800 Subject: [PATCH 18/46] Add comments and maintain 80 character line limit --- kafka/consumer.py | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 6ff5ea94f..a44f2f6da 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -304,8 +304,9 @@ def get_messages(self, count=1, block=True, timeout=0.1): count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified time (in seconds) - until count messages is fetched. If None, it will block forever. + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] if timeout: @@ -317,16 +318,20 @@ def get_messages(self, count=1, block=True, timeout=0.1): messages.append(message) count -= 1 else: - # Ran out of messages for the last request. If we're not blocking, break. + # Ran out of messages for the last request. if not block: + # If we're not blocking, break. break if timeout: + # If we're blocking and have a timeout, reduce it to the + # appropriate value timeout = max_time - time.time() return messages def get_message(self, block=True, timeout=0.1): if self.queue.empty(): + # We're out of messages, go grab some more. with FetchContext(self, block, timeout): self._fetch() try: @@ -353,29 +358,39 @@ def __iter__(self): break def _fetch(self): + # Create fetch request payloads for all the partitions requests = [] partitions = self.offsets.keys() for partition in partitions: - requests.append(FetchRequest(self.topic, partition, self.offsets[partition], self.buffer_size)) + requests.append(FetchRequest(self.topic, partition, + self.offsets[partition], + self.buffer_size)) + # Send request responses = self.client.send_fetch_request( requests, max_wait_time=int(self.fetch_max_wait_time), min_bytes=self.fetch_min_bytes) + for resp in responses: partition = resp.partition try: for message in resp.messages: + # Update partition offset self.offsets[partition] = message.offset + 1 + # Count, check and commit messages if necessary self.count_since_commit += 1 self._auto_commit() + + # Put the message in our queue if self.partition_info: self.queue.put((partition, message)) else: self.queue.put(message) except ConsumerFetchSizeTooSmall, e: self.buffer_size *= 2 - log.warn("Fetch size too small, increasing to %d (2x) and retry", self.buffer_size) + log.warn("Fetch size too small, increase to %d (2x) and retry", + self.buffer_size) except ConsumerNoMoreData, e: log.debug("Iteration was ended by %r", e) except StopIteration: @@ -562,8 +577,9 @@ def get_messages(self, count=1, block=True, timeout=10): count: Indicates the maximum number of messages to be fetched block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified time (in seconds) - until count messages is fetched. If None, it will block forever. + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] From 21b46b165fabecb37371eb3cd495e40d1a258483 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Thu, 2 Jan 2014 18:10:57 -0800 Subject: [PATCH 19/46] Add and fix comments to protocol.py --- kafka/protocol.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/kafka/protocol.py b/kafka/protocol.py index 612acf63f..74a0dce0e 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -121,7 +121,7 @@ def _decode_message_set_iter(cls, data): except BufferUnderflowError: if read_message is False: # If we get a partial read of a message, but haven't - # yielded anyhting there's a problem + # yielded anything there's a problem raise ConsumerFetchSizeTooSmall() else: raise StopIteration() @@ -171,7 +171,7 @@ def encode_produce_request(cls, client_id, correlation_id, Params ====== client_id: string - correlation_id: string + correlation_id: int payloads: list of ProduceRequest acks: How "acky" you want the request to be 0: immediate response @@ -231,7 +231,7 @@ def encode_fetch_request(cls, client_id, correlation_id, payloads=None, Params ====== client_id: string - correlation_id: string + correlation_id: int payloads: list of FetchRequest max_wait_time: int, how long to block waiting on min_bytes of data min_bytes: int, the minimum number of bytes to accumulate before @@ -338,7 +338,7 @@ def encode_metadata_request(cls, client_id, correlation_id, topics=None): Params ====== client_id: string - correlation_id: string + correlation_id: int topics: list of strings """ topics = [] if topics is None else topics @@ -376,12 +376,16 @@ def decode_metadata_response(cls, data): topic_metadata = {} for i in range(num_topics): + # NOTE: topic_error is discarded. Should probably be returned with + # the topic metadata. ((topic_error,), cur) = relative_unpack('>h', data, cur) (topic_name, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) partition_metadata = {} for j in range(num_partitions): + # NOTE: partition_error_code is discarded. Should probably be + # returned with the partition metadata. ((partition_error_code, partition, leader, numReplicas), cur) = \ relative_unpack('>hiii', data, cur) @@ -408,7 +412,7 @@ def encode_offset_commit_request(cls, client_id, correlation_id, Params ====== client_id: string - correlation_id: string + correlation_id: int group: string, the consumer group you are committing offsets for payloads: list of OffsetCommitRequest """ @@ -459,7 +463,7 @@ def encode_offset_fetch_request(cls, client_id, correlation_id, Params ====== client_id: string - correlation_id: string + correlation_id: int group: string, the consumer group you are fetching offsets for payloads: list of OffsetFetchRequest """ From f332985678ace18a6d5450af26e7e2e6f79f8526 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Thu, 2 Jan 2014 18:14:44 -0800 Subject: [PATCH 20/46] Add note about questionable error handling while decoding messages. Will remove once any error handling issues are resolved. --- kafka/protocol.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/kafka/protocol.py b/kafka/protocol.py index 74a0dce0e..54b8eeeeb 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -119,6 +119,14 @@ def _decode_message_set_iter(cls, data): read_message = True yield OffsetAndMessage(offset, message) except BufferUnderflowError: + # NOTE: Not sure this is correct error handling: + # Is it possible to get a BUE if the message set is somewhere + # in the middle of the fetch response? If so, we probably have + # an issue that's not fetch size too small. + # Aren't we ignoring errors if we fail to unpack data by + # raising StopIteration()? + # If _decode_message() raises a ChecksumError, couldn't that + # also be due to the fetch size being too small? if read_message is False: # If we get a partial read of a message, but haven't # yielded anything there's a problem From c9205fe3274a63a10ae0134e643b2f58dbc79094 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Thu, 2 Jan 2014 18:16:41 -0800 Subject: [PATCH 21/46] Fix unit tests. This is pretty much a rewrite. The tests that involve offset requests/responses are not implemented since that API is not supported in kafka 0.8 yet. Only kafka.codec and kafka.protocol are currently tested, so there is more work to be done here. --- test/test_unit.py | 464 +++++++++++++++++++++++++++++++--------------- 1 file changed, 314 insertions(+), 150 deletions(-) diff --git a/test/test_unit.py b/test/test_unit.py index 3f3af66ac..93d88a151 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -3,13 +3,22 @@ import struct import unittest -from kafka.client import KafkaClient -from kafka.common import ProduceRequest, FetchRequest +from kafka.common import ( + ProduceRequest, FetchRequest, Message, ChecksumError, + ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, + OffsetAndMessage, BrokerMetadata, PartitionMetadata +) from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode ) +from kafka.protocol import ( + create_gzip_message, + create_message, + create_snappy_message, + KafkaProtocol +) ITERATIONS = 1000 STRLEN = 100 @@ -20,16 +29,13 @@ def random_string(): class TestPackage(unittest.TestCase): - @unittest.expectedFailure + def test_top_level_namespace(self): import kafka as kafka1 self.assertEquals(kafka1.KafkaClient.__name__, "KafkaClient") - self.assertEquals(kafka1.gzip_encode.__name__, "gzip_encode") - self.assertEquals(kafka1.snappy_encode.__name__, "snappy_encode") self.assertEquals(kafka1.client.__name__, "kafka.client") self.assertEquals(kafka1.codec.__name__, "kafka.codec") - @unittest.expectedFailure def test_submodule_namespace(self): import kafka.client as client1 self.assertEquals(client1.__name__, "kafka.client") @@ -48,19 +54,8 @@ def test_submodule_namespace(self): from kafka import KafkaClient as KafkaClient2 self.assertEquals(KafkaClient2.__name__, "KafkaClient") - from kafka import gzip_encode as gzip_encode2 - self.assertEquals(gzip_encode2.__name__, "gzip_encode") - - from kafka import snappy_encode as snappy_encode2 - self.assertEquals(snappy_encode2.__name__, "snappy_encode") - - -class TestMisc(unittest.TestCase): - @unittest.expectedFailure - def test_length_prefix(self): - for i in xrange(ITERATIONS): - s1 = random_string() - self.assertEquals(struct.unpack('>i', s2[0:4])[0], len(s1)) + from kafka.codec import snappy_encode + self.assertEquals(snappy_encode.__name__, "snappy_encode") class TestCodec(unittest.TestCase): @@ -81,140 +76,309 @@ def test_snappy(self): self.assertEquals(s1, s2) -# XXX(sandello): These really should be protocol tests. -class TestMessage(unittest.TestCase): - @unittest.expectedFailure - def test_create(self): - msg = KafkaClient.create_message("testing") - self.assertEquals(msg.payload, "testing") - self.assertEquals(msg.magic, 1) - self.assertEquals(msg.attributes, 0) - self.assertEquals(msg.crc, -386704890) +class TestProtocol(unittest.TestCase): + + def test_create_message(self): + payload = "test" + key = "key" + msg = create_message(payload, key) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, 0) + self.assertEqual(msg.key, key) + self.assertEqual(msg.value, payload) - @unittest.expectedFailure def test_create_gzip(self): - msg = KafkaClient.create_gzip_message("testing") - self.assertEquals(msg.magic, 1) - self.assertEquals(msg.attributes, 1) - # Can't check the crc or payload for gzip since it's non-deterministic - (messages, _) = KafkaClient.read_message_set(gzip_decode(msg.payload)) - inner = messages[0] - self.assertEquals(inner.magic, 1) - self.assertEquals(inner.attributes, 0) - self.assertEquals(inner.payload, "testing") - self.assertEquals(inner.crc, -386704890) - - @unittest.expectedFailure + payloads = ["v1", "v2"] + msg = create_gzip_message(payloads) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & + KafkaProtocol.CODEC_GZIP) + self.assertEqual(msg.key, None) + # Need to decode to check since gzipped payload is non-deterministic + decoded = gzip_decode(msg.value) + expect = ("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2" + "\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00" + "\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff\xff" + "\xff\xff\x00\x00\x00\x02v2") + self.assertEqual(decoded, expect) + def test_create_snappy(self): - msg = KafkaClient.create_snappy_message("testing") - self.assertEquals(msg.magic, 1) - self.assertEquals(msg.attributes, 2) - self.assertEquals(msg.crc, -62350868) - (messages, _) = KafkaClient.read_message_set(snappy_decode(msg.payload)) - inner = messages[0] - self.assertEquals(inner.magic, 1) - self.assertEquals(inner.attributes, 0) - self.assertEquals(inner.payload, "testing") - self.assertEquals(inner.crc, -386704890) - - @unittest.expectedFailure - def test_message_simple(self): - msg = KafkaClient.create_message("testing") - enc = KafkaClient.encode_message(msg) - expect = "\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing" - self.assertEquals(enc, expect) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), 1) - self.assertEquals(messages[0], msg) - - @unittest.expectedFailure - def test_message_list(self): - msgs = [ - KafkaClient.create_message("one"), - KafkaClient.create_message("two"), - KafkaClient.create_message("three") - ] - enc = KafkaClient.encode_message_set(msgs) - expect = ("\x00\x00\x00\t\x01\x00zl\x86\xf1one\x00\x00\x00\t\x01\x00\x11" - "\xca\x8aftwo\x00\x00\x00\x0b\x01\x00F\xc5\xd8\xf5three") - self.assertEquals(enc, expect) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), 3) - self.assertEquals(messages[0].payload, "one") - self.assertEquals(messages[1].payload, "two") - self.assertEquals(messages[2].payload, "three") - - @unittest.expectedFailure - def test_message_gzip(self): - msg = KafkaClient.create_gzip_message("one", "two", "three") - enc = KafkaClient.encode_message(msg) - # Can't check the bytes directly since Gzip is non-deterministic - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), 3) - self.assertEquals(messages[0].payload, "one") - self.assertEquals(messages[1].payload, "two") - self.assertEquals(messages[2].payload, "three") - - @unittest.expectedFailure - def test_message_snappy(self): - msg = KafkaClient.create_snappy_message("one", "two", "three") - enc = KafkaClient.encode_message(msg) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), 3) - self.assertEquals(messages[0].payload, "one") - self.assertEquals(messages[1].payload, "two") - self.assertEquals(messages[2].payload, "three") - - @unittest.expectedFailure - def test_message_simple_random(self): - for i in xrange(ITERATIONS): - n = random.randint(0, 10) - msgs = [KafkaClient.create_message(random_string()) for j in range(n)] - enc = KafkaClient.encode_message_set(msgs) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), n) - for j in range(n): - self.assertEquals(messages[j], msgs[j]) - - @unittest.expectedFailure - def test_message_gzip_random(self): - for i in xrange(ITERATIONS): - n = random.randint(1, 10) - strings = [random_string() for j in range(n)] - msg = KafkaClient.create_gzip_message(*strings) - enc = KafkaClient.encode_message(msg) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), n) - for j in range(n): - self.assertEquals(messages[j].payload, strings[j]) - - @unittest.expectedFailure - def test_message_snappy_random(self): - for i in xrange(ITERATIONS): - n = random.randint(1, 10) - strings = [random_string() for j in range(n)] - msg = KafkaClient.create_snappy_message(*strings) - enc = KafkaClient.encode_message(msg) - (messages, read) = KafkaClient.read_message_set(enc) - self.assertEquals(len(messages), n) - for j in range(n): - self.assertEquals(messages[j].payload, strings[j]) - - -class TestRequests(unittest.TestCase): - @unittest.expectedFailure - def test_produce_request(self): - req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")]) - enc = KafkaClient.encode_produce_request(req) - expect = "\x00\x00\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x11\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing" - self.assertEquals(enc, expect) - - @unittest.expectedFailure - def test_fetch_request(self): - req = FetchRequest("my-topic", 0, 0, 1024) - enc = KafkaClient.encode_fetch_request(req) - expect = "\x00\x01\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00" - self.assertEquals(enc, expect) + payloads = ["v1", "v2"] + msg = create_snappy_message(payloads) + self.assertEqual(msg.magic, 0) + self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & + KafkaProtocol.CODEC_SNAPPY) + self.assertEqual(msg.key, None) + expect = ("8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff\xff\xff\xff" + "\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5\x96\nx\x00\x00\xff" + "\xff\xff\xff\x00\x00\x00\x02v2") + self.assertEqual(msg.value, expect) + + def test_encode_message_header(self): + expect = '\x00\n\x00\x00\x00\x00\x00\x04\x00\x07client3' + encoded = KafkaProtocol._encode_message_header("client3", 4, 10) + self.assertEqual(encoded, expect) + + def test_encode_message(self): + message = create_message("test", "key") + encoded = KafkaProtocol._encode_message(message) + expect = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test" + self.assertEqual(encoded, expect) + + def test_encode_message_failure(self): + self.assertRaises(Exception, KafkaProtocol._encode_message, + Message(1, 0, "key", "test")) + + def test_encode_message_set(self): + message_set = [create_message("v1", "k1"), create_message("v2", "k2")] + encoded = KafkaProtocol._encode_message_set(message_set) + expect = ("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x12W\xe7In\x00" + "\x00\x00\x00\x00\x02k1\x00\x00\x00\x02v1\x00\x00\x00\x00" + "\x00\x00\x00\x00\x00\x00\x00\x12\xff\x06\x02I\x00\x00\x00" + "\x00\x00\x02k2\x00\x00\x00\x02v2") + self.assertEqual(encoded, expect) + + def test_decode_message(self): + encoded = "\xaa\xf1\x8f[\x00\x00\x00\x00\x00\x03key\x00\x00\x00\x04test" + offset = 10 + (returned_offset, decoded_message) = \ + list(KafkaProtocol._decode_message(encoded, offset))[0] + self.assertEqual(returned_offset, offset) + self.assertEqual(decoded_message, create_message("test", "key")) + + def test_decode_message_set(self): + encoded = ('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2' + '\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff' + '\xff\xff\xff\x00\x00\x00\x02v2') + iter = KafkaProtocol._decode_message_set_iter(encoded) + decoded = list(iter) + self.assertEqual(len(decoded), 2) + (returned_offset1, decoded_message1) = decoded[0] + self.assertEqual(returned_offset1, 0) + self.assertEqual(decoded_message1, create_message("v1")) + (returned_offset2, decoded_message2) = decoded[1] + self.assertEqual(returned_offset2, 0) + self.assertEqual(decoded_message2, create_message("v2")) + + def test_decode_message_gzip(self): + gzip_encoded = ('\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000' + '\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01' + '\x9f\xf9\xd1\x87\x18\x18\xfe\x03\x01\x90\xc7Tf\xc8' + '\x80$wu\x1aW\x05\x92\x9c\x11\x00z\xc0h\x888\x00\x00' + '\x00') + offset = 11 + decoded = list(KafkaProtocol._decode_message(gzip_encoded, offset)) + self.assertEqual(len(decoded), 2) + (returned_offset1, decoded_message1) = decoded[0] + self.assertEqual(returned_offset1, 0) + self.assertEqual(decoded_message1, create_message("v1")) + (returned_offset2, decoded_message2) = decoded[1] + self.assertEqual(returned_offset2, 0) + self.assertEqual(decoded_message2, create_message("v2")) + + def test_decode_message_snappy(self): + snappy_encoded = ('\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00' + '\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff' + '\xff\xff\xff\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5' + '\x96\nx\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v2') + offset = 11 + decoded = list(KafkaProtocol._decode_message(snappy_encoded, offset)) + self.assertEqual(len(decoded), 2) + (returned_offset1, decoded_message1) = decoded[0] + self.assertEqual(returned_offset1, 0) + self.assertEqual(decoded_message1, create_message("v1")) + (returned_offset2, decoded_message2) = decoded[1] + self.assertEqual(returned_offset2, 0) + self.assertEqual(decoded_message2, create_message("v2")) + + def test_decode_message_checksum_error(self): + invalid_encoded_message = "This is not a valid encoded message" + iter = KafkaProtocol._decode_message(invalid_encoded_message, 0) + self.assertRaises(ChecksumError, list, iter) + + # NOTE: The error handling in _decode_message_set_iter() is questionable. + # If it's modified, the next two tests might need to be fixed. + def test_decode_message_set_fetch_size_too_small(self): + iter = KafkaProtocol._decode_message_set_iter('a') + self.assertRaises(ConsumerFetchSizeTooSmall, list, iter) + + def test_decode_message_set_stop_iteration(self): + encoded = ('\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10L\x9f[\xc2' + '\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v1\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x10\xd5\x96\nx\x00\x00\xff' + '\xff\xff\xff\x00\x00\x00\x02v2') + iter = KafkaProtocol._decode_message_set_iter(encoded + "@#$%(Y!") + decoded = list(iter) + self.assertEqual(len(decoded), 2) + (returned_offset1, decoded_message1) = decoded[0] + self.assertEqual(returned_offset1, 0) + self.assertEqual(decoded_message1, create_message("v1")) + (returned_offset2, decoded_message2) = decoded[1] + self.assertEqual(returned_offset2, 0) + self.assertEqual(decoded_message2, create_message("v2")) + + def test_encode_produce_request(self): + requests = [ProduceRequest("topic1", 0, [create_message("a"), + create_message("b")]), + ProduceRequest("topic2", 1, [create_message("c")])] + expect = ('\x00\x00\x00\x94\x00\x00\x00\x00\x00\x00\x00\x02\x00\x07' + 'client1\x00\x02\x00\x00\x00d\x00\x00\x00\x02\x00\x06topic1' + '\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x006\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x0fQ\xdf:2\x00\x00\xff\xff' + '\xff\xff\x00\x00\x00\x01a\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x0f\xc8\xd6k\x88\x00\x00\xff\xff\xff\xff\x00' + '\x00\x00\x01b\x00\x06topic2\x00\x00\x00\x01\x00\x00\x00\x01' + '\x00\x00\x00\x1b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x0f\xbf\xd1[\x1e\x00\x00\xff\xff\xff\xff\x00\x00\x00' + '\x01c') + encoded = KafkaProtocol.encode_produce_request("client1", 2, requests, + 2, 100) + self.assertEqual(encoded, expect) + + def test_decode_produce_response(self): + t1 = "topic1" + t2 = "topic2" + encoded = struct.pack('>iih%dsiihqihqh%dsiihq' % (len(t1), len(t2)), + 2, 2, len(t1), t1, 2, 0, 0, 10L, 1, 1, 20L, + len(t2), t2, 1, 0, 0, 30L) + responses = list(KafkaProtocol.decode_produce_response(encoded)) + self.assertEqual(responses, + [ProduceResponse(t1, 0, 0, 10L), + ProduceResponse(t1, 1, 1, 20L), + ProduceResponse(t2, 0, 0, 30L)]) + + def test_encode_fetch_request(self): + requests = [FetchRequest("topic1", 0, 10, 1024), + FetchRequest("topic2", 1, 20, 100)] + expect = ('\x00\x00\x00Y\x00\x01\x00\x00\x00\x00\x00\x03\x00\x07' + 'client1\xff\xff\xff\xff\x00\x00\x00\x02\x00\x00\x00d\x00' + '\x00\x00\x02\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x04\x00\x00\x06' + 'topic2\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00' + '\x00\x00\x14\x00\x00\x00d') + encoded = KafkaProtocol.encode_fetch_request("client1", 3, requests, 2, + 100) + self.assertEqual(encoded, expect) + + def test_decode_fetch_response(self): + t1 = "topic1" + t2 = "topic2" + msgs = map(create_message, ["message1", "hi", "boo", "foo", "so fun!"]) + ms1 = KafkaProtocol._encode_message_set([msgs[0], msgs[1]]) + ms2 = KafkaProtocol._encode_message_set([msgs[2]]) + ms3 = KafkaProtocol._encode_message_set([msgs[3], msgs[4]]) + + encoded = struct.pack('>iih%dsiihqi%dsihqi%dsh%dsiihqi%ds' % + (len(t1), len(ms1), len(ms2), len(t2), len(ms3)), + 4, 2, len(t1), t1, 2, 0, 0, 10, len(ms1), ms1, 1, + 1, 20, len(ms2), ms2, len(t2), t2, 1, 0, 0, 30, + len(ms3), ms3) + + responses = list(KafkaProtocol.decode_fetch_response(encoded)) + def expand_messages(response): + return FetchResponse(response.topic, response.partition, + response.error, response.highwaterMark, + list(response.messages)) + + expanded_responses = map(expand_messages, responses) + expect = [FetchResponse(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]), + OffsetAndMessage(0, msgs[1])]), + FetchResponse(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]), + FetchResponse(t2, 0, 0, 30, [OffsetAndMessage(0, msgs[3]), + OffsetAndMessage(0, msgs[4])])] + self.assertEqual(expanded_responses, expect) + + def test_encode_metadata_request_no_topics(self): + encoded = KafkaProtocol.encode_metadata_request("cid", 4) + self.assertEqual(encoded, '\x00\x00\x00\x11\x00\x03\x00\x00\x00\x00' + '\x00\x04\x00\x03cid\x00\x00\x00\x00') + + def test_encode_metadata_request_with_topics(self): + encoded = KafkaProtocol.encode_metadata_request("cid", 4, ["t1", "t2"]) + self.assertEqual(encoded, '\x00\x00\x00\x19\x00\x03\x00\x00\x00\x00' + '\x00\x04\x00\x03cid\x00\x00\x00\x02\x00\x02' + 't1\x00\x02t2') + + def _create_encoded_metadata_response(self, broker_data, topic_data, + topic_errors, partition_errors): + encoded = struct.pack('>ii', 3, len(broker_data)) + for node_id, broker in broker_data.iteritems(): + encoded += struct.pack('>ih%dsi' % len(broker.host), node_id, + len(broker.host), broker.host, broker.port) + + encoded += struct.pack('>i', len(topic_data)) + for topic, partitions in topic_data.iteritems(): + encoded += struct.pack('>hh%dsi' % len(topic), topic_errors[topic], + len(topic), topic, len(partitions)) + for partition, metadata in partitions.iteritems(): + encoded += struct.pack('>hiii', + partition_errors[(topic, partition)], + partition, metadata.leader, + len(metadata.replicas)) + if len(metadata.replicas) > 0: + encoded += struct.pack('>%di' % len(metadata.replicas), + *metadata.replicas) + + encoded += struct.pack('>i', len(metadata.isr)) + if len(metadata.isr) > 0: + encoded += struct.pack('>%di' % len(metadata.isr), + *metadata.isr) + + return encoded + + def test_decode_metadata_response(self): + node_brokers = { + 0: BrokerMetadata(0, "brokers1.kafka.rdio.com", 1000), + 1: BrokerMetadata(1, "brokers1.kafka.rdio.com", 1001), + 3: BrokerMetadata(3, "brokers2.kafka.rdio.com", 1000) + } + topic_partitions = { + "topic1": { + 0: PartitionMetadata("topic1", 0, 1, (0, 2), (2,)), + 1: PartitionMetadata("topic1", 1, 3, (0, 1), (0, 1)) + }, + "topic2": { + 0: PartitionMetadata("topic2", 0, 0, (), ()) + } + } + topic_errors = {"topic1": 0, "topic2": 1} + partition_errors = { + ("topic1", 0): 0, + ("topic1", 1): 1, + ("topic2", 0): 0 + } + encoded = self._create_encoded_metadata_response(node_brokers, + topic_partitions, + topic_errors, + partition_errors) + decoded = KafkaProtocol.decode_metadata_response(encoded) + self.assertEqual(decoded, (node_brokers, topic_partitions)) + + @unittest.skip("Not Implemented") + def test_encode_offset_request(self): + pass + + @unittest.skip("Not Implemented") + def test_decode_offset_response(self): + pass + + + @unittest.skip("Not Implemented") + def test_encode_offset_commit_request(self): + pass + + @unittest.skip("Not Implemented") + def test_decode_offset_commit_response(self): + pass + + @unittest.skip("Not Implemented") + def test_encode_offset_fetch_request(self): + pass + + @unittest.skip("Not Implemented") + def test_decode_offset_fetch_response(self): + pass if __name__ == '__main__': From 165eb8bca5795ba990c5f24fdb4964b95a59afe2 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 10:57:29 -0800 Subject: [PATCH 22/46] Style fix for imports --- test/test_unit.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/test/test_unit.py b/test/test_unit.py index 93d88a151..08fef9c2a 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -9,15 +9,11 @@ OffsetAndMessage, BrokerMetadata, PartitionMetadata ) from kafka.codec import ( - has_gzip, has_snappy, - gzip_encode, gzip_decode, + has_gzip, has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode ) from kafka.protocol import ( - create_gzip_message, - create_message, - create_snappy_message, - KafkaProtocol + create_gzip_message, create_message, create_snappy_message, KafkaProtocol ) ITERATIONS = 1000 From ca59b2f9378f5c13ee22651f95342f1c5978367b Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 15:18:32 -0800 Subject: [PATCH 23/46] Fix seek offset deltas We always store the offset of the next available message, so we shouldn't decrement the offset deltas when seeking by an extra 1 --- kafka/consumer.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index a44f2f6da..078886ece 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -282,12 +282,6 @@ def seek(self, offset, whence): reqs.append(OffsetRequest(self.topic, partition, -2, 1)) elif whence == 2: reqs.append(OffsetRequest(self.topic, partition, -1, 1)) - - # The API returns back the next available offset - # For eg: if the current offset is 18, the API will return - # back 19. So, if we have to seek 5 points before, we will - # end up going back to 14, instead of 13. Adjust this - deltas[partition] -= 1 else: pass From 2a55bd330ef2014f0c62872110e790ccbd0320db Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 15:26:00 -0800 Subject: [PATCH 24/46] Raise a ConnectionError when a socket.error is raised when receiving data Also, log.exception() is unhelpfully noisy. Use log.error() with some error details in the message instead. --- kafka/conn.py | 18 +++++++++++------- kafka/producer.py | 2 +- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 7c6d51035..3661c00b8 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -35,13 +35,21 @@ def __str__(self): # Private API # ################### + def _raise_connection_error(self): + self._dirty = True + raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port)) + def _read_bytes(self, num_bytes): bytes_left = num_bytes resp = '' log.debug("About to read %d bytes from Kafka", num_bytes) while bytes_left: - data = self._sock.recv(bytes_left) + try: + data = self._sock.recv(bytes_left) + except socket.error, e: + log.error('Unable to receive data from Kafka: %s', e) + self._raise_connection_error() if data == '': raise BufferUnderflowError("Not enough data to read this response") bytes_left -= len(data) @@ -65,10 +73,6 @@ def _consume_response(self): resp = self._read_bytes(size) return str(resp) - def _raise_connection_error(self): - self._dirty = True - raise ConnectionError("Kafka @ {}:{} went away".format(self.host, self.port)) - ################## # Public API # ################## @@ -84,8 +88,8 @@ def send(self, request_id, payload): sent = self._sock.sendall(payload) if sent is not None: self._raise_connection_error() - except socket.error: - log.exception('Unable to send payload to Kafka') + except socket.error, e: + log.error('Unable to send payload to Kafka: %s', e) self._raise_connection_error() def recv(self, request_id): diff --git a/kafka/producer.py b/kafka/producer.py index 0f1fc85a1..3b36d9415 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -147,7 +147,7 @@ def send_messages(self, partition, *msg): resp = self.client.send_produce_request([req], acks=self.req_acks, timeout=self.ack_timeout) except Exception as e: - log.exception("Unable to send messages") + log.error("Unable to send messages: %s", e) raise e return resp From 4ba0943afdcfd371aae2d1a2befbe493c9f65405 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 15:29:04 -0800 Subject: [PATCH 25/46] Fix client error handling This differentiates between errors that occur when sending the request and receiving the response, and adds BufferUnderflowError handling. --- kafka/client.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 0cf17d33a..f8690b8cf 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -8,7 +8,7 @@ from kafka.common import (ErrorMapping, TopicAndPartition, ConnectionError, FailedPayloadsError, BrokerResponseError, PartitionUnavailableError, - KafkaRequestError) + BufferUnderflowError, KafkaRequestError) from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -126,14 +126,24 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): request = encoder_fn(client_id=self.client_id, correlation_id=requestId, payloads=payloads) + failed = False # Send the request, recv the response try: conn.send(requestId, request) if decoder_fn is None: continue - response = conn.recv(requestId) - except ConnectionError, e: # ignore BufferUnderflow for now - log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e)) + try: + response = conn.recv(requestId) + except (ConnectionError, BufferUnderflowError), e: + log.warning("Could not receive response to request [%s] " + "from server %s: %s", request, conn, e) + failed = True + except ConnectionError, e: + log.warning("Could not send request [%s] to server %s: %s", + request, conn, e) + failed = True + + if failed: failed_payloads += payloads self.reset_all_metadata() continue From 02932af617211bc592275ac1012b060f22bbe849 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 15:49:55 -0800 Subject: [PATCH 26/46] Add a limit to fetch buffer size, and actually retry requests when fetch size is too small Note: This can cause fetching a message to exceed a given timeout, but timeouts are not guaranteed anyways, and in this case it's the client's fault for not sending a big enough buffer size rather than the kafka server. This can be bad if max_fetch_size is None (no limit) and there is some message in Kafka that is crazy huge, but that is why we should have some max_fetch_size. --- kafka/consumer.py | 95 +++++++++++++++++++++++++++++------------------ 1 file changed, 58 insertions(+), 37 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 078886ece..73efa86de 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -25,6 +25,7 @@ FETCH_MAX_WAIT_TIME = 100 FETCH_MIN_BYTES = 4096 FETCH_BUFFER_SIZE_BYTES = 4096 +MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 ITER_TIMEOUT_SECONDS = 60 NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 @@ -213,8 +214,10 @@ class SimpleConsumer(Consumer): auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit fetch_size_bytes: number of bytes to request in a FetchRequest - buffer_size: initial number of bytes to tell kafka we have - available. This will double every time it's not enough + buffer_size: default 4K. Initial number of bytes to tell kafka we + have available. This will double as needed. + max_buffer_size: default 16K. Max number of bytes to tell kafka we have + available. None means no limit. iter_timeout: default None. How much time (in seconds) to wait for a message in the iterator before exiting. None means no timeout, so it will wait forever. @@ -230,9 +233,15 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_t=AUTO_COMMIT_INTERVAL, fetch_size_bytes=FETCH_MIN_BYTES, buffer_size=FETCH_BUFFER_SIZE_BYTES, + max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, iter_timeout=None): + if max_buffer_size is not None and buffer_size > max_buffer_size: + raise ValueError("buffer_size (%d) is greater than " + "max_buffer_size (%d)" % + (buffer_size, max_buffer_size)) self.buffer_size = buffer_size + self.max_buffer_size = max_buffer_size self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes @@ -355,42 +364,54 @@ def _fetch(self): # Create fetch request payloads for all the partitions requests = [] partitions = self.offsets.keys() - for partition in partitions: - requests.append(FetchRequest(self.topic, partition, - self.offsets[partition], - self.buffer_size)) - # Send request - responses = self.client.send_fetch_request( - requests, - max_wait_time=int(self.fetch_max_wait_time), - min_bytes=self.fetch_min_bytes) - - for resp in responses: - partition = resp.partition - try: - for message in resp.messages: - # Update partition offset - self.offsets[partition] = message.offset + 1 - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - - # Put the message in our queue - if self.partition_info: - self.queue.put((partition, message)) + while partitions: + for partition in partitions: + requests.append(FetchRequest(self.topic, partition, + self.offsets[partition], + self.buffer_size)) + # Send request + responses = self.client.send_fetch_request( + requests, + max_wait_time=int(self.fetch_max_wait_time), + min_bytes=self.fetch_min_bytes) + + retry_partitions = set() + for resp in responses: + partition = resp.partition + try: + for message in resp.messages: + # Update partition offset + self.offsets[partition] = message.offset + 1 + + # Count, check and commit messages if necessary + self.count_since_commit += 1 + self._auto_commit() + + # Put the message in our queue + if self.partition_info: + self.queue.put((partition, message)) + else: + self.queue.put(message) + except ConsumerFetchSizeTooSmall, e: + if (self.max_buffer_size is not None and + self.buffer_size == self.max_buffer_size): + log.error("Max fetch size %d too small", + self.max_buffer_size) + raise e + if self.max_buffer_size is None: + self.buffer_size *= 2 else: - self.queue.put(message) - except ConsumerFetchSizeTooSmall, e: - self.buffer_size *= 2 - log.warn("Fetch size too small, increase to %d (2x) and retry", - self.buffer_size) - except ConsumerNoMoreData, e: - log.debug("Iteration was ended by %r", e) - except StopIteration: - # Stop iterating through this partition - log.debug("Done iterating over partition %s" % partition) - + self.buffer_size = max([self.buffer_size * 2, + self.max_buffer_size]) + log.warn("Fetch size too small, increase to %d (2x) " + "and retry", self.buffer_size) + retry_partitions.add(partition) + except ConsumerNoMoreData, e: + log.debug("Iteration was ended by %r", e) + except StopIteration: + # Stop iterating through this partition + log.debug("Done iterating over partition %s" % partition) + partitions = retry_partitions def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size): """ From 7c6e51916841c66f4b452eab5c879298a5444538 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 15:52:37 -0800 Subject: [PATCH 27/46] Handle starting/stopping Kafka brokers that are already started/stopped in integration tests If some of the tests stop brokers then error out, the teardown method will try to close the same brokers and fail. This change allows it to continue. --- test/fixtures.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test/fixtures.py b/test/fixtures.py index c771a5859..17e6672f6 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -272,8 +272,13 @@ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas= self.tmp_dir = None self.child = None + self.running = False def open(self): + if self.running: + print("*** Kafka instance already running") + return + self.tmp_dir = tempfile.mkdtemp() print("*** Running local Kafka instance") print(" host = %s" % self.host) @@ -318,10 +323,16 @@ def open(self): self.child.start() self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id) print("*** Done!") + self.running = True def close(self): + if not self.running: + print("*** Kafka instance already stopped") + return + print("*** Stopping Kafka...") self.child.stop() self.child = None print("*** Done!") shutil.rmtree(self.tmp_dir) + self.running = False From f5ffd85daa7dc039ef212da96032c708228ff7dc Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 17:40:07 -0800 Subject: [PATCH 28/46] Remove unnecessary brackets --- kafka/consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/consumer.py b/kafka/consumer.py index 73efa86de..522d6cacd 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -401,8 +401,8 @@ def _fetch(self): if self.max_buffer_size is None: self.buffer_size *= 2 else: - self.buffer_size = max([self.buffer_size * 2, - self.max_buffer_size]) + self.buffer_size = max(self.buffer_size * 2, + self.max_buffer_size) log.warn("Fetch size too small, increase to %d (2x) " "and retry", self.buffer_size) retry_partitions.add(partition) From 33551badd7f7842c1407d6c984b08dfb1e6b1e2a Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 17:47:39 -0800 Subject: [PATCH 29/46] Fix client and consumer params in integration tests --- test/test_integration.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/test/test_integration.py b/test/test_integration.py index 6833088a0..26cc700dc 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -572,7 +572,7 @@ def setUpClass(cls): cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192) + cls.client = KafkaClient(cls.server2.host, cls.server2.port) @classmethod def tearDownClass(cls): # noqa @@ -601,7 +601,9 @@ def test_simple_consumer(self): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", + "test_simple_consumer", auto_commit=False, + iter_timeout=0) all_messages = [] for message in consumer: all_messages.append(message) @@ -627,7 +629,9 @@ def test_simple_consumer(self): consumer.stop() def test_simple_consumer_blocking(self): - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_blocking", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", + "test_simple_consumer_blocking", + auto_commit=False, iter_timeout=0) # Blocking API start = datetime.now() @@ -675,7 +679,8 @@ def test_simple_consumer_pending(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_pending", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_pending", + auto_commit=False, iter_timeout=0) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) @@ -782,7 +787,8 @@ def test_large_messages(self): self.assertEquals(resp.offset, 10) # Consumer should still get all of them - consumer = SimpleConsumer(self.client, "group1", "test_large_messages", auto_commit=False) + consumer = SimpleConsumer(self.client, "group1", "test_large_messages", + auto_commit=False, iter_timeout=0) all_messages = messages1 + messages2 for i, message in enumerate(consumer): self.assertEquals(all_messages[i], message.message) @@ -887,7 +893,7 @@ def _kill_leader(self, topic, partition): def _count_messages(self, group, topic): client = KafkaClient(self.brokers[0].host, self.brokers[0].port) - consumer = SimpleConsumer(client, group, topic, auto_commit=False) + consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0) all_messages = [] for message in consumer: all_messages.append(message) From 78f7caaecb12ba57f8bcd424db8a528c533ba87f Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Fri, 3 Jan 2014 17:49:23 -0800 Subject: [PATCH 30/46] Add tests for limited and unlimited consumer max_buffer_size --- test/test_integration.py | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/test/test_integration.py b/test/test_integration.py index 26cc700dc..f8e71253f 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -8,6 +8,7 @@ from kafka import * # noqa from kafka.common import * # noqa from kafka.codec import has_gzip, has_snappy +from kafka.consumer import FETCH_BUFFER_SIZE_BYTES, MAX_FETCH_BUFFER_SIZE_BYTES from .fixtures import ZookeeperFixture, KafkaFixture @@ -778,7 +779,7 @@ def test_large_messages(self): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - # Produce 10 messages that are too large (bigger than default fetch size) + # Produce 10 messages that are large (bigger than default fetch size) messages2 = [create_message(random_string(5000)) for i in range(10)] produce2 = ProduceRequest("test_large_messages", 0, messages2) @@ -794,6 +795,30 @@ def test_large_messages(self): self.assertEquals(all_messages[i], message.message) self.assertEquals(i, 19) + # Produce 1 message that is too large (bigger than max fetch size) + big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10 + big_message = create_message(random_string(big_message_size)) + produce3 = ProduceRequest("test_large_messages", 0, [big_message]) + for resp in self.client.send_produce_request([produce3]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 20) + + self.assertRaises(ConsumerFetchSizeTooSmall, consumer.get_message, False, 0.1) + + # Create a consumer with no fetch size limit + big_consumer = SimpleConsumer(self.client, "group1", "test_large_messages", + max_buffer_size=None, partitions=[0], + auto_commit=False, iter_timeout=0) + + # Seek to the last message + big_consumer.seek(-1, 2) + + # Consume giant message successfully + message = big_consumer.get_message(block=False, timeout=10) + self.assertIsNotNone(message) + self.assertEquals(message.message.value, big_message.value) + + class TestFailover(KafkaTestCase): @classmethod From dbc3d80a1ac8e780b29da96b33737d2d8225a45d Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Mon, 6 Jan 2014 14:09:16 -0800 Subject: [PATCH 31/46] Make kafka brokers per-test in failover integration tests This is better since the tests stop/start brokers, and if something goes wrong they can affect eachother. --- test/test_integration.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/test/test_integration.py b/test/test_integration.py index f8e71253f..7e5dc39f1 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -821,25 +821,23 @@ def test_large_messages(self): class TestFailover(KafkaTestCase): - @classmethod - def setUpClass(cls): + def setUp(self): - zk_chroot = random_string(10) + zk_chroot = random_string(10) replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - cls.zk = ZookeeperFixture.instance() - kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] - cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port) - - @classmethod - def tearDownClass(cls): - cls.client.close() - for broker in cls.brokers: + self.zk = ZookeeperFixture.instance() + kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] + self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] + self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + + def tearDown(self): + self.client.close() + for broker in self.brokers: broker.close() - cls.zk.close() + self.zk.close() def test_switch_leader(self): From 36c3930c302d52aa60f6a10bcbe3b30935488789 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Mon, 6 Jan 2014 14:16:11 -0800 Subject: [PATCH 32/46] Add object type and ID to message prefix in fixtures output for easier debugging --- test/fixtures.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index 17e6672f6..28d6519a9 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -208,9 +208,12 @@ def __init__(self, host, port): self.tmp_dir = None self.child = None + def out(self, message): + print("*** Zookeeper[%s]: %s" % (id(self), message)) + def open(self): self.tmp_dir = tempfile.mkdtemp() - print("*** Running local Zookeeper instance...") + print("*** [%s] Running local Zookeeper instance..." % id(self)) print(" host = %s" % self.host) print(" port = %s" % self.port) print(" tmp_dir = %s" % self.tmp_dir) @@ -229,16 +232,16 @@ def open(self): self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt")) # Party! - print("*** Starting Zookeeper...") + self.out("Starting...") self.child.start() self.child.wait_for(r"Snapshotting") - print("*** Done!") + self.out("Done!") def close(self): - print("*** Stopping Zookeeper...") + self.out("Stopping...") self.child.stop() self.child = None - print("*** Done!") + self.out("Done!") shutil.rmtree(self.tmp_dir) @@ -274,13 +277,16 @@ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas= self.child = None self.running = False + def out(self, message): + print("*** Kafka[%s]: %s" % (id(self), message)) + def open(self): if self.running: - print("*** Kafka instance already running") + self.out("Instance already running") return self.tmp_dir = tempfile.mkdtemp() - print("*** Running local Kafka instance") + self.out("Running local instance") print(" host = %s" % self.host) print(" port = %s" % self.port) print(" broker_id = %s" % self.broker_id) @@ -308,31 +314,31 @@ def open(self): self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt")) # Party! - print("*** Creating Zookeeper chroot node...") + self.out("Creating Zookeeper chroot node...") proc = subprocess.Popen(kafka_run_class_args( "org.apache.zookeeper.ZooKeeperMain", "-server", "%s:%d" % (self.zk_host, self.zk_port), "create", "/%s" % self.zk_chroot, "kafka-python" )) if proc.wait() != 0: - print("*** Failed to create Zookeeper chroot node") + self.out("Failed to create Zookeeper chroot node") raise RuntimeError("Failed to create Zookeeper chroot node") - print("*** Done!") + self.out("Done!") - print("*** Starting Kafka...") + self.out("Starting...") self.child.start() self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id) - print("*** Done!") + self.out("Done!") self.running = True def close(self): if not self.running: - print("*** Kafka instance already stopped") + self.out("Instance already stopped") return - print("*** Stopping Kafka...") + self.out("Stopping...") self.child.stop() self.child = None - print("*** Done!") + self.out("Done!") shutil.rmtree(self.tmp_dir) self.running = False From 3a12e5c6ec5d9cd107a2300cc568f0971f9193ee Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Tue, 7 Jan 2014 17:23:30 -0800 Subject: [PATCH 33/46] Use the same timeout when reinitializing a connection --- kafka/conn.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 3661c00b8..4d0027b1d 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -25,7 +25,8 @@ def __init__(self, host, port, timeout=10): self.port = port self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) - self._sock.settimeout(timeout) + self.timeout = timeout + self._sock.settimeout(self.timeout) self._dirty = False def __str__(self): @@ -129,5 +130,5 @@ def reinit(self): self.close() self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((self.host, self.port)) - self._sock.settimeout(10) + self._sock.settimeout(self.timeout) self._dirty = False From 11b09c1ce7059711cdf67087024ff653e1d8d81e Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Tue, 7 Jan 2014 17:26:09 -0800 Subject: [PATCH 34/46] Handle dirty flag in conn.recv() * If the connection is dirty, reinit * If we get a BufferUnderflowError, the server could have gone away, so mark it dirty --- kafka/conn.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 4d0027b1d..80f157fb9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -44,7 +44,8 @@ def _read_bytes(self, num_bytes): bytes_left = num_bytes resp = '' log.debug("About to read %d bytes from Kafka", num_bytes) - + if self._dirty: + self.reinit() while bytes_left: try: data = self._sock.recv(bytes_left) @@ -52,6 +53,7 @@ def _read_bytes(self, num_bytes): log.error('Unable to receive data from Kafka: %s', e) self._raise_connection_error() if data == '': + self._dirty = True raise BufferUnderflowError("Not enough data to read this response") bytes_left -= len(data) log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) From adcfff0ba2900e44df99b968670352b967c71f4f Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Tue, 7 Jan 2014 17:27:21 -0800 Subject: [PATCH 35/46] Remove unnecessary method --- kafka/conn.py | 31 ++++++++----------------------- 1 file changed, 8 insertions(+), 23 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 80f157fb9..833f9c5fc 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -61,21 +61,6 @@ def _read_bytes(self, num_bytes): return resp - def _consume_response(self): - """ - This method handles the response header and error messages. It - then returns the response - """ - log.debug("Expecting response from Kafka") - # Read the size off of the header - resp = self._read_bytes(4) - - (size,) = struct.unpack('>i', resp) - - # Read the remainder of the response - resp = self._read_bytes(size) - return str(resp) - ################## # Public API # ################## @@ -91,7 +76,7 @@ def send(self, request_id, payload): sent = self._sock.sendall(payload) if sent is not None: self._raise_connection_error() - except socket.error, e: + except socket.error: log.error('Unable to send payload to Kafka: %s', e) self._raise_connection_error() @@ -100,14 +85,14 @@ def recv(self, request_id): Get a response from Kafka """ log.debug("Reading response %d from Kafka" % request_id) - if self._dirty: - self._raise_connection_error() + # Read the size off of the header + resp = self._read_bytes(4) - try: - return self._consume_response() - except socket.error: - log.exception('Unable to read response from Kafka') - self._raise_connection_error() + (size,) = struct.unpack('>i', resp) + + # Read the remainder of the response + resp = self._read_bytes(size) + return str(resp) def copy(self): """ From 283ce220edf845a6510f2072b2c0c93c0700fecf Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Tue, 7 Jan 2014 17:46:01 -0800 Subject: [PATCH 36/46] Skip snappy/gzip tests if they're not available --- test/test_unit.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/test/test_unit.py b/test/test_unit.py index 08fef9c2a..e3fd4bb5b 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -55,17 +55,16 @@ def test_submodule_namespace(self): class TestCodec(unittest.TestCase): + + @unittest.skipUnless(has_gzip(), "Gzip not available") def test_gzip(self): - if not has_gzip(): - return for i in xrange(ITERATIONS): s1 = random_string() s2 = gzip_decode(gzip_encode(s1)) self.assertEquals(s1, s2) + @unittest.skipUnless(has_snappy(), "Snappy not available") def test_snappy(self): - if not has_snappy(): - return for i in xrange(ITERATIONS): s1 = random_string() s2 = snappy_decode(snappy_encode(s1)) @@ -83,6 +82,7 @@ def test_create_message(self): self.assertEqual(msg.key, key) self.assertEqual(msg.value, payload) + @unittest.skipUnless(has_gzip(), "Snappy not available") def test_create_gzip(self): payloads = ["v1", "v2"] msg = create_gzip_message(payloads) @@ -98,6 +98,7 @@ def test_create_gzip(self): "\xff\xff\x00\x00\x00\x02v2") self.assertEqual(decoded, expect) + @unittest.skipUnless(has_snappy(), "Snappy not available") def test_create_snappy(self): payloads = ["v1", "v2"] msg = create_snappy_message(payloads) @@ -157,6 +158,7 @@ def test_decode_message_set(self): self.assertEqual(returned_offset2, 0) self.assertEqual(decoded_message2, create_message("v2")) + @unittest.skipUnless(has_gzip(), "Gzip not available") def test_decode_message_gzip(self): gzip_encoded = ('\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000' '\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01' @@ -173,6 +175,7 @@ def test_decode_message_gzip(self): self.assertEqual(returned_offset2, 0) self.assertEqual(decoded_message2, create_message("v2")) + @unittest.skipUnless(has_snappy(), "Snappy not available") def test_decode_message_snappy(self): snappy_encoded = ('\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00' '\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff' From 27ae2694ee44227ca33a6a2bfaaf6384fbdb0a05 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Tue, 7 Jan 2014 17:54:05 -0800 Subject: [PATCH 37/46] Some cleanup and easier to read test fixture output --- test/fixtures.py | 8 ++++---- test/test_integration.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index 28d6519a9..9e283d3c5 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -209,11 +209,11 @@ def __init__(self, host, port): self.child = None def out(self, message): - print("*** Zookeeper[%s]: %s" % (id(self), message)) + print("*** Zookeeper [%s:%d]: %s" % (self.host, self.port, message)) def open(self): self.tmp_dir = tempfile.mkdtemp() - print("*** [%s] Running local Zookeeper instance..." % id(self)) + self.out("Running local instance...") print(" host = %s" % self.host) print(" port = %s" % self.port) print(" tmp_dir = %s" % self.tmp_dir) @@ -278,7 +278,7 @@ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas= self.running = False def out(self, message): - print("*** Kafka[%s]: %s" % (id(self), message)) + print("*** Kafka [%s:%d]: %s" % (self.host, self.port, message)) def open(self): if self.running: @@ -286,7 +286,7 @@ def open(self): return self.tmp_dir = tempfile.mkdtemp() - self.out("Running local instance") + self.out("Running local instance...") print(" host = %s" % self.host) print(" port = %s" % self.port) print(" broker_id = %s" % self.broker_id) diff --git a/test/test_integration.py b/test/test_integration.py index 7e5dc39f1..289e65db8 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -8,7 +8,7 @@ from kafka import * # noqa from kafka.common import * # noqa from kafka.codec import has_gzip, has_snappy -from kafka.consumer import FETCH_BUFFER_SIZE_BYTES, MAX_FETCH_BUFFER_SIZE_BYTES +from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES from .fixtures import ZookeeperFixture, KafkaFixture From cf4d2206c28d8207ebda7cd7937529b1679068bc Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 8 Jan 2014 11:30:15 -0800 Subject: [PATCH 38/46] Change BufferUnderflowError to ConnectionError in conn._read_bytes() Both errors are handled the same way when raised and caught, so this makes sense. --- kafka/client.py | 5 +++-- kafka/conn.py | 5 ++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index f8690b8cf..7e169e884 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -8,7 +8,8 @@ from kafka.common import (ErrorMapping, TopicAndPartition, ConnectionError, FailedPayloadsError, BrokerResponseError, PartitionUnavailableError, - BufferUnderflowError, KafkaRequestError) + KafkaRequestError) + from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -134,7 +135,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): continue try: response = conn.recv(requestId) - except (ConnectionError, BufferUnderflowError), e: + except ConnectionError, e: log.warning("Could not receive response to request [%s] " "from server %s: %s", request, conn, e) failed = True diff --git a/kafka/conn.py b/kafka/conn.py index 833f9c5fc..0af9e72ed 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -4,7 +4,6 @@ import struct from threading import local -from kafka.common import BufferUnderflowError from kafka.common import ConnectionError log = logging.getLogger("kafka") @@ -53,8 +52,8 @@ def _read_bytes(self, num_bytes): log.error('Unable to receive data from Kafka: %s', e) self._raise_connection_error() if data == '': - self._dirty = True - raise BufferUnderflowError("Not enough data to read this response") + log.error("Not enough data to read this response") + self._raise_connection_error() bytes_left -= len(data) log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) resp += data From 6899063286e7614b80484b8be73aa3f4947767f6 Mon Sep 17 00:00:00 2001 From: Omar Ghishan Date: Wed, 8 Jan 2014 11:46:10 -0800 Subject: [PATCH 39/46] Change log.error() back to log.exception() --- kafka/conn.py | 6 +++--- kafka/producer.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 0af9e72ed..ec42fda4b 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -48,8 +48,8 @@ def _read_bytes(self, num_bytes): while bytes_left: try: data = self._sock.recv(bytes_left) - except socket.error, e: - log.error('Unable to receive data from Kafka: %s', e) + except socket.error: + log.exception('Unable to receive data from Kafka') self._raise_connection_error() if data == '': log.error("Not enough data to read this response") @@ -76,7 +76,7 @@ def send(self, request_id, payload): if sent is not None: self._raise_connection_error() except socket.error: - log.error('Unable to send payload to Kafka: %s', e) + log.exception('Unable to send payload to Kafka') self._raise_connection_error() def recv(self, request_id): diff --git a/kafka/producer.py b/kafka/producer.py index 3b36d9415..07c7d3e45 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -146,9 +146,9 @@ def send_messages(self, partition, *msg): try: resp = self.client.send_produce_request([req], acks=self.req_acks, timeout=self.ack_timeout) - except Exception as e: - log.error("Unable to send messages: %s", e) - raise e + except Exception: + log.exception("Unable to send messages") + raise return resp def stop(self, timeout=1): From adb7a63a5d033885fd7cfa972ba480dffe721813 Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Mon, 6 Jan 2014 17:37:41 -0800 Subject: [PATCH 40/46] Check for socket status on read as well as send --- kafka/conn.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index ec42fda4b..8d3e725f8 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -83,7 +83,6 @@ def recv(self, request_id): """ Get a response from Kafka """ - log.debug("Reading response %d from Kafka" % request_id) # Read the size off of the header resp = self._read_bytes(4) From 4bf32b99b2366d9017fe2e01386bd763df7b110e Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Mon, 6 Jan 2014 17:50:39 -0800 Subject: [PATCH 41/46] Propagate error immediately if dirty --- kafka/conn.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/conn.py b/kafka/conn.py index 8d3e725f8..ec42fda4b 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -83,6 +83,7 @@ def recv(self, request_id): """ Get a response from Kafka """ + log.debug("Reading response %d from Kafka" % request_id) # Read the size off of the header resp = self._read_bytes(4) From 9b7d8633dcdda3fcbf6b26ed545f81724406b0cd Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Tue, 7 Jan 2014 19:17:21 -0800 Subject: [PATCH 42/46] Add error hierarchy, remove client infinite loop --- kafka/client.py | 41 +++++++++++++++++++++++++++++++++++++++- test/test_integration.py | 1 - 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 7e169e884..1052d6e61 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -53,10 +53,44 @@ def _get_leader_for_partition(self, topic, partition): self.load_metadata_for_topics(topic) if key not in self.topics_to_brokers: - raise KafkaRequestError("Partition does not exist: %s" % str(key)) + raise BrokerResponseError("Partition does not exist: %s" % str(key)) return self.topics_to_brokers[key] + def _load_metadata_for_topics(self, *topics): + """ + Discover brokers and metadata for a set of topics. This method will + recurse in the event of a retry. + """ + request_id = self._next_id() + request = KafkaProtocol.encode_metadata_request(self.client_id, + request_id, topics) + + response = self._send_broker_unaware_request(request_id, request) + if response is None: + raise Exception("All servers failed to process request") + + (brokers, topics) = KafkaProtocol.decode_metadata_response(response) + + log.debug("Broker metadata: %s", brokers) + log.debug("Topic metadata: %s", topics) + + self.brokers = brokers + self.topics_to_brokers = {} + + for topic, partitions in topics.items(): + # Clear the list once before we add it. This removes stale entries + # and avoids duplicates + self.topic_partitions.pop(topic, None) + + if not partitions: + raise PartitionUnavailableError("Partitions for %s are unassigned!" % topic) + + for partition, meta in partitions.items(): + topic_part = TopicAndPartition(topic, partition) + self.topics_to_brokers[topic_part] = brokers[meta.leader] + self.topic_partitions[topic].append(partition) + def _next_id(self): """ Generate a new correlation id @@ -192,6 +226,9 @@ def reset_all_metadata(self): def has_metadata_for_topic(self, topic): return topic in self.topic_partitions + def reset_metadata(self): + self.topics_to_brokers = {} + def close(self): for conn in self.conns.values(): conn.close() @@ -280,6 +317,7 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000, if fail_on_error is True: self._raise_on_response_error(resp) + # Run the callback if callback is not None: out.append(callback(resp)) else: @@ -308,6 +346,7 @@ def send_fetch_request(self, payloads=[], fail_on_error=True, if fail_on_error is True: self._raise_on_response_error(resp) + # Run the callback if callback is not None: out.append(callback(resp)) else: diff --git a/test/test_integration.py b/test/test_integration.py index 289e65db8..44beecf47 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -41,7 +41,6 @@ def tearDownClass(cls): # noqa cls.server.close() cls.zk.close() - ##################### # Produce Tests # ##################### From ab79531dc7e05bd009c766320dae86251843ff0c Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Tue, 7 Jan 2014 19:41:39 -0800 Subject: [PATCH 43/46] Better error handling on broker exception, flake8 fixes --- kafka/client.py | 46 +++------------------------------------------- kafka/producer.py | 1 - 2 files changed, 3 insertions(+), 44 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 1052d6e61..1ac17788a 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -8,7 +8,7 @@ from kafka.common import (ErrorMapping, TopicAndPartition, ConnectionError, FailedPayloadsError, BrokerResponseError, PartitionUnavailableError, - KafkaRequestError) + KafkaError) from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -57,40 +57,6 @@ def _get_leader_for_partition(self, topic, partition): return self.topics_to_brokers[key] - def _load_metadata_for_topics(self, *topics): - """ - Discover brokers and metadata for a set of topics. This method will - recurse in the event of a retry. - """ - request_id = self._next_id() - request = KafkaProtocol.encode_metadata_request(self.client_id, - request_id, topics) - - response = self._send_broker_unaware_request(request_id, request) - if response is None: - raise Exception("All servers failed to process request") - - (brokers, topics) = KafkaProtocol.decode_metadata_response(response) - - log.debug("Broker metadata: %s", brokers) - log.debug("Topic metadata: %s", topics) - - self.brokers = brokers - self.topics_to_brokers = {} - - for topic, partitions in topics.items(): - # Clear the list once before we add it. This removes stale entries - # and avoids duplicates - self.topic_partitions.pop(topic, None) - - if not partitions: - raise PartitionUnavailableError("Partitions for %s are unassigned!" % topic) - - for partition, meta in partitions.items(): - topic_part = TopicAndPartition(topic, partition) - self.topics_to_brokers[topic_part] = brokers[meta.leader] - self.topic_partitions[topic].append(partition) - def _next_id(self): """ Generate a new correlation id @@ -112,7 +78,7 @@ def _send_broker_unaware_request(self, requestId, request): "trying next server: %s" % (request, conn, e)) continue - raise BrokerResponseError("All servers failed to process request") + raise KafkaError("All servers failed to process request") def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): """ @@ -226,9 +192,6 @@ def reset_all_metadata(self): def has_metadata_for_topic(self, topic): return topic in self.topic_partitions - def reset_metadata(self): - self.topics_to_brokers = {} - def close(self): for conn in self.conns.values(): conn.close() @@ -269,9 +232,8 @@ def load_metadata_for_topics(self, *topics): self.reset_topic_metadata(topic) if not partitions: - continue + raise PartitionUnavailableError("Partitions for %s are unassigned!" % topic) - self.topic_partitions[topic] = [] for partition, meta in partitions.items(): topic_part = TopicAndPartition(topic, partition) self.topics_to_brokers[topic_part] = brokers[meta.leader] @@ -317,7 +279,6 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000, if fail_on_error is True: self._raise_on_response_error(resp) - # Run the callback if callback is not None: out.append(callback(resp)) else: @@ -346,7 +307,6 @@ def send_fetch_request(self, payloads=[], fail_on_error=True, if fail_on_error is True: self._raise_on_response_error(resp) - # Run the callback if callback is not None: out.append(callback(resp)) else: diff --git a/kafka/producer.py b/kafka/producer.py index 07c7d3e45..6ed22ee4a 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -4,7 +4,6 @@ import time from Queue import Empty - from collections import defaultdict from itertools import cycle from multiprocessing import Queue, Process From fbec8ffa7208f1cf629fa17692ed3879207f855b Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Wed, 8 Jan 2014 12:05:09 -0800 Subject: [PATCH 44/46] No exception for empty partitions, fix topic metadata reset --- kafka/client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/client.py b/kafka/client.py index 1ac17788a..3577e227f 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -232,8 +232,9 @@ def load_metadata_for_topics(self, *topics): self.reset_topic_metadata(topic) if not partitions: - raise PartitionUnavailableError("Partitions for %s are unassigned!" % topic) + continue + self.topic_partitions[topic] = [] for partition, meta in partitions.items(): topic_part = TopicAndPartition(topic, partition) self.topics_to_brokers[topic_part] = brokers[meta.leader] From 1118bbd6084c41cbcb371bfe6d92f24b571a5676 Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Thu, 9 Jan 2014 17:06:34 -0800 Subject: [PATCH 45/46] Change exception names, make a new "KafkaRequestError" --- kafka/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 3577e227f..7e169e884 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -8,7 +8,7 @@ from kafka.common import (ErrorMapping, TopicAndPartition, ConnectionError, FailedPayloadsError, BrokerResponseError, PartitionUnavailableError, - KafkaError) + KafkaRequestError) from kafka.conn import KafkaConnection from kafka.protocol import KafkaProtocol @@ -53,7 +53,7 @@ def _get_leader_for_partition(self, topic, partition): self.load_metadata_for_topics(topic) if key not in self.topics_to_brokers: - raise BrokerResponseError("Partition does not exist: %s" % str(key)) + raise KafkaRequestError("Partition does not exist: %s" % str(key)) return self.topics_to_brokers[key] @@ -78,7 +78,7 @@ def _send_broker_unaware_request(self, requestId, request): "trying next server: %s" % (request, conn, e)) continue - raise KafkaError("All servers failed to process request") + raise BrokerResponseError("All servers failed to process request") def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): """ From 88b8ff334a27ff62d81b41e1439ea4f6e5993126 Mon Sep 17 00:00:00 2001 From: Thomas Dimson Date: Mon, 13 Jan 2014 14:42:11 -0800 Subject: [PATCH 46/46] Fix test errors after rebase --- test/test_integration.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/test/test_integration.py b/test/test_integration.py index 44beecf47..56974a5eb 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -719,7 +719,7 @@ def test_multi_process_consumer(self): start = datetime.now() messages = consumer.get_messages(block=True, timeout=5) diff = (datetime.now() - start).total_seconds() - self.assertGreaterEqual(diff, 5) + self.assertGreaterEqual(diff, 4.9) self.assertEqual(len(messages), 0) # Send 10 messages @@ -821,16 +821,16 @@ def test_large_messages(self): class TestFailover(KafkaTestCase): def setUp(self): - - zk_chroot = random_string(10) - replicas = 2 + zk_chroot = random_string(10) + replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - self.zk = ZookeeperFixture.instance() - kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] + self.zk = ZookeeperFixture.instance() + kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + KafkaTestCase.setUp(self) def tearDown(self): self.client.close() @@ -904,17 +904,17 @@ def _send_random_messages(self, producer, n): resp = producer.send_messages(random_string(10)) if len(resp) > 0: self.assertEquals(resp[0].error, 0) - time.sleep(1) # give it some time + time.sleep(1) # give it some time def _kill_leader(self, topic, partition): leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)] broker = self.brokers[leader.nodeId] broker.close() - time.sleep(1) # give it some time + time.sleep(1) # give it some time return broker def _count_messages(self, group, topic): - client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + client = KafkaClient(self.brokers[0].host, self.brokers[0].port) consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0) all_messages = [] for message in consumer: