Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ ext {
scalaVersion = '2.13'
springBootVersion = '3.5.0' // docs module
springDataVersion = '2025.1.0-M5'
springRetryVersion = '2.0.12'
springVersion = '7.0.0-M8'
springVersion = '7.0.0-SNAPSHOT'

idPrefix = 'kafka'

Expand Down Expand Up @@ -249,9 +248,6 @@ project ('spring-kafka') {
api 'org.springframework:spring-context'
api 'org.springframework:spring-messaging'
api 'org.springframework:spring-tx'
api ("org.springframework.retry:spring-retry:$springRetryVersion") {
exclude group: 'org.springframework'
}
api "org.apache.kafka:kafka-clients:$kafkaVersion"
api 'io.micrometer:micrometer-observation'
optionalApi "org.apache.kafka:kafka-streams:$kafkaVersion"
Expand Down Expand Up @@ -322,7 +318,6 @@ project ('spring-kafka-test') {
api "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
api 'org.springframework:spring-context'
api 'org.springframework:spring-test'
api "org.springframework.retry:spring-retry:$springRetryVersion"

api "org.apache.kafka:kafka-clients:$kafkaVersion:test"
api "org.apache.kafka:kafka-server:$kafkaVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.annotation.Backoff;
import org.springframework.kafka.annotation.Backoff;

/**
* Sample shows use of topic-based retry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ The exceptions that are considered fatal, by default, are:
since these exceptions are unlikely to be resolved on a retried delivery.

You can add more exception types to the not-retryable category, or completely replace the map of classified exceptions.
See the Javadocs for `DefaultErrorHandler.addNotRetryableException()` and `DefaultErrorHandler.setClassifications()` for more information, as well as those for the `spring-retry` `BinaryExceptionClassifier`.
See the Javadocs for `DefaultErrorHandler.addNotRetryableException()` and `DefaultErrorHandler.setClassifications()` for more information, as well as `ExceptionMatcher`.

Here is an example that adds `IllegalArgumentException` to the not-retryable exceptions:

Expand Down Expand Up @@ -502,7 +502,7 @@ The exceptions that are considered fatal, by default, are:
since these exceptions are unlikely to be resolved on a retried delivery.

You can add more exception types to the not-retryable category, or completely replace the map of classified exceptions.
See the Javadocs for `DefaultAfterRollbackProcessor.setClassifications()` for more information, as well as those for the `spring-retry` `BinaryExceptionClassifier`.
See the Javadocs for `DefaultAfterRollbackProcessor.setClassifications()` for more information, as well as `ExceptionMatcher`.

Here is an example that adds `IllegalArgumentException` to the not-retryable exceptions:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,10 @@ ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
----

Starting with version `3.1.2`, a `RecoveryCallback` can be set on the `RetryingDeserializer` optionally.
A recovery callback be set on the `RetryingDeserializer`, to return a fallback object
if all retries are exhausted.

Refer to the https://github.com/spring-projects/spring-retry[spring-retry] project for configuration of the `RetryTemplate` with a retry policy, back off policy, etc.
Refer to the https://github.com/spring-projects/spring-framework[Spring Framework] project for configuration of the `RetryTemplate` with a retry policy, back off, etc.


[[messaging-message-conversion]]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Copyright 2018-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.core.annotation.AliasFor;
import org.springframework.core.retry.RetryPolicy;
import org.springframework.format.annotation.DurationFormat;
import org.springframework.util.backoff.ExponentialBackOff;
import org.springframework.util.backoff.FixedBackOff;

/**
* Collects metadata for creating a {@link org.springframework.util.backoff.BackOff BacOff}
* instance as part of a {@link RetryPolicy}. Values can be provided as is or using a
* {@code *String} equivalent that supports more format, as well as expression evaluations.
* <p>
* The available attributes lead to the following:
* <ul>
* <li>With no explicit settings, the default is a {@link FixedBackOff} with a delay of
* {@value #DEFAULT_DELAY} ms</li>
* <li>With only {@link #delay()} set: the backoff is a fixed delay with that value</li>
* <li>In all other cases, an {@link ExponentialBackOff} is created with the values of
* {@link #delay()} (default: {@value RetryPolicy.Builder#DEFAULT_DELAY} ms),
* {@link #maxDelay()} (default: no maximum), {@link #multiplier()}
* (default: {@value RetryPolicy.Builder#DEFAULT_MULTIPLIER}) and {@link #jitter()}
* (default: no jitter).</li>
* </ul>
*
* @author Dave Syer
* @author Gary Russell
* @author Aftab Shaikh
* @author Stephane Nicoll
* @since 4.0
*/
@Target(ElementType.ANNOTATION_TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface BackOff {

/**
* Default {@link #delay()} in milliseconds.
*/
long DEFAULT_DELAY = 1000;

/**
* Alias for {@link #delay()}.
* <p>Intended to be used when no other attributes are needed, for example:
* {@code @BackOff(2000)}.
*
* @return the based delay in milliseconds (default{@value DEFAULT_DELAY})
*/
@AliasFor("delay")
long value() default DEFAULT_DELAY;

/**
* Specify the base delay after the initial invocation.
* <p>If only a {@code delay} is specified, a {@link FixedBackOff} with that value
* as the interval is configured.
* <p>If a {@linkplain #multiplier() multiplier} is specified, this serves as the
* initial delay to multiply from.
* <p>The default is {@value DEFAULT_DELAY} milliseconds.
*
* @return the based delay in milliseconds (default{@value DEFAULT_DELAY})
*/
@AliasFor("value")
long delay() default DEFAULT_DELAY;

/**
* Specify the base delay after the initial invocation using a String format. If
* this is specified, takes precedence over {@link #delay()}.
* <p>The delay String can be in several formats:
* <ul>
* <li>a plain long &mdash; which is interpreted to represent a duration in
* milliseconds</li>
* <li>any of the known {@link DurationFormat.Style}: the {@link DurationFormat.Style#ISO8601 ISO8601}
* style or the {@link DurationFormat.Style#SIMPLE SIMPLE} style &mdash; using
* milliseconds as fallback if the string doesn't contain an explicit unit</li>
* <li>Regular expressions, such as {@code ${example.property}} to use the
* {@code example.property} from the environment</li>
* </ul>
*
* @return the based delay as a String value &mdash; for example a placeholder
* @see #delay()
*/
String delayString() default "";

/**
* Specify the maximum delay for any retry attempt, limiting how far
* {@linkplain #jitter jitter} and the {@linkplain #multiplier() multiplier} can
* increase the {@linkplain #delay() delay}.
* <p>Ignored if only {@link #delay()} is set, otherwise an {@link ExponentialBackOff}
* with the given max delay or an unlimited delay if not set.
*
* @return the maximum delay
*/
long maxDelay() default 0;

/**
* Specify the maximum delay for any retry attempt using a String format. If this is
* specified, takes precedence over {@link #maxDelay()}..
* <p>The max delay String can be in several formats:
* <ul>
* <li>a plain long &mdash; which is interpreted to represent a duration in
* milliseconds</li>
* <li>any of the known {@link DurationFormat.Style}: the {@link DurationFormat.Style#ISO8601 ISO8601}
* style or the {@link DurationFormat.Style#SIMPLE SIMPLE} style &mdash; using
* milliseconds as fallback if the string doesn't contain an explicit unit</li>
* <li>Regular expressions, such as {@code ${example.property}} to use the
* {@code example.property} from the environment</li>
* </ul>
*
* @return the max delay as a String value &mdash; for example a placeholder
* @see #maxDelay()
*/
String maxDelayString() default "";

/**
* Specify a multiplier for a delay for the next retry attempt, applied to the previous
* delay, starting with the initial {@linkplain #delay() delay} as well as to the
* applicable {@linkplain #jitter() jitter} for each attempt.
* <p>Ignored if only {@link #delay()} is set, otherwise an {@link ExponentialBackOff}
* with the given multiplier or {@code 1.0} if not set.
*
* @return the value to multiply the current interval by for each attempt
*/
double multiplier() default 0;

/**
* Specify a multiplier for a delay for the next retry attempt using a String format.
* If this is specified, takes precedence over {@link #multiplier()}.
* <p>The multiplier String can be in several formats:
* <ul>
* <li>a plain double</li>
* <li>Regular expressions, such as {@code ${example.property}} to use the
* {@code example.property} from the environment</li>
* </ul>
*
* @return the value to multiply the current interval by for each attempt &mdash;
* for example a placeholder
* @see #multiplier()
*/
String multiplierString() default "";

/**
* Specify a jitter value for the base retry attempt, randomly subtracted or added to
* the calculated delay, resulting in a value between {@code delay - jitter} and
* {@code delay + jitter} but never below the {@linkplain #delay() base delay} or
* above the {@linkplain #maxDelay() max delay}.
* <p>If a {@linkplain #multiplier() multiplier} is specified, it is applied to the
* jitter value as well.
* <p>Ignored if only {@link #delay()} is set, otherwise an {@link ExponentialBackOff}
* with the given jitter or no jitter if not set.
*
* @return the jitter value in milliseconds
* @see #delay()
* @see #maxDelay()
* @see #multiplier()
*/
long jitter() default 0;

/**
* Specify a jitter value for the base retry attempt using a String format. If this is
* specified, takes precedence over {@link #jitter()}.
* <p>The jitter String can be in several formats:
* <ul>
* <li>a plain long &mdash; which is interpreted to represent a duration in
* milliseconds</li>
* <li>any of the known {@link DurationFormat.Style}: the {@link DurationFormat.Style#ISO8601 ISO8601}
* style or the {@link DurationFormat.Style#SIMPLE SIMPLE} style &mdash; using
* milliseconds as fallback if the string doesn't contain an explicit unit</li>
* <li>Regular expressions, such as {@code ${example.property}} to use the
* {@code example.property} from the environment</li>
* </ul>
*
* @return the jitter as a String value &mdash; for example a placeholder
* @see #jitter()
*/
String jitterString() default "";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2018-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.annotation;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.jspecify.annotations.Nullable;

import org.springframework.core.retry.RetryPolicy;
import org.springframework.format.annotation.DurationFormat;
import org.springframework.format.datetime.standard.DurationFormatterUtils;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.StringValueResolver;
import org.springframework.util.backoff.FixedBackOff;

/**
* Create a {@link org.springframework.util.backoff.BackOff} from the state of a
* {@link BackOff @BackOff} annotation.
*
* @author Stephane Nicoll
*/
final class BackOffFactory {

private static final long DEFAULT_DELAY = 1000;

private final @Nullable StringValueResolver embeddedValueResolver;

BackOffFactory(@Nullable StringValueResolver embeddedValueResolver) {
this.embeddedValueResolver = embeddedValueResolver;
}

/**
* Create a {@link org.springframework.util.backoff.BackOff} instance based on the
* state of the given {@link BackOff @Backff}. The returned backoff instance has
* unlimited number of attempts as these are controlled externally.
*
* @param annotation the annotation to source the parameters from
* @return a {@link org.springframework.util.backoff.BackOff}
*/
public org.springframework.util.backoff.BackOff createFromAnnotation(BackOff annotation) { // NOSONAR
Duration delay = resolveDuration("delay", () -> annotation.delay() == DEFAULT_DELAY
? annotation.value() : annotation.delay(), annotation::delayString);
Duration maxDelay = resolveDuration("maxDelay", annotation::maxDelay, annotation::maxDelayString);
double multiplier = resolveMultiplier(annotation);
Duration jitter = resolveDuration("jitter", annotation::jitter, annotation::jitterString);
if (maxDelay == Duration.ZERO && multiplier == 0 && jitter == Duration.ZERO) {
Assert.isTrue(!delay.isNegative(),
() -> "Invalid delay (%dms): must be >= 0.".formatted(delay.toMillis()));
return new FixedBackOff(delay.toMillis());
}
RetryPolicy.Builder retryPolicyBuilder = RetryPolicy.builder().maxAttempts(Long.MAX_VALUE);
retryPolicyBuilder.delay(delay);
if (maxDelay != Duration.ZERO) {
retryPolicyBuilder.maxDelay(maxDelay);
}
if (multiplier != 0) {
retryPolicyBuilder.multiplier(multiplier);
}
if (jitter != Duration.ZERO) {
retryPolicyBuilder.jitter(jitter);
}
return retryPolicyBuilder.build().getBackOff();
}

private Duration resolveDuration(String attributeName, Supplier<@Nullable Long> valueRaw,
Supplier<String> valueString) {
String resolvedValue = resolve(valueString.get());
if (StringUtils.hasLength(resolvedValue)) {
try {
return toDuration(resolvedValue, TimeUnit.MILLISECONDS);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid duration value for '%s': '%s'; %s".formatted(attributeName, resolvedValue, ex));
}
}
Long raw = valueRaw.get();
return (raw != null && raw != 0) ? Duration.ofMillis(raw) : Duration.ZERO;
}

private Double resolveMultiplier(BackOff annotation) {
String resolvedMultiplier = resolve(annotation.multiplierString());
if (StringUtils.hasLength(resolvedMultiplier)) {
try {
return Double.valueOf(resolvedMultiplier);
}
catch (NumberFormatException ex) {
throw new IllegalArgumentException(
"Invalid multiplier: '%s'; %s".formatted(resolvedMultiplier, ex));
}
}
return annotation.multiplier();
}

private @Nullable String resolve(String valueString) {
if (StringUtils.hasLength(valueString) && this.embeddedValueResolver != null) {
return this.embeddedValueResolver.resolveStringValue(valueString);
}
return valueString;
}

private static Duration toDuration(String valueToResolve, TimeUnit timeUnit) {
DurationFormat.Unit unit = DurationFormat.Unit.fromChronoUnit(timeUnit.toChronoUnit());
return DurationFormatterUtils.detectAndParse(valueToResolve, unit);
}

}
Loading