Skip to content

Broker connection timeout #532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
4 of 7 tasks
DrPyser opened this issue Feb 6, 2019 · 11 comments
Closed
4 of 7 tasks

Broker connection timeout #532

DrPyser opened this issue Feb 6, 2019 · 11 comments

Comments

@DrPyser
Copy link

DrPyser commented Feb 6, 2019

Description

I'm wondering what's the appropriate way to set a timeout on the broker/cluster connection? That is, when I make a call to e.g. producer.flush()? I know the method itself takes a timeout parameter, but that api doesn't differentiate between different causes of timeout(e.g. the broker has only processed part of the message queue vs. the client can't connect to the broker).

I've tried using configuration options like request.timeout.ms, but that doesn't seem to work, i.e. producer.poll() will still block indefinitely if the broker is unavailable.

Thanks!

How to reproduce

  1. create a producer configured with an unavailable/unreachable broker address. e.g.
    producer = confluent_kafka.Producer({"bootstrap.servers": "unreachable.example.com", "request.timeout.ms": 1000})
  2. do producer.poll(), watch it block.
  3. do producer.poll(5), wait for 5 seconds. It returns with no exception and no information about the context of the timeout.

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): ('1.0.0', 722432), ('1.0.0-pre2', 16777218)
  • Apache Kafka broker version: (docker image) confluentinc/cp-kafka:4.1.0
  • Client configuration: {"bootstrap.servers": "test.abc", "request.timeout.ms": 1000}
  • Operating system: Linux server-04 4.15.0-45-generic #48-Ubuntu SMP Tue Jan 29 16:28:13 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@rnpridgeon
Copy link
Contributor

rnpridgeon commented Feb 6, 2019

Hello @DrPyser , your checklist mentions librdkafka 1.0.0-pre2 which I do not believe actually exists yet. Perhaps you mean RC6? If so this was actually just fixed!

confluentinc/librdkafka#2218 (comment)

@edenhill
Copy link
Contributor

edenhill commented Feb 6, 2019

I believe 1.0.0-pre2 was the version of librdkafka up to confluentinc/librdkafka@4a7ed8a (dec 18), so the reported issue is not for the issue Ryan links to.

The messages will remain in the producer queues waiting to be transmitted/retried until the message.timeout.ms expires, which defaults to 5 minutes.

@DrPyser
Copy link
Author

DrPyser commented Feb 6, 2019

@edenhill So what's the expected behavior if I set message.timeout.ms to, say, 1000? Will flush raise an exception after 1 second?
What I want is to avoid blocking my thread(s) indefinitely when I flush() and the broker is unreachable, but I also want to know if the broker is unreachable(giving a timeout argument to flush doesn't help in that regard).
Also, does this particular setting affect e.g. list_topics?

@edenhill
Copy link
Contributor

edenhill commented Feb 7, 2019

Message timeouts are scanned every second, so with a message.timeout.ms you must allow for 1+1s for a message to timeout (worst case, best case is of course 1s).

flush() does not raise exceptions, but simply returns the number of outstanding messages in queues/transit.
If you call flush() with a timeout shorter than the next message timeout it will thus return whatever number of messages are still queued.

An application should typically not care if the broker is unreachable or not, but instead set message.timeout.ms according to the data lifetime constraint; e.g., what is the maximum time allowed to produce this set of data before it is deemed outdated.
This abstracts all the nitty gritty details of the Kafka cluster away from the application and lets the application focus on what is important; the data.

list_topics() will use socket.timeout.ms, or the provided timeout, for its request timeouts.

@DrPyser
Copy link
Author

DrPyser commented Feb 7, 2019

I think my application should definitely care if no brokers are reachable, since that means no message can be sent, and my data integrity relies on my message being sent... If there's a typo in the hostname in the config, for example, I'd rather know it, and not have to guess from my messages not being sent.

In my particular case, I'm writing to kafka inside a database transaction(I know, dual writes are bad, but that's what I have to do for now). I don't want the kafka write to take forever, and I don't want to commit the database transaction without writing to kafka. So I need to know if I need to fail the transaction because kafka isn't available, and ideally know it fast. In my current setup network latency isn't an issue as everything(kafka, database and app) runs on the same machine.

