-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Quote from documentation of 2.2.2.RELEASE:
If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer2 returns a null value and a DeserializationException in a header, containing the cause and raw bytes. When using a record-level MessageListener, if either the key or value contains a DeserializationException header, the container’s ErrorHandler is called with the failed ConsumerRecord; the record is not passed to the listener.
This all works perfect if I configure my ConsumerFactory
as described in the docs:
public ConsumerFactory<?, ?> consumerFactoryFromDocs() {
Map<String, Object> props = new HashMap<>()
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
However, I have configured my application in a slightly different way, which should arguably lead to the same settings but it does not.
public ConsumerFactory<?, ?> consumerFactoryMyWay() {
return new DefaultKafkaConsumerFactory<>(new HashMap<>(), new StringDeserializer(),
new ErrorHandlingDeserializer2<>(new JsonDeserializer<>(Foo.class)));
}
Using the latter setup for the ConsumerFactory
, the ListenerConsumer
fails to find the value.deserializer
entry in the properties map and ends up setting the checkNullValueForExceptions
field to false
. Checking "if either the key or value contains a DeserializationException header" occurs only if checkNullValueForExceptions
is set to true
.
if (record.value() == null && this.checkNullValueForExceptions) {
checkDeser(record, ErrorHandlingDeserializer2.VALUE_DESERIALIZER_EXCEPTION_HEADER);
}
if (record.key() == null && this.checkNullKeyForExceptions) {
checkDeser(record, ErrorHandlingDeserializer2.KEY_DESERIALIZER_EXCEPTION_HEADER);
}
Can you explain what is the reason behind this?