From 387131ab30961d981e3be3889a1248d7c0deed1e Mon Sep 17 00:00:00 2001 From: Mark Roberts Date: Tue, 25 Feb 2014 10:21:10 -0800 Subject: [PATCH 1/6] Introduce Kafka server version abstraction layer for allowing users to select Kafka version with appropriate defaults for core consumers and producers --- example.py | 15 +++++++-------- kafka/__init__.py | 23 +++++++++++++++++++++-- kafka/client.py | 21 +++++++++++++++++++-- kafka/consumer.py | 8 ++++---- kafka/producer.py | 4 ++-- 5 files changed, 53 insertions(+), 18 deletions(-) diff --git a/example.py b/example.py index 3a2dc928b..410bccefb 100644 --- a/example.py +++ b/example.py @@ -1,23 +1,22 @@ import logging -from kafka.client import KafkaClient, FetchRequest, ProduceRequest -from kafka.consumer import SimpleConsumer -from kafka.producer import SimpleProducer +from kafka import Kafka081Client def produce_example(client): - producer = SimpleProducer(client, "my-topic") - producer.send_messages("test") + producer = client.simple_producer() + producer.send_messages('my-topic', "test") def consume_example(client): - consumer = SimpleConsumer(client, "test-group", "my-topic") + consumer = client.simple_consumer("test-group", "my-topic") for message in consumer: print(message) def main(): - client = KafkaClient("localhost", 9092) + client = Kafka081Client("localhost", 9092) produce_example(client) consume_example(client) + if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) + logging.basicConfig(level=logging.WARN) main() diff --git a/kafka/__init__.py b/kafka/__init__.py index 73aa7603c..b81eaaeb2 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -1,5 +1,5 @@ __title__ = 'kafka' -__version__ = '0.2-alpha' +__version__ = '0.9.0' __author__ = 'David Arthur' __license__ = 'Apache License 2.0' __copyright__ = 'Copyright 2012, David Arthur under Apache License, v2.0' @@ -12,9 +12,28 @@ from kafka.producer import SimpleProducer, KeyedProducer from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner from kafka.consumer import SimpleConsumer, MultiProcessConsumer +from kafka.util import set_defaults + +class Kafka081Client(KafkaClient): + server_version = "0.8.1" + +class Kafka080Client(KafkaClient): + server_version = "0.8.0" + + def simple_consumer(self, group, topic, **kwargs): + assert not kwargs.get('auto_commit') + kwargs['auto_commit'] = False + + return super(Kafka080Client, self).simple_consumer(group, topic, **kwargs) + + def multiprocess_consumer(self, group, topic, **kwargs): + assert not kwargs.get('auto_commit') + kwargs['auto_commit'] = False + + return super(Kafka080Client, self).multiprocess_consumer(group, topic, **kwargs) __all__ = [ - 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer', + 'KafkaClient', 'Kafka080Client', 'Kafka081Client', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer', 'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer', 'MultiProcessConsumer', 'create_message', 'create_gzip_message', 'create_snappy_message' diff --git a/kafka/client.py b/kafka/client.py index 155f65883..1d9887576 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -15,9 +15,9 @@ log = logging.getLogger("kafka") - class KafkaClient(object): + server_version = "unknown" CLIENT_ID = "kafka-python" ID_GEN = count() @@ -163,7 +163,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): return (acc[k] for k in original_keys) if acc else () def __repr__(self): - return '' % (self.client_id) + return '' % (self.server_version, self.client_id) def _raise_on_response_error(self, resp): if resp.error == ErrorMapping.NO_ERROR: @@ -180,6 +180,23 @@ def _raise_on_response_error(self, resp): ################# # Public API # ################# + + def keyed_producer(self, **kwargs): + import kafka + return kafka.producer.KeyedProducer(self, **kwargs) + + def simple_producer(self, **kwargs): + import kafka + return kafka.producer.SimpleProducer(self, **kwargs) + + def simple_consumer(self, group, topic, **kwargs): + import kafka + return kafka.consumer.SimpleConsumer(self, group, topic, **kwargs) + + def multiprocess_consumer(self, group, topic, **kwargs): + import kafka + return kafka.consumer.MultiProcessConsumer(self, group, topic, **kwargs) + def reset_topic_metadata(self, *topics): for topic in topics: try: diff --git a/kafka/consumer.py b/kafka/consumer.py index 28b53ec92..c31a23553 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -255,8 +255,8 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, self.queue = Queue() def __repr__(self): - return '' % \ - (self.group, self.topic, str(self.offsets.keys())) + return '' % \ + (self.client.server_version, self.group, self.topic, str(self.offsets.keys())) def provide_partition_info(self): """ @@ -578,8 +578,8 @@ def __init__(self, client, group, topic, auto_commit=True, self.procs.append(proc) def __repr__(self): - return '' % \ - (self.group, self.topic, len(self.procs)) + return '' % \ + (self.client.server_version, self.group, self.topic, len(self.procs)) def stop(self): # Set exit and start off all waiting consumers diff --git a/kafka/producer.py b/kafka/producer.py index 12a293401..d7109f9d4 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -205,7 +205,7 @@ def send_messages(self, topic, *msg): return super(SimpleProducer, self).send_messages(topic, partition, *msg) def __repr__(self): - return '' % self.async + return '' % (self.client.server_version, self.async) class KeyedProducer(Producer): @@ -254,4 +254,4 @@ def send(self, topic, key, msg): return self.send_messages(topic, partition, msg) def __repr__(self): - return '' % self.async + return '' % (self.client.server_version, self.async) From 992d5e27731909d25f5e95200a818e49a3dc51a6 Mon Sep 17 00:00:00 2001 From: Mark Roberts Date: Tue, 25 Feb 2014 10:39:32 -0800 Subject: [PATCH 2/6] Fix compilation error --- kafka/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka/__init__.py b/kafka/__init__.py index b81eaaeb2..f3e1608b6 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -12,7 +12,6 @@ from kafka.producer import SimpleProducer, KeyedProducer from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner from kafka.consumer import SimpleConsumer, MultiProcessConsumer -from kafka.util import set_defaults class Kafka081Client(KafkaClient): server_version = "0.8.1" From 0d128b2b83ff0c6c52ff5bcdd78cb7c0115df05b Mon Sep 17 00:00:00 2001 From: Mark Roberts Date: Tue, 25 Feb 2014 10:21:10 -0800 Subject: [PATCH 3/6] Introduce Kafka server version abstraction layer for allowing users to select Kafka version with appropriate defaults for core consumers and producers --- example.py | 15 +++++++-------- kafka/__init__.py | 21 ++++++++++++++++++++- kafka/client.py | 21 +++++++++++++++++++-- kafka/consumer.py | 8 ++++---- kafka/producer.py | 4 ++-- 5 files changed, 52 insertions(+), 17 deletions(-) diff --git a/example.py b/example.py index 3a2dc928b..410bccefb 100644 --- a/example.py +++ b/example.py @@ -1,23 +1,22 @@ import logging -from kafka.client import KafkaClient, FetchRequest, ProduceRequest -from kafka.consumer import SimpleConsumer -from kafka.producer import SimpleProducer +from kafka import Kafka081Client def produce_example(client): - producer = SimpleProducer(client, "my-topic") - producer.send_messages("test") + producer = client.simple_producer() + producer.send_messages('my-topic', "test") def consume_example(client): - consumer = SimpleConsumer(client, "test-group", "my-topic") + consumer = client.simple_consumer("test-group", "my-topic") for message in consumer: print(message) def main(): - client = KafkaClient("localhost", 9092) + client = Kafka081Client("localhost", 9092) produce_example(client) consume_example(client) + if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) + logging.basicConfig(level=logging.WARN) main() diff --git a/kafka/__init__.py b/kafka/__init__.py index e446f58f1..b81eaaeb2 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -12,9 +12,28 @@ from kafka.producer import SimpleProducer, KeyedProducer from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner from kafka.consumer import SimpleConsumer, MultiProcessConsumer +from kafka.util import set_defaults + +class Kafka081Client(KafkaClient): + server_version = "0.8.1" + +class Kafka080Client(KafkaClient): + server_version = "0.8.0" + + def simple_consumer(self, group, topic, **kwargs): + assert not kwargs.get('auto_commit') + kwargs['auto_commit'] = False + + return super(Kafka080Client, self).simple_consumer(group, topic, **kwargs) + + def multiprocess_consumer(self, group, topic, **kwargs): + assert not kwargs.get('auto_commit') + kwargs['auto_commit'] = False + + return super(Kafka080Client, self).multiprocess_consumer(group, topic, **kwargs) __all__ = [ - 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer', + 'KafkaClient', 'Kafka080Client', 'Kafka081Client', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer', 'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer', 'MultiProcessConsumer', 'create_message', 'create_gzip_message', 'create_snappy_message' diff --git a/kafka/client.py b/kafka/client.py index 155f65883..1d9887576 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -15,9 +15,9 @@ log = logging.getLogger("kafka") - class KafkaClient(object): + server_version = "unknown" CLIENT_ID = "kafka-python" ID_GEN = count() @@ -163,7 +163,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): return (acc[k] for k in original_keys) if acc else () def __repr__(self): - return '' % (self.client_id) + return '' % (self.server_version, self.client_id) def _raise_on_response_error(self, resp): if resp.error == ErrorMapping.NO_ERROR: @@ -180,6 +180,23 @@ def _raise_on_response_error(self, resp): ################# # Public API # ################# + + def keyed_producer(self, **kwargs): + import kafka + return kafka.producer.KeyedProducer(self, **kwargs) + + def simple_producer(self, **kwargs): + import kafka + return kafka.producer.SimpleProducer(self, **kwargs) + + def simple_consumer(self, group, topic, **kwargs): + import kafka + return kafka.consumer.SimpleConsumer(self, group, topic, **kwargs) + + def multiprocess_consumer(self, group, topic, **kwargs): + import kafka + return kafka.consumer.MultiProcessConsumer(self, group, topic, **kwargs) + def reset_topic_metadata(self, *topics): for topic in topics: try: diff --git a/kafka/consumer.py b/kafka/consumer.py index 28b53ec92..c31a23553 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -255,8 +255,8 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None, self.queue = Queue() def __repr__(self): - return '' % \ - (self.group, self.topic, str(self.offsets.keys())) + return '' % \ + (self.client.server_version, self.group, self.topic, str(self.offsets.keys())) def provide_partition_info(self): """ @@ -578,8 +578,8 @@ def __init__(self, client, group, topic, auto_commit=True, self.procs.append(proc) def __repr__(self): - return '' % \ - (self.group, self.topic, len(self.procs)) + return '' % \ + (self.client.server_version, self.group, self.topic, len(self.procs)) def stop(self): # Set exit and start off all waiting consumers diff --git a/kafka/producer.py b/kafka/producer.py index 12a293401..d7109f9d4 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -205,7 +205,7 @@ def send_messages(self, topic, *msg): return super(SimpleProducer, self).send_messages(topic, partition, *msg) def __repr__(self): - return '' % self.async + return '' % (self.client.server_version, self.async) class KeyedProducer(Producer): @@ -254,4 +254,4 @@ def send(self, topic, key, msg): return self.send_messages(topic, partition, msg) def __repr__(self): - return '' % self.async + return '' % (self.client.server_version, self.async) From 1489a5bdfb704ac049b03b2d1a7549ba33c15163 Mon Sep 17 00:00:00 2001 From: Mark Roberts Date: Tue, 25 Feb 2014 10:39:32 -0800 Subject: [PATCH 4/6] Fix compilation error --- kafka/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka/__init__.py b/kafka/__init__.py index b81eaaeb2..f3e1608b6 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -12,7 +12,6 @@ from kafka.producer import SimpleProducer, KeyedProducer from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner from kafka.consumer import SimpleConsumer, MultiProcessConsumer -from kafka.util import set_defaults class Kafka081Client(KafkaClient): server_version = "0.8.1" From 0bb9ae10f9e67b70a607587b1f3d6d610c5e5d8a Mon Sep 17 00:00:00 2001 From: Mark Roberts Date: Wed, 19 Mar 2014 02:22:05 -0700 Subject: [PATCH 5/6] Make Kafka080Client pull from topic head. Conditionally set auto_commit --- kafka/__init__.py | 5 ++++- kafka/consumer.py | 22 +++++++++++----------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/kafka/__init__.py b/kafka/__init__.py index f3e1608b6..6cf7cb3d7 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -23,7 +23,10 @@ def simple_consumer(self, group, topic, **kwargs): assert not kwargs.get('auto_commit') kwargs['auto_commit'] = False - return super(Kafka080Client, self).simple_consumer(group, topic, **kwargs) + consumer = super(Kafka080Client, self).simple_consumer(group, topic, **kwargs) + consumer.seek(0, 2) + + return consumer def multiprocess_consumer(self, group, topic, **kwargs): assert not kwargs.get('auto_commit') diff --git a/kafka/consumer.py b/kafka/consumer.py index c31a23553..ce99b93c9 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -10,6 +10,7 @@ from kafka.common import ( ErrorMapping, FetchRequest, OffsetRequest, OffsetCommitRequest, + OffsetFetchRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData ) @@ -105,17 +106,16 @@ def get_or_init_offset_callback(resp): "partition=%d failed with errorcode=%s" % ( resp.topic, resp.partition, resp.error)) - # Uncomment for 0.8.1 - # - #for partition in partitions: - # req = OffsetFetchRequest(topic, partition) - # (offset,) = self.client.send_offset_fetch_request(group, [req], - # callback=get_or_init_offset_callback, - # fail_on_error=False) - # self.offsets[partition] = offset - - for partition in partitions: - self.offsets[partition] = 0 + if auto_commit: + for partition in partitions: + req = OffsetFetchRequest(topic, partition) + (offset,) = self.client.send_offset_fetch_request(group, [req], + callback=get_or_init_offset_callback, + fail_on_error=False) + self.offsets[partition] = offset + else: + for partition in partitions: + self.offsets[partition] = 0 def commit(self, partitions=None): """ From 334f385a70b4d8e3a90d3544af5d5f73aac78c63 Mon Sep 17 00:00:00 2001 From: Mark Roberts Date: Fri, 4 Apr 2014 17:43:46 -0700 Subject: [PATCH 6/6] Move Kafka clients into clients file, add Kafka 0.8.2 client --- kafka/__init__.py | 26 +++----------------------- kafka/client.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/kafka/__init__.py b/kafka/__init__.py index 6cf7cb3d7..d6feb674e 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -4,7 +4,7 @@ __license__ = 'Apache License 2.0' __copyright__ = 'Copyright 2012, David Arthur under Apache License, v2.0' -from kafka.client import KafkaClient +from kafka.client import KafkaClient, Kafka080Client, Kafka081Client, Kafka082Client from kafka.conn import KafkaConnection from kafka.protocol import ( create_message, create_gzip_message, create_snappy_message @@ -13,29 +13,9 @@ from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner from kafka.consumer import SimpleConsumer, MultiProcessConsumer -class Kafka081Client(KafkaClient): - server_version = "0.8.1" - -class Kafka080Client(KafkaClient): - server_version = "0.8.0" - - def simple_consumer(self, group, topic, **kwargs): - assert not kwargs.get('auto_commit') - kwargs['auto_commit'] = False - - consumer = super(Kafka080Client, self).simple_consumer(group, topic, **kwargs) - consumer.seek(0, 2) - - return consumer - - def multiprocess_consumer(self, group, topic, **kwargs): - assert not kwargs.get('auto_commit') - kwargs['auto_commit'] = False - - return super(Kafka080Client, self).multiprocess_consumer(group, topic, **kwargs) - __all__ = [ - 'KafkaClient', 'Kafka080Client', 'Kafka081Client', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer', + 'KafkaClient', 'Kafka080Client', 'Kafka081Client', 'Kafka082Client', + 'KafkaConnection', 'SimpleProducer', 'KeyedProducer', 'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer', 'MultiProcessConsumer', 'create_message', 'create_gzip_message', 'create_snappy_message' diff --git a/kafka/client.py b/kafka/client.py index d808d5bf7..bf87d8140 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -420,3 +420,31 @@ def send_offset_fetch_request(self, group, payloads=[], else: out.append(resp) return out + + +class Kafka082Client(KafkaClient): + server_version = "0.8.2" + + +class Kafka081Client(KafkaClient): + server_version = "0.8.1" + + +class Kafka080Client(KafkaClient): + server_version = "0.8.0" + + def simple_consumer(self, group, topic, **kwargs): + assert not kwargs.get('auto_commit') + kwargs['auto_commit'] = False + + consumer = super(Kafka080Client, self).simple_consumer(group, topic, **kwargs) + consumer.seek(0, 2) + + return consumer + + def multiprocess_consumer(self, group, topic, **kwargs): + assert not kwargs.get('auto_commit') + kwargs['auto_commit'] = False + + return super(Kafka080Client, self).multiprocess_consumer(group, topic, **kwargs) +