From b649bde45cf73cc575e48520d0e7f94c001371a1 Mon Sep 17 00:00:00 2001 From: LeonHartley Date: Tue, 24 Sep 2024 19:42:55 +0000 Subject: [PATCH 1/2] Update OffsetCommit CommittedLeaderEpoch to v6, fixes #1334 --- protocol/offsetcommit/offsetcommit.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/offsetcommit/offsetcommit.go b/protocol/offsetcommit/offsetcommit.go index 5844928a6..cbde5ae89 100644 --- a/protocol/offsetcommit/offsetcommit.go +++ b/protocol/offsetcommit/offsetcommit.go @@ -28,7 +28,7 @@ type RequestPartition struct { PartitionIndex int32 `kafka:"min=v0,max=v7"` CommittedOffset int64 `kafka:"min=v0,max=v7"` CommitTimestamp int64 `kafka:"min=v1,max=v1"` - CommittedLeaderEpoch int32 `kafka:"min=v5,max=v7"` + CommittedLeaderEpoch int32 `kafka:"min=v6,max=v7"` CommittedMetadata string `kafka:"min=v0,max=v7,nullable"` } From d4205471f74307286e58b13cdc96a2c735709950 Mon Sep 17 00:00:00 2001 From: LeonHartley Date: Thu, 15 May 2025 20:12:54 +0000 Subject: [PATCH 2/2] update OffsetCommitRequest test to use correct protocol version --- protocol/offsetcommit/offsetcommit_test.go | 46 ++++++++++++++++------ 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/protocol/offsetcommit/offsetcommit_test.go b/protocol/offsetcommit/offsetcommit_test.go index 3a9afd236..c716aa55a 100644 --- a/protocol/offsetcommit/offsetcommit_test.go +++ b/protocol/offsetcommit/offsetcommit_test.go @@ -93,12 +93,9 @@ func TestOffsetCommitRequest(t *testing.T) { }) } - // Version 5 added: - // RequestTopic.RequestPartition.CommitedLeaderEpoc // Version 5 removed: // RetentionTimeMs - // Fields are the same through version 6. - for _, version := range []int16{5, 6} { + for _, version := range []int16{5} { prototest.TestRequest(t, version, &offsetcommit.Request{ GroupID: "group-3", GenerationID: 1, @@ -106,17 +103,44 @@ func TestOffsetCommitRequest(t *testing.T) { Topics: []offsetcommit.RequestTopic{ { Name: "topic-3", + Partitions: []offsetcommit.RequestPartition{ + { + PartitionIndex: 0, + CommittedOffset: 1, + CommittedMetadata: "meta-3-0", + }, + { + PartitionIndex: 1, + CommittedOffset: 2, + CommittedMetadata: "meta-3-1", + }, + }, + }, + }, + }) + } + + // Version 6 added: + // RequestTopic.RequestPartition.CommittedLeaderEpoch + for _, version := range []int16{6} { + prototest.TestRequest(t, version, &offsetcommit.Request{ + GroupID: "group-4", + GenerationID: 1, + MemberID: "member-4", + Topics: []offsetcommit.RequestTopic{ + { + Name: "topic-4", Partitions: []offsetcommit.RequestPartition{ { PartitionIndex: 0, CommittedOffset: 1, - CommittedMetadata: "meta-3-0", + CommittedMetadata: "meta-4-0", CommittedLeaderEpoch: 10, }, { PartitionIndex: 1, CommittedOffset: 2, - CommittedMetadata: "meta-3-1", + CommittedMetadata: "meta-4-1", CommittedLeaderEpoch: 11, }, }, @@ -129,10 +153,10 @@ func TestOffsetCommitRequest(t *testing.T) { // GroupInstanceID for _, version := range []int16{7} { prototest.TestRequest(t, version, &offsetcommit.Request{ - GroupID: "group-4", + GroupID: "group-5", GenerationID: 1, - MemberID: "member-4", - GroupInstanceID: "instance-4", + MemberID: "member-5", + GroupInstanceID: "instance-5", Topics: []offsetcommit.RequestTopic{ { Name: "topic-4", @@ -140,13 +164,13 @@ func TestOffsetCommitRequest(t *testing.T) { { PartitionIndex: 0, CommittedOffset: 1, - CommittedMetadata: "meta-4-0", + CommittedMetadata: "meta-5-0", CommittedLeaderEpoch: 10, }, { PartitionIndex: 1, CommittedOffset: 2, - CommittedMetadata: "meta-4-1", + CommittedMetadata: "meta-5-1", CommittedLeaderEpoch: 11, }, },