Skip to content

Is producer.flush() a must? #137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
smj19 opened this issue Feb 23, 2017 · 26 comments
Closed

Is producer.flush() a must? #137

smj19 opened this issue Feb 23, 2017 · 26 comments

Comments

@smj19
Copy link

smj19 commented Feb 23, 2017

My multi-threaded producer doesn't seem to be sending any messages if flush is NOT included in the end. This is my script:

conf = {'bootstrap.servers': 'localhost:9092', 
             'queue.buffering.max.messages': 1000000, 
             'queue.buffering.max.ms' : 500, 
             'batch.num.messages': 50, 
             'default.topic.config': {'acks': 'all'}}

producer = confluent_kafka.Producer(**conf)

        try:
               
                fh = open(os.path.join("/home/samples/samples", queue.get()), "r")
                while True:
                    data = fh.read(10240)
                    if data == '':
                        fh.close()
                        break
                   try:
                        producer.produce(topic, value=data, callback=self.delivery_callback)
                        producer.poll(0)
                   except BufferError as e:
                        print "Buffer full"
                        producer.produce(topic, value=data, callback=self.delivery_callback)
                        producer.poll(0)
                  #print "Waiting for %d deliveries\n" % len(producer)
                  #producer.flush()
            except IOError as e:
                print "IO error"
            except ValueError:
                print "Conversion error"
            except:
                print "unexpected error"
                raise
            queue.task_done()

Adding flush() increases the run time drastically. Is it a must? Is there any other way I can make sure all the messages have reached the topics?

@smj19
Copy link
Author

smj19 commented Feb 24, 2017

Can anyone please help me with this?

@edenhill
Copy link
Contributor

edenhill commented Feb 24, 2017

produce() is asynchronous, all it does is enqueue the message on an internal queue which is later (>= queue.buffering.max.ms) served by internal threads and sent to the broker (if a leader is available, else wait some more).

What this means in practice is that if you do:

produce(1)
produce(2)
exit()

Then most likely neither message 1 or message 2 will have actually been sent to the broker, much less acknowledged by it.

Adding flush() before exiting will make the client wait for any outstanding messages to be delivered to the broker (and this will be around queue.buffering.max.ms, plus latency).
If you add flush() after each produce() call you are effectively implementing a sync producer (which you shouldn't, see here: https://github.com/edenhill/librdkafka/wiki/FAQ#why-is-there-no-sync-produce-interface).

Let me reconstruct your loop to be more performant, look for FIXME comments:

conf = {'bootstrap.servers': 'localhost:9092', 
             'queue.buffering.max.messages': 1000000, 
             'queue.buffering.max.ms' : 500,  ### <==== FIXME: this is fine for large files where the total send time will be above 500ms, but for small files it will add some delay - there is no harm in decreasing this value to something like 10 or 100 ms. On the other hand that may be offset by a lower batch.num.messages, but that is typically not the way to go. 
             'batch.num.messages': 50,   ### <=== FIXME: Why this low value? You typically don't need to alter the batch size.
             'default.topic.config': {'acks': 'all'}}

producer = confluent_kafka.Producer(**conf)

        try:
               
                fh = open(os.path.join("/home/samples/samples", queue.get()), "r")
                while True:
                    data = fh.read(10240)
                    if data == '':
                        fh.close()
                        break
                   try:
                        producer.produce(topic, value=data, callback=self.delivery_callback)
                        producer.poll(0)   # <=== not-FIXME: This is good, serve delivery report callbacks, but do note that it will not serve the delivery report of the message just produce()d since hasn't made it farther than the local queue at this point. It will serve previous message's delivery reports.
                   except BufferError as e:
                        print "Buffer full, waiting for free space on the queue"  # <==== not-FIXME: clarified this
                        producer.poll(10)  # <==== FIXME: putting the poll() first to block until there is queue space available. This blocks for 10 seconds, but you should block for as long as makes sense for your application (preferably longer) since message delivery can take some time if there are temporary errors on the broker (e.g., leader failover).                  
                        producer.produce(topic, value=data, callback=self.delivery_callback)  # <=== FIXME: and now try again when there is hopefully some free space on the queue. 
                  #print "Waiting for %d deliveries\n" % len(producer)
                  #producer.flush()   # <==== FIXME: Bad, this implements a sync producer, very slow. remove it.
            except IOError as e:
                print "IO error"
            except ValueError:
                print "Conversion error"
            except:
                print "unexpected error"
                raise
            producer.flush()  # <==== FIXME: Put the flush() here instead to wait for any remaining delivery reports.
            queue.task_done()

@smj19
Copy link
Author

smj19 commented Feb 27, 2017

Thanks a ton for your solution. I have a few questions:

  1. I incremented the number of messages delivered in my delivery_callback function (Let's call this number total_messages) and If we don't add producer.flush() at all, the total number of messages in that function(total_messages) were way lesser than the number of messages I produced. This only means that although all my messages were produced, it's probably still in the producer queue and they're not delivered to the Kafka yet, right? I'm asking this because if I add "producer.flush()" as you mentioned, the performance is ~3 minutes and if I remove that line all together, the performance is ~15 seconds. FYI I have 1749 files each of 614400 bytes and I'm reading 10240 bytes at once from these files and producing.

@edenhill
Copy link
Contributor

1749 * 6MB is about 10 GB. Producing that amount in 3 minutes gives a rate of about 60MB/s or almost 500 Mbit/s.
Are you dissatisfied with that throughput?

@cah-jeffgraham
Copy link

cah-jeffgraham commented May 11, 2017

I'm having s similar issue with flush() not completing. I'm consuming 464k messages to topic A and producing into B. All messages appear to be in topic B. I tried following your recommendations to snehamj with no luck.

Output

Copying topic from t-A into t-B
Topic tsi-A 100,000 messages consumed.
Topic tsi-A 200,000 messages consumed.
Topic tsi-A 300,000 messages consumed.
Topic tsi-A 400,000 messages consumed.
Topic tsi-A 464,865 messages consumed.
No more messages to process
Messages copied: 464,865

Code

def delivery_callback(err, msg):
    if err:
        sys.stderr.write('%% Message failed delivery: %s\n' % err)
    # else:
    #     sys.stderr.write('%% Message delivered to %s [%d]\n' % (msg.topic(), msg.partition()))


def cpTopic():
    conf = {'group.id': uuid.uuid1(),
            'bootstrap.servers': 'host:9093',
            'security.protocol': 'ssl',
            'ssl.ca.location': 'CARoot.pem',
            'ssl.certificate.location': 'certificate.pem',
            'ssl.key.location': 'key.pem',
            'session.timeout.ms': 6000,
            'compression.codec': 'snappy',
            'default.topic.config': {
                'auto.offset.reset': 'earliest'
            }
            }

    c = Consumer(**conf)
    p = Producer(**conf)
    # Subscribing to a topic will start a background thread
    c.subscribe([topic])

    last = int(time.time())
    timeout = 10
    msg_cnt = 0

    try:
        while True:
            # print 'timeout: ' + str(int(time.time()) - last)
            if (int(time.time()) - last) > timeout:
                print 'Topic %s %s messages consumed.' % (topic, format(msg_cnt, ",d"))
                print 'No more messages to process'
                break

            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                # Error or event
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    continue
                    # sys.stderr.write('Topic %s partition %d reached end at offset %d\n' %
                    #                  (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    # Error
                    raise KafkaException(msg.error())
            else:
                # Proper message
                msg_cnt += 1
                last = int(time.time())

                if not msg_cnt % 100000:
                    print 'Topic %s %s messages consumed.' % (msg.topic(), format(msg_cnt, ",d"))

                try:
                    p.produce(target_topic, msg.value(), callback=delivery_callback)

                except BufferError as e:
                    sys.stderr.write('%% Local producer queue is full ' \
                                     '(%d messages awaiting delivery): try again\n' %
                                     len(p))
                p.poll(0)


    except KeyboardInterrupt:
        print('Aborted by user')

    print 'Messages copied: %s' % format(msg_cnt, ",d")

    # Close down consumer to commit final offsets.
    c.close()
    # Flush messages produced
    p.flush()


if __name__ == "__main__":
    start = time.time()
    topic = sys.argv[1] 
    target_topic = sys.argv[2]  

    print 'Copying topic from %s into %s' % (topic, target_topic)
    cpTopic()
    end = time.time() - start
    print 'Time to complete: {0:.{1}f} seconds'.format(end, 2)

@isaacdd
Copy link

isaacdd commented Dec 27, 2017

@edenhill what is the time unit in poll/flush? The code for flush at least multiples the values by 1000, here you are passing it in milliseconds. The docs don't indicate a unit. Thanks!

@edenhill
Copy link
Contributor

edenhill commented Jan 2, 2018

@isaacdd The timeout unit is in seconds, but I mistakenly wrote it as milliseconds above.

dtheodor pushed a commit to dtheodor/confluent-kafka-python that referenced this issue Sep 4, 2018
@shaikshakeel
Copy link

shaikshakeel commented Sep 26, 2018

@edenhill
See the following snippets :

def send(topic, message):
	p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
		self.p.poll(0)

When I use above snippet, i am seeing delivery_callback for last message. it is perfect

def send(topic, message):
	p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
		self.p.poll(10)

When I use above snippet, i am seeing delivery_callback for every message(means as soon as i produce to Kafka). why so ? i don't understand clearly.

Confluent-kafka-version : 0.11.0

@edenhill
Copy link
Contributor

produce() is asynchronous, it simply enqueues your message on an internal queue for later transmission to the broker.

When you call poll(0) directly after produce() it is highly unlikely that the message you just produced has been sent to the broker, processed, and an ack response received from the broker in that short time. So you will most likely only see delivery reports from previous messages at that point.

@shaikshakeel
Copy link

@edenhill . Thanks for info.

poll(0) is fine. then what about poll(10). what this will do ?

@shaikshakeel
Copy link

shaikshakeel commented Sep 26, 2018

@edenhill Let me explain clearly what i am doing.

We have a Kafka consumer which will read messages and do so stuff and again publish to Kafka topic using below script

producer config :

{
  "bootstrap.servers": "localhost:9092"
}

I haven't configured any other configuration like queue.buffering.max.messages queue.buffering.max.ms batch.num.messages: 50

I am assuming these all will be going to be default values.
queue.buffering.max.messages : 100000
queue.buffering.max.ms : 0
batch.num.messages : 10000
from Configuration

my understanding : When internal queue reaches either of queue.buffering.max.ms or batch.num.messages messages will get published to Kafka in separate thread. in my configuration queue.buffering.max.ms is 0, so every message will be published as soon as when I call produce(). correct me if I am wrong.

My producer snippet:

def send(topic, message):
	p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
	p.flush()

from this post i understand that using flush after every message, producer is going to be sync producer .

If I change above snippet to

def send(topic, message):
	p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
	p.poll(0)

Is there any performance will be improved ? Can you clarify my understanding.

Thanks

@edenhill
Copy link
Contributor

edenhill commented Oct 5, 2018

. in my configuration queue.buffering.max.ms is 0, so every message will be published as soon as when I call produce(). correct me if I am wrong.

the produce() is still asynchronous and all it does is put the message on an internal queue.
That internal queue is served by an internal thread that tries to achieve a latency that is <= queue.buffering.max.ms. Depending on produce rate, load, outstanding requests, etc, it is possible that multiple messages will be batched together before being sent to the broker.

By calling flush() after each produce() you effectively turn it into a sync produce, which is slow, see here: https://github.com/edenhill/librdkafka/wiki/FAQ#why-is-there-no-sync-produce-interface

Calling poll(0) after produce() will serve delivery reports of already sent and acked messages, which is typically never the message you just produced.
This is the proper way to produce, keeping all things asynchronous which gives you the best performance since you can produce a stream of messages without ever stopping to wait for a single delivery.

@shaikshakeel
Copy link

Thanks @edenhill .

I have changed my code from :

def send(topic, message):
	p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
	p.flush()

to

def send(topic, message):
	p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
	p.poll(0)

In first scenario snippet used to take ~45ms because it is waiting for internal queue to get empty.
In second scenario snippet is taking ~0.2ms that is huge performance improvement.

Thanks

@ASalihov
Copy link

ASalihov commented Jan 18, 2019

produce() is asynchronous, all it does is enqueue the message on an internal queue which is later (>= queue.buffering.max.ms) served by internal threads and sent to the broker (if a leader is available, else wait some more).

What this means in practice is that if you do:

produce(1)
produce(2)
exit()

Then most likely neither message 1 or message 2 will have actually been sent to the broker, much less acknowledged by it.

Adding flush() before exiting will make the client wait for any outstanding messages to be delivered to the broker (and this will be around queue.buffering.max.ms, plus latency).
If you add flush() after each produce() call you are effectively implementing a sync producer (which you shouldn't, see here: https://github.com/edenhill/librdkafka/wiki/FAQ#why-is-there-no-sync-produce-interface).

Let me reconstruct your loop to be more performant, look for FIXME comments:

conf = {'bootstrap.servers': 'localhost:9092', 
             'queue.buffering.max.messages': 1000000, 
             'queue.buffering.max.ms' : 500,  ### <==== FIXME: this is fine for large files where the total send time will be above 500ms, but for small files it will add some delay - there is no harm in decreasing this value to something like 10 or 100 ms. On the other hand that may be offset by a lower batch.num.messages, but that is typically not the way to go. 
             'batch.num.messages': 50,   ### <=== FIXME: Why this low value? You typically don't need to alter the batch size.
             'default.topic.config': {'acks': 'all'}}

producer = confluent_kafka.Producer(**conf)

        try:
               
                fh = open(os.path.join("/home/samples/samples", queue.get()), "r")
                while True:
                    data = fh.read(10240)
                    if data == '':
                        fh.close()
                        break
                   try:
                        producer.produce(topic, value=data, callback=self.delivery_callback)
                        producer.poll(0)   # <=== not-FIXME: This is good, serve delivery report callbacks, but do note that it will not serve the delivery report of the message just produce()d since hasn't made it farther than the local queue at this point. It will serve previous message's delivery reports.
                   except BufferError as e:
                        print "Buffer full, waiting for free space on the queue"  # <==== not-FIXME: clarified this
                        producer.poll(10)  # <==== FIXME: putting the poll() first to block until there is queue space available. This blocks for 10 seconds, but you should block for as long as makes sense for your application (preferably longer) since message delivery can take some time if there are temporary errors on the broker (e.g., leader failover).                  
                        producer.produce(topic, value=data, callback=self.delivery_callback)  # <=== FIXME: and now try again when there is hopefully some free space on the queue. 
                  #print "Waiting for %d deliveries\n" % len(producer)
                  #producer.flush()   # <==== FIXME: Bad, this implements a sync producer, very slow. remove it.
            except IOError as e:
                print "IO error"
            except ValueError:
                print "Conversion error"
            except:
                print "unexpected error"
                raise
            producer.flush()  # <==== FIXME: Put the flush() here instead to wait for any remaining delivery reports.
            queue.task_done()

What if our app fail before flush calling? Will we lose our data?

@edenhill
Copy link
Contributor

Depends on if produce()d messages have made it to the broker yet, or are still in the producer queue or in socket buffers.

@semicolon1709
Copy link

semicolon1709 commented May 27, 2019

@edenhill
Calling poll(0) after produce() won't send the message I just produce() to broker. It just serves delivery reports.
Am I right?
Then what if I don't call poll(0) after produce()?
Is it right that no harm to produce performance, just lost delivery reports information?

@edenhill
Copy link
Contributor

Then what if I don't call poll(0) after produce()?

A message is considered in-queue until you've polled for its delivery report, which means that if you don't call poll the internal producer queue (queue.buffering.max.messages) will eventually fill up and reject further messages.

If you know that your producer is short-lived and will not produce more messages than queue.buffering.max.messages you can skip poll() and just go for the final flush(), but for long-running producers you should call poll() frequently.

@edenhill
Copy link
Contributor

Also note that poll() will trigger error callbacks (error_cb), stats, etc.

@semicolon1709
Copy link

If I call poll() right after every produce(), does that mean I send data to broker record by record, not in batch mode?
If it is right above. The configuration like queue.buffering.max.messages, queue.buffering.max.ms and batch.num.messages won't make effort in this suitiation, right?

@edenhill
Copy link
Contributor

No, if you call poll() with a timeout of 0 it will serve any delivery reports from previous produce-calls, or none, without blocking.

@ArmanAjdani
Copy link

I Used poll(0) instead flush() but no message produce to topics, Am I missing something?

@shaikshakeel
Copy link

Hi, @ArmanAjdani can you share your code snippet?

@ArmanAjdani
Copy link

this is my produce function:

def produce(self, topic: KafkaTopicType, data):
    self.producer.produce(topic.value, data.encode('utf-8'), callback=self.delivery_report)

    self.producer.poll(0)   # not works
    self.producer.flush()  # works

@shaikshakeel
Copy link

Producer won't produce the message to the broker immediately it will wait until the producer queue (queue.buffering.max.messages)gets full or size of the queue(queue.buffering.max.kbytes).

calling flush immediately after produce will publish all messages to the broker irrespective of these two config values.

keep poll and try to publish more messages. you will receive a callback for the previous message.

@pandasagar
Copy link

i have a producer code using callback . but i see my message not calling the callback or throwing the error even if no producer. always it show success. but if kafka is up then producing the message.
But I need to raise the error even if the 1st message delivery fails. I could able to get the second message but not the first. can someone please help to understand the behaviour. or how to handle the 1st message failure
below is my snippet

import json
from confluent_kafka import avro, KafkaError
from confluent_kafka.avro import AvroProducer
from confluent_kafka import Producer

logger = get_logger()

class SimpleProducer(config):
....
def produce_with_callback(self, key=None, value=None, poll_timeout=305000):
def delivery_callback(error, msg):
if error:
logger.error(f"Message failed delivery for {msg.value()} with {error}")
raise KafkaError
else:
logger.debug(f"Message delivered to {msg.topic()},{msg.partition()}, {msg.offset()}")

    produce(topic=self._topic_name, key=key, value=value,
                           callback=lambda error, message: delivery_callback(error,message))

    # Poll time set to higher than message.timeout.ms
    poll(poll_timeout)

configuration = {'linger.ms': 30, 'acks': 'All', 'request.timeout.ms': 10000, 'message.timeout.ms': 90000}

count = 1
jproducer = SimpleProducer( producer_id="producer_id",topic_name="test",broker_host="localhost", broker_port='9092', configurations=configuration)
print(f" producing {count}-- {datetime.datetime.now()}")
try:
jproducer.produce_with_callback(key='1',value=json.dumps(count), poll_timeout=130)

except BufferError as e:
print(f"Buffer error: {datetime.datetime.now()} \n {e}")
except Exception as e:
print(f"produce error: {datetime.datetime.now()} \n {e}")
else:
print(f"produced successfully {count}: {datetime.datetime.now()}")
count+=1

mikesneider added a commit to mikesneider/TaxiSpeedLayer that referenced this issue Dec 30, 2020
The configuration "conf" lets avoid an error of "buffereing.max.message".  credits to: confluentinc/confluent-kafka-python#137 (comment)
@pavan-r-kas
Copy link

pavan-r-kas commented Dec 12, 2022

1749 * 6MB is about 10 GB. Producing that amount in 3 minutes gives a rate of about 60MB/s or almost 500 Mbit/s. Are you dissatisfied with that throughput?

for df_dim_data in SNOWFLAKE_CONNECTOR.pull_into_dataframe(query):
        if len(df_dim_data) > 10000:
            """ If dimension data is big then split and publish"""
            df_dim_chunks = split_dataframe(df_dim_data)
            for chunk in range(len(df_dim_chunks)):
                obj_tuple = create_objects_from_df[dimension](df_dim_chunks[chunk])
                for key_obj, value_obj in obj_tuple:
                    logging.info("Producing Avro record: {}\t{}".format(key_obj.to_dict(), value_obj.to_dict()))
                    producer.produce(topic=topic, key=key_obj, value=value_obj, on_delivery=acked)
                    producer.poll(0)
                producer.flush()
                logging.info("{} messages were produced to topic {}!".format(delivered_records, topic))

On a similar code, which has around 4.5 million records (size of 70 MB) takes around 4 hours to completely produce.
Is it the issue with code or ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants