From 2372c33cb5116e7db3d6f2937e4e0e7d9d2ee3ae Mon Sep 17 00:00:00 2001 From: "nathan.xu" Date: Wed, 1 Nov 2023 21:53:52 -0400 Subject: [PATCH] cosmetic doc improvements part 3 --- .../kafka/annotation-error-handling.adoc | 61 +++++++++---------- .../ROOT/pages/kafka/container-factory.adoc | 2 +- .../ROOT/pages/kafka/container-props.adoc | 2 +- .../ROOT/pages/kafka/exactly-once.adoc | 4 +- .../modules/ROOT/pages/kafka/headers.adoc | 12 ++-- .../ROOT/pages/kafka/interceptors.adoc | 2 +- .../modules/ROOT/pages/kafka/micrometer.adoc | 14 ++--- .../pages/kafka/pause-resume-partitions.adoc | 2 +- ...roducer-interceptor-managed-in-spring.adoc | 3 +- .../ROOT/pages/kafka/sending-messages.adoc | 4 +- .../modules/ROOT/pages/kafka/serdes.adoc | 29 ++++----- .../modules/ROOT/pages/kafka/tombstones.adoc | 2 +- .../ROOT/pages/kafka/transactions.adoc | 14 ++--- .../antora/modules/ROOT/pages/retrytopic.adoc | 2 +- .../retrytopic/how-the-pattern-works.adoc | 6 +- .../retrytopic/programmatic-construction.adoc | 2 +- .../ROOT/pages/retrytopic/retry-config.adoc | 18 +++--- .../kafka/jdocs/started/noboot/Config.java | 2 +- 18 files changed, 88 insertions(+), 93 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc index a0bc28fdee..f880848c93 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc @@ -32,7 +32,7 @@ It might be used in a request/reply scenario where you wish to send a failure re [source, java] ---- @Bean -KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) { +public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) { return (msg, ex) -> { if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) { recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex); @@ -50,7 +50,7 @@ It has a sub-interface (`ConsumerAwareListenerErrorHandler`) that has access to Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer); ---- -Another sub-interface (`ManualAckListenerErrorHandler`) provides access to the `Acknowledgment` object when using manual `AckMode` s. +Another sub-interface (`ManualAckListenerErrorHandler`) provides access to the `Acknowledgment` object when using manual `AckMode`+++s+++. [source, java] ---- @@ -157,7 +157,7 @@ For example, with the `@KafkaListener` container factory, you can add `DefaultEr ---- @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckMode(AckMode.RECORD); factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L))); @@ -168,7 +168,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerCont For a record listener, this will retry a delivery up to 2 times (3 delivery attempts) with a back off of 1 second, instead of the default configuration (`FixedBackOff(0L, 9)`). Failures are simply logged after retries are exhausted. -As an example; if the `poll` returns six records (two from each partition 0, 1, 2) and the listener throws an exception on the fourth record, the container acknowledges the first three messages by committing their offsets. +As an example, if the `poll` returns six records (two from each partition 0, 1, 2) and the listener throws an exception on the fourth record, the container acknowledges the first three messages by committing their offsets. The `DefaultErrorHandler` seeks to offset 1 for partition 1 and offset 0 for partition 2. The next `poll()` returns the three unprocessed records. @@ -181,7 +181,7 @@ The sequence of events is: * Commit the offsets of the records before the index. * If retries are not exhausted, perform seeks so that all the remaining records (including the failed record) will be redelivered. * If retries are exhausted, attempt recovery of the failed record (default log only) and perform seeks so that the remaining records (excluding the failed record) will be redelivered. -The recovered record's offset is committed +The recovered record's offset is committed. * If retries are exhausted and recovery fails, seeks are performed as if retries are not exhausted. IMPORTANT: Starting with version 2.9, the `DefaultErrorHandler` can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed above, but without actually seeking. @@ -243,7 +243,7 @@ public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) { } ---- -The error handler can be configured with one or more `RetryListener` s, receiving notifications of retry and recovery progress. +The error handler can be configured with one or more `RetryListener`+++s+++, receiving notifications of retry and recovery progress. Starting with version 2.8.10, methods for batch listeners were added. [source, java] @@ -259,11 +259,11 @@ public interface RetryListener { default void recoveryFailed(ConsumerRecord record, Exception original, Exception failure) { } - default void failedDelivery(ConsumerRecords records, Exception ex, int deliveryAttempt) { - } + default void failedDelivery(ConsumerRecords records, Exception ex, int deliveryAttempt) { + } - default void recovered(ConsumerRecords records, Exception ex) { - } + default void recovered(ConsumerRecords records, Exception ex) { + } default void recoveryFailed(ConsumerRecords records, Exception original, Exception failure) { } @@ -271,7 +271,7 @@ public interface RetryListener { } ---- -See the javadocs for more information. +See the JavaDocs for more information. IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks. If the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again. @@ -298,7 +298,7 @@ Also see xref:kafka/annotation-error-handling.adoc#delivery-header[Delivery Atte Starting with version 2.8, batch listeners can now properly handle conversion errors, when using a `MessageConverter` with a `ByteArrayDeserializer`, a `BytesDeserializer` or a `StringDeserializer`, as well as a `DefaultErrorHandler`. When a conversion error occurs, the payload is set to null and a deserialization exception is added to the record headers, similar to the `ErrorHandlingDeserializer`. -A list of `ConversionException` s is available in the listener so the listener can throw a `BatchListenerFailedException` indicating the first index at which a conversion exception occurred. +A list of `ConversionException`+++s+++ is available in the listener so the listener can throw a `BatchListenerFailedException` indicating the first index at which a conversion exception occurred. Example: @@ -323,7 +323,7 @@ This is now the fallback behavior of the `DefaultErrorHandler` for a batch liste There is no guarantee that, when a batch is redelivered, the batch has the same number of records and/or the redelivered records are in the same order. It is impossible, therefore, to easily maintain retry state for a batch. -The `FallbackBatchErrorHandler` takes a the following approach. +The `FallbackBatchErrorHandler` takes the following approach. If a batch listener throws an exception that is not a `BatchListenerFailedException`, the retries are performed from the in-memory batch of records. In order to avoid a rebalance during an extended retry sequence, the error handler pauses the consumer, polls it before sleeping for the back off, for each retry, and calls the listener again. If/when retries are exhausted, the `ConsumerRecordRecoverer` is called for each record in the batch. @@ -340,7 +340,7 @@ While waiting for a `BackOff` interval, the error handler will loop with a short The `CommonContainerStoppingErrorHandler` stops the container if the listener throws an exception. For record listeners, when the `AckMode` is `RECORD`, offsets for already processed records are committed. For record listeners, when the `AckMode` is any manual value, offsets for already acknowledged records are committed. -For record listeners, wWhen the `AckMode` is `BATCH`, or for batch listeners, the entire batch is replayed when the container is restarted. +For record listeners, when the `AckMode` is `BATCH`, or for batch listeners, the entire batch is replayed when the container is restarted. After the container stops, an exception that wraps the `ListenerExecutionFailedException` is thrown. This is to cause the transaction to roll back (if transactions are enabled). @@ -407,16 +407,16 @@ If you wish to use a different error handling strategy for record and batch list |`DefaultErrorHandler` |`RetryingBatchErrorHandler` -|No replacements - use `DefaultErrorHandler` and throw an exception other than `BatchListenerFailedException`. +|No replacements, use `DefaultErrorHandler` and throw an exception other than `BatchListenerFailedException`. |=== [[migrating-legacy-eh]] === Migrating Custom Legacy Error Handler Implementations to `CommonErrorHandler` -Refer to the javadocs in `CommonErrorHandler`. +Refer to the JavaDocs in `CommonErrorHandler`. To replace an `ErrorHandler` or `ConsumerAwareErrorHandler` implementation, you should implement `handleOne()` and leave `seeksAfterHandle()` to return `false` (default). -You should also implement `handleOtherException()` - to handle exceptions that occur outside the scope of record processing (e.g. consumer errors). +You should also implement `handleOtherException()` to handle exceptions that occur outside the scope of record processing (e.g. consumer errors). To replace a `RemainingRecordsErrorHandler` implementation, you should implement `handleRemaining()` and override `seeksAfterHandle()` to return `true` (the error handler must perform the necessary seeks). You should also implement `handleOtherException()` - to handle exceptions that occur outside the scope of record processing (e.g. consumer errors). @@ -515,7 +515,7 @@ Since the event also has a reference to the container, you can restart the conta Starting with version 2.7, while waiting for a `BackOff` interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the `stop()` rather than causing a delay. -Starting with version 2.7, the processor can be configured with one or more `RetryListener` s, receiving notifications of retry and recovery progress. +Starting with version 2.7, the processor can be configured with one or more `RetryListener`+++s+++, receiving notifications of retry and recovery progress. [source, java] ---- @@ -533,7 +533,7 @@ public interface RetryListener { } ---- -See the javadocs for more information. +See the JavaDocs for more information. [[delivery-header]] == Delivery Attempts Header @@ -548,7 +548,7 @@ When receiving a raw `ConsumerRecord` the integer is in a `byte[4]`. ---- int delivery = ByteBuffer.wrap(record.headers() .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value()) - .getInt() + .getInt(); ---- When using `@KafkaListener` with the `DefaultKafkaHeaderMapper` or `SimpleKafkaHeaderMapper`, it can be obtained by adding `@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery` as a parameter to the listener method. @@ -568,15 +568,15 @@ Then, the container will add this in the `KafkaListener.LISTENER_INFO` header to [source, java] ---- -@KafkaListener(id = "something", topic = "topic", filter = "someFilter", +@KafkaListener(id = "something", topics = "topic", filter = "someFilter", info = "this is the something listener") -public void listen2(@Payload Thing thing, +public void listen(@Payload Thing thing, @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) { -... + ... } ---- -When used in a `RecordInterceptor` or `RecordFilterStrategy` implementation, the header is in the consumer record as a byte array, converted using the `KafkaListenerAnnotationBeanPostProcessor` 's `charSet` property. +When used in a `RecordInterceptor` or `RecordFilterStrategy` implementation, the header is in the consumer record as a byte array, converted using the `KafkaListenerAnnotationBeanPostProcessor`+++'+++s `charSet` property. The header mappers also convert to `String` when creating `MessageHeaders` from the consumer record and never map this header on an outbound record. @@ -600,7 +600,7 @@ NOTE: If the batch listener has a filter and the filter results in an empty batc If you receive `List>` the info is in the `KafkaHeaders.LISTENER_INFO` header of each `Message`. -See <> for more information about consuming batches. +See xref:kafka/receiving-messages/listener-annotation.adoc#batch-listeners[Batch Listeners] for more information about consuming batches. [[dead-letters]] == Publishing Dead-letter Records @@ -649,7 +649,7 @@ The record sent to the dead-letter topic is enhanced with the following headers: * `KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE`: The original timestamp type. * `KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP`: The original consumer group that failed to process the record (since version 2.8). -Key exceptions are only caused by `DeserializationException` s so there is no `DLT_KEY_EXCEPTION_CAUSE_FQCN`. +Key exceptions are only caused by `DeserializationException`+++s+++ so there is no `DLT_KEY_EXCEPTION_CAUSE_FQCN`. There are two mechanisms to add more headers. @@ -661,15 +661,14 @@ The second is simpler to implement but the first has more information available, Starting with version 2.3, when used in conjunction with an `ErrorHandlingDeserializer`, the publisher will restore the record `value()`, in the dead-letter producer record, to the original value that failed to be deserialized. Previously, the `value()` was null and user code had to decode the `DeserializationException` from the message headers. -In addition, you can provide multiple `KafkaTemplate` s to the publisher; this might be needed, for example, if you want to publish the `byte[]` from a `DeserializationException`, as well as values using a different serializer from records that were deserialized successfully. -Here is an example of configuring the publisher with `KafkaTemplate` s that use a `String` and `byte[]` serializer: +In addition, you can provide multiple `KafkaTemplate`+++s+++ to the publisher; this might be needed, for example, if you want to publish the `byte[]` from a `DeserializationException`, as well as values using a different serializer from records that were deserialized successfully. +Here is an example of configuring the publisher with `KafkaTemplate`+++s+++ that use a `String` and `byte[]` serializer: [source, java] ---- @Bean public DeadLetterPublishingRecoverer publisher(KafkaTemplate stringTemplate, KafkaTemplate bytesTemplate) { - Map, KafkaTemplate> templates = new LinkedHashMap<>(); templates.put(String.class, stringTemplate); templates.put(byte[].class, bytesTemplate); @@ -680,7 +679,7 @@ public DeadLetterPublishingRecoverer publisher(KafkaTemplate stringTemplat The publisher uses the map keys to locate a template that is suitable for the `value()` about to be published. A `LinkedHashMap` is recommended so that the keys are examined in order. -When publishing `null` values, when there are multiple templates, the recoverer will look for a template for the `Void` class; if none is present, the first template from the `values().iterator()` will be used. +When publishing `null` values, and there are multiple templates, the recoverer will look for a template for the `Void` class; if none is present, the first template from the `values().iterator()` will be used. Since 2.7 you can use the `setFailIfSendResultIsError` method so that an exception is thrown when message publishing fails. You can also set a timeout for the verification of the sender success with `setWaitForSendResultTimeout`. @@ -695,7 +694,7 @@ By default, the exception type is not considered. Starting with version 2.3, the recoverer can also be used with Kafka Streams - see xref:streams.adoc#streams-deser-recovery[Recovery from Deserialization Exceptions] for more information. -The `ErrorHandlingDeserializer` adds the deserialization exception(s) in headers `ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER` and `ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER` (using java serialization). +The `ErrorHandlingDeserializer` adds the deserialization exception(s) in headers `ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER` and `ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER` (using Java serialization). By default, these headers are not retained in the message published to the dead letter topic. Starting with version 2.7, if both the key and value fail deserialization, the original values of both are populated in the record sent to the DLT. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/container-factory.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/container-factory.adoc index 9200e362be..c89a9ea998 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/container-factory.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/container-factory.adoc @@ -49,6 +49,6 @@ public ContainerPostProcessor> |LATEST_ONLY _NO_TX |Whether or not to commit the initial position on assignment; by default, the initial offset will only be committed if the `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` is `latest` and it won't run in a transaction even if there is a transaction manager present. -See the javadocs for `ContainerProperties.AssignmentCommitOption` for more information about the available options. +See the JavaDocs for `ContainerProperties.AssignmentCommitOption` for more information about the available options. |[[asyncAcks]]<> |false diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/exactly-once.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/exactly-once.adoc index afc82b96ae..d507a91b28 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/exactly-once.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/exactly-once.adoc @@ -10,8 +10,8 @@ See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Proc Using transactions enables Exactly Once Semantics (EOS). -This means that, for a `read->process-write` sequence, it is guaranteed that the **sequence** is completed exactly once. -(The read and process are have at least once semantics). +This means that, for a `read+++->+++process+++->+++write` sequence, it is guaranteed that the **sequence** is completed exactly once. +(The read and process have at least once semantics). Spring for Apache Kafka version 3.0 and later only supports `EOSMode.V2`: diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/headers.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/headers.adoc index 8fb5fc2e20..2ca550a0e9 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/headers.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/headers.adoc @@ -37,7 +37,7 @@ public interface KafkaHeaderMapper { The `SimpleKafkaHeaderMapper` maps raw headers as `byte[]`, with configuration options for conversion to `String` values. The `DefaultKafkaHeaderMapper` maps the key to the `MessageHeaders` header name and, in order to support rich header types for outbound messages, JSON conversion is performed. -A "`special`" header (with a key of `spring_json_header_types`) contains a JSON map of `:`. +A +++"+++`special`+++"+++ header (with a key of `spring_json_header_types`) contains a JSON map of `:`. This header is used on the inbound side to provide appropriate conversion of each header value to the original type. On the inbound side, all Kafka `Header` instances are mapped to `MessageHeaders`. @@ -70,7 +70,7 @@ public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { <3> Uses a default Jackson `ObjectMapper` and maps headers according to the provided patterns. <4> Uses the provided Jackson `ObjectMapper` and maps headers according to the provided patterns. -Patterns are rather simple and can contain a leading wildcard (`*`), a trailing wildcard, or both (for example, `*.cat.*`). +Patterns are rather simple and can contain a leading wildcard (`+++*+++`), a trailing wildcard, or both (for example, `+++*+++.cat.+++*+++`). You can negate patterns with a leading `!`. The first pattern that matches a header name (whether positive or negative) wins. @@ -79,7 +79,7 @@ When you provide your own patterns, we recommend including `!id` and `!timestamp IMPORTANT: By default, the mapper deserializes only classes in `java.lang` and `java.util`. You can trust other (or all) packages by adding trusted packages with the `addTrustedPackages` method. If you receive messages from untrusted sources, you may wish to add only those packages you trust. -To trust all packages, you can use `mapper.addTrustedPackages("*")`. +To trust all packages, you can use `mapper.addTrustedPackages("+++*+++")`. NOTE: Mapping `String` header values in a raw form is useful when communicating with systems that are not aware of the mapper's JSON format. @@ -150,7 +150,7 @@ By default, the `DefaultKafkaHeaderMapper` is used in the `MessagingMessageConve With the batch converter, the converted headers are available in the `KafkaHeaders.BATCH_CONVERTED_HEADERS` as a `List>` where the map in a position of the list corresponds to the data position in the payload. If there is no converter (either because Jackson is not present or it is explicitly set to `null`), the headers from the consumer record are provided unconverted in the `KafkaHeaders.NATIVE_HEADERS` header. -This header is a `Headers` object (or a `List` in the case of the batch converter), where the position in the list corresponds to the data position in the payload). +This header is a `Headers` object (or a `List` in the case of the batch converter), where the position in the list corresponds to the data position in the payload. IMPORTANT: Certain types are not suitable for JSON serialization, and a simple `toString()` serialization might be preferred for these types. The `DefaultKafkaHeaderMapper` has a method called `addToStringClasses()` that lets you supply the names of classes that should be treated this way for outbound mapping. @@ -158,9 +158,9 @@ During inbound mapping, they are mapped as `String`. By default, only `org.springframework.util.MimeType` and `org.springframework.http.MediaType` are mapped this way. NOTE: Starting with version 2.3, handling of String-valued headers is simplified. -Such headers are no longer JSON encoded, by default (i.e. they do not have enclosing `"..."` added). +Such headers are no longer JSON encoded, by default (i.e. they do not have enclosing `"+++...+++"` added). The type is still added to the JSON_TYPES header so the receiving system can convert back to a String (from `byte[]`). -The mapper can handle (decode) headers produced by older versions (it checks for a leading `"`); in this way an application using 2.3 can consume records from older versions. +The mapper can handle (decode) headers produced by older versions (it checks for a leading `+++"+++`); in this way an application using 2.3 can consume records from older versions. IMPORTANT: To be compatible with earlier versions, set `encodeStrings` to `true`, if records produced by a version using 2.3 might be consumed by applications using earlier versions. When all applications are using 2.3 or higher, you can leave the property at its default value of `false`. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/interceptors.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/interceptors.adoc index 2883f8c754..2d96b135db 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/interceptors.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/interceptors.adoc @@ -4,7 +4,7 @@ Apache Kafka provides a mechanism to add interceptors to producers and consumers. These objects are managed by Kafka, not Spring, and so normal Spring dependency injection won't work for wiring in dependent Spring Beans. However, you can manually wire in those dependencies using the interceptor `config()` method. -The following Spring Boot application shows how to do this by overriding boot's default factories to add some dependent bean into the configuration properties. +The following Spring Boot application shows how to do this by overriding Spring Boot's default factories to add some dependent bean into the configuration properties. [source, java] ---- diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc index 8cad5e8a3b..5e1af5d7b0 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/micrometer.adoc @@ -4,8 +4,8 @@ [[monitoring-listener-performance]] == Monitoring Listener Performance -Starting with version 2.3, the listener container will automatically create and update Micrometer `Timer` s for the listener, if `Micrometer` is detected on the class path, and a single `MeterRegistry` is present in the application context. -The timers can be disabled by setting the `ContainerProperty` `micrometerEnabled` to `false`. +Starting with version 2.3, the listener container will automatically create and update Micrometer `Timer`+++s+++ for the listener, if `Micrometer` is detected on the class path, and a single `MeterRegistry` is present in the application context. +The timers can be disabled by setting the `ContainerProperty`+++'+++s `micrometerEnabled` to `false`. Two timers are maintained - one for successful calls to the listener and one for failures. @@ -15,16 +15,16 @@ The timers are named `spring.kafka.listener` and have the following tags: * `result` : `success` or `failure` * `exception` : `none` or `ListenerExecutionFailedException` -You can add additional tags using the `ContainerProperties` `micrometerTags` property. +You can add additional tags using the `ContainerProperties`+++'+++s `micrometerTags` property. -Starting with versions 2.9.8, 3.0.6, you can provide a function in `ContainerProperties` `micrometerTagsProvider`; the function receives the `ConsumerRecord` and returns tags which can be based on that record, and merged with any static tags in `micrometerTags`. +Starting with versions 2.9.8, 3.0.6, you can provide a function in `ContainerProperties`+++'+++s `micrometerTagsProvider`; the function receives the `ConsumerRecord` and returns tags which can be based on that record, and merged with any static tags in `micrometerTags`. NOTE: With the concurrent container, timers are created for each thread and the `name` tag is suffixed with `-n` where n is `0` to `concurrency-1`. [[monitoring-kafkatemplate-performance]] == Monitoring KafkaTemplate Performance -Starting with version 2.5, the template will automatically create and update Micrometer `Timer` s for send operations, if `Micrometer` is detected on the class path, and a single `MeterRegistry` is present in the application context. +Starting with version 2.5, the template will automatically create and update Micrometer `Timer`+++s for send operations, if `Micrometer` is detected on the class path, and a single `MeterRegistry` is present in the application context. The timers can be disabled by setting the template's `micrometerEnabled` property to `false`. Two timers are maintained - one for successful calls to the listener and one for failures. @@ -83,7 +83,7 @@ double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total") .tag("customTag", "customTagValue") .tag("spring.id", "myProducerFactory.myClientId-1") .functionCounter() - .count() + .count(); ---- A similar listener is provided for the `StreamsBuilderFactoryBean` - see xref:streams.adoc#streams-micrometer[KafkaStreams Micrometer Support]. @@ -103,7 +103,7 @@ The default implementations add the `bean.name` tag for template observations an You can either subclass `DefaultKafkaTemplateObservationConvention` or `DefaultKafkaListenerObservationConvention` or provide completely new implementations. -See xref:appendix.adoc#observation-gen[Micrometer Observation Documentation] for details of the default observations that are recorded. +See xref:appendix/micrometer.adoc#observation-gen[Micrometer Observation Documentation] for details of the default observations that are recorded. Starting with version 3.0.6, you can add dynamic tags to the timers and traces, based on information in the consumer or producer records. To do so, add a custom `KafkaListenerObservationConvention` and/or `KafkaTemplateObservationConvention` to the listener container properties or `KafkaTemplate` respectively. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/pause-resume-partitions.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/pause-resume-partitions.adoc index c6f08ed7cf..31bd68be5c 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/pause-resume-partitions.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/pause-resume-partitions.adoc @@ -3,7 +3,7 @@ :page-section-summary-toc: 1 Since version 2.7 you can pause and resume the consumption of specific partitions assigned to that consumer by using the `pausePartition(TopicPartition topicPartition)` and `resumePartition(TopicPartition topicPartition)` methods in the listener containers. -The pausing and resuming takes place respectively before and after the `poll()` similar to the `pause()` and `resume()` methods. +The pausing and resuming take place respectively before and after the `poll()` similar to the `pause()` and `resume()` methods. The `isPartitionPauseRequested()` method returns true if pause for that partition has been requested. The `isPartitionPaused()` method returns true if that partition has effectively been paused. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/producer-interceptor-managed-in-spring.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/producer-interceptor-managed-in-spring.adoc index a5a5785b57..3696b0b0d9 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/producer-interceptor-managed-in-spring.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/producer-interceptor-managed-in-spring.adoc @@ -17,7 +17,6 @@ public class MyProducerInterceptor implements ProducerInterceptor configs) { - } @Override @@ -47,7 +46,7 @@ public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) { @Bean public KafkaTemplate kafkaTemplate(ProducerFactory pf, MyProducerInterceptor myProducerInterceptor) { - KafkaTemplate kafkaTemplate = new KafkaTemplate(pf); + KafkaTemplate kafkaTemplate = new KafkaTemplate<>(pf); kafkaTemplate.setProducerInterceptor(myProducerInterceptor); } ---- diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/sending-messages.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/sending-messages.adoc index 911fc01347..d5cd0f6773 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/sending-messages.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/sending-messages.adoc @@ -275,7 +275,7 @@ For another technique to achieve similar results, but with the additional capabi As seen in xref:kafka/sending-messages.adoc#kafka-template[Using `KafkaTemplate`], a `ProducerFactory` is used to create the producer. -When not using xref:kafka/transactions.adoc[Transactions], by default, the `DefaultKafkaProducerFactory` creates a singleton producer used by all clients, as recommended in the `KafkaProducer` javadocs. +When not using xref:kafka/transactions.adoc[Transactions], by default, the `DefaultKafkaProducerFactory` creates a singleton producer used by all clients, as recommended in the `KafkaProducer` JavaDocs. However, if you call `flush()` on the template, this can cause delays for other threads using the same producer. Starting with version 2.3, the `DefaultKafkaProducerFactory` has a new property `producerPerThread`. When set to `true`, the factory will create (and cache) a separate producer for each thread, to avoid this issue. @@ -708,6 +708,6 @@ After a rebalance, it is possible for duplicate reply deliveries; these will be NOTE: If you use an xref:kafka/serdes.adoc#error-handling-deserializer[`ErrorHandlingDeserializer`] with this aggregating template, the framework will not automatically detect `DeserializationException`+++s+++. Instead, the record (with a `null` value) will be returned intact, with the deserialization exception(s) in headers. It is recommended that applications call the utility method `ReplyingKafkaTemplate.checkDeserialization()` method to determine if a deserialization exception occurred. -See its javadocs for more information. +See its JavaDocs for more information. The `replyErrorChecker` is also not called for this aggregating template; you should perform the checks on each element of the reply. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc index 7262f2d3de..7a53567f0c 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc @@ -23,7 +23,7 @@ For more complex or particular cases, the `KafkaConsumer` (and, therefore, `Kafk constructors to accept `Serializer` and `Deserializer` instances for `keys` and `values`, respectively. When you use this API, the `DefaultKafkaProducerFactory` and `DefaultKafkaConsumerFactory` also provide properties (through constructors or setter methods) to inject custom `Serializer` and `Deserializer` instances into the target `Producer` or `Consumer`. -Also, you can pass in `Supplier` or `Supplier` instances through constructors - these `Supplier` s are called on creation of each `Producer` or `Consumer`. +Also, you can pass in `Supplier` or `Supplier` instances through constructors - these `Supplier`+++s+++ are called on creation of each `Producer` or `Consumer`. [[string-serde]] == String serialization @@ -40,7 +40,7 @@ ParseStringDeserializer deserializer = new ParseStringDeserializer<>(Thin ---- By default, the `ToStringSerializer` is configured to convey type information about the serialized entity in the record `Headers`. -You can disable this by setting the `addTypeInfo` property to false. +You can disable this by setting the `addTypeInfo` property to `false`. This information can be used by `ParseStringDeserializer` on the receiving side. * `ToStringSerializer.ADD_TYPE_INFO_HEADERS` (default `true`): You can set it to `false` to disable this feature on the `ToStringSerializer` (sets the `addTypeInfo` property). @@ -111,7 +111,7 @@ They have no effect if you have provided `Serializer` and `Deserializer` instanc * `JsonDeserializer.KEY_DEFAULT_TYPE`: Fallback type for deserialization of keys if no header information is present. * `JsonDeserializer.VALUE_DEFAULT_TYPE`: Fallback type for deserialization of values if no header information is present. * `JsonDeserializer.TRUSTED_PACKAGES` (default `java.util`, `java.lang`): Comma-delimited list of package patterns allowed for deserialization. -`*` means deserialize all. +`*` means deserializing all. * `JsonDeserializer.TYPE_MAPPINGS` (default `empty`): See xref:kafka/serdes.adoc#serdes-mapping-types[Mapping Types]. * `JsonDeserializer.KEY_TYPE_METHOD` (default `empty`): See xref:kafka/serdes.adoc#serdes-type-methods[Using Methods to Determine Types]. * `JsonDeserializer.VALUE_TYPE_METHOD` (default `empty`): See xref:kafka/serdes.adoc#serdes-type-methods[Using Methods to Determine Types]. @@ -176,7 +176,6 @@ public ConsumerFactory kafkaConsumerFactory(JsonDeserializer cust @Bean public ProducerFactory kafkaProducerFactory(JsonSerializer customValueSerializer) { - return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(), new StringSerializer(), customValueSerializer); } @@ -186,7 +185,7 @@ public ProducerFactory kafkaProducerFactory(JsonSerializer custom Setters are also provided, as an alternative to using these constructors. ==== -Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean `useHeadersIfPresent` (which is `true` by default). +Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean `useHeadersIfPresent` parameter (which is `true` by default). The following example shows how to do so: [source, java] @@ -209,7 +208,6 @@ The method must be declared as `public static`, have one of three signatures `(S You can use arbitrary headers or inspect the data to determine the type. -.Example [source, java] ---- JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class); @@ -282,7 +280,6 @@ public ConsumerFactory cf() { To provide type mapping programmatically, similar to xref:kafka/serdes.adoc#serdes-type-methods[Using Methods to Determine Types], use the `typeFunction` property. -.Example [source, java] ---- JsonDeserializer deser = new JsonDeserializer<>() @@ -355,7 +352,7 @@ In this case, if there are amiguous matches, an ordered `Map`, such as a `Linked === By Topic Starting with version 2.8, the `DelegatingByTopicSerializer` and `DelegatingByTopicDeserializer` allow selection of a serializer/deserializer based on the topic name. -Regex `Pattern` s are used to lookup the instance to use. +Regex `Pattern`+++s+++ are used to lookup the instance to use. The map can be configured using a constructor, or via properties (a comma delimited list of `pattern:serializer`). [source, java] @@ -391,7 +388,7 @@ An additional property `DelegatingByTopicSerialization.CASE_SENSITIVE` (default [[retrying-deserialization]] == Retrying Deserializer -The `RetryingDeserializer` uses a delegate `Deserializer` and `RetryTemplate` to retry deserialization when the delegate might have transient errors, such a network issues, during deserialization. +The `RetryingDeserializer` uses a delegate `Deserializer` and `RetryTemplate` to retry deserialization when the delegate might have transient errors, such as network issues, during deserialization. [source, java] ---- @@ -460,7 +457,7 @@ Starting with version 2.7.1, message payload conversion can be delegated to a `s IMPORTANT: The `KafkaMessageConverter.fromMessage()` method is called for outbound conversion to a `ProducerRecord` with the message payload in the `ProducerRecord.value()` property. The `KafkaMessageConverter.toMessage()` method is called for inbound conversion from `ConsumerRecord` with the payload being the `ConsumerRecord.value()` property. -The `SmartMessageConverter.toMessage()` method is called to create a new outbound `Message` from the `Message` passed to`fromMessage()` (usually by `KafkaTemplate.send(Message msg)`). +The `SmartMessageConverter.toMessage()` method is called to create a new outbound `Message` from the `Message` passed to `fromMessage()` (usually by `KafkaTemplate.send(Message msg)`). Similarly, in the `KafkaMessageConverter.toMessage()` method, after the converter has created a new `Message` from the `ConsumerRecord`, the `SmartMessageConverter.fromMessage()` method is called and then the final inbound message is created with the newly converted payload. In either case, if the `SmartMessageConverter` returns `null`, the original message is used. @@ -509,7 +506,7 @@ public void projection(SomeSample in) { ---- Accessor methods will be used to lookup the property name as field in the received JSON document by default. -The `@JsonPath` expression allows customization of the value lookup, and even to define multiple JSON Path expressions, to lookup values from multiple places until an expression returns an actual value. +The `@JsonPath` expression allows customization of the value lookup, and even to define multiple JSON Path expressions, to look up values from multiple places until an expression returns an actual value. To enable this feature, use a `ProjectingMessageConverter` configured with an appropriate delegate converter (used for outbound conversion and converting non-projection interfaces). You must also add `spring-data:spring-data-commons` and `com.jayway.jsonpath:json-path` to the class path. @@ -590,7 +587,7 @@ consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider. ... ---- -IMPORTANT: If the consumer is configured with an `ErrorHandlingDeserializer` it is important to configure the `KafkaTemplate` and its producer with a serializer that can handle normal objects as well as raw `byte[]` values, which result from deserialization exceptions. +IMPORTANT: If the consumer is configured with an `ErrorHandlingDeserializer`, it is important to configure the `KafkaTemplate` and its producer with a serializer that can handle normal objects as well as raw `byte[]` values, which result from deserialization exceptions. The generic value type of the template should be `Object`. One technique is to use the `DelegatingByTypeSerializer`; an example follows: @@ -717,7 +714,7 @@ The following example shows how to do so: [source, java] ---- @KafkaListener(topics = "blc3", groupId = "blc3") -public void listen1(List> fooMessages) { +public void listen(List> fooMessages) { ... } ---- @@ -725,7 +722,7 @@ public void listen1(List> fooMessages) { [[conversionservice-customization]] == `ConversionService` Customization -Starting with version 2.1.1, the `org.springframework.core.convert.ConversionService` used by the default `o.s.messaging.handler.annotation.support.MessageHandlerMethodFactory` to resolve parameters for the invocation of a listener method is supplied with all beans that implement any of the following interfaces: +Starting with version 2.1.1, the `org.springframework.core.convert.ConversionService` used by the default `org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory` to resolve parameters for the invocation of a listener method is supplied with all beans that implement any of the following interfaces: * `org.springframework.core.convert.converter.Converter` * `org.springframework.core.convert.converter.GenericConverter` @@ -736,7 +733,7 @@ This lets you further customize listener deserialization without changing the de IMPORTANT: Setting a custom `MessageHandlerMethodFactory` on the `KafkaListenerEndpointRegistrar` through a `KafkaListenerConfigurer` bean disables this feature. [[custom-arg-resolve]] -== Adding custom `HandlerMethodArgumentResolver` to `@KafkaListener` +== Adding Custom `HandlerMethodArgumentResolver` to `@KafkaListener` Starting with version 2.4.2 you are able to add your own `HandlerMethodArgumentResolver` and resolve custom method parameters. All you need is to implement `KafkaListenerConfigurer` and use method `setCustomMethodArgumentResolvers()` from class `KafkaListenerEndpointRegistrar`. @@ -773,5 +770,5 @@ You can also completely replace the framework's argument resolution by adding a If you do this, and your application needs to handle tombstone records, with a `null` `value()` (e.g. from a compacted topic), you should add a `KafkaNullAwarePayloadArgumentResolver` to the factory; it must be the last resolver because it supports all types and can match arguments without a `@Payload` annotation. If you are using a `DefaultMessageHandlerMethodFactory`, set this resolver as the last custom resolver; the factory will ensure that this resolver will be used before the standard `PayloadMethodArgumentResolver`, which has no knowledge of `KafkaNull` payloads. -See also xref:kafka/tombstones.adoc[Null Payloads and Log Compaction of 'Tombstone' Records]. +See also xref:kafka/tombstones.adoc[Null Payloads and Log Compaction of `Tombstone` Records]. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/tombstones.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/tombstones.adoc index e11d4b0b51..e358ef8666 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/tombstones.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/tombstones.adoc @@ -13,7 +13,7 @@ For convenience, the static `KafkaNull.INSTANCE` is provided. When you use a message listener container, the received `ConsumerRecord` has a `null` `value()`. To configure the `@KafkaListener` to handle `null` payloads, you must use the `@Payload` annotation with `required = false`. -If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was "`deleted`". +If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was +++"+++`deleted`+++"+++. The following example shows such a configuration: [source, java] diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc index e1a9350110..e5100aec55 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc @@ -9,7 +9,7 @@ This section describes how Spring for Apache Kafka supports transactions. The 0.11.0.0 client library added support for transactions. Spring for Apache Kafka adds support in the following ways: -* `KafkaTransactionManager`: Used with normal Spring transaction support (`@Transactional`, `TransactionTemplate` etc). +* `KafkaTransactionManager`: Used with normal Spring transaction support (`@Transactional`, `TransactionTemplate`, etc) * Transactional `KafkaMessageListenerContainer` * Local transactions with `KafkaTemplate` * Transaction synchronization with other transaction managers @@ -25,12 +25,12 @@ Also see xref:kafka/exactly-once.adoc[Exactly Once Semantics]. Also see xref:kafka/transactions.adoc#transaction-id-prefix[`transactionIdPrefix`]. -With Spring Boot, it is only necessary to set the `spring.kafka.producer.transaction-id-prefix` property - Boot will automatically configure a `KafkaTransactionManager` bean and wire it into the listener container. +With Spring Boot, it is only necessary to set the `spring.kafka.producer.transaction-id-prefix` property - Spring Boot will automatically configure a `KafkaTransactionManager` bean and wire it into the listener container. IMPORTANT: Starting with version 2.5.8, you can now configure the `maxAge` property on the producer factory. This is useful when using transactional producers that might lay idle for the broker's `transactional.id.expiration.ms`. With current `kafka-clients`, this can cause a `ProducerFencedException` without a rebalance. -By setting the `maxAge` to less than `transactional.id.expiration.ms`, the factory will refresh the producer if it is past it's max age. +By setting the `maxAge` to less than `transactional.id.expiration.ms`, the factory will refresh the producer if it is past its max age. [[using-kafkatransactionmanager]] == Using `KafkaTransactionManager` @@ -68,13 +68,13 @@ If you wish the commits to be performed in the reverse order (Kafka first), use See xref:tips.adoc#ex-jdbc-sync[Examples of Kafka Transactions with Other Transaction Managers] for examples of an application that synchronizes JDBC and Kafka transactions in Kafka-first or DB-first configurations. NOTE: Starting with versions 2.5.17, 2.6.12, 2.7.9 and 2.8.0, if the commit fails on the synchronized transaction (after the primary transaction has committed), the exception will be thrown to the caller. -Previously, this was silently ignored (logged at debug). +Previously, this was silently ignored (logged at debug level). Applications should take remedial action, if necessary, to compensate for the committed primary transaction. [[container-transaction-manager]] == Using Consumer-Initiated Transactions -The `ChainedKafkaTransactionManager` is now deprecated, since version 2.7; see the javadocs for its super class `ChainedTransactionManager` for more information. +The `ChainedKafkaTransactionManager` is now deprecated, since version 2.7; see the JavaDocs for its super class `ChainedTransactionManager` for more information. Instead, use a `KafkaTransactionManager` in the container to start the Kafka transaction and annotate the listener method with `@Transactional` to start the other transaction. See xref:tips.adoc#ex-jdbc-sync[Examples of Kafka Transactions with Other Transaction Managers] for an example application that chains JDBC and Kafka transactions. @@ -114,7 +114,7 @@ Normally, when a `KafkaTemplate` is transactional (configured with a transaction The transaction can be started by a `TransactionTemplate`, a `@Transactional` method, calling `executeInTransaction`, or by a listener container, when configured with a `KafkaTransactionManager`. Any attempt to use the template outside the scope of a transaction results in the template throwing an `IllegalStateException`. Starting with version 2.4.3, you can set the template's `allowNonTransactional` property to `true`. -In that case, the template will allow the operation to run without a transaction, by calling the `ProducerFactory` 's `createNonTransactionalProducer()` method; the producer will be cached, or thread-bound, as normal for reuse. +In that case, the template will allow the operation to run without a transaction, by calling the `ProducerFactory`+++'+++s `createNonTransactionalProducer()` method; the producer will be cached, or thread-bound, as normal for reuse. See xref:kafka/sending-messages.adoc#producer-factory[Using `DefaultKafkaProducerFactory`]. [[transactions-batch]] @@ -125,7 +125,7 @@ When using the default `AfterRollbackProcessor` with a record listener, seeks ar With a batch listener, however, the whole batch will be redelivered because the framework doesn't know which record in the batch failed. See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Processor] for more information. -When using a batch listener, version 2.4.2 introduced an alternative mechanism to deal with failures while processing a batch; the `BatchToRecordAdapter`. +When using a batch listener, version 2.4.2 introduced an alternative mechanism to deal with failures while processing a batch: `BatchToRecordAdapter`. When a container factory with `batchListener` set to true is configured with a `BatchToRecordAdapter`, the listener is invoked with one record at a time. This enables error handling within the batch, while still making it possible to stop processing the entire batch, depending on the exception type. A default `BatchToRecordAdapter` is provided, that can be configured with a standard `ConsumerRecordRecoverer` such as the `DeadLetterPublishingRecoverer`. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic.adoc index c18b928e69..08d02f3f2f 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic.adoc @@ -7,5 +7,5 @@ Version 2.9 changed the mechanism to bootstrap infrastructure beans; see xref:re Achieving non-blocking retry / dlt functionality with Kafka usually requires setting up extra topics and creating and configuring the corresponding listeners. Since 2.7 Spring for Apache Kafka offers support for that via the `@RetryableTopic` annotation and `RetryTopicConfiguration` class to simplify that bootstrapping. -IMPORTANT: Non-blocking retries are not supported with <>. +IMPORTANT: Non-blocking retries are not supported with xref:kafka/receiving-messages/listener-annotation.adoc#batch-listeners[Batch Listeners]. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/how-the-pattern-works.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/how-the-pattern-works.adoc index 6a04804942..7db1f32f64 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/how-the-pattern-works.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/how-the-pattern-works.adoc @@ -1,19 +1,19 @@ [[how-the-pattern-works]] -= How The Pattern Works += How the Pattern Works If message processing fails, the message is forwarded to a retry topic with a back off timestamp. The retry topic consumer then checks the timestamp and if it's not due it pauses the consumption for that topic's partition. When it is due the partition consumption is resumed, and the message is consumed again. If the message processing fails again the message will be forwarded to the next retry topic, and the pattern is repeated until a successful processing occurs, or the attempts are exhausted, and the message is sent to the Dead Letter Topic (if configured). -To illustrate, if you have a "main-topic" topic, and want to setup non-blocking retry with an exponential backoff of 1000ms with a multiplier of 2 and 4 max attempts, it will create the main-topic-retry-1000, main-topic-retry-2000, main-topic-retry-4000 and main-topic-dlt topics and configure the respective consumers. +To illustrate, if you have a "main-topic" topic, and want to set up non-blocking retry with an exponential backoff of 1000ms with a multiplier of 2 and 4 max attempts, it will create the main-topic-retry-1000, main-topic-retry-2000, main-topic-retry-4000 and main-topic-dlt topics and configure the respective consumers. The framework also takes care of creating the topics and setting up and configuring the listeners. IMPORTANT: By using this strategy you lose Kafka's ordering guarantees for that topic. IMPORTANT: You can set the `AckMode` mode you prefer, but `RECORD` is suggested. -IMPORTANT: At this time this functionality doesn't support class level `@KafkaListener` annotations +IMPORTANT: At this time this functionality doesn't support class level `@KafkaListener` annotations. When using a manual `AckMode` with `asyncAcks` set to true, the `DefaultErrorHandler` must be configured with `seekAfterError` set to `false`. Starting with versions 2.9.10, 3.0.8, this will be set to true unconditionally for such configurations. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/programmatic-construction.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/programmatic-construction.adoc index f2aa206c8f..d6143ad8ac 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/programmatic-construction.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/programmatic-construction.adoc @@ -10,7 +10,7 @@ The following Spring Boot application provides an example of how to do so. public class Application extends RetryTopicConfigurationSupport { public static void main(String[] args) { - SpringApplication.run(2Application.class, args); + SpringApplication.run(Application.class, args); } @Bean diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/retry-config.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/retry-config.adoc index 773752e0b5..6394863aa5 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/retry-config.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/retry-config.adoc @@ -24,7 +24,7 @@ To configure the retry topic and dlt for a `@KafkaListener` annotated method, yo @RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate") @KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId") public void processMessage(MyPojo message) { - // ... message processing + // ... message processing } ---- @@ -35,7 +35,7 @@ If no DltHandler method is provided a default consumer is created which only log ---- @DltHandler public void processMessage(MyPojo message) { -// ... message processing, persistence, etc + // ... message processing, persistence, etc } ---- @@ -72,7 +72,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate templa } ---- -This will create retry topics and a dlt, as well as the corresponding consumers, for all topics in methods annotated with '@KafkaListener' using the default configurations. The `KafkaTemplate` instance is required for message forwarding. +This will create retry topics and a dlt, as well as the corresponding consumers, for all topics in methods annotated with `+++@+++KafkaListener` using the default configurations. The `KafkaTemplate` instance is required for message forwarding. To achieve more fine-grained control over how to handle non-blocking retrials for each topic, more than one `RetryTopicConfiguration` bean can be provided. @@ -104,7 +104,7 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate producerFactory() { - return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(), - new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(), - MyNormalObject.class, new JsonSerializer()))); + return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(), + new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(), + MyNormalObject.class, new JsonSerializer()))); } @Bean public KafkaTemplate kafkaTemplate() { - return new KafkaTemplate<>(producerFactory()); + return new KafkaTemplate<>(producerFactory()); } ---- @@ -168,7 +168,7 @@ IMPORTANT: When using this configuration approach, the `@EnableKafkaRetryTopic` Use the simple `@EnableKafka` annotation instead. When `autoCreateTopics` is true, the main and retry topics will be created with the specified number of partitions and replication factor. -Starting with version 3.0, the default replication factor is `-1`, meaning use the broker default. +Starting with version 3.0, the default replication factor is `-1`, meaning using the broker default. If your broker version is earlier than 2.4, you will need to set an explicit value. To override these values for a particular topic (e.g. the main topic or DLT), simply add a `NewTopic` `@Bean` with the required properties; that will override the auto creation properties. diff --git a/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/started/noboot/Config.java b/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/started/noboot/Config.java index 3585be826f..3f38be4f08 100644 --- a/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/started/noboot/Config.java +++ b/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/started/noboot/Config.java @@ -100,7 +100,7 @@ private Map senderProps() { @Bean public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { - return new KafkaTemplate(producerFactory); + return new KafkaTemplate<>(producerFactory); } }