Skip to content

Commit 5246daa

Browse files
frosieregaryrussell
authored andcommitted
GH-2170: Support Custom OffsetAndMetadata
Resolves #2170 Add a way to create a custom OffsetAndMetadata GH-2170: Remove useless initializations
1 parent 5650491 commit 5246daa

File tree

5 files changed

+196
-9
lines changed

5 files changed

+196
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.regex.Pattern;
2323

2424
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
25+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2526
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
2627

2728
import org.springframework.kafka.support.KafkaUtils;
@@ -90,6 +91,12 @@ public class ConsumerProperties {
9091
*/
9192
private OffsetCommitCallback commitCallback;
9293

94+
/**
95+
* A provider for {@link OffsetAndMetadata}; by default, the provider creates an offset and metadata with
96+
* empty metadata. The provider gives a way to customize the metadata.
97+
*/
98+
private OffsetAndMetadataProvider offsetAndMetadataProvider;
99+
93100
/**
94101
* Whether or not to call consumer.commitSync() or commitAsync() when the
95102
* container is responsible for commits. Default true.
@@ -278,6 +285,16 @@ public void setCommitCallback(OffsetCommitCallback commitCallback) {
278285
this.commitCallback = commitCallback;
279286
}
280287

288+
/**
289+
* Set the offset and metadata provider associated to a commit callback.
290+
* @param offsetAndMetadataProvider an offset and metadata provider.
291+
* @since 2.8.5
292+
* @see #setCommitCallback(OffsetCommitCallback)
293+
*/
294+
public void setOffsetAndMetadataProvider(OffsetAndMetadataProvider offsetAndMetadataProvider) {
295+
this.offsetAndMetadataProvider = offsetAndMetadataProvider;
296+
}
297+
281298
/**
282299
* Return the commit callback.
283300
* @return the callback.
@@ -287,6 +304,15 @@ public OffsetCommitCallback getCommitCallback() {
287304
return this.commitCallback;
288305
}
289306

307+
/**
308+
* Return the offset and metadata provider.
309+
* @return the offset and metadata provider.
310+
*/
311+
@Nullable
312+
public OffsetAndMetadataProvider getOffsetAndMetadataProvider() {
313+
return this.offsetAndMetadataProvider;
314+
}
315+
290316
/**
291317
* Set whether or not to call consumer.commitSync() or commitAsync() when the
292318
* container is responsible for commits. Default true.
@@ -541,6 +567,7 @@ protected final String renderProperties() {
541567
? "\n consumerRebalanceListener=" + this.consumerRebalanceListener
542568
: "")
543569
+ (this.commitCallback != null ? "\n commitCallback=" + this.commitCallback : "")
570+
+ (this.offsetAndMetadataProvider != null ? "\n offsetAndMetadataProvider=" + this.offsetAndMetadataProvider : "")
544571
+ "\n syncCommits=" + this.syncCommits
545572
+ (this.syncCommitTimeout != null ? "\n syncCommitTimeout=" + this.syncCommitTimeout : "")
546573
+ (this.kafkaConsumerProperties.size() > 0 ? "\n properties=" + this.kafkaConsumerProperties : "")

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,12 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
561561
? this.containerProperties.getCommitCallback()
562562
: new LoggingCommitCallback();
563563

564+
private final OffsetAndMetadataProvider offsetAndMetadataProvider = this.containerProperties.getOffsetAndMetadataProvider() == null
565+
? (listenerMetadata, offset) -> new OffsetAndMetadata(offset)
566+
: this.containerProperties.getOffsetAndMetadataProvider();
567+
568+
private final ConsumerAwareListenerMetadata consumerAwareListenerMetadata = new ConsumerAwareListenerMetadata();
569+
564570
private final Consumer<K, V> consumer;
565571

566572
private final Map<String, Map<Integer, Long>> offsets = new HashMap<>();
@@ -1453,7 +1459,7 @@ private void fixTxOffsetsIfNeeded() {
14531459
return;
14541460
}
14551461
if (position > oamd.offset()) {
1456-
toFix.put(tp, new OffsetAndMetadata(position));
1462+
toFix.put(tp, createOffsetAndMetadata(position));
14571463
}
14581464
});
14591465
if (toFix.size() > 0) {
@@ -1930,7 +1936,7 @@ else if (record.offset() < offs.get(0)) {
19301936
private void ackImmediate(ConsumerRecord<K, V> record) {
19311937
Map<TopicPartition, OffsetAndMetadata> commits = Collections.singletonMap(
19321938
new TopicPartition(record.topic(), record.partition()),
1933-
new OffsetAndMetadata(record.offset() + 1));
1939+
createOffsetAndMetadata(record.offset() + 1));
19341940
this.commitLogger.log(() -> COMMITTING + commits);
19351941
if (this.producer != null) {
19361942
doSendOffsets(this.producer, commits);
@@ -1946,9 +1952,8 @@ else if (this.syncCommits) {
19461952
private void ackImmediate(ConsumerRecords<K, V> records) {
19471953
Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();
19481954
for (TopicPartition part : records.partitions()) {
1949-
commits.put(part,
1950-
new OffsetAndMetadata(records.records(part)
1951-
.get(records.records(part).size() - 1).offset() + 1));
1955+
commits.put(part, createOffsetAndMetadata(records.records(part)
1956+
.get(records.records(part).size() - 1).offset() + 1));
19521957
}
19531958
this.commitLogger.log(() -> COMMITTING + commits);
19541959
if (this.producer != null) {
@@ -2736,7 +2741,7 @@ public void ackCurrent(final ConsumerRecord<K, V> record) {
27362741
if (this.isRecordAck) {
27372742
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
27382743
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
2739-
new OffsetAndMetadata(record.offset() + 1));
2744+
createOffsetAndMetadata(record.offset() + 1));
27402745
if (this.producer == null) {
27412746
this.commitLogger.log(() -> COMMITTING + offsetsToCommit);
27422747
if (this.syncCommits) {
@@ -3043,7 +3048,7 @@ private Map<TopicPartition, OffsetAndMetadata> buildCommits() {
30433048
for (Entry<String, Map<Integer, Long>> entry : this.offsets.entrySet()) {
30443049
for (Entry<Integer, Long> offset : entry.getValue().entrySet()) {
30453050
commits.put(new TopicPartition(entry.getKey(), offset.getKey()),
3046-
new OffsetAndMetadata(offset.getValue() + 1));
3051+
createOffsetAndMetadata(offset.getValue() + 1));
30473052
}
30483053
}
30493054
this.offsets.clear();
@@ -3145,6 +3150,29 @@ private String zombieFenceTxIdSuffix(String topic, int partition) {
31453150
return this.consumerGroupId + "." + topic + "." + partition;
31463151
}
31473152

3153+
private OffsetAndMetadata createOffsetAndMetadata(long offset) {
3154+
return this.offsetAndMetadataProvider.provide(this.consumerAwareListenerMetadata, offset);
3155+
}
3156+
3157+
private final class ConsumerAwareListenerMetadata implements ListenerMetadata {
3158+
3159+
@Override
3160+
public String getListenerId() {
3161+
return getBeanName();
3162+
}
3163+
3164+
@Override
3165+
public String getGroupId() {
3166+
return ListenerConsumer.this.consumerGroupId;
3167+
}
3168+
3169+
@Override
3170+
public byte[] getListenerInfo() {
3171+
return ListenerConsumer.this.listenerinfo;
3172+
}
3173+
3174+
}
3175+
31483176
private final class ConsumerAcknowledgment implements Acknowledgment {
31493177

31503178
private final ConsumerRecord<K, V> record;
@@ -3346,8 +3374,7 @@ private boolean collectAndCommitIfNecessary(Collection<TopicPartition> partition
33463374
for (TopicPartition partition : partitions) {
33473375
try {
33483376
if (committed.get(partition) == null) { // no existing commit for this group
3349-
offsetsToCommit.put(partition,
3350-
new OffsetAndMetadata(ListenerConsumer.this.consumer.position(partition)));
3377+
offsetsToCommit.put(partition, createOffsetAndMetadata(ListenerConsumer.this.consumer.position(partition)));
33513378
}
33523379
}
33533380
catch (NoOffsetForPartitionException e) {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2016-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
/**
20+
* Metadata associated to a {@link org.springframework.kafka.annotation.KafkaListener}.
21+
*
22+
* @author Francois Rosiere
23+
* @since 2.8.5
24+
* @see org.springframework.kafka.annotation.KafkaListener
25+
*/
26+
public interface ListenerMetadata {
27+
28+
/**
29+
* Return the listener id.
30+
* @return the listener id.
31+
*/
32+
String getListenerId();
33+
34+
/**
35+
* Return the group id.
36+
* @return the group id.
37+
*/
38+
String getGroupId();
39+
40+
/**
41+
* Return the listener info.
42+
* @return the listener info.
43+
*/
44+
byte[] getListenerInfo();
45+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2016-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
20+
21+
/**
22+
* Provider for {@link OffsetAndMetadata}. In case of async commits of the offsets,
23+
* the provider can be used in combination with an {@link org.apache.kafka.clients.consumer.OffsetCommitCallback} to
24+
* have more granularity in the way to create an {@link OffsetAndMetadata}.
25+
*
26+
* @author Francois Rosiere
27+
* @since 2.8.5
28+
* @see org.apache.kafka.clients.consumer.OffsetCommitCallback
29+
*/
30+
public interface OffsetAndMetadataProvider {
31+
32+
/**
33+
* Provide an offset and metadata object for the given listener metadata and offset.
34+
*
35+
* @param listenerMetadata metadata associated to a listener.
36+
* @param offset an offset.
37+
* @return an offset and metadata.
38+
*/
39+
OffsetAndMetadata provide(ListenerMetadata listenerMetadata, long offset);
40+
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3819,6 +3819,54 @@ public void clearThreadState(Consumer<?, ?> consumer) {
38193819
container.stop();
38203820
}
38213821

3822+
@Test
3823+
public void testOffsetAndMetadataWithoutProvider() throws InterruptedException {
3824+
testOffsetAndMetadata(null, new OffsetAndMetadata(1));
3825+
}
3826+
3827+
@Test
3828+
public void testOffsetAndMetadataWithProvider() throws InterruptedException {
3829+
testOffsetAndMetadata((listenerMetadata, offset) ->
3830+
new OffsetAndMetadata(offset, listenerMetadata.getGroupId()),
3831+
new OffsetAndMetadata(1, "grp"));
3832+
}
3833+
3834+
@SuppressWarnings("unchecked")
3835+
private void testOffsetAndMetadata(OffsetAndMetadataProvider provider, OffsetAndMetadata expectedOffsetAndMetadata) throws InterruptedException {
3836+
final ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3837+
final Consumer<Integer, String> consumer = mock(Consumer.class);
3838+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
3839+
given(consumer.poll(any(Duration.class))).willAnswer(i -> new ConsumerRecords<>(
3840+
Map.of(
3841+
new TopicPartition("foo", 0),
3842+
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, 1, "foo"))
3843+
)
3844+
));
3845+
final ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> offsetsCaptor = ArgumentCaptor.forClass(Map.class);
3846+
final CountDownLatch latch = new CountDownLatch(1);
3847+
willAnswer(invocation -> {
3848+
latch.countDown();
3849+
return null;
3850+
}).given(consumer).commitAsync(offsetsCaptor.capture(), any());
3851+
final ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset("foo", 0));
3852+
containerProps.setGroupId("grp");
3853+
containerProps.setClientId("clientId");
3854+
containerProps.setSyncCommits(false);
3855+
containerProps.setMessageListener((MessageListener<Integer, String>) data -> {
3856+
});
3857+
containerProps.setCommitCallback((offsets, exception) -> {
3858+
});
3859+
containerProps.setOffsetAndMetadataProvider(provider);
3860+
final KafkaMessageListenerContainer<Integer, String> container =
3861+
new KafkaMessageListenerContainer<>(cf, containerProps);
3862+
container.start();
3863+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
3864+
assertThat(offsetsCaptor.getValue())
3865+
.hasSize(1)
3866+
.containsValue(expectedOffsetAndMetadata);
3867+
container.stop();
3868+
}
3869+
38223870
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
38233871
Consumer<?, ?> consumer =
38243872
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class);

0 commit comments

Comments
 (0)