Skip to content

Re-subscribe to a topic after disconnection/reconnection #59

Closed
@patrickviet

Description

@patrickviet

Here is my test scenario. It's limited to the bare basics to isolate my problem

I have single kafka broker. Single topic, with a single partition.

Producer creates random messages to topic patricktest
Consumer reads the messages from topic patricktest

Producer

from confluent_kafka import Producer
from time import gmtime

p = Producer({'bootstrap.servers':'localhost'})
p.produce('patricktest','Kwak kwak I got data on %s' % gmtime())
p.flush()

Consumer

from confluent_kafka import Consumer, KafkaError
import sys
import time

c = Consumer({
  'bootstrap.servers':'localhost',
  'group.id':'patricklaptop',
  'default.topic.config': {'auto.offset.reset':'smallest'}
})

c.subscribe(['patricktest'])


while True:

  msg = c.poll(timeout=1.0)

  if msg is None:
    print "msg is None"
    #time.sleep(1)
    continue


  if msg.error():
    if msg.error().code() == KafkaError._PARTITION_EOF:
      # Not an error really
      sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset()))
    else:
      print "raising exception"
      raise KafkaException(msg.error())
  else:
    # Good stuff here.
    print('Received message: %s' % msg.value().decode('utf-8'))

  print "end of while cycle"

c.close()

So as you can see, it's pretty basic. In my test scenario, I run the consumer, and it will read the messages, then print "msg is None" quite a bit, until I produce more messages by running the producer code.

Now here is the problem: if I stop kafka while the consumer is running, then restart kafka, the consumer will reconnect BUT it won't re-subscribe to the topic.
I have not found ANY WAY to know that I was disconnected. All I know is that poll returns None. That's it.

For now the workaround that I have found is to add this at the beginning of the loop.

last_resubscribe = 0
while True:
  # resub every 10sec just in case I got disconnected
  if time.time() - last_resubscribe > 10:
    print "resubscribing - just in case"
    c.subscribe(['patricktest'])
    last_resubscribe = time.time()

[...]

But sadly I also get this kind of thing every 10 seconds on kafka server:

kafka/server.out

[2016-11-04 22:26:27,851] INFO [GroupCoordinator 1]: Preparing to restabilize group patricklaptop with old generation 7 (kafka.coordinator.GroupCoordinator)
[2016-11-04 22:26:27,852] INFO [GroupCoordinator 1]: Stabilized group patricklaptop generation 8 (kafka.coordinator.GroupCoordinator)
[2016-11-04 22:26:28,114] INFO [GroupCoordinator 1]: Assignment received from leader for group patricklaptop for generation 8 (kafka.coordinator.GroupCoordinator)

Is there any way to catch the reconnect so that I can re-subscribe to a topic?
Or maybe have a way to query the consumer object to know my current status?
Or maybe have the client automatically resubscribe to the topics itself?

While my workaround is functional, it's pretty awful AND produces extra load/logs on the broker.
I'm suspecting that it would even trigger a full rebalance and other things if I had several consumers / partitions rather than this minimal one consumer / partition / topic minimal scenario...

Thanks!
--Patrick

Metadata

Metadata

Assignees

Labels

component:librdkafkaFor issues tied to the librdkfka elements

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions