Skip to content

Commit a56469b

Browse files
garyrussellartembilan
authored andcommitted
GH-716: Null Payload Doc Improvements
Fixes #716 Improve documentation - `null` is not necessarily just for tombstone records. (cherry picked from commit 9ac47f6)
1 parent 2cd4d9d commit a56469b

File tree

2 files changed

+18
-6
lines changed

2 files changed

+18
-6
lines changed

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,12 +256,16 @@ public void testSimple() throws Exception {
256256
.isEqualTo("clientIdViaAnnotation-0");
257257

258258
template.send("annotated11", 0, "foo");
259-
template.flush();
260259
assertThat(this.listener.latch7.await(60, TimeUnit.SECONDS)).isTrue();
261260
assertThat(this.consumerRef.get()).isNotNull();
261+
assertThat(this.listener.latch7String).isEqualTo("foo");
262262

263263
assertThat(this.recordFilter.called).isTrue();
264264

265+
template.send("annotated11", 0, null);
266+
assertThat(this.listener.latch7a.await(60, TimeUnit.SECONDS)).isTrue();
267+
assertThat(this.listener.latch7String).isNull();
268+
265269
MessageListenerContainer rebalanceConcurrentContainer = registry.getListenerContainer("rebalanceListener");
266270
assertThat(rebalanceConcurrentContainer).isNotNull();
267271
MessageListenerContainer rebalanceContainer = (MessageListenerContainer) KafkaTestUtils
@@ -1103,6 +1107,10 @@ static class Listener implements ConsumerSeekAware {
11031107

11041108
private final CountDownLatch latch7 = new CountDownLatch(1);
11051109

1110+
private final CountDownLatch latch7a = new CountDownLatch(2);
1111+
1112+
private volatile String latch7String;
1113+
11061114
private final CountDownLatch latch8 = new CountDownLatch(1);
11071115

11081116
private final CountDownLatch latch9 = new CountDownLatch(1);
@@ -1247,8 +1255,10 @@ public void jsonHeaders(Bar foo) { // should be mapped to Foo via Headers
12471255

12481256
@KafkaListener(id = "rebalanceListener", topics = "annotated11", idIsGroup = false,
12491257
containerFactory = "kafkaRebalanceListenerContainerFactory")
1250-
public void listen7(String foo) {
1258+
public void listen7(@Payload(required = false) String foo) {
1259+
this.latch7String = foo;
12511260
this.latch7.countDown();
1261+
this.latch7a.countDown();
12521262
}
12531263

12541264
@KafkaListener(id = "quux", topics = "annotated12")

src/reference/asciidoc/kafka.adoc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1631,11 +1631,11 @@ If the converter has no converter (either because Jackson is not present, or it
16311631
IMPORTANT: The Jackson `ObjectMapper` (even if provided) will be enhanced to support deserializing `org.springframework.util.MimeType` objects, often used in the `spring-messaging` `contentType` header.
16321632
If you don't wish your mapper to be enhanced in this way, for some reason, you should subclass the `DefaultKafkaHeaderMapper` and override `getObjectMapper()` to return your mapper.
16331633

1634-
==== Log Compaction
1634+
==== Null Payloads and Log Compaction 'Tombstone' Records
16351635

16361636
When using https://kafka.apache.org/documentation/#compaction[Log Compaction], it is possible to send and receive messages with `null` payloads which identifies the deletion of a key.
16371637

1638-
Starting with _version 1.0.3_, this is now fully supported.
1638+
It is also possible to receive `null` values for other reasons - such as a `Deserializer` that might return `null` when it can't deserialize a value.
16391639

16401640
To send a `null` payload using the `KafkaTemplate` simply pass null into the value argument of the `send()` methods.
16411641
One exception to this is the `send(Message<?> message)` variant.
@@ -1644,7 +1644,7 @@ For convenience, the static `KafkaNull.INSTANCE` is provided.
16441644

16451645
When using a message listener container, the received `ConsumerRecord` will have a `null` `value()`.
16461646

1647-
To configure the `@KafkaListener` to handle `null` payloads, you must use the `@Payload` annotation with `required = false`; you will usually also need the key so your application knows which key was "deleted":
1647+
To configure the `@KafkaListener` to handle `null` payloads, you must use the `@Payload` annotation with `required = false`; if it's a tombstone message for a compacted log, you will usually also need the key so your application can determine which key was "deleted":
16481648

16491649
[source, java]
16501650
----
@@ -1654,7 +1654,7 @@ public void listen(@Payload(required = false) String value, @Header(KafkaHeaders
16541654
}
16551655
----
16561656

1657-
When using a class-level `@KafkaListener`, some additional configuration is needed - a `@KafkaHandler` method with a `KafkaNull` payload:
1657+
When using a class-level `@KafkaListener` with multiple `@KafkaHandler` methods, some additional configuration is needed - a `@KafkaHandler` method with a `KafkaNull` payload:
16581658

16591659
[source, java]
16601660
----
@@ -1679,6 +1679,8 @@ static class MultiListenerBean {
16791679
}
16801680
----
16811681

1682+
Note that the argument will be `null` not a `KafkaNull`.
1683+
16821684
[[annotation-error-handling]]
16831685
==== Handling Exceptions
16841686

0 commit comments

Comments
 (0)