Skip to content

BufferError [Local] Queue full for producer even after changing librdkafka config #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mikesparr opened this issue Jul 7, 2016 · 12 comments

Comments

@mikesparr
Copy link

mikesparr commented Jul 7, 2016

Ubuntu 14.04.4 LTS / Python 2.7.x / Kakfa 0.10 (Confluent Platform 3) / Python client (latest)

  • app servers: 4 core, 32GB RAM, SATA (running python scripts)
  • db servers: 8 core, 64GB RAM, SSD (5-node Kafka/Cassandra cluster + 3-node ES cluster)
  • 10Gb/s private NIC and bare metal stack

I'm seeing a large number of BufferError [Local] Queue full errors in logs for Producer client. I searched for the error yesterday and saw an issue from 2014 for librdkafka that was resolved by changing a few configuration parameters. I posted in this issue and changed my config and initial errors went away but as the program ran overnight, a flood of errors filled the logs. Out of 500,000 messages consumed from the topics, I'm missing over 100,000 in the subsequent topic.

I have a python stream processor that instantiates both Consumer and Producer classes and consumes from 6 topics, performing diff operation/upsert against matching record if exists in Cassandra cluster, and then publishing diff'ed object to another topic (...ListingEditEvent). When it tries to publish to the subsequent topic, messages are getting lost. Transformer program picks up from the ListingEditEvent topic and converts to our schema and publishes to ListingEditTransformed topic for Logstash consumption to Elasticsearch. I'm seeing differences in the records in ES compared to Kafka topics and trying to resolve. I appreciate any tips on how to solve or better configuration values.

I edited the config for Producer client to the following:

            conf = {
                'bootstrap.servers': ','.join(map(str, self.config.get('hosts'))),
                'queue.buffering.max.messages': 500000, # is this too small?
                'queue.buffering.max.ms': 60000, # is this too long?
                'batch.num.messages': 100, # is this too small?
                'log.connection.close': False,
                'client.id': socket.gethostname(),
                'default.topic.config': {'acks': 'all'}
            }

I'm thinking of reducing the max time and increasing max messages, perhaps reduce to 5000ms, and 250 batch size, and 1 million max?

Errors not constant so must just exceed buffer as it's processing and recover and then exceed again:

2016-07-07 09:58:42,952 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160002361]
2016-07-07 10:02:55,094 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009744]
2016-07-07 10:02:55,106 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160009744]
2016-07-07 10:02:55,189 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160010014]
2016-07-07 10:02:55,199 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160010014]
2016-07-07 10:02:57,466 - DEBUG - diff_processor.py - Error with lat [None], lon [None] for listing [160009744]
2016-07-07 10:02:57,475 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009744]
2016-07-07 10:08:03,292 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:121]
2016-07-07 10:08:03,311 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:9]
2016-07-07 10:08:04,807 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:1549]
2016-07-07 10:08:04,822 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:8199]
2016-07-07 10:08:08,017 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009089]
2016-07-07 10:08:09,728 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:140009614]
2016-07-07 10:13:17,459 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009935]
2016-07-07 10:13:17,468 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160009935]
2016-07-07 10:13:17,541 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009962]
2016-07-07 10:13:17,550 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160009962]
2016-07-07 10:13:17,565 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160010015]
2016-07-07 10:18:25,977 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160004679]
2016-07-07 10:18:25,985 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160004679]
2016-07-07 10:18:26,012 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160007175]
2016-07-07 10:18:26,021 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160007175]
2016-07-07 10:18:26,044 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160008663]
2016-07-07 10:18:26,053 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160008663]

My producer class doesn't call flush() like your example client since the calling module connects and keeps publishing. I also don't call poll(0) like example but unsure if that matters???

@mikesparr
Copy link
Author

mikesparr commented Jul 7, 2016

Oddly, it looks like every 4-6 minutes we have BufferError occur for about 1-2 seconds, and then again 4-6 minutes later. (see timestamps)

2016-07-07 11:00:48,501 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160008896]
2016-07-07 11:04:54,092 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160009983]
2016-07-07 11:04:54,117 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009708]
2016-07-07 11:04:54,125 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160009708]
2016-07-07 11:04:54,144 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009978]
2016-07-07 11:04:54,151 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160009978]
2016-07-07 11:04:54,177 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009973]
2016-07-07 11:04:54,185 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160009973]
2016-07-07 11:04:54,404 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160010004]
2016-07-07 11:10:01,297 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:286566]
2016-07-07 11:10:01,353 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:286565]
2016-07-07 11:10:01,368 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:286567]
2016-07-07 11:10:03,934 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160006956]
2016-07-07 11:10:04,005 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009978]
2016-07-07 11:10:04,042 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009991]
2016-07-07 11:15:12,232 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160003462]
2016-07-07 11:15:12,240 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160003462]
2016-07-07 11:15:12,438 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009723]
2016-07-07 11:15:13,356 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:150007762]
2016-07-07 11:20:20,780 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160008018]
2016-07-07 11:20:20,789 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160008018]
2016-07-07 11:20:20,849 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009723]
2016-07-07 11:21:20,857 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:150015701]
2016-07-07 11:25:28,763 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:286568]
2016-07-07 11:25:28,777 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:286569]
2016-07-07 11:25:30,359 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160006460]
2016-07-07 11:25:30,368 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160006460]
2016-07-07 11:25:30,473 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160007526]
2016-07-07 11:25:30,481 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160007526]
2016-07-07 11:25:30,581 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009723]
2016-07-07 11:25:30,615 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160007840]
2016-07-07 11:25:30,958 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009978]

@edenhill
Copy link
Contributor

edenhill commented Jul 7, 2016

You need to call poll() at regular intervals to serve the producer's delivery report callbacks.
Produced messages remain accounted for, even after succesful ack by the broker, until the application's delivery report callback has been called for each message.

poll(0) is a cheap call if nothing needs to be done so it is typically put in the producer loop.

You should typically only call flush() on termination to make sure all outstanding messages are sent.

@mikesparr
Copy link
Author

If I call poll(0) after every produce() call will it have a major performance hit? Currently consumption is at rate of 55 msg/sec per consumer, per node, given the program also performs Cassandra query, DictDiff, Cassandra upsert, then Produce to next topic in stream. I doubt Kafka is the bottleneck and should have more msg/sec produced than that.

Since the diff module instantiates my Producer class, it should never terminate but I may add a flush method to that class and call flush() on exit to be safe.

FYI - I edited the producer config to the following and let it run 3 hours and process 400K+ msgs and no BufferError. I do think that config is important and with defaults I was running into it so perhaps mention something in the Python client docs on those params.

            conf = {
                'bootstrap.servers': ','.join(map(str, self.config.get('hosts'))),
                'queue.buffering.max.messages': 1000000,
                'queue.buffering.max.ms': 5000,
                'batch.num.messages': 100,
                'log.connection.close': False,
                'client.id': socket.gethostname(),
                'default.topic.config': {'acks': 'all'}
            }

@mikesparr
Copy link
Author

Oddly, I added logging every 100 records in loop for each to print the current count. I don't understand how it's even possible but my Produced count passed my Consumed count along the way:

  • consume topic
    • diff with Cassandra record, if exists
    • upsert Cassandra
      • produce diff doc to next topic

I only increment each upon each successful call and eventually Produced count surpasses Consumed and stays ahead the rest of the program through over 400K records.

2016-07-07 16:06:07,109 - INFO - stream_producer.pyc - Produced [361600] records since startup.
2016-07-07 16:06:07,168 - INFO - stream_consumer.pyc - Consumed [246000] records since startup.
2016-07-07 16:06:08,137 - INFO - stream_producer.pyc - Produced [361700] records since startup.
2016-07-07 16:06:09,151 - INFO - stream_producer.pyc - Produced [361800] records since startup.
2016-07-07 16:06:09,213 - INFO - stream_consumer.pyc - Consumed [246100] records since startup.
2016-07-07 16:06:10,175 - INFO - stream_producer.pyc - Produced [361900] records since startup.

It should not be possible that produced is called more than consumed, and they should be and are equal for first 25-30K messages and then produced starts jumping ahead. BufferError not appearing in logs but something really strange?

@edenhill
Copy link
Contributor

edenhill commented Jul 7, 2016

poll() is cheap to call, it will not have a performance impact, so please add it to your producer loop.

Regarding the 3 hour run, if it only did 400k messages then that is still below the internal queue size of 1M messages, but eventually that limit will be exceeded without calls to poll().

@mikesparr
Copy link
Author

I will add poll(0) after the produce() call. With config above I'm no longer seeing BufferError (subject of this issue) but in my testing now I'm seeing more produced calls than consumer calls which should be impossible. Is there a built-in retry that might trigger that counter multiple times and why it exceeds consumer count?

@edenhill
Copy link
Contributor

edenhill commented Jul 7, 2016

How are you counting consumed and produced messages?

@mikesparr
Copy link
Author

I set class member var produced = 0 upon init. Then whenever publish called from calling module, the function increments class var and every 100 prints in log:

    def publish(self, data, topic='test', key=None, partition=None, callback=None):
        """POSTs data to Kafka REST proxy"""

        logger = self.__get_logger()

        try:
            if self.producer is None:
                self._get_connection()
            # end if producer

            logger.debug( "Sending message to topic [{}] with key [{}]".format(topic, key) )

            self.producer.produce(
                topic, 
                json.dumps(data, separators=(',', ':')).encode('utf-8'), 
                key=key, 
                on_delivery=callback
            )
            self.producer.poll(0) # recommended by @edenhill

            self.produced += 1
            if (self.produced % 100) == 0:
                logger.info( "Produced [{}] records since startup.".format(self.produced) )

        except BufferError, be:
            logger.error( "BufferError publishing topic [{}]:[{}]-[{}]".format(topic, partition, key) )

        except Exception, e:
            logger.error( "Error publishing data at topic [{}]:[{}]-[{}]".format(topic, partition, key) )
            logger.debug( traceback.format_exc() )

I did same in Consumer class. Not using multithreading and single consumer group and single program for testing.

@mikesparr
Copy link
Author

In the diff_processor module that instantiates both classes (consumer, producer), for every msg from consume it does the Cassandra query/upsert then this:

                # build edits doc
                edits_obj = _get_edits(changes, obj)
                if edits_obj is not None:
                    logger.debug( "Publishing edits object to stream" )

                    # create topic string
                    topic_name = "rets.{}.ListingEditEvent".format(
                        feed_code
                    )

                    # create topic key
                    topic_key = "{}:{}".format(
                        feed_code,
                        source_id
                    )

                    try:
                        # publish to Kafka topic
                        resp = s.publish(edits_obj, topic=topic_name, key=topic_key)

                    except Exception, e:
                        logger.error( "Error publishing object to topic [{}]".format(topic_name) )
                        logger.debug( traceback.format_exc() )
                else:
                    logger.debug( "Edits object missing. Nothing to do" )

In theory the produced count should be an exact match to consumed count as I only call it once per consumed msg.

@mikesparr
Copy link
Author

Ignore the publish count... Doh! If program detects photo modified date in listing record changed, it also publishes a record to another topic for a photo puller program to process. I will try poll(0) next build but BufferError did stop after updated config although makes sense unless I hit 1 million messages, I wouldn't run into it.

I'll close this issue and if poll(0) didn't solve (I'll reduce max in configs to verify), I'll reopen. Thanks!

@gemfield
Copy link

gemfield commented May 20, 2018

It must be noticed that the limitation of queue.buffering.max.kbytes may also introduce "BufferError: Local: Queue full" error. The max value of queue.buffering.max.kbytes is 2097151, that is about 2GB (400MB as default). There also has a document for Chinese readers: https://zhuanlan.zhihu.com/p/37016944

@stripthesoul
Copy link

I can confirm that a poll after every message causes a drastic hit in performance. I had to revert it back in Prod on code that creates millions of records

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants