-
Notifications
You must be signed in to change notification settings - Fork 1.7k

Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
2.8.6
Describe the bug
When handling deserialization errors jackson+kotlin is giving this very helpful Exception
com.fasterxml.jackson.module.kotlin.MissingKotlinParameterException: Instantiation of [simple type, class com.example.ModelClass] value failed for JSON property exampleProperty due to missing (therefore NULL) value for creator parameter exampleProperty which is a non-nullable type
at [Source: (byte[])"{ "test": "test" }"; line: 1, column: 18] (through reference chain: com.example.ModelClass["exampleProperty"])
which tells me exactly what JSON is tried and which property in which class did not work/is missing. Debugging has never been easier.
This is the exception that is catched in org.springframework.kafka.support.serializer.JsonDeserializer#deserialize and wrapped into a org.apache.kafka.common.errors.SerializationException
Later this exception is handled in org.springframework.kafka.support.serializer.SerializationUtils#deserializationException where it is tried to be serialized into an ObjectOutputStream
Unfortunately MissingKotlinParameterException contains an object of kotlin.reflect.jvm.internal.KParameterImpl which is not Serializable.
So in the end all I get in my error queue added as headers kafka_dlt-exception-message and kafka_dlt-exception-stacktrace is that the error handler failed to serialize that error:
failed to deserialize; nested exception is java.lang.RuntimeException: Could not deserialize type java.io.NotSerializableException with message kotlin.reflect.jvm.internal.KParameterImpl failure: kotlin.reflect.jvm.internal.KParameterImpl
which is not in any way helpful for debugging the faulty JSON.
To Reproduce
- Use kotlin and spring-kafka.
- Consume a topic.
- Try to deserialize an incomplete JSON where a non-nullable property is missing.
Expected behavior
Helpful information from the exception should not be lost.
Sample
KafkaConfguration:
@EnableKafka
@Configuration
internal class KafkaConfig {
@Autowired
lateinit var kafkaTemplate: KafkaTemplate<String, String>
@Bean
fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Int?, String?>): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Int, String>> {
val factory = ConcurrentKafkaListenerContainerFactory<Int, String>()
factory.consumerFactory = consumerFactory
factory.setCommonErrorHandler(errorHandler())
return factory
}
private fun errorHandler(): CommonErrorHandler {
val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { r, _ -> TopicPartition(r.topic() + ".error", r.partition()) }
return DefaultErrorHandler(recoverer, FixedBackOff(Duration.of(30, ChronoUnit.SECONDS).toMillis(), 10))
}
}
Consume:
@Service
@KafkaListener(
properties = ["spring.json.value.default.type=com.example.ModelClass"],
topics = ["example-topic"]
)
class Consumer{
@KafkaHandler
fun consumeModelClass(modelClass: ModelClass) {
}
}
application.yml
spring:
kafka:
consumer:
group-id: "id"
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer