Skip to content

Messages loss when full local queue ? #341

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
4 of 7 tasks
erms77 opened this issue Mar 24, 2018 · 5 comments
Closed
4 of 7 tasks

Messages loss when full local queue ? #341

erms77 opened this issue Mar 24, 2018 · 5 comments
Labels
usage Incorrect usage

Comments

@erms77
Copy link

erms77 commented Mar 24, 2018

Description

When we try to write 1000000 messages in a Kafka cluster, we do not find the expected account as a result. We have 999989 messages.
Do you have an idea ?

How to reproduce

import confluent_kafka
import time
import sys

producer_timings = {}
consumer_timings = {}


def error_cb(kafka_error):
        print("ERR", kafka_error.name(), kafka_error.str(), file=sys.stderr)


def delivery_cb(err, msg):
    if err:
        print('message failed:', err, file=sys.stderr)


def calculate_thoughput(timing, n_messages=1000000, msg_size=100):
    print("Processed {0} messsages in {1:.2f} seconds".format(n_messages, timing))
    print("{0:.2f} MB/s".format((msg_size * n_messages) / timing / (1024*1024)))
    print("{0:.2f} Msgs/s".format(n_messages / timing))


def confluent_kafka_producer_performance():
    msg_count = 1000000
    msg_size = 100
    msg_payload = ('kafkatest' * 20).encode()[:msg_size]
    topic = "confluent-kafka-topic"

    prod_config = {
        "error_cb": error_cb,
        "on_delivery": delivery_cb,
        "bootstrap.servers": "192.168.254.33:9092,192.168.254.84:9092,192.168.254.69:9092",
        "group.id": "python_inector",
        "retry.backoff.ms": 3000,
        "retries": 5,
        "default.topic.config": {"request.required.acks": "all"},
        "max.in.flight.requests.per.connection": 1,
        "queue.buffering.max.messages": 100000,
        "batch.num.messages": 100,
        "message.max.bytes": 2000000
    }

    producer = confluent_kafka.Producer(**prod_config)

    producer_start = time.time()

    for i in range(msg_count):
        try:
            producer.produce(topic, value=msg_payload)
            producer.poll(0)
        except BufferError as e:
            print(e, file=sys.stderr)
            time.sleep(1)
            producer.poll(0)

    producer.flush()

    return time.time() - producer_start


producer_timings["confluent_kafka_producer"] = confluent_kafka_producer_performance()
calculate_thoughput(producer_timings["confluent_kafka_producer"])

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version
    confluent_kafka.version() : ('0.11.0', 720896)
    confluent_kafka.libversion() : ('0.11.4-RC2', 722175)

  • Apache Kafka broker version:
    0.11.0

  • Client configuration: { 'error_cb': error_cb, 'on_delivery': delivery_cb, 'bootstrap.servers': '192.168.254.33:9092,192.168.254.84:9092,192.168.254.69:9092', 'group.id': 'python_inector', 'retry.backoff.ms': 3000, 'retries': 5, 'default.topic.config': {'request.required.acks': 'all'}, 'max.in.flight.requests.per.connection': 1, 'queue.buffering.max.messages': 100000, 'batch.num.messages': 100, 'message.max.bytes': 2000000 }

  • Operating system : Debian GNU/Linux 9.4 (stretch)

  • Provide client logs (with 'debug': '..' as necessary)

  • Provide broker log excerpts

  • Critical issue

@edenhill
Copy link
Contributor

The problem is with your producer loop:

for i in range(msg_count):
    try:
        producer.produce(topic, value=msg_payload)
        producer.poll(0)
    except BufferError as e:
        print(e, file=sys.stderr)
        time.sleep(1)
        producer.poll(0)

When you get a BufferError you back off and wait, but you don't retry the failed message.
Also, I suggest you remove time.sleep() and call producer.poll(1) instead, that'll make it quicker.

so something like this;

    for i in range(msg_count):
       while True:
            try:
                producer.produce(topic, value=msg_payload)
                producer.poll(0)
                break
            except BufferError as e:
                print(e, file=sys.stderr)
                producer.poll(1)

@erms77
Copy link
Author

erms77 commented Mar 28, 2018

It works better :)
Thank you

@datta90
Copy link

datta90 commented Apr 30, 2018

i solved it by increasing the file descriptors of the user kafka in ubuntu
i am runnig kafka from the user kafka I created
this properties are given kafka documentation requirements for production env
i entered this in /etc/security/limits.conf
kafka soft nofile 128000
kafka hard nofile 128000
kafka soft nproc 65536
kafka hard nproc 65536
kafka soft memlock unlimited
kakfa hard memlock unlimited

@saurabh1920
Copy link

saurabh1920 commented Oct 13, 2021

Hi @edenhill,
In producer config, default value for message.timeout.ms is 300000. After this time, producer queue should automatically get cleared, I believe so. Please correct me if I am wrong.

@PratikshaPanpaliya
Copy link

@edenhill what is your producer configuration

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
usage Incorrect usage
Projects
None yet
Development

No branches or pull requests

5 participants