Skip to content

Consumer poll() does not always return #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mikesparr opened this issue Jul 12, 2016 · 7 comments
Closed

Consumer poll() does not always return #18

mikesparr opened this issue Jul 12, 2016 · 7 comments

Comments

@mikesparr
Copy link

I'm running into an odd scenario with Consumer class implementation where sometimes the poll() return is null or empty. I would expect this to be one of the object with message, or object with error (sometimes just EOF for partition), but never empty or null?

Here is my code and out of 25,000 messages in topic, just under 2,000 warnings in log that poll() returned no msg object. What conditions would this occur or did I implement incorrectly below?

                while running:
                    try:
                        msg = c.poll()
                        if msg:
                            if not msg.error():
                                processor(msg)
                            elif msg.error().code() == kafka.KafkaError._PARTITION_EOF:
                                # End of partition event
                                logger.debug( "{} [{}] reached end at offset {}".format(
                                                msg.topic(), msg.partition(), msg.offset()) )
                                logger.debug( "Consumed [{}] records so far (reached end of offset)".format(self.consumed) )
                            else:
                                logger.debug( "Unknown error [{}] during polling.".format(msg.error()) )
                                raise kafka.KafkaException(msg.error())

                            self.consumed += 1
                            if (self.consumed % 100) == 0:
                                logger.debug( "Consumed [{}] records since startup.".format(self.consumed) )
                        else:
                            logger.warn( "No message object returned on poll() [{}]".format(msg) )

                    except Exception, e:
                        logger.warn( "Error polling messages [{}]".format(str(e)) )
                        logger.debug( traceback.format_exc() )
                        # we do not stop loop because some polling errors anticipated
                        # https://github.com/confluentinc/confluent-kafka-python/issues/12
                # end while running loop

In logs:

2016-07-12 15:09:25,331 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,332 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,342 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,343 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,343 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,343 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,343 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,343 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,343 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,343 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,343 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,344 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,344 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,344 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,344 - DEBUG - stream_consumer.pyc - Consumed [24800] records since startup.
2016-07-12 15:09:25,344 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,345 - WARNING - stream_consumer.pyc - No message object returned on poll()
@edenhill
Copy link
Contributor

This seems to be a problem with the underlying librdkafka library.
For now I suggest simply ignoring this case and trying poll() again.

@mikesparr
Copy link
Author

Is there a possibility that with this issue, the consumer is skipping messages?

I'm trying to match up message count from one topic to the next when all should occur is a simple transform and re-publish to next topic in chain. The counts don't add up. I scour the logs and compare counts of occurrence of a log statement to the topic offsets and they don't match, and I see a lot of those "no message object" occurrences in log.

Odd (negative) offsets even though I deleted and restarted pull from new topics. The total of logsize is over 1600 messages for this topic, however.

ubuntu@db1:~$ kafka-consumer-offset-checker --topic rets.wvmls.Property --group etl-diffs --zookeeper queue2:2181

Group           Topic                          Pid Offset          logSize         Lag             Owner
etl-diffs       rets.wvmls.Property            0   265             179             -86             none
etl-diffs       rets.wvmls.Property            1   261             173             -88             none
etl-diffs       rets.wvmls.Property            2   227             173             -54             none
etl-diffs       rets.wvmls.Property            3   266             200             -66             none
etl-diffs       rets.wvmls.Property            4   229             170             -59             none
etl-diffs       rets.wvmls.Property            5   289             193             -96             none
etl-diffs       rets.wvmls.Property            6   233             174             -59             none
etl-diffs       rets.wvmls.Property            7   255             197             -58             none
etl-diffs       rets.wvmls.Property            8   313             195             -118            none

When I check the log file and count occurrances of string in message handler, it is about 600 records short (evident in my subsequent topic) and I see over 900 occurrences of the No message object error noted above.

ubuntu@app2:~$ sudo cat /var/log/supervisor/etlservice-diffs-wvmls.log* | grep "Topic: rets.wvmls.Property" | wc -l
1091
ubuntu@app2:~$ sudo cat /var/log/supervisor/etlservice-diffs-wvmls.log* | grep "Consumer object missing" | wc -l
0
ubuntu@app2:~$ sudo cat /var/log/supervisor/etlservice-diffs-wvmls.log* | grep "Missing topic key" | wc -l
0
ubuntu@app2:~$ sudo cat /var/log/supervisor/etlservice-diffs-wvmls.log* | grep "Error" | wc -l
0
ubuntu@app2:~$ sudo cat /var/log/supervisor/etlservice-diffs-wvmls.log* | grep "No message object" | wc -l
925

Could this issue be causing my consumer to miss messages and also make the consumer-offset-checker counts go negative?

@mikesparr
Copy link
Author

mikesparr commented Jul 18, 2016

On a whim, I wiped the topics and change them to single partition before creating them again, and then re-ran my program. Now counts match exactly as expected! It appears there is also an issue if you have many partitions and a single consumer in the routing of messages?

ubuntu@app2:~$ sudo cat /var/log/supervisor/etlservice-diffs-wvmls.log* | grep "Topic: rets.wvmls.Property" | wc -l
1695
ubuntu@app2:~$ sudo cat /var/log/supervisor/etlservice-diffs-wvmls.log* | grep "No message object" | wc -l
58

No message object issue still occurs but now my consumer consumed every message and didn't lose any.

And now after changing to single partition, I have 0 lag and no more (-)negative lag counts and offset and logsize match. There must also be a bug in partition routing/distribution.

ubuntu@db1:~$ kafka-consumer-offset-checker --topic rets.wvmls.Property --group etl-diffs --zookeeper queue2:2181

Group           Topic                          Pid Offset          logSize         Lag             Owner
etl-diffs       rets.wvmls.Property            0   1708            1708            0               none

@edenhill
Copy link
Contributor

edenhill commented Aug 4, 2016

Are you using log compaction on this topic?

Can you check the offset consistency of the consumed messages to see if messages are dropped or delivered out of order?

@mikesparr
Copy link
Author

Yes, I have compaction enabled. I created topic with the following while testing so I can monitor compaction of same-key topics:
kafka-topics --create --config cleanup.policy=compact --config segment.ms=300000 --config min.cleanable.dirty.ratio=0.01 --replication-factor 1 --partitions 1 --zookeeper {} --topic {}

@mikesparr
Copy link
Author

@edenhill I haven't checked offset consistency but could output those to log to figure out if non-sequential. I'm wrapping up something else at the moment. I did have to switch to single partition to fix the data "loss" issue reported with that librdkafka bug you confirmed when consumer returns no msg object sometimes. I've played around now with the consumer config settings and queues. I have no errors in logs and with single partition I have 1:1 record count as expected - adding partitions I get loss (with single consumer across many partitions).

@edenhill
Copy link
Contributor

Are you still seeing this?
Have not been able to reproduce or seen any other similar issues

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants