1
1
/*
2
- * Copyright 2018-2021 the original author or authors.
2
+ * Copyright 2018-2022 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
29
29
import org .springframework .core .task .TaskExecutor ;
30
30
import org .springframework .kafka .annotation .KafkaListener ;
31
31
import org .springframework .kafka .core .KafkaOperations ;
32
+ import org .springframework .kafka .listener .CommonErrorHandler ;
32
33
import org .springframework .kafka .listener .DeadLetterPublishingRecoverer ;
33
- import org .springframework .kafka .listener .SeekToCurrentErrorHandler ;
34
+ import org .springframework .kafka .listener .DefaultErrorHandler ;
34
35
import org .springframework .kafka .support .converter .JsonMessageConverter ;
35
36
import org .springframework .kafka .support .converter .RecordMessageConverter ;
36
37
import org .springframework .util .backoff .FixedBackOff ;
@@ -59,8 +60,8 @@ public static void main(String[] args) {
59
60
* Boot will autowire this into the container factory.
60
61
*/
61
62
@ Bean
62
- public SeekToCurrentErrorHandler errorHandler (KafkaOperations <Object , Object > template ) {
63
- return new SeekToCurrentErrorHandler (
63
+ public CommonErrorHandler errorHandler (KafkaOperations <Object , Object > template ) {
64
+ return new DefaultErrorHandler (
64
65
new DeadLetterPublishingRecoverer (template ), new FixedBackOff (1000L , 2 ));
65
66
}
66
67
@@ -79,8 +80,8 @@ public void listen(Foo2 foo) {
79
80
}
80
81
81
82
@ KafkaListener (id = "dltGroup" , topics = "topic1.DLT" )
82
- public void dltListen (String in ) {
83
- logger .info ("Received from DLT: " + in );
83
+ public void dltListen (byte [] in ) {
84
+ logger .info ("Received from DLT: " + new String ( in ) );
84
85
this .exec .execute (() -> System .out .println ("Hit Enter to terminate..." ));
85
86
}
86
87
0 commit comments