Skip to content

Commit 4a5f4a1

Browse files
garyrussellartembilan
authored andcommitted
GH-2249: Batch Listener LISTENER_INFO Headers
Resolves #2249 * Single String parameter for batch listeners. * Polish docs for an empty batch. **cherry-pick to 2.9.x, 2.8.x**
1 parent ab9d645 commit 4a5f4a1

File tree

4 files changed

+47
-5
lines changed

4 files changed

+47
-5
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5541,6 +5541,30 @@ When used in a `RecordInterceptor` or `RecordFilterStrategy` implementation, the
55415541

55425542
The header mappers also convert to `String` when creating `MessageHeaders` from the consumer record and never map this header on an outbound record.
55435543

5544+
For POJO batch listeners, starting with version 2.8.6, the header is copied into each member of the batch and is also available as a single `String` parameter after conversion.
5545+
5546+
====
5547+
[source, java]
5548+
----
5549+
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
5550+
info = "info for batch")
5551+
public void listen(List<Thing> list,
5552+
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
5553+
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
5554+
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
5555+
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
5556+
@Header(KafkaHeaders.LISTENER_INFO) String info) {
5557+
...
5558+
}
5559+
----
5560+
====
5561+
5562+
NOTE: If the batch listener has a filter and the filter results in an empty batch, you will need to add `required = false` to the `@Header` parameter because the info is not available for an empty batch.
5563+
5564+
If you receive `List<Message<Thing>>` the info is in the `KafkaHeaders.LISTENER_INFO` header of each `Message<?>`.
5565+
5566+
See <<batch-listeners>> for more information about consuming batches.
5567+
55445568
[[dead-letters]]
55455569
===== Publishing Dead-letter Records
55465570

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2269,6 +2269,9 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
22692269

22702270
ConsumerRecords<K, V> records = recordsArg;
22712271
List<ConsumerRecord<K, V>> recordList = recordListArg;
2272+
if (this.listenerinfo != null) {
2273+
records.iterator().forEachRemaining(rec -> listenerInfo(rec));
2274+
}
22722275
if (this.batchInterceptor != null) {
22732276
records = this.batchInterceptor.intercept(recordsArg, this.consumer);
22742277
if (records == null) {
@@ -2516,10 +2519,14 @@ private void internalHeaders(final ConsumerRecord<K, V> record) {
25162519
record.headers().add(new RecordHeader(KafkaHeaders.DELIVERY_ATTEMPT, buff));
25172520
}
25182521
if (this.listenerinfo != null) {
2519-
record.headers().add(this.infoHeader);
2522+
listenerInfo(record);
25202523
}
25212524
}
25222525

2526+
private void listenerInfo(final ConsumerRecord<K, V> record) {
2527+
record.headers().add(this.infoHeader);
2528+
}
2529+
25232530
private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecord<K, V> record) {
25242531
if (!this.autoCommit && !this.isRecordAck) {
25252532
processCommits();

spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -169,8 +169,8 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
169169
commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes,
170170
timestamps);
171171
rawHeaders.put(KafkaHeaders.CONVERSION_FAILURES, conversionFailures);
172-
173172
boolean logged = false;
173+
String info = null;
174174
for (ConsumerRecord<?, ?> record : records) {
175175
payloads.add(obtainPayload(type, record, conversionFailures));
176176
keys.add(record.key());
@@ -185,6 +185,10 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
185185
Map<String, Object> converted = new HashMap<>();
186186
this.headerMapper.toHeaders(record.headers(), converted);
187187
convertedHeaders.add(converted);
188+
Object object = converted.get(KafkaHeaders.LISTENER_INFO);
189+
if (object instanceof String) {
190+
info = (String) object;
191+
}
188192
}
189193
else {
190194
if (!logged) {
@@ -200,6 +204,9 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
200204
raws.add(record);
201205
}
202206
}
207+
if (info != null) {
208+
rawHeaders.put(KafkaHeaders.LISTENER_INFO, info);
209+
}
203210
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
204211
}
205212

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,7 @@ public void testBatchWitHeaders() throws Exception {
591591
list = this.listener.offsets;
592592
assertThat(list.size()).isGreaterThan(0);
593593
assertThat(list.get(0)).isInstanceOf(Long.class);
594+
assertThat(this.listener.listenerInfo).isEqualTo("info for batch");
594595
}
595596

596597
@Test
@@ -1982,17 +1983,20 @@ public void listen10(List<String> list, @Header(KafkaHeaders.GROUP_ID) String gr
19821983
this.latch10.countDown();
19831984
}
19841985

1985-
@KafkaListener(id = "list2", topics = "annotated15", containerFactory = "batchFactory")
1986+
@KafkaListener(id = "list2", topics = "annotated15", containerFactory = "batchFactory",
1987+
info = "info for batch")
19861988
public void listen11(List<String> list,
19871989
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
19881990
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
19891991
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
1990-
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
1992+
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
1993+
@Header(KafkaHeaders.LISTENER_INFO) String info) {
19911994
this.payload = list;
19921995
this.keys = keys;
19931996
this.partitions = partitions;
19941997
this.topics = topics;
19951998
this.offsets = offsets;
1999+
this.listenerInfo = info;
19962000
this.latch11.countDown();
19972001
}
19982002

0 commit comments

Comments
 (0)