-
Notifications
You must be signed in to change notification settings - Fork 933
Description
Description
The current Python high-level consumer & producer are not context managers. They do not implement __enter__() and __exit__() methods, so they do not support using Python's with statements. The producer doesn't even have a close() method!
This would allow cleaner, pythonic use of producers and consumers. As an example, here's a modified version of the High Level Consumer example from the README.md that uses contextlib's closing:
from contextlib import closing
from confluent_kafka import Consumer
with closing(Consumer({'bootstrap.servers': 'mybroker',
'group.id': 'mygroup',
'auto_offset.reset': 'earliest'
}) as c:
c.subscribe(['mytopic'])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print("Received message: {}'.format(msg.value().decode('utf-8')))
The producer is a bit more work because it doesn't implement a close() method (which could very be implemented to just call flush()), but you can get there using the contextmanager decorator:
import confluent_kafka
from contextlib import contextmanager
@contextmanager
def Producer(*args, **kwargs):
producer = confluent_kafka.Producer(*args, **kwargs)
try:
yield producer
finally:
producer.flush()
# delivery_report is left unaltered
with Producer({'bootstrap.servers': 'mybroker1,mybroker2'}) as p:
for data in some_data_source:
p.poll(0)
p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)
Rather than using contextlib at all, it probably makes more sense to just implement __enter__() and __exit__() methods in the base Consumer & Producer classes (and while you're at it, add a close() method to Producer which simply calls flush()).