Description
Description
On simple consumer->consume(0) call is resulting in memory leak indication in valgrind
same memory leak is getting generated for
- RdKafka::Message *message = consumer->consume(0);
- ConsumerCb::consume_cb() method
valgrind trace
==21852== 73 (40 direct, 33 indirect) bytes in 1 blocks are definitely lost in loss record 656 of 743
==21852== at 0x4C2A203: operator new(unsigned long) (vg_replace_malloc.c:334)
==21852== by 0x5750EFC: RdKafka::TopicPartition::create(std::string const&, int, long) (TopicPartitionImpl.cpp:46)
==21852== by 0x40CE42: sondmi::DistributedMessaging::rebalance_cb(RdKafka::KafkaConsumer*, RdKafka::ErrorCode, std::vector<RdKafka::TopicPartition*, std::allocatorRdKafka::TopicPartition* >&) (sondmi.cpp:40)
==21852== by 0x574D488: RdKafka::rebalance_cb_trampoline(rd_kafka_s*, rd_kafka_resp_err_t, rd_kafka_topic_partition_list_s*, void*) (HandleImpl.cpp:176)
==21852== by 0x597136F: rd_kafka_poll_cb (rdkafka.c:2530)
==21852== by 0x5971945: rd_kafka_consume0 (rdkafka.c:1956)
==21852== by 0x574F8C9: RdKafka::KafkaConsumerImpl::consume(int) (KafkaConsumerImpl.cpp:112)
==== these frames call std::unique_ptrRdKafka::Message message(consumer->consume(0));
How to reproduce
- Setup one producer and KafkaConsumer with any topic name.
- run examples using valgrind
- Produce one message
- Consume message using any of the above 2 options.
Checklist
Please provide the following information:
- [0.11.0 ] confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): - [ 2.11-1.0.0] Apache Kafka broker version:
- Client configuration:
conf->set("metadata.broker.list", brokers, errstr); conf->set("bootstrap.servers", brokers, errstr); conf->set("default_topic_conf", tconf.get(), errstr); conf->set("log_level", "3", errstr); conf->set("group.id", std::to_string(group_id), errstr); conf->set("client.id", m_kafka_client_id, errstr); conf->set("auto.offset.reset", "earliest", errstr); conf->set("rebalance_cb", rebalance_cb, errstr); conf->set("event_cb", event_cb, errstr); if (socket_cb) { conf->set("socket_cb", socket_cb, errstr); }
- [Linux localhost.localdomain 3.10.0-693.5.2.el7.x86_64 kafkatest, refactoring, and bug fixes #1 SMP Fri Oct 20 20:32:50 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux ] Operating system:
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue