Skip to content

BufferError: Local: Queue Full #781

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
luanmorenomaciel opened this issue Feb 18, 2020 · 2 comments
Open

BufferError: Local: Queue Full #781

luanmorenomaciel opened this issue Feb 18, 2020 · 2 comments

Comments

@luanmorenomaciel
Copy link

luanmorenomaciel commented Feb 18, 2020

Description

Error trying to send data using the Python library to Confluent 5.4. All the services are up and running, and even trying producer config options I'm getting a constant buffer pool exception.

I've been through the other links and recommendations but still getting the errors, is there any way this could be failing due the advertised listener?

How to reproduce

def avro_producer(self):

    # avro schema [key] & [value]
    key_schema_str = config.key_schema_str
    value_schema_str = config.value_schema_str

    # load avro definition
    key_schema = avro.loads(key_schema_str)
    value_schema = avro.loads(value_schema_str)

    # get data to insert
    get_data = read_files.CSV().csv_reader()

    # init producer using key & value schema
    producer = AvroProducer(
        {
            # kafka broker server
            "bootstrap.servers": config.bootstrap_servers,
            # schema registry url
            "schema.registry.url": config.schema_registry_url,
            # max number of messages batched in one message set
            "batch.num.messages": 100,
            # delay in ms to wait for messages in queue
            "queue.buffering.max.ms": 100,
            # max number of messages on queue
            "queue.buffering.max.messages": 1000,
            # wait messages in queue before send to brokers (batch)
            "linger.ms": 200
        },
        default_key_schema=key_schema,
        default_value_schema=value_schema)

    # loop to insert data
    inserts = 0
    while inserts < len(get_data):

        # instantiate new records, execute callbacks
        record = Kafka()

        try:

            # map columns and access using dict values
            record.genre = get_data[inserts]['genre']
            record.artist_name = get_data[inserts]['artist_name']
            record.track_name = get_data[inserts]['track_name']
            record.track_id = get_data[inserts]['track_id']
            record.popularity = get_data[inserts]['popularity']
            record.duration_ms = get_data[inserts]['duration_ms']

            # server on_delivery callbacks from previous asynchronous produce()
            producer.poll(0)

            # message passed to the delivery callback will already be serialized.
            # to aid in debugging we provide the original object to the delivery callback.
            producer.produce(
                topic=config.topic,
                key={'user_id': randint(0, 100000)},
                value=record.to_dict(),
                callback=lambda err, msg, obj=record: self.on_delivery(err, msg, obj)
            )
        except BufferError:
            print("buffer full")
            raise

        except ValueError:
            print("invalid input")
            raise

        except KeyboardInterrupt:
            raise

        # increment values
        inserts += 1

    print("flushing records...")

    # buffer messages to send
    producer.flush()

Checklist

Please provide the following information:

  • confluent-kafka[avro]==1.3.0

  • confluent-kafka==1.3.0

              "bootstrap.servers": config.bootstrap_servers,
              "schema.registry.url": config.schema_registry_url,
              "batch.num.messages": 100,
              "queue.buffering.max.ms": 100,
              "queue.buffering.max.messages": 1000,
              "linger.ms": 200
    
@TomGoBravo
Copy link

I'm not an expert but found this issue when trying to solve similar BufferError: Local: Queue full problems in my code that calls confluent-kafka-python. You are already calling poll per #16 ... I'm guessing "queue.buffering.max.messages" is constraining the buffer. The default is much larger (100k), try increasing it. Also you set queue.buffering.max.ms and linger.ms to different values but they are aliases.

@TomGoBravo
Copy link

I'm handling BufferError exceptions like this, which seems to work well: #341 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants