Skip to content

KafkaConsumer Fails to Report Problem with Compression #1033

@nickwallen

Description

@nickwallen

When no module exists to handle Snappy decompression, the KafkaConsumer returns no messages, rather than reporting the problem. This differs from the legacy Consumer API which provides a much more useful error message.

Background

I was attempting to fetch some data from a Kafka topic which was using snappy compression. No data was ever returned even though I knew data was being landed in the topic (confirmed with the Kafka CLI tools). This had me very confused.

>>> consumer = kafka.KafkaConsumer("test", bootstrap_servers=["svr:9092"])
>>> consumer.poll(5000)
{}

I then attempted to use the legacy consumer API which pointed me to the exact problem.

>>> client = kafka.SimpleClient("svr:9092")
>>> consumer.close()
>>> consumer = kafka.SimpleConsumer(client, "group", "test")
>>> for message in consumer:
...     print(message)
...
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 353, in __iter__
    message = self.get_message(True, timeout)
  File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 305, in get_message
    return self._get_message(block, timeout, get_partition_info)
  File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 320, in _get_message
    self._fetch()
  File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 379, in _fetch
    fail_on_error=False
  File "/usr/lib/python2.7/site-packages/kafka/client.py", line 665, in send_fetch_request
    KafkaProtocol.decode_fetch_response)
  File "/usr/lib/python2.7/site-packages/kafka/client.py", line 295, in _send_broker_aware_request
    for payload_response in decoder_fn(future.value):
  File "/usr/lib/python2.7/site-packages/kafka/protocol/legacy.py", line 212, in decode_fetch_response
    for partition, error, highwater_offset, messages in partitions
  File "/usr/lib/python2.7/site-packages/kafka/protocol/legacy.py", line 219, in decode_message_set
    inner_messages = message.decompress()
  File "/usr/lib/python2.7/site-packages/kafka/protocol/message.py", line 121, in decompress
    assert has_snappy(), 'Snappy decompression unsupported'
AssertionError: Snappy decompression unsupported

All I needed to do was install the python-snappy module to handle the decompression.

pip install python-snappy

It would be useful if the KafkaConsumer reported the same problem as the legacy API.

Version/Environment

# pip show kafka-python
---
Metadata-Version: 2.0
Name: kafka-python
Version: 1.3.2
Summary: Pure Python client for Apache Kafka
Home-page: https://github.com/dpkp/kafka-python
Author: Dana Powers
Author-email: [email protected]
Installer: pip
License: Apache License 2.0
Location: /usr/lib/python2.7/site-packages
Requires:
Classifiers:
  Development Status :: 5 - Production/Stable
  Intended Audience :: Developers
  License :: OSI Approved :: Apache Software License
  Programming Language :: Python
  Programming Language :: Python :: 2
  Programming Language :: Python :: 2.6
  Programming Language :: Python :: 2.7
  Programming Language :: Python :: 3
  Programming Language :: Python :: 3.3
  Programming Language :: Python :: 3.4
  Programming Language :: Python :: 3.5
  Programming Language :: Python :: Implementation :: PyPy
  Topic :: Software Development :: Libraries :: Python Modules
You are using pip version 8.1.2, however version 9.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
# uname -a
Linux y138... 3.10.0-514.10.2.el7.x86_64 #1 SMP Fri Mar 3 00:04:05 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions