Skip to content

Commit e826a58

Browse files
garyrussellartembilan
authored andcommitted
GH-656: Fix seek on rollback
Fixes #656 Fixes #657 Previously, after a rollback, we only performed a `seek` on the failed record. We need to seek for all unprocessed records. Also, when no error handler was provided, and using a batch listener, the offsets were added to `acks` and incorrectly committed. (#657). Also, if a `ContainerAwareErrorHandler` "handles" the error, the offsets weren't committed. Enhance the tests to verify full seeks. Add a new test to verify the batch listener doesn't commit after a roll back. **cherry-pick to 2.1.x, 2.0.x** I will backport to 1.3.x after review. * Some simple polishing * Remove `@FunctionalInterface` since it's not for Java 7 * Refactor `DefaultAfterRollbackProcessor` to avoid extra loop
1 parent 877fd6a commit e826a58

File tree

9 files changed

+295
-60
lines changed

9 files changed

+295
-60
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.springframework.context.ApplicationEventPublisherAware;
2525
import org.springframework.kafka.core.ConsumerFactory;
2626
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
27+
import org.springframework.kafka.listener.AfterRollbackProcessor;
2728
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
2829
import org.springframework.kafka.listener.config.ContainerProperties;
2930
import org.springframework.kafka.support.converter.MessageConverter;
@@ -68,6 +69,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
6869

6970
private ApplicationEventPublisher applicationEventPublisher;
7071