What is the effect of socket.timeout.ms? Will a call to list_topics fail with an exception if it times out?

@edenhill
Copy link
Contributor

edenhill commented Feb 7, 2019

You can register an error_cb callback and look for the ERR__ALL_BROKERS_DOWN error code, which indicates the client could not connect to any broker.

But, what if this is caused by a temporary network problem between the client and brokers? And how does the application know which broker connection is important? (it needs to know the leader for each partition it wants to produce to, and keep up that info up to date).

You will eventually have reimplemented all the things the client already does for you.

At the end of the day, it boils down to your data policy, and instead of monitoring the complex state of a kafka cluster in realtime, you provide data-level constraints, namely, the maximum permitted delivery time, and that abstracts all possibly underlying errors, whatever they may be.

What is the effect of socket.timeout.ms? Will a call to list_topics fail with an exception if it times out?

list_topics() will use the provided timeout= argument, or default to socket.timeout.ms, and yes, it will raise an exception if it could get a response in time.

@DrPyser
Copy link
Author

DrPyser commented Feb 7, 2019

Not sure I understand or agree with your point. It's one thing to hide/abstract away the fact that one broker was down and another one was up so the second one was used, in a multi-nodes cluster setup(which isn't my case for now, I have a single node). I don't care which broker is used, of course, as long as my message is sent, received and eventually replicated. But that's different from hiding the fact that a timeout occurred because of a failure to establish a connection, or because the connection has high latency or the broker is under heavy load, etc. I don't want to monitor the complex state of the cluster, just know if it's up or not(as a whole). The client should tell the application what it knows when failures occur, so that the application and programmer can understand what's happening and react accordingly.

If the problem is temporary, that's fine, retries are there for that, right? But I don't want my producer to retry and wait indefinitely on a connection that will never occur(because the broker is down, or because my internet connection is down, or because I made a mistake in my config). Eventually, I want to fail fast, and let the user know to retry later or something.

Anyway, thanks, you answered my questions, I'll use the socket.timeout.ms and error_cb.

@edenhill
Copy link
Contributor

edenhill commented Feb 7, 2019

I don't want to monitor the complex state of the cluster, just know if it's up or not(as a whole).

But the entire cluster does not need to be up and functional for a client to perform its operations, depending on what it needs to do. A degraded cluster may work flawlessly for one set of clients, and not at all for another set of clients, depending on what topic&partitions they are consuming from or producing to, if they are using a high or low level consumer, if they're performing admin API requests, etc.

Apart from the ERR__ALL_BROKERS_DOWN error, you can also enable the stats_cb and set statistics.interval.ms to get metrics and state of the client's world view, including broker and partition states. All the information you need to decide if the cluster is functional or not is in there, but I still don't recommend you go down that route. As I said before, you will just duplicate the logic that is already in the client.

But I don't want my producer to retry and wait indefinitely on a connection that will never occur(because the broker is down, or because my internet connection is down, or because I made a mistake in my config).

But there is no way for the client, or the application, through Kafka alone, to know if a broker being down or a partition not having a leader is a temporary or permanent error. It could get resolves within 2s or 2 months (most often the former). So what conclusion can you really make from just observing the system?

Eventually, I want to fail fast, and let the user know to retry later or something.

message.timeout.ms abstracts all this error checking logic down to a single configurable property.

The stats are described here: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md

@DrPyser
Copy link
Author

DrPyser commented Feb 8, 2019

I don't need to make a permanent conclusion, just know what I need to know to react in the moment. Maybe you're right, I don't need to check explicitly for the cause of the timeouts in the application code. I can simply raise a TimeoutException to abort the transaction, not caring of the cause. But then, I need to be able to investigate the cause of the timeout(to know if there's a solvable problem, or if it's just transient network issues), so I guess I should at least log errors using the error_cb.

Guess I'll try and use message.timeout.ms.

@edenhill
Copy link
Contributor

edenhill commented Feb 8, 2019

Sounds good.

@jyotendra
Copy link

@edenhill referring this I will be using timeout parameter inside my list_topics() call to see if the cluster is up and running. Hope this is the right way of doing it, assuming my network is working fine.
One observation though, the timeout seems to be taking seconds instead of milliseconds as opposed to other kafka timeout settings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants