Skip to content

Commit 82cb03e

Browse files
authored
GH-1902: AuthenticationException Fatal By Default
Resolves #1902 Rename property and use for both exception types. * Fix checkstyle.
1 parent 15e771a commit 82cb03e

File tree

5 files changed

+105
-24
lines changed

5 files changed

+105
-24
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1128,12 +1128,12 @@ It does not apply if the container is configured to listen to a topic pattern (r
11281128
Previously, the container threads looped within the `consumer.poll()` method waiting for the topic to appear while logging many messages.
11291129
Aside from the logs, there was no indication that there was a problem.
11301130

1131-
As of version 2.3.5, a new container property called `authorizationExceptionRetryInterval` has been introduced.
1132-
This causes the container to retry fetching messages after getting any `AuthorizationException` from `KafkaConsumer`.
1133-
This can happen when, for example, the configured user is denied access to read certain topic.
1134-
Defining `authorizationExceptionRetryInterval` should help the application to recover as soon as proper permissions are granted.
1131+
As of version 2.8, a new container property `authExceptionRetryInterval` has been introduced.
1132+
This causes the container to retry fetching messages after getting any `AuthenticationException` or `AuthorizationException` from the `KafkaConsumer`.
1133+
This can happen when, for example, the configured user is denied access to read a certain topic or credentials are incorrect.
1134+
Defining `authExceptionRetryInterval` allows the container to recover when proper permissions are granted.
11351135

1136-
NOTE: By default, no interval is configured - authorization errors are considered fatal, which causes the container to stop.
1136+
NOTE: By default, no interval is configured - authentication and authorization errors are considered fatal, which causes the container to stop.
11371137

11381138
Starting with version 2.8, when creating the consumer factory, if you provide deserializers as objects (in the constructor or via the setters), the factory will invoke the `configure()` method to configure them with the configuration properties.
11391139

@@ -2433,9 +2433,9 @@ When creating the `TopicPartitionOffset` s for the request, only positive, absol
24332433
|Whether or not to commit the initial position on assignment; by default, the initial offset will only be committed if the `ConsumerConfig.AUTO_OFFSET_RESET_CONFIG` is `latest` and it won't run in a transaction even if there is a transaction manager present.
24342434
See the javadocs for `ContainerProperties.AssignmentCommitOption` for more information about the available options.
24352435

2436-
|[[authorizationExceptionRetryInterval]]<<authorizationExceptionRetryInterval,`authorizationExceptionRetryInterval`>>
2436+
|[[authExceptionRetryInterval]]<<authExceptionRetryInterval,`authExceptionRetryInterval`>>
24372437
|`null`
2438-
|When not null, a `Duration` to sleep between polls when an `AuthorizationException` is thrown by the Kafka client.
2438+
|When not null, a `Duration` to sleep between polls when an `AuthenticationException` or `AuthorizationException` is thrown by the Kafka client.
24392439
When null, such exceptions are considered fatal and the container will stop.
24402440

24412441
|[[clientId]]<<clientId,`clientId`>>
@@ -2805,7 +2805,7 @@ In addition, the `ConsumerStoppedEvent` has the following additional property:
28052805
** `NORMAL` - the consumer stopped normally (container was stopped).
28062806
** `ERROR` - a `java.lang.Error` was thrown.
28072807
** `FENCED` - the transactional producer was fenced and the `stopContainerWhenFenced` container property is `true`.
2808-
** `AUTH` - an `AuthorizationException` was thrown and the `authorizationExceptionRetryInterval` is not configured.
2808+
** `AUTH` - an `AuthenticationException` or `AuthorizationException` was thrown and the `authExceptionRetryInterval` is not configured.
28092809
** `NO_OFFSET` - there is no offset for a partition and the `auto.offset.reset` policy is `none`.
28102810

28112811
You can use this event to restart the container after such a condition:

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ See <<error-handlers>> for more information.
6161

6262
The `interceptBeforeTx` container property is now `true` by default.
6363

64+
The `authorizationExceptionRetryInterval` property has been renamed to `authExceptionRetryInterval` and now applies to `AuthenticationException` s in addition to `AuthorizationException` s previously.
65+
Both exceptions are considered fatal and the container will stop by default, unless this property is set.
66+
67+
See <<kafka-container>> and <<container-props>> for more information.
68+
6469
[[x28-serializers]]
6570
==== Serializer/Deserializer Changes
6671

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
2525
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
26+
import org.apache.kafka.common.errors.AuthenticationException;
2627

2728
import org.springframework.kafka.support.LogIfLevelEnabled;
2829
import org.springframework.kafka.support.TopicPartitionOffset;
@@ -101,7 +102,7 @@ public class ConsumerProperties {
101102

102103
private Properties kafkaConsumerProperties = new Properties();
103104

104-
private Duration authorizationExceptionRetryInterval;
105+
private Duration authExceptionRetryInterval;
105106

106107
private int commitRetries = DEFAULT_COMMIT_RETRIES;
107108

@@ -347,22 +348,54 @@ public void setKafkaConsumerProperties(Properties kafkaConsumerProperties) {
347348
this.kafkaConsumerProperties = kafkaConsumerProperties;
348349
}
349350

351+
/**
352+
* Get the authentication/authorization retry interval.
353+
* @return the interval.
354+
* @deprecated in favor of {@link #getAuthExceptionRetryInterval()}.
355+
*/
356+
@Deprecated
357+
@Nullable
350358
public Duration getAuthorizationExceptionRetryInterval() {
351-
return this.authorizationExceptionRetryInterval;
359+
return this.authExceptionRetryInterval;
352360
}
353361

354362
/**
355-
* Set the interval between retries after {@code AuthorizationException} is thrown
356-
* by {@code KafkaConsumer}. By default the field is null and retries are disabled.
357-
* In such case the container will be stopped.
363+
* Set the interval between retries after and {@link AuthenticationException} or
364+
* {@code AuthorizationException} is thrown by {@code KafkaConsumer}. By default the
365+
* field is null and retries are disabled. In such case the container will be stopped.
358366
*
359367
* The interval must be less than {@code max.poll.interval.ms} consumer property.
360368
*
361369
* @param authorizationExceptionRetryInterval the duration between retries
362370
* @since 2.3.5
371+
* @deprecated in favor of {@link #setAuthExceptionRetryInterval(Duration)}.
363372
*/
373+
@Deprecated
364374
public void setAuthorizationExceptionRetryInterval(Duration authorizationExceptionRetryInterval) {
365-
this.authorizationExceptionRetryInterval = authorizationExceptionRetryInterval;
375+
this.authExceptionRetryInterval = authorizationExceptionRetryInterval;
376+
}
377+
378+
/**
379+
* Get the authentication/authorization retry interval.
380+
* @return the interval.
381+
*/
382+
@Nullable
383+
public Duration getAuthExceptionRetryInterval() {
384+
return this.authExceptionRetryInterval;
385+
}
386+
387+
/**
388+
* Set the interval between retries after and {@link AuthenticationException} or
389+
* {@code AuthorizationException} is thrown by {@code KafkaConsumer}. By default the
390+
* field is null and retries are disabled. In such case the container will be stopped.
391+
*
392+
* The interval must be less than {@code max.poll.interval.ms} consumer property.
393+
*
394+
* @param authExceptionRetryInterval the duration between retries
395+
* @since 2.8
396+
*/
397+
public void setAuthExceptionRetryInterval(Duration authExceptionRetryInterval) {
398+
this.authExceptionRetryInterval = authExceptionRetryInterval;
366399
}
367400

368401
/**
@@ -449,7 +482,7 @@ protected final String renderProperties() {
449482
+ "\n syncCommits=" + this.syncCommits
450483
+ (this.syncCommitTimeout != null ? "\n syncCommitTimeout=" + this.syncCommitTimeout : "")
451484
+ (this.kafkaConsumerProperties.size() > 0 ? "\n properties=" + this.kafkaConsumerProperties : "")
452-
+ "\n authorizationExceptionRetryInterval=" + this.authorizationExceptionRetryInterval
485+
+ "\n authExceptionRetryInterval=" + this.authExceptionRetryInterval
453486
+ "\n commitRetries=" + this.commitRetries
454487
+ "\n fixTxOffsets" + this.fixTxOffsets;
455488
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.kafka.common.Metric;
5959
import org.apache.kafka.common.MetricName;
6060
import org.apache.kafka.common.TopicPartition;
61+
import org.apache.kafka.common.errors.AuthenticationException;
6162
import org.apache.kafka.common.errors.AuthorizationException;
6263
import org.apache.kafka.common.errors.FencedInstanceIdException;
6364
import org.apache.kafka.common.errors.ProducerFencedException;
@@ -488,7 +489,7 @@ private void publishConsumerStoppedEvent(@Nullable Throwable throwable) {
488489
else if (throwable instanceof StopAfterFenceException || throwable instanceof FencedInstanceIdException) {
489490
reason = Reason.FENCED;
490491
}
491-
else if (throwable instanceof AuthorizationException) {
492+
else if (throwable instanceof AuthenticationException || throwable instanceof AuthorizationException) {
492493
reason = Reason.AUTH;
493494
}
494495
else if (throwable instanceof NoOffsetForPartitionException) {
@@ -669,8 +670,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
669670

670671
private final boolean subBatchPerPartition;
671672

672-
private final Duration authorizationExceptionRetryInterval =
673-
this.containerProperties.getAuthorizationExceptionRetryInterval();
673+
private final Duration authExceptionRetryInterval =
674+
this.containerProperties.getAuthExceptionRetryInterval();
674675

675676
private final AssignmentCommitOption autoCommitOption = this.containerProperties.getAssignmentCommitOption();
676677

@@ -1238,20 +1239,23 @@ public void run() {
12381239
exitThrowable = nofpe;
12391240
break;
12401241
}
1241-
catch (AuthorizationException ae) {
1242-
if (this.authorizationExceptionRetryInterval == null) {
1243-
ListenerConsumer.this.logger.error(ae, "Authorization Exception and no authorizationExceptionRetryInterval set");
1242+
catch (AuthenticationException | AuthorizationException ae) {
1243+
if (this.authExceptionRetryInterval == null) {
1244+
ListenerConsumer.this.logger.error(ae,
1245+
"Authentcation/Authorization Exception and no authExceptionRetryInterval set");
12441246
this.fatalError = true;
12451247
exitThrowable = ae;
12461248
break;
12471249
}
12481250
else {
1249-
ListenerConsumer.this.logger.error(ae, "Authorization Exception, retrying in " + this.authorizationExceptionRetryInterval.toMillis() + " ms");
1251+
ListenerConsumer.this.logger.error(ae,
1252+
"Authentcation/Authorization Exception, retrying in "
1253+
+ this.authExceptionRetryInterval.toMillis() + " ms");
12501254
// We can't pause/resume here, as KafkaConsumer doesn't take pausing
12511255
// into account when committing, hence risk of being flooded with
12521256
// GroupAuthorizationExceptions.
12531257
// see: https://github.com/spring-projects/spring-kafka/pull/1337
1254-
sleepFor(this.authorizationExceptionRetryInterval);
1258+
sleepFor(this.authExceptionRetryInterval);
12551259
}
12561260
}
12571261
catch (FencedInstanceIdException fie) {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
7575
import org.apache.kafka.clients.producer.ProducerConfig;
7676
import org.apache.kafka.common.TopicPartition;
77+
import org.apache.kafka.common.errors.AuthenticationException;
7778
import org.apache.kafka.common.errors.AuthorizationException;
7879
import org.apache.kafka.common.errors.FencedInstanceIdException;
7980
import org.apache.kafka.common.errors.RebalanceInProgressException;
@@ -3029,6 +3030,44 @@ public void testCommitErrorHandlerCalled() throws Exception {
30293030
container.stop();
30303031
}
30313032

3033+
@SuppressWarnings({ "unchecked", "rawtypes" })
3034+
@Test
3035+
void testFatalErrorOnAuthenticationException() throws Exception {
3036+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
3037+
Consumer<Integer, String> consumer = mock(Consumer.class);
3038+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
3039+
given(cf.getConfigurationProperties()).willReturn(new HashMap<>());
3040+
3041+
willThrow(AuthenticationException.class)
3042+
.given(consumer).poll(any());
3043+
3044+
ContainerProperties containerProps = new ContainerProperties(topic1);
3045+
containerProps.setGroupId("grp");
3046+
containerProps.setClientId("clientId");
3047+
containerProps.setMessageListener((MessageListener) r -> { });
3048+
KafkaMessageListenerContainer<Integer, String> container =
3049+
new KafkaMessageListenerContainer<>(cf, containerProps);
3050+
3051+
AtomicReference<ConsumerStoppedEvent.Reason> reason = new AtomicReference<>();
3052+
CountDownLatch stopped = new CountDownLatch(1);
3053+
3054+
container.setApplicationEventPublisher(e -> {
3055+
if (e instanceof ConsumerStoppedEvent) {
3056+
reason.set(((ConsumerStoppedEvent) e).getReason());
3057+
stopped.countDown();
3058+
}
3059+
});
3060+
3061+
container.start();
3062+
try {
3063+
assertThat(stopped.await(10, TimeUnit.SECONDS)).isTrue();
3064+
assertThat(reason.get()).isEqualTo(Reason.AUTH);
3065+
}
3066+
finally {
3067+
container.stop();
3068+
}
3069+
}
3070+
30323071
@SuppressWarnings({ "unchecked", "rawtypes" })
30333072
@Test
30343073
void testFatalErrorOnAuthorizationException() throws Exception {
@@ -3080,7 +3119,7 @@ void testNotFatalErrorOnAuthorizationException() throws Exception {
30803119
containerProps.setGroupId("grp");
30813120
containerProps.setClientId("clientId");
30823121
containerProps.setMessageListener((MessageListener) r -> { });
3083-
containerProps.setAuthorizationExceptionRetryInterval(Duration.ofMillis(100));
3122+
containerProps.setAuthExceptionRetryInterval(Duration.ofMillis(100));
30843123
KafkaMessageListenerContainer<Integer, String> container =
30853124
new KafkaMessageListenerContainer<>(cf, containerProps);
30863125
container.start();

0 commit comments

Comments
 (0)