72+
private AfterRollbackProcessor<K, V> afterRollbackProcessor;
73+
7174
/**
7275
* Specify a {@link ConsumerFactory} to use.
7376
* @param consumerFactory The consumer factory.
@@ -162,6 +165,17 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
162165
this.applicationEventPublisher = applicationEventPublisher;
163166
}
164167

168+
/**
169+
* Set a processor to invoke after a transaction rollback; typically will
170+
* seek the unprocessed topic/partition to reprocess the records.
171+
* The default does so, including the failed record.
172+
* @param afterRollbackProcessor the processor.
173+
* @since 1.3.5
174+
*/
175+
public void setAfterRollbackProcessor(AfterRollbackProcessor<K, V> afterRollbackProcessor) {
176+
this.afterRollbackProcessor = afterRollbackProcessor;
177+
}
178+
165179
/**
166180
* Obtain the properties template for this factory - set properties as needed
167181
* and they will be copied to a final properties instance for the endpoint.
@@ -232,6 +246,9 @@ protected void initializeContainer(C instance) {
232246
ContainerProperties properties = instance.getContainerProperties();
233247
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
234248
"messageListener", "ackCount", "ackTime");
249+
if (this.afterRollbackProcessor != null) {
250+
instance.setAfterRollbackProcessor(this.afterRollbackProcessor);
251+
}
235252
if (this.containerProperties.getAckCount() > 0) {
236253
properties.setAckCount(this.containerProperties.getAckCount());
237254
}

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ public enum AckMode {
115115

116116
private int phase = DEFAULT_PHASE;
117117

118+
private AfterRollbackProcessor<K, V> afterRollbackProcessor = new DefaultAfterRollbackProcessor<>();
119+
118120
private volatile boolean running = false;
119121

120122
protected AbstractMessageListenerContainer(ContainerProperties containerProperties) {
@@ -189,6 +191,22 @@ public int getPhase() {
189191
return this.phase;
190192
}
191193

194+
protected AfterRollbackProcessor<K, V> getAfterRollbackProcessor() {
195+
return this.afterRollbackProcessor;
196+
}
197+
198+
/**
199+
* Set a processor to perform seeks on unprocessed records after a rollback.
200+
* Default will seek to current position all topics/partitions, including the failed
201+
* record.
202+
* @param afterRollbackProcessor the processor.
203+
* @since 1.3.5
204+
*/
205+
public void setAfterRollbackProcessor(AfterRollbackProcessor<K, V> afterRollbackProcessor) {
206+
Assert.notNull(afterRollbackProcessor, "'afterRollbackProcessor' cannot be null");
207+
this.afterRollbackProcessor = afterRollbackProcessor;
208+
}
209+
192210
public ContainerProperties getContainerProperties() {
193211
return this.containerProperties;
194212
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.List;
20+
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
24+
/**
25+
* Invoked by a listener container with remaining, unprocessed, records
26+
* (including the failed record). Implementations should seek the desired
27+
* topics/partitions so that records will be re-fetched on the next
28+
* poll. When used with a batch listener, the entire batch of records is
29+
* provided.
30+
*
31+
* @param <K> the key type.
32+
* @param <V> the value type.
33+
*
34+
* @author Gary Russell
35+
*
36+
* @since 1.3.5
37+
*
38+
*/
39+
public interface AfterRollbackProcessor<K, V> {
40+
41+
/**
42+
* Process the remaining records.
43+
* @param records the records.
44+
* @param consumer the consumer.
45+
*/
46+
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer);
47+
48+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.HashSet;
20+
import java.util.List;
21+
import java.util.Set;
22+
23+
import org.apache.commons.logging.Log;
24+
import org.apache.commons.logging.LogFactory;
25+
import org.apache.kafka.clients.consumer.Consumer;
26+
import org.apache.kafka.clients.consumer.ConsumerRecord;
27+
import org.apache.kafka.common.TopicPartition;
28+
29+
/**
30+
* Default implementation of {@link AfterRollbackProcessor}. Seeks all
31+
* topic/partitions so the records will be re-fetched, including the failed
32+
* record.
33+
*
34+
* @param <K> the key type.
35+
* @param <V> the value type.
36+
*
37+
* @author Gary Russell
38+
* @author Artem Bilan
39+
*
40+
* @since 1.3.5
41+
*
42+
*/
43+
public class DefaultAfterRollbackProcessor<K, V> implements AfterRollbackProcessor<K, V> {
44+
45+
private static final Log logger = LogFactory.getLog(DefaultAfterRollbackProcessor.class);
46+
47+
@Override
48+
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer) {
49+
Set<TopicPartition> seekOffsets = new HashSet<>();
50+
for (ConsumerRecord<K, V> record : records) {
51+
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
52+
if (seekOffsets.add(topicPartition)) {
53+
try {
54+
consumer.seek(topicPartition, record.offset());
55+
}
56+
catch (Exception e) {
57+
logger.error("Failed to seek " + topicPartition + " to " + record.offset());
58+
}
59+
}
60+
}
61+
}
62+
63+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 47 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -804,18 +804,7 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
804804
}
805805
catch (RuntimeException e) {
806806
this.logger.error("Transaction rolled back", e);
807-
Map<TopicPartition, Long> seekOffsets = new HashMap<>();
808-
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
809-
while (iterator.hasNext()) {
810-
ConsumerRecord<K, V> record = iterator.next();
811-
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
812-
if (!seekOffsets.containsKey(topicPartition)) {
813-
seekOffsets.put(topicPartition, record.offset());
814-
}
815-
}
816-
for (Entry<TopicPartition, Long> entry : seekOffsets.entrySet()) {
817-
this.consumer.seek(entry.getKey(), entry.getValue());
818-
}
807+
getAfterRollbackProcessor().process(recordList, this.consumer);
819808
}
820809
}
821810

@@ -850,7 +839,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
850839
}
851840
}
852841
catch (RuntimeException e) {
853-
if (this.containerProperties.isAckOnError() && !this.autoCommit) {
842+
if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {
854843
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(recordList)) {
855844
this.acks.add(record);
856845
}
@@ -860,7 +849,11 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
860849
}
861850
try {
862851
this.batchErrorHandler.handle(e, records);
852+
// if the handler handled the error (no exception), go ahead and commit
863853
if (producer != null) {
854+
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(recordList)) {
855+
this.acks.add(record);
856+
}
864857
sendOffsetsToTransaction(producer);
865858
}
866859
}
@@ -920,8 +913,12 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
920913
}
921914
catch (RuntimeException e) {
922915
this.logger.error("Transaction rolled back", e);
923-
this.consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
924-
break;
916+
List<ConsumerRecord<K, V>> unprocessed = new ArrayList<>();
917+
unprocessed.add(record);
918+
while (iterator.hasNext()) {
919+
unprocessed.add(iterator.next());
920+
}
921+
getAfterRollbackProcessor().process(unprocessed, this.consumer);
925922
}
926923
}
927924
}
@@ -957,45 +954,11 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
957954
else {
958955
this.listener.onMessage(record);
959956
}
960-
if (this.isRecordAck) {
961-
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
962-
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
963-
new OffsetAndMetadata(record.offset() + 1));
964-
if (producer == null) {
965-
if (this.containerProperties.isSyncCommits()) {
966-
this.consumer.commitSync(offsetsToCommit);
967-
}
968-
else {
969-
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
970-
}
971-
}
972-
else {
973-
this.acks.add(record);
974-
}
975-
}
976-
else if (!this.isAnyManualAck && !this.autoCommit) {
977-
this.acks.add(record);
978-
}
979-
if (producer != null) {
980-
sendOffsetsToTransaction(producer);
981-
}
957+
ackCurrent(record, producer);
982958
}
983959
catch (RuntimeException e) {
984960
if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {
985-
if (this.isRecordAck) {
986-
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
987-
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
988-
new OffsetAndMetadata(record.offset() + 1));
989-
if (this.containerProperties.isSyncCommits()) {
990-
this.consumer.commitSync(offsetsToCommit);
991-
}
992-
else {
993-
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
994-
}
995-
}
996-
else if (!this.isAnyManualAck) {
997-
this.acks.add(record);
998-
}
961+
ackCurrent(record, producer);
999962
}
1000963
if (this.errorHandler == null) {
1001964
throw e;
@@ -1023,6 +986,39 @@ else if (!this.isAnyManualAck) {
1023986
return null;
1024987
}
1025988

989+
public void ackCurrent(final ConsumerRecord<K, V> record, @SuppressWarnings("rawtypes") Producer producer) {
990+
if (this.isRecordAck) {
991+
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
992+
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
993+
new OffsetAndMetadata(record.offset() + 1));
994+
if (producer == null) {
995+
if (this.logger.isDebugEnabled()) {
996+
this.logger.debug("Committing: " + offsetsToCommit);
997+
}
998+
if (this.containerProperties.isSyncCommits()) {
999+
this.consumer.commitSync(offsetsToCommit);
1000+
}
1001+
else {
1002+
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
1003+
}
1004+
}
1005+
else {
1006+
this.acks.add(record);
1007+
}
1008+
}
1009+
else if (!this.isAnyManualAck && !this.autoCommit) {
1010+
this.acks.add(record);
1011+
}
1012+
if (producer != null) {
1013+
try {
1014+
sendOffsetsToTransaction(producer);
1015+
}
1016+
catch (Exception e) {
1017+
this.logger.error("Send offsets to transaction failed", e);
1018+
}
1019+
}
1020+
}
1021+
10261022
@SuppressWarnings({ "unchecked", "rawtypes" })
10271023
private void sendOffsetsToTransaction(Producer producer) {
10281024
handleAcks();

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ public void testAckOnErrorRecord() throws Exception {
513513
}
514514
}
515515
assertThat(consumer.position(new TopicPartition(topic9, 0))).isEqualTo(1);
516-
// this consumer is positioned at 1, the next offset after the successfully
516+
// this consumer is positioned at 2, the next offset after the successfully
517517
// processed 'qux'
518518
// it has been updated even 'baz' failed
519519
for (int i = 0; i < 100; i++) {

0 commit comments

Comments
 (0)