Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
* @author Artem Bilan
* @author Loic Talhouarne
* @author Vladimir Tsanev
* @author Chen Binbin
* @author Yang Qiju
* @author Tom van den Berge
*/
Expand Down Expand Up @@ -719,11 +720,23 @@ public void run() {
break;
}
catch (Exception e) {
if (this.containerProperties.getGenericErrorHandler() != null) {
this.containerProperties.getGenericErrorHandler().handle(e, null);
try {
GenericErrorHandler<?> containerErrorHandler = this.containerProperties.getGenericErrorHandler();
if (containerErrorHandler != null) {
if (containerErrorHandler instanceof ConsumerAwareErrorHandler
|| containerErrorHandler instanceof ConsumerAwareBatchErrorHandler) {
containerErrorHandler.handle(e, null, this.consumer);
Copy link
Contributor

@garyrussell garyrussell Mar 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a problem here; I noticed this while running tests...

java.lang.UnsupportedOperationException: Container should never call this
	at org.springframework.kafka.listener.RemainingRecordsErrorHandler.handle(RemainingRecordsErrorHandler.java:39)
	at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:1)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:752)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:748)

testExceptionWhenCommitAfterRebalance

}
else {
containerErrorHandler.handle(e, null);
}
}
else {
this.logger.error("Container exception", e);
}
}
else {
this.logger.error("Container exception", e);
catch (Exception ex) {
this.logger.error("Container exception", ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,13 @@ public class KafkaMessageListenerContainerTests {

private static String topic18 = "testTopic18";

private static String topic19 = "testTopic19";


@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic1, topic2, topic3, topic4, topic5,
topic6, topic7, topic8, topic9, topic10, topic11, topic12, topic13, topic14, topic15, topic16, topic17,
topic18);
topic18, topic19);

@Rule
public TestName testName = new TestName();
Expand Down Expand Up @@ -1718,6 +1720,69 @@ public void testInitialSeek() throws Exception {
container.stop();
}

@Test
public void testExceptionWhenCommitAfterRebalance() throws Exception {
final CountDownLatch rebalanceLatch = new CountDownLatch(2);
final CountDownLatch consumeLatch = new CountDownLatch(7);

Map<String, Object> props = KafkaTestUtils.consumerProps("test19", "false", embeddedKafka);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 15000);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic19);
containerProps.setMessageListener((MessageListener<Integer, String>) messages -> {
logger.info("listener: " + messages);
consumeLatch.countDown();
try {
Thread.sleep(3000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
});
containerProps.setSyncCommits(true);
containerProps.setAckMode(AckMode.BATCH);
containerProps.setPollTimeout(100);
containerProps.setAckOnError(false);
containerProps.setErrorHandler(new SeekToCurrentErrorHandler());

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic19);

containerProps.setConsumerRebalanceListener(new ConsumerRebalanceListener() {

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("rebalance occurred.");
rebalanceLatch.countDown();
}
});

KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testContainerException");
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
container.pause();

for (int i = 0; i < 6; i++) {
template.sendDefault(0, 0, "a");
}
template.flush();

container.resume();
// should be rebalanced and consume again
assertThat(rebalanceLatch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(consumeLatch.await(60, TimeUnit.SECONDS)).isTrue();
container.stop();
}

private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
Consumer<?, ?> consumer = spy(
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class));
Expand Down