diff --git a/samples/sample-01/pom.xml b/samples/sample-01/pom.xml index ad011217fc..dc424cf29e 100644 --- a/samples/sample-01/pom.xml +++ b/samples/sample-01/pom.xml @@ -5,7 +5,7 @@ com.example kafka-sample-01 - 2.6.5 + 3.0.2-SNAPSHOT jar kafka-sample-01 @@ -14,14 +14,14 @@ org.springframework.boot spring-boot-starter-parent - 2.4.3 + 3.0.1-SNAPSHOT UTF-8 UTF-8 - 1.8 + 17 diff --git a/samples/sample-01/src/main/java/com/example/Application.java b/samples/sample-01/src/main/java/com/example/Application.java index 6f894b4eae..ef5cf8c6fd 100644 --- a/samples/sample-01/src/main/java/com/example/Application.java +++ b/samples/sample-01/src/main/java/com/example/Application.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,8 +29,9 @@ import org.springframework.core.task.TaskExecutor; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; -import org.springframework.kafka.listener.SeekToCurrentErrorHandler; +import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.support.converter.JsonMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.util.backoff.FixedBackOff; @@ -59,8 +60,8 @@ public static void main(String[] args) { * Boot will autowire this into the container factory. */ @Bean - public SeekToCurrentErrorHandler errorHandler(KafkaOperations template) { - return new SeekToCurrentErrorHandler( + public CommonErrorHandler errorHandler(KafkaOperations template) { + return new DefaultErrorHandler( new DeadLetterPublishingRecoverer(template), new FixedBackOff(1000L, 2)); } @@ -79,8 +80,8 @@ public void listen(Foo2 foo) { } @KafkaListener(id = "dltGroup", topics = "topic1.DLT") - public void dltListen(String in) { - logger.info("Received from DLT: " + in); + public void dltListen(byte[] in) { + logger.info("Received from DLT: " + new String(in)); this.exec.execute(() -> System.out.println("Hit Enter to terminate...")); } diff --git a/samples/sample-02/pom.xml b/samples/sample-02/pom.xml index 6d5d8849f6..55cb73cbc3 100644 --- a/samples/sample-02/pom.xml +++ b/samples/sample-02/pom.xml @@ -5,7 +5,7 @@ com.example kafka-sample-02 - 2.6.5 + 3.0.2-SNAPSHOT jar kafka-sample-02 @@ -14,14 +14,14 @@ org.springframework.boot spring-boot-starter-parent - 2.4.3 + 3.0.1-SNAPSHOT UTF-8 UTF-8 - 1.8 + 17 diff --git a/samples/sample-02/src/main/java/com/example/Application.java b/samples/sample-02/src/main/java/com/example/Application.java index 6577389ae8..6030b358c7 100644 --- a/samples/sample-02/src/main/java/com/example/Application.java +++ b/samples/sample-02/src/main/java/com/example/Application.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,12 +27,13 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Profile; import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; -import org.springframework.kafka.listener.SeekToCurrentErrorHandler; -import org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper; -import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper.TypePrecedence; +import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.support.converter.JsonMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper; +import org.springframework.kafka.support.mapping.Jackson2JavaTypeMapper.TypePrecedence; import org.springframework.util.backoff.FixedBackOff; import com.common.Bar2; @@ -56,8 +57,8 @@ public static void main(String[] args) { * Boot will autowire this into the container factory. */ @Bean - public SeekToCurrentErrorHandler errorHandler(KafkaOperations template) { - return new SeekToCurrentErrorHandler( + public CommonErrorHandler errorHandler(KafkaOperations template) { + return new DefaultErrorHandler( new DeadLetterPublishingRecoverer(template), new FixedBackOff(1000L, 2)); } diff --git a/samples/sample-03/pom.xml b/samples/sample-03/pom.xml index 705037f80d..0d225c5293 100644 --- a/samples/sample-03/pom.xml +++ b/samples/sample-03/pom.xml @@ -5,7 +5,7 @@ com.example kafka-sample-03 - 2.6.5 + 3.0.2-SNAPSHOT jar kafka-sample-03 @@ -14,14 +14,14 @@ org.springframework.boot spring-boot-starter-parent - 2.4.3 + 3.0.1-SNAPSHOT UTF-8 UTF-8 - 1.8 + 17 diff --git a/samples/sample-03/src/main/java/com/example/Application.java b/samples/sample-03/src/main/java/com/example/Application.java index 2fc7db8e24..8f1f37bb9a 100644 --- a/samples/sample-03/src/main/java/com/example/Application.java +++ b/samples/sample-03/src/main/java/com/example/Application.java @@ -35,6 +35,7 @@ import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.JsonMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; +import org.springframework.stereotype.Component; import com.common.Foo2; @@ -48,9 +49,7 @@ @SpringBootApplication public class Application { - private final Logger LOGGER = LoggerFactory.getLogger(Application.class); - - private final static CountDownLatch LATCH = new CountDownLatch(1); + final static CountDownLatch LATCH = new CountDownLatch(1); public static void main(String[] args) throws InterruptedException { ConfigurableApplicationContext context = SpringApplication.run(Application.class, args); @@ -69,6 +68,23 @@ public BatchMessagingMessageConverter batchConverter() { return new BatchMessagingMessageConverter(converter()); } + @Bean + public NewTopic topic2() { + return TopicBuilder.name("topic2").partitions(1).replicas(1).build(); + } + + @Bean + public NewTopic topic3() { + return TopicBuilder.name("topic3").partitions(1).replicas(1).build(); + } + +} + +@Component +class Listener { + + private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class); + @Autowired private KafkaTemplate kafkaTemplate; @@ -83,17 +99,7 @@ public void listen1(List foos) throws IOException { @KafkaListener(id = "fooGroup3", topics = "topic3") public void listen2(List in) { LOGGER.info("Received: " + in); - LATCH.countDown(); - } - - @Bean - public NewTopic topic2() { - return TopicBuilder.name("topic2").partitions(1).replicas(1).build(); - } - - @Bean - public NewTopic topic3() { - return TopicBuilder.name("topic3").partitions(1).replicas(1).build(); + Application.LATCH.countDown(); } } diff --git a/samples/sample-04/pom.xml b/samples/sample-04/pom.xml index 69e1513376..f89738d640 100644 --- a/samples/sample-04/pom.xml +++ b/samples/sample-04/pom.xml @@ -5,7 +5,7 @@ com.example kafka-sample-04 - 2.7.0 + 3.0.2-SNAPSHOT jar kafka-sample-04 @@ -14,14 +14,14 @@ org.springframework.boot spring-boot-starter-parent - 2.5.0-SNAPSHOT + 3.0.1-SNAPSHOT UTF-8 UTF-8 - 11 + 17 diff --git a/samples/sample-05/pom.xml b/samples/sample-05/pom.xml index 0310c65511..e0de8edbb4 100644 --- a/samples/sample-05/pom.xml +++ b/samples/sample-05/pom.xml @@ -5,13 +5,13 @@ org.springframework.boot spring-boot-starter-parent - 3.0.0-SNAPSHOT + 3.0.1-SNAPSHOT com.example kafka-sample-05 - 3.0.0-SNAPSHOT + 3.0.2-SNAPSHOT kafka-sample-05 Kafka Sample 5 @@ -19,7 +19,6 @@ 17 - 3.0.0-SNAPSHOT