-
Notifications
You must be signed in to change notification settings - Fork 920
Description
Proposed Enhancement:
For both the Consumer and Producer, it seems necessary to expose the error_cb in the Python API so that Consumer and Producer code can deal with errors, especially w.r.t. connections with Kafka.
For example, consider the Consumer pseudo code:
while True:
msg = consumer.poll(1.0)
if not msg:
logger.info('No messages found in polling window')
continue
if msg.error():
logger.exception('Received error', error=msg.error())
raise confluent_kafka.KafkaException(msg.error())
This code will run forever, even if the consumer loses connection to the brokers, which isn't always what you want. In my case, I'd like to fail if the error has persisted for the past 5 minutes.
It seems like it wouldn't be too much work to do something similar to what is done for the on_revoke
and on_assign
callbacks, but not sure about the intricacies of the librdkafka's error_cb.
This is blocking us from adopting confluent-kafka-python more broadly (or at all).
Thanks for your time.