Skip to content

Conversation

@moonyoungCHAE
Copy link
Contributor

  • As-is
    • When a @KafkaListener is created with both @Topic and @TopicPartition, the listener consumes messages based on @TopicPartition.
    • When @RetryableTopic is added, retry topics are managed based on @Topic.
  • To-be
    • Creating a @KafkaListener with both @Topic and @TopicPartition is not allowed.

fixes #4170

@sobychacko
Copy link
Contributor

@moonyoungCHAE There are build failures on the PR - Are you able to see this? https://github.com/spring-projects/spring-kafka/actions/runs/19523082207/job/55890231401?pr=4172.

Can you also trace, what happens to the retryable topic when we only provide TopicPartition on the KafkaListener? Since there are no topics given any more on KafkaListener (based on this PR), will it still create destinations for retry?

Thanks!

@moonyoungCHAE
Copy link
Contributor Author

@sobychacko
Hi, I updated the PR to allow using one of @Topic, @TopicPartition, or @TopicPattern. Previously, only @Topic and @TopicPartition were allowed.

I confirmed that when only @TopicPartition is provided, it still creates a destination for retry. The retry topic is determined here, which has not changed.

I traced the behavior, and the results are below.

    @RetryableTopic(attempts = "3", backoff = @Backoff(delay = 1000, multiplier = 2), topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
    @KafkaListener(id = "foo",
            clientIdPrefix = "myClientId",
            topicPartitions =
                    {
                            @TopicPartition(topic = "topic1-2", partitions = {"0"}),
                            @TopicPartition(topic = "topic2-2", partitions = {"0"})
                    }
                    )
    public void listen(String data) {

    }

2025-12-02T14:54:57.229+09:00  INFO 4341 --- [o-retry-1-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=myClientId-retry-1-0, groupId=foo-retry-1] Resetting offset for partition topic1-2-retry-1-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.
2025-12-02T14:54:57.229+09:00  INFO 4341 --- [o-retry-1-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=myClientId-retry-1-0, groupId=foo-retry-1] Resetting offset for partition topic2-2-retry-1-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.
2025-12-02T14:54:57.231+09:00  INFO 4341 --- [o-retry-0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=myClientId-retry-0-0, groupId=foo-retry-0] Resetting offset for partition topic2-2-retry-0-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.
2025-12-02T14:54:57.232+09:00  INFO 4341 --- [o-retry-0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=myClientId-retry-0-0, groupId=foo-retry-0] Resetting offset for partition topic1-2-retry-0-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}


2025-12-02T14:54:57.232+09:00  INFO 4341 --- [  foo-dlt-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=myClientId-dlt-0, groupId=foo-dlt] Resetting offset for partition topic2-2-dlt-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.
2025-12-02T14:54:57.232+09:00  INFO 4341 --- [  foo-dlt-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=myClientId-dlt-0, groupId=foo-dlt] Resetting offset for partition topic1-2-dlt-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.

Thanks!

@sobychacko sobychacko merged commit a0b5b70 into spring-projects:main Dec 3, 2025
3 checks passed
spring-builds pushed a commit that referenced this pull request Dec 3, 2025
…on) (#4172)

Fixes #4170

* add topic validation of kafka listener

* refactor: allow topic pattern for validtion

Signed-off-by: moonyoungCHAE <[email protected]>
(cherry picked from commit a0b5b70)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

@KafkaListener with Both topics and topicPartitions Breaks @RetryableTopic

2 participants