Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
sphinx
sphinxcontrib-napoleon
sphinx_rtd_theme

# Install kafka-python in editable mode
# This allows the sphinx autodoc module
Expand Down
103 changes: 89 additions & 14 deletions docs/usage.rst
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
Usage
=====

High level
----------
SimpleProducer
--------------

.. code:: python

from kafka import SimpleProducer, KafkaClient, KafkaConsumer
from kafka import SimpleProducer, KafkaClient

# To send messages synchronously
kafka = KafkaClient("localhost:9092")
Expand Down Expand Up @@ -51,17 +51,6 @@ High level
batch_send_every_n=20,
batch_send_every_t=60)

# To consume messages
consumer = KafkaConsumer("my-topic", group_id="my_group",
metadata_broker_list=["localhost:9092"])
for message in consumer:
# message is raw byte string -- decode if necessary!
# e.g., for unicode: `message.decode('utf-8')`
print(message)

kafka.close()


Keyed messages
--------------

Expand All @@ -80,6 +69,92 @@ Keyed messages
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)



KafkaConsumer
-------------

.. code:: python

from kafka import KafkaConsumer

# To consume messages
consumer = KafkaConsumer("my-topic",
group_id="my_group",
bootstrap_servers=["localhost:9092"])
for message in consumer:
# message value is raw byte string -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))

kafka.close()


messages (m) are namedtuples with attributes:

* `m.topic`: topic name (str)
* `m.partition`: partition number (int)
* `m.offset`: message offset on topic-partition log (int)
* `m.key`: key (bytes - can be None)
* `m.value`: message (output of deserializer_class - default is raw bytes)


.. code:: python

from kafka import KafkaConsumer

# more advanced consumer -- multiple topics w/ auto commit offset
# management
consumer = KafkaConsumer('topic1', 'topic2',
bootstrap_servers=['localhost:9092'],
group_id='my_consumer_group',
auto_commit_enable=True,
auto_commit_interval_ms=30 * 1000,
auto_offset_reset='smallest')

# Infinite iteration
for m in consumer:
do_some_work(m)

# Mark this message as fully consumed
# so it can be included in the next commit
#
# **messages that are not marked w/ task_done currently do not commit!
kafka.task_done(m)

# If auto_commit_enable is False, remember to commit() periodically
kafka.commit()

# Batch process interface
while True:
for m in kafka.fetch_messages():
process_message(m)
kafka.task_done(m)


Configuration settings can be passed to constructor,
otherwise defaults will be used:

.. code:: python

client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
bootstrap_servers=[],
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
consumer_timeout_ms=-1

Configuration parameters are described in more detail at
http://kafka.apache.org/documentation.html#highlevelconsumerapi

Multiprocess consumer
---------------------

Expand Down
Loading