Skip to content

Consumer Keeps Resets to -1001 and difference between topic offset and consumer offset? #291

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
6 of 7 tasks
rohitgcs opened this issue Dec 21, 2017 · 32 comments
Closed
6 of 7 tasks

Comments

@rohitgcs
Copy link

Description

So I'm currently troubleshooting my consumer and trying different things with partitions. I have 3 partitions and a consumer for each partition.
I initially had an issue where the commit offset would be -1001 but I figured that was only because of the timeout. So I put in code to reset it to 0 if <0, but now everytime I rerun the consumer it always returns -1001 as my offset.
Is there a way to find the latest commit of a particular topic partition?
And also, what is the difference between topic and consumer offset?

Thanks in advance

How to reproduce

consumer = Consumer({'bootstrap.servers': KAFKA_BROKER,
... 'group.id': 'maxwellConsumer','debug':'broker,fetch',
... 'enable.auto.commit': True,
... 'default.topic.config': {'auto.offset.reset': 'latest'}})
%7|1513895424.653|BRKMAIN|rdkafka#consumer-2| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1513895424.653|STATE|rdkafka#consumer-2| [thrd::0/internal]: :0/internal: Broker changed state INIT -> UP
%7|1513895424.654|BROKER|rdkafka#consumer-2| [thrd:app]: 172.31.230.234:9092/bootstrap: Added new broker with NodeId -1
%7|1513895424.654|BRKMAIN|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Enter main broker thread
%7|1513895424.654|CONNECT|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: broker in state INIT connecting
%7|1513895424.654|CONNECT|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Connecting to ipv4#172.31.230.234:9092 (plaintext) with socket 7
%7|1513895424.654|STATE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Broker changed state INIT -> CONNECT
%7|1513895424.654|CONNECT|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Connected to ipv4#172.31.230.234:9092
%7|1513895424.654|CONNECTED|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Connected (#1)
%7|1513895424.654|FEATURE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1513895424.654|STATE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1513895424.655|FEATURE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2
%7|1513895424.655|STATE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: Broker changed state APIVERSION_QUERY -> UP
%7|1513895424.655|BROKER|rdkafka#consumer-2| [thrd:main]: 192.23.213.130:9092/2: Added new broker with NodeId 2
%7|1513895424.655|BRKMAIN|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Enter main broker thread
%7|1513895424.655|CONNECT|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: broker in state INIT connecting
%7|1513895424.655|BROKER|rdkafka#consumer-2| [thrd:main]: 172.31.230.155:9092/1: Added new broker with NodeId 1
%7|1513895424.655|CONNECT|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Connecting to ipv4#192.23.213.130:9092 (plaintext) with socket 10
%7|1513895424.655|CLUSTERID|rdkafka#consumer-2| [thrd:main]: 172.31.230.234:9092/bootstrap: ClusterId update "" -> "LlhmfovJSe-sOmvvhbrI7w"
%7|1513895424.655|UPDATE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/bootstrap: NodeId changed from -1 to 0
%7|1513895424.655|UPDATE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Name changed from 172.31.230.234:9092/bootstrap to 172.31.230.234:9092/0
%7|1513895424.655|LEADER|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Mapped 0 partition(s) to broker
%7|1513895424.655|STATE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Broker changed state UP -> UPDATE
%7|1513895424.655|STATE|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Broker changed state INIT -> CONNECT
%7|1513895424.655|BRKMAIN|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Enter main broker thread
%7|1513895424.655|CONNECT|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: broker in state INIT connecting
%7|1513895424.655|CONNECT|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Connecting to ipv4#172.31.230.155:9092 (plaintext) with socket 13
%7|1513895424.656|STATE|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Broker changed state INIT -> CONNECT
%7|1513895424.656|STATE|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Broker changed state UPDATE -> UP
%7|1513895424.656|CONNECT|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Connected to ipv4#192.23.213.130:9092
%7|1513895424.656|CONNECTED|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Connected (#1)
%7|1513895424.656|FEATURE|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1513895424.656|STATE|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1513895424.656|CONNECT|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Connected to ipv4#172.31.230.155:9092
%7|1513895424.656|CONNECTED|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Connected (#1)
%7|1513895424.656|FEATURE|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1513895424.656|STATE|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1513895424.656|FEATURE|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2
%7|1513895424.656|STATE|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Broker changed state APIVERSION_QUERY -> UP
%7|1513895424.656|FEATURE|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2
%7|1513895424.656|STATE|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Broker changed state APIVERSION_QUERY -> UP

topic = TopicPartition('elastic',50)
print "Topic offset",topic.offset
Topic offset -1001
consumer.assign([topic])
%7|1513895439.655|TOPBRK|rdkafka#consumer-2| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Topic elastic [2]: joining broker (rktp 0x7f9ffc004e70)
%7|1513895439.655|TOPBRK|rdkafka#consumer-2| [thrd:172.31.230.155:9092/1]: 172.31.230.155:9092/1: Topic elastic [0]: joining broker (rktp 0x7f9ffc0045f0)
%7|1513895439.655|TOPBRK|rdkafka#consumer-2| [thrd:192.23.213.130:9092/2]: 192.23.213.130:9092/2: Topic elastic [1]: joining broker (rktp 0x7f9ffc0049d0)

confluent_kafka.version() - ('0.11.0', 720896)
confluent_kafka.libversion() - ('0.11.0', 721151)
Broker version 0.9.0.1
OS: ubuntu

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • Apache Kafka broker version:
  • Client configuration: {...}
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

Offset -1001 is a special value meaning Invalid or Default, depending on context.
If you pass a TopicPartition list to assign() with offset set to -1001 it will attempt to fetch committed offsets from the broker, and if that fails revert to auto.offset.reset.

If you want to manually specify the starting offset you can set the offset of the TopicPartition to an offset to your liking.

You can use position() to get the current consume position for the given partitions, and committed() to get the comitted offsets. Use assignment() to get the currently assigned partition set.

@rohitgcs
Copy link
Author

@edenhill everytime I stop the consumer the offest resets to -1001
So currently I have it reset to 0, but that's not helping me. I basically need it to reset to the last commit it had (but that prints out -1001 most of the time.

topic = TopicPartition(FROM_TOPIC,i)
print "Topic offset",topic.offset
consumer.assign([topic])
print "Getting Position"
position = consumer.position([topic])
if position[0].offset<0:
position[0].offset=0 #Reset Offset

@treziac
Copy link

treziac commented Dec 29, 2017

Can you provide your config and some code?
In order to resume from previous offset, you have to use the same consumer group id when restarting a consumer, and commit offset regularly (client does it every 5s by default, but you should adapt it to your use case)

When you say you have a consumer for each partition, are you manually assigning the partition, or do you have three consumer in a same consumer groups which share the offset? In your case, you should use the same group id for each consumer, as you seem to want them to read all data from topics. Your call if you use subscribe (auto repartition) or assign (you decide which partition each consumer read).

If you assign the consumer to the partition, you should let the invalid offset for the assignment- this will use the stored offset for the consumer group id.

With some code it will be easier to help you :)

@rohitgcs
Copy link
Author

rohitgcs commented Jan 2, 2018

@treziac @edenhill this is how I set up my consumers. I do use the same groupid and I assign them manually.

def launchConsumer(i):
    consumer = Consumer({'bootstrap.servers': KAFKA_BROKER, \
    'group.id': 'testConsumer', \
    'enable.auto.commit': True,\
    'default.topic.config': {'auto.offset.reset': 'latest'}})
    topic = TopicPartition(FROM_TOPIC,i)
    print "Topic offset",topic.offset
    consumer.assign([topic])
    print "Getting Position"
    position = consumer.position([topic])
    if position[0].offset<0:
        position[0].offset=0 #Reset Offset
    print "Partition", i, ", Position: ", position
    hasData = True
    try:
        while hasData:
            print "Running..."
            message = consumer.poll()
            if message:
                logger.debug("Consumer: Data Retrieved")
                print "================================================"
                print dumps(loads(message.value()), indent=4)
				msg = loads(message.value())
	            if msg['type']=='insert':
	            	# Do Something
                print "================================================"
    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by user\n')
        logger.debug('%% Aborted by user\n')
        c.close()
    c.close()

if __name__ == "__main__":
    threads = list()
    topic = FROM_TOPIC
	threadCount = int(sys.argv[1])
    for i in range(threadCount):
        t = threading.Thread(target=launchConsumer, args=(i,)) # pass in the callable
        threads.append(t)
        print('Starting Thread {}'.format(i))
        t.start()
    for i, t in enumerate(threads):
        t.join()
        print('Thread {} Stopped'.format(i))

@edenhill
Copy link
Contributor

edenhill commented Jan 2, 2018

   consumer.assign([topic])
    print "Getting Position"
    position = consumer.position([topic])

Since the client is asynchronous internally and it takes some time to set up connections, query leaders, and start fetching, it is very likely that calling position() right after assign() will not return an actual offset.

What I think you are after is actually the committed offset, for which you would use committed() (and you can do so right after assign() since it will block while waiting for the broker to respond).
position() is instantaneous and returns the current fetch position, calling it before the fetching has started will give you -1001.

Unrelated, check message for None: if message is not None:, since a zero length message (e.g., a delete) will be false otherwhise.

@rohitgcs
Copy link
Author

rohitgcs commented Jan 2, 2018

@edenhill so to your point I need to wait before fetching the offset? so a time.sleep() should help?
Something like

def launchConsumer(i):
    consumer = Consumer({'bootstrap.servers': KAFKA_BROKER, \
    'group.id': 'testConsumer', \
    'enable.auto.commit': True,\
    'default.topic.config': {'auto.offset.reset': 'latest'}})
    topic = TopicPartition(FROM_TOPIC,i)
    print "Topic offset",topic.offset
    consumer.assign([topic])
    print "Getting Position"
    time.sleep(5)
    position = consumer.position([topic])
    if position[0].offset<0:
        position[0].offset=0 #Reset Offset
    print "Partition", i, ", Position: ", position

Is that right?

@edenhill
Copy link
Contributor

edenhill commented Jan 2, 2018

What is it that you are trying to achieve by getting the fetch position?

@rohitgcs
Copy link
Author

rohitgcs commented Jan 2, 2018

@edenhill I'm trying what you mentioned, if for some reason if the consumer crashes and dies - I would like to start this script again. This way, when the consumers launches and I start assigning partitions to each consumer, I want to get the last offset position and continue. I check for position<0 because of the -1001 so if there's a better way you could reconstruct my code?

@edenhill
Copy link
Contributor

edenhill commented Jan 2, 2018

The client already has built-in support for this by two means:

  • resuming from committed offsets
  • if no committed offsets are available or they are out of date: revert to the auto.offset.reset policy.

Which means you only need to decide if you want to start reading the oldest (auto.offset.reset=beginning) or latest (auto.offset.reset=end) if no committed offsets are available, and you do this by configuration alone.

@edenhill edenhill closed this as completed Jan 2, 2018
@edenhill edenhill reopened this Jan 2, 2018
@rohitgcs
Copy link
Author

rohitgcs commented Jan 2, 2018

@edenhill okay, will take a look.
How is this different from looking for get_watermark_offsets() for a partition? Where would I need to use watermark offset as supposed to a general offset.

@edenhill
Copy link
Contributor

edenhill commented Jan 2, 2018

It is not different, it is just handled automatically in the client itself, so there is typically no reason to implement the same logic in the application.

@rohitgcs
Copy link
Author

rohitgcs commented Jan 2, 2018

cool, let me try your changes and get back.

@rohitgcs
Copy link
Author

rohitgcs commented Jan 3, 2018

@edenhill I removed setting offset to 0 and I noticed form the fetch logs that the consumer gets stuck on the latest offset and doesn't cover the lag.

%7|1514946451.579|FETCH|rdkafka#consumer-28| [thrd:172.32.231.127:9092/2]: 172.32.231.127:9092/2: Fetch topic qa_gcdb_shipment_queue [26] at offset 2413 (v2)

@edenhill
Copy link
Contributor

edenhill commented Jan 3, 2018

What do you mean "gets stuck on"?

If there are no previously committed offsets it will revert to auto.offset.reset which you've set to latest, i.e., the end of the partition (the next new message to arrive).

@rohitgcs
Copy link
Author

rohitgcs commented Jan 3, 2018

@edenhill so basically this is the state of my topic partitions
screen shot 2018-01-03 at 9 03 43 am

So for example when I launch my consumer, I can see the consumer for partition 2 (when Debug=Fetch) at 17108 rather than 16911 and complete the remaining 197 entries.
I tried changing the offset to 'latest' but the same thing occur. What other options do I have?

@edenhill
Copy link
Contributor

edenhill commented Jan 3, 2018

Can you show me your latest code?

@rohitgcs
Copy link
Author

rohitgcs commented Jan 3, 2018

consumer = Consumer({'bootstrap.servers': KAFKA_BROKER, \
    'group.id': 'maxwellConsumer', \
    'enable.auto.commit': True,'debug':'broker,fetch',\
    'default.topic.config': {'auto.offset.reset': 'latest'}})
    topic = TopicPartition(FROM_TOPIC,i)
    print "Topic offset",topic.offset
    consumer.assign([topic])
    print "Getting Position"
    position = consumer.position([topic])
    #if position[0].offset<0:
    #    position[0].offset=0 #Reset Offset
    print "Partition", i, ", Position: ", position
    hasData = True
    try:
        while hasData:
            print "Running..."
            message = consumer.poll()
            if message:
                logger.debug("Consumer: Data Retrieved")
                print "================================================"
                print dumps(loads(message.value()), indent=4)

@edenhill
Copy link
Contributor

edenhill commented Jan 3, 2018

If you down where the message is consumed add a printout for message.offset(), can you provide the full output from your application?

Nit: you should use if message is not None:, since a empty-value message will otherwise be ignored (false)

@edenhill
Copy link
Contributor

edenhill commented Jan 3, 2018

Oh sorry, set debug to cgrp,topic,fetch, thanks

@edenhill
Copy link
Contributor

edenhill commented Jan 3, 2018

Thank you!

So the consumer starts up, gets your assignment and then asks the broker for committed offsets for topic qa_gcdb_shipment_queue partition 2 in group maxwellConsumer, but the broker responds with offset -1 which means there are no committed offsets for that combination.

%7|1515001264.714|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: 172.31.230.155:9092/1: OffsetFetchResponse: qa_gcdb_shipment_queue [2] offset -1

There are generally two reasons for this:

  • there were no prior commits for the given topic,partition,group.id
  • there were prior commits but they happened longer ago than the offset topic's retention time, thus they have been removed.

Either way the client needs to know what to do when there are no committed offsets to use, so it looks at auto.offset.reset and looks up that offset. In your case that is the end of the partition and thus no messages are seen until new messages arrive.

Do note that this consumer will only commit actually messages it has seen, in this case it has not seen any actual message though, only the end of the partition, so nothing is committed.
As soon as a new message arrives it will commit the offset for that message though.

@rohitgcs
Copy link
Author

rohitgcs commented Jan 3, 2018

@edenhill I can't seem to find the offset.rentention parameter in the consumer.properties nor the server.properties. Where would it be? Or do I have to add it independently?

@edenhill
Copy link
Contributor

edenhill commented Jan 3, 2018

That is a broker-level configuration, look for offsets.retention.minutes (et.al) here:
http://kafka.apache.org/documentation.html#brokerconfigs

@rohitgcs
Copy link
Author

rohitgcs commented Jan 3, 2018

@edenhill the link has no mention of offset.retention
I looked at server.properties, consumer.properties and zookeeper.properties on the broker. Am I looking in the wrong place?

@edenhill
Copy link
Contributor

edenhill commented Jan 3, 2018

You misspelled it, see my last comment.

@rohitgcs
Copy link
Author

rohitgcs commented Jan 4, 2018

@edenhill got it. Sounds like that fixes the problem.
But sporadically my consumers get a Receive failed: Disconnected message.
And the consumer just stops altogether where there is no computation even on the existing message it received.
For eg:

Producer produces: 1,2,3,4,5,6
Consumer received: 1,2,3
And while it's computing 4, the Disconnected message comes and whatever formula 4 goes through after being consumed - hangs (Log output stops midway)

Why would that happen?

@edenhill
Copy link
Contributor

edenhill commented Jan 4, 2018

When you restart the client, does it progress from message 4 and sees message 5 and 6 as well?
If not:

  • the messages were probably not properly produced.

If it does see the remaining messages:

  • can you reproduce with debug set to broker,cgrp,fetch,topic and provide the logs, please point out where it stops consuming.

@rohitgcs
Copy link
Author

rohitgcs commented Jan 4, 2018

I'm not able to reproduce it right away but this is what the logs had shown

%3|1515100056.487|ERROR|rdkafka#consumer-69| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Receive failed: Disconnected
%3|1515100056.487|FAIL|rdkafka#consumer-71| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Receive failed: Disconnected
%3|1515100056.487|ERROR|rdkafka#consumer-71| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Receive failed: Disconnected
%3|1515100056.507|FAIL|rdkafka#consumer-72| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Receive failed: Disconnected
%3|1515100056.507|ERROR|rdkafka#consumer-72| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Receive failed: Disconnected
%7|1515100355.512|METADATA|test-producer#producer-1| [thrd:main]: Topic stg_xml_queue partition 0 Leader 0
%7|1515100355.512|METADATA|test-producer#producer-1| [thrd:main]: 172.31.230.155:9092/1: 1/1 requested topic(s) seen in metadata
%7|1515100355.526|BROKERFAIL|test-producer#producer-1| [thrd:172.31.230.127:9092/2]: 172.31.230.127:9092/2: failed: err: Local: Broker transport failure: (errno: Resource temporarily unavailable)
%3|1515100355.526|FAIL|test-producer#producer-1| [thrd:172.31.230.127:9092/2]: 172.31.230.127:9092/2: Receive failed: Disconnected
%3|1515100355.526|ERROR|test-producer#producer-1| [thrd:172.31.230.127:9092/2]: 172.31.230.127:9092/2: Receive failed: Disconnected
%7|1515100355.526|STATE|test-producer#producer-1| [thrd:172.31.230.127:9092/2]: 172.31.230.127:9092/2: Broker changed state UP -> DOWN
%7|1515100355.527|METADATA|test-producer#producer-1| [thrd:main]: Topic stg_xml_queue partition 0 Leader 0
%7|1515100355.527|METADATA|test-producer#producer-1| [thrd:main]: 172.31.230.155:9092/1: 1/1 requested topic(s) seen in metadata
%3|1515100355.594|FAIL|rdkafka#consumer-16| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Receive failed: Disconnected
%3|1515100355.594|ERROR|rdkafka#consumer-16| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Receive failed: Disconnected
%3|1515100355.595|FAIL|rdkafka#consumer-19| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Receive failed: Disconnected
%3|1515100355.596|ERROR|rdkafka#consumer-19| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Receive failed: Disconnected
%3|1515100355.601|FAIL|rdkafka#consumer-12| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Receive failed: Disconnected
%3|1515100355.601|ERROR|rdkafka#consumer-12| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Receive failed: Disconnected
%7|1515100355.626|CONNECT|test-producer#producer-1| [thrd:172.31.230.127:9092/2]: 172.31.230.127:9092/2: broker in state DOWN connecting
%7|1515100355.627|CONNECT|test-producer#producer-1| [thrd:172.31.230.127:9092/2]: 172.31.230.127:9092/2: Connecting to ipv4#172.31.230.127:9092 (plaintext) with socket 58
%7|1515100355.627|STATE|test-producer#producer-1| [thrd:172.31.230.127:9092/2]: 172.31.230.127:9092/2: Broker changed state DOWN -> CONNECT
%7|1515100355.627|CONNECT|test-producer#producer-1| [thrd:172.31.230.127:9092/2]: 172.31.230.127:9092/2: Connected to ipv4#172.31.230.127:9092
%7|1515100355.627|CONNECTED|test-producer#producer-1| [thrd:172.31.230.127:9092/2]: 172.31.230.127:9092/2: Connected (#7)
%7|1515100355.627|STATE|test-producer#producer-1| [thrd:172.31.230.127:9092/2]: 172.31.230.127:9092/2: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1515100355.628|STATE|test-producer#producer-1| [thrd:172.31.230.127:9092/2]: 172.31.230.127:9092/2: Broker changed state APIVERSION_QUERY -> UP
%3|1515100355.634|FAIL|rdkafka#consumer-32| [thrd:172.31.230.127:9092/2]: 172.31.230.127:9092/2: Receive failed: Disconnected
%3|1515100355.634|ERROR|rdkafka#consumer-32| [thrd:172.31.230.127:9092/2]: 172.31.230.127:9092/2: Receive failed: Disconnected
%3|1515100355.648|FAIL|rdkafka#consumer-4| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Receive failed: Disconnected
%3|1515100355.648|ERROR|rdkafka#consumer-4| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Receive failed: Disconnected
%3|1515100355.718|FAIL|rdkafka#consumer-33| [thrd:172.31.230.127:9092/2]: 172.31.230.127:9092/2: Receive failed: Disconnected
%3|1515100355.718|ERROR|rdkafka#consumer-33| [thrd:172.31.230.127:9092/2]: 172.31.230.127:9092/2: Receive failed: Disconnected
%3|1515100355.749|FAIL|rdkafka#consumer-45| [thrd:172.31.230.234:9092/bootstrap]: 172.31.230.234:9092/0: Receive failed: Disconnected

@edenhill
Copy link
Contributor

edenhill commented Jan 4, 2018

Is this from when it fails to consume?
There is no fetch or topic debugging in this, it is as if it is not actually set up to consume anything.

Also, the timestamp indicates that it is the broker's idle connection reaper that is closing idle connections. SEe https://github.com/edenhill/librdkafka/wiki/FAQ#why-am-i-seeing-receive-failed-disconnected

@rohitgcs
Copy link
Author

Changed my code quite a bit since then and not able to reproduce. Will reopen this with the logs if and when it occurs again.

@smsubrahmannian
Copy link

smsubrahmannian commented May 22, 2019

@edenhill I have the same issue when I tried to implement manual commits after disabling enable.auto.commit=False. I have also tried to use store_offset method you mentioned in another ticket. But, I end up getting negative lag.

def _fetch_process_msgs(self):
        """Get messages from consumer and process massages
        Returns:
            None
        """
        while True:
            logger.info("Processing new messages. Consumer starts consuming...")
            # read messages from Kafka topic
            with ignored(Exception, 'self.consumer.consume: ERROR = {0}'):
                msglist = self.consumer.consume(num_messages=100, timeout=1)
            topicpartitions = []
            # Collecting latest offset in each partition in a dictionary with partition id as key and
            # offset value as the value
            msg_partition_offset_dict = {}
            for message in msglist:
                topicpartitions.append(TopicPartition(message.topic(), message.partition()))
                if message.partition() not in msg_partition_offset_dict:
                    msg_partition_offset_dict[message.partition()] = message.offset()
                else:
                    if message.offset() > msg_partition_offset_dict[message.partition()]:
                        msg_partition_offset_dict[message.partition()] = message.offset()

            # process one by one
            for msg in msglist:
                try:
                    self._process_single_msg(msg)
                except Exception as e:
                    logger.error("Message processing: ERROR = {0}, MSG_VALUE = {1}".format(e, msg.value()))

            logger.info("Committed offset in each partition before commit: {0}".format(self.consumer.committed(topicpartitions)))

            offset_to_commit = []
            topic = self.get_consumer_topic()[0]
            for k, v in msg_partition_offset_dict.items():
                offset_to_commit.append(TopicPartition(topic, k, v + 1))

My consumer configuration is as given below

      self.consumer_configs = {"bootstrap.servers": ",".join(config["kafka_instances"]),
                                "group.id": self.consumer_group_id,
                                "log.connection.close": False,
                                "enable.auto.commit": False,
                                'default.topic.config': {'auto.offset.reset': 'latest'}
                               }

Everytime, I start the consumer, the lag goes to negative -1001 and then rise and fall back again. Let me know if I am doing anything wrong.

@edenhill
Copy link
Contributor

I don't see anywhere you are calculating the lag

Everytime, I start the consumer, the lag goes to negative -1001 ..

You are not checking the message for errors (message objects are reused for error signalling).
You should not track offsets for message that has a msg.error().

nit: You are adding the same topic,partition combo multiples times to the topicpartitions list, better use a dict.

@smsubrahmannian
Copy link

I am sorry that I didn't add the code for tracking lag. I will remove the tracking of offset if the message results in an error. Although, while reading the documentation more thoroughly, I realized that If I consume a batch and process those messages before I consume the next batch. I don't need manual commit. Because of the offsets of the first batch is being committed when I consume the next batch. So even if I consumer breaks down. I still won't lose any messages. Because my consumption and processing happen synchronously. I did some tests, it happens to validate my hypothesis. But the scale of those tests isn't that reliable. Correct me if I am wrong. Thanks for the help :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants