-
Notifications
You must be signed in to change notification settings - Fork 639
Description
Describe the bug
We are developing a spring cloud stream application with function composition to handle messages received from Kafka in protobuf format. Our functions look like the following and they are composed as function1|sink.
public Function<Person,Person> function1() {
return payload -> payload;
}
@Bean
public Consumer<Person> sink() {
return payload -> logger.info(payload.toString());
}
And we are getting java.lang.ClassCastException when the message is passing from function1 to sink. I dig into the framework source code a little bit and found that in SimpleFunctionRegistry, it is trying to do message conversion before passing to next function even if the next function is expecting the same class type and the conversion is not necessary.
In
private Object convertInputIfNecessary(Object input, Type type) {
if (!this.isInputConversionNecessary(input, type)) {
return input;
}
Because our message type is protobuf and therefore ProtobufMessageConverter.convertFromInternal is invoked for the message conversion which will then trying to cast the payload to byte array and causes java.lang.ClassCastException
@Override
protected Object convertFromInternal(org.springframework.messaging.Message<?> message,
Class<?> targetClass, @Nullable Object conversionHint) {
MimeType contentType = getMimeType(message.getHeaders());
final Object payload = message.getPayload();
if (contentType == null) {
contentType = PROTOBUF;
}
Charset charset = contentType.getCharset();
if (charset == null) {
charset = DEFAULT_CHARSET;
}
Message.Builder builder = getMessageBuilder(targetClass);
try {
if (PROTOBUF.isCompatibleWith(contentType)) {
builder.mergeFrom((byte[]) payload, this.extensionRegistry); // Exception occurs
}
else if (this.protobufFormatSupport != null) {
this.protobufFormatSupport.merge(message, charset, contentType, this.extensionRegistry, builder);
}
}
Sample
I have made a sample project
https://github.com/cecilwei/spring-cloud-stream-bug
Please run the test and you can see the exception.