-
Notifications
You must be signed in to change notification settings - Fork 883
Closed
Labels
Description
Description
The only first partition is consumed
How to reproduce
Confluent.Kafka nuget version: 1.9.0
Apache Kafka setup:
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
networks:
- broker-kafka
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
networks:
- broker-kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "test-topic:5:1"
networks:
broker-kafka:
driver: bridge
Operating system: windows
Consumer code:
public class Consumer
{
private readonly ConsumerConfig _consumerConfig;
public Consumer(string bootstrapServer)
{
_consumerConfig = new ConsumerConfig
{
BootstrapServers = bootstrapServer,
EnableAutoCommit = false,
EnableAutoOffsetStore = false,
MaxPollIntervalMs = 300000,
GroupId = "default",
//Debug = "consumer,topic",
//MetadataMaxAgeMs = 60000,
// Read messages from start if no commit exists.
AutoOffsetReset = AutoOffsetReset.Earliest
};
}
public void StartReceivingMessages(string topicName)
{
using var consumer = new ConsumerBuilder<string, string>(_consumerConfig)
.SetKeyDeserializer(Deserializers.Utf8)
.SetValueDeserializer(Deserializers.Utf8)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}"))
.SetPartitionsAssignedHandler((c, partitions) =>
{
// Since a cooperative assignor (CooperativeSticky) has been configured, the
// partition assignment is incremental (adds partitions to any existing assignment).
Console.WriteLine(
"Partitions assigned: [" +
string.Join(',', partitions.Select(p => p.Partition.Value)) +
"], all: [" +
string.Join(',', c.Assignment.Concat(partitions).Select(p => p.Partition.Value)) +
"]");
// Possibly manually specify start offsets by returning a list of topic/partition/offsets
// to assign to, e.g.:
// return partitions.Select(tp => new TopicPartitionOffset(tp, externalOffsets[tp]));
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
// Since a cooperative assignor (CooperativeSticky) has been configured, the revoked
// assignment is incremental (may remove only some partitions of the current assignment).
var remaining = c.Assignment.Where(atp => partitions.Where(rtp => rtp.TopicPartition == atp).Count() == 0);
Console.WriteLine(
"Partitions incrementally revoked: [" +
string.Join(',', partitions.Select(p => p.Partition.Value)) +
"], remaining: [" +
string.Join(',', remaining.Select(p => p.Partition.Value)) +
"]");
})
.SetPartitionsLostHandler((c, partitions) =>
{
// The lost partitions handler is called when the consumer detects that it has lost ownership
// of its assignment (fallen out of the group).
Console.WriteLine($"Partitions were lost: [{string.Join(", ", partitions)}]");
})
.Build();
try
{
consumer.Subscribe(topicName);
Console.WriteLine("\nConsumer loop started...\n\n");
while (true)
{
var result =
consumer.Consume(
TimeSpan.FromMilliseconds(_consumerConfig.MaxPollIntervalMs - 1000 ?? 250000));
var message = result?.Message?.Value;
if (message == null)
{
continue;
}
Console.WriteLine(
$"Received: {result.Message.Key}:{message} from partition: {result.Partition.Value}");
consumer.Commit(result);
consumer.StoreOffset(result);
Thread.Sleep(TimeSpan.FromSeconds(5));
}
}
catch (KafkaException e)
{
Console.WriteLine($"Consume error: {e.Message}");
Console.WriteLine("Exiting producer...");
}
finally
{
consumer.Close();
}
}
}
Run the application.
Check assignment results with:
docker exec -it kafka kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group default
Output:
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
default rdkafka-2972a694-2cf5-4fff-aa81-c6090f7b49e2 /172.18.0.1 rdkafka 5 test-topic(0,1,2,3,4)
Check the app stdout:
Consumer loop started...
Partitions assigned: [0,1,2,3,4], all: [0,1,2,3,4]
Received: 6:{"resourceId":"6","text":"Resource 6 with version 17","version":17} from partition: 0
Received: 6:{"resourceId":"6","text":"Resource 6 with version 18","version":18} from partition: 0
Received: 6:{"resourceId":"6","text":"Resource 6 with version 19","version":19} from partition: 0
Received: 6:{"resourceId":"6","text":"Resource 6 with version 20","version":20} from partition: 0
Received: 6:{"resourceId":"6","text":"Resource 6 with version 21","version":21} from partition: 0
Expected results: consumer assigned to all 5 partitions and data from all partitions is consumed.
Actual results: only partition 0 is consumed. When partition 0 is finished consumer continues to consume from partition 1
What am I doing wrong? And how to consume from all 5 partitions?