-
Notifications
You must be signed in to change notification settings - Fork 1.6k
GH-2359: Initial AOT Support for Native #2364
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Resolves spring-projects#2359 This is sufficient for the basic Kafka smoke test. I will issue another PR if changes are needed for the Streams and Avro versions. I have a `KafkaAvroBeanRegistrationAotProcessor` but I have omitted it from this commit because it hasn't been tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if Avro stuff has to be somewhere else: doesn't look like it is just Kafka-specific...
hints.proxies().registerJdkProxy(Consumer.class, SpringProxy.class, Advised.class, DecoratingProxy.class); | ||
hints.proxies().registerJdkProxy(Producer.class, SpringProxy.class, Advised.class, DecoratingProxy.class); | ||
|
||
if (ClassUtils.isPresent("org.apache.kafka.streams.StreamsBuilder", null)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it is preferable to use a dedicated for this purposes API on the hints: registerTypeIfPresent()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with that is I have to use FQ class names instead of the classes themselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct: there is a variant for fqcn and classLoader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you going to fix this via onReachableType(TypeReference.of("org.apache.kafka.streams.StreamsBuilder")
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct: there is a variant for fqcn and classLoader
Right converting all those classes to FQCN is a pain.
Where is onReachableType
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConsumerProperties.class, | ||
ContainerProperties.class, | ||
ProducerListener.class, | ||
KafkaListener.class, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe annotations must be registers with their special API like you did in Spring AMQP.
ContainerProperties.class, | ||
ProducerListener.class, | ||
KafkaListener.class, | ||
EnableKafka.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this annotation to expose...
MemberCategory.INTROSPECT_PUBLIC_METHODS))); | ||
|
||
Stream.of( | ||
Message.class, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you have duplicated this type.
I also wonder if we really use it via reflection at all...
In SI I registered it only for serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, I just copied what was in spring-native, for now.
spring-kafka/src/main/java/org/springframework/kafka/aot/KafkaRuntimeHintsRegistrar.java
Show resolved
Hide resolved
The avro processor is specific to spring-kafka because it discovers listeners that use Avro types and registers them for reflection. I left it out of this PR to avoid any confusion (and because I haven't tested it yet). |
RuntimeHintsUtils.registerAnnotation(hints, EnableKafka.class); | ||
RuntimeHintsUtils.registerAnnotation(hints, PartitionOffset.class); | ||
RuntimeHintsUtils.registerAnnotation(hints, TopicPartition.class); | ||
RuntimeHintsUtils.registerAnnotation(hints, MessageMapping.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This annotation is registered by Spring Framework by itself. See MessagingAnnotationsRuntimeHints
@@ -115,24 +116,23 @@ public class KafkaRuntimeHintsRegistrar implements RuntimeHintsRegistrar { | |||
|
|||
@Override | |||
public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader) { | |||
RuntimeHintsUtils.registerAnnotation(hints, KafkaListener.class); | |||
RuntimeHintsUtils.registerAnnotation(hints, KafkaListeners.class); | |||
RuntimeHintsUtils.registerAnnotation(hints, EnableKafka.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't think we need this. The @Import
processing is initiated by AOT. So, that our KafkaBootstrapConfiguration
is already registered at runtime.
Therefore I don't see a reason in this registration.
.forEach(type -> reflectionHints.registerType(type, builder -> | ||
builder.withMembers(MemberCategory.INVOKE_PUBLIC_CONSTRUCTORS))); | ||
|
||
reflectionHints.registerType(TypeReference.of("java.util.zip.CRC32C"), builder -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why FQCN? Look like this class is a part of Java by itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again; copied from spring-native.
hints.proxies().registerJdkProxy(Consumer.class, SpringProxy.class, Advised.class, DecoratingProxy.class); | ||
hints.proxies().registerJdkProxy(Producer.class, SpringProxy.class, Advised.class, DecoratingProxy.class); | ||
|
||
if (ClassUtils.isPresent("org.apache.kafka.streams.StreamsBuilder", null)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you going to fix this via onReachableType(TypeReference.of("org.apache.kafka.streams.StreamsBuilder")
?
builder.withMembers(MemberCategory.INVOKE_PUBLIC_CONSTRUCTORS)); | ||
|
||
hints.proxies().registerJdkProxy(Consumer.class, SpringProxy.class, Advised.class, DecoratingProxy.class); | ||
hints.proxies().registerJdkProxy(Producer.class, SpringProxy.class, Advised.class, DecoratingProxy.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AopProxyUtils.completeJdkProxyInterfaces()
?
|
||
private static boolean isListener(Class<?> beanType) { | ||
return GenericMessageListener.class.isAssignableFrom(beanType) | ||
|| MergedAnnotations.from(beanType).get(KAFKA_LISTENER_CLASS_NAME).isPresent(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just MergedAnnotations.isPresent(String annotationType)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it turns out we don't need to process the annotations, only GenericMessageListener
s that use Avro types - the ones that are method parameters are already discovered.
…stener` only `GenericMessageListener`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Let me know if anything else you'd like to add to this PR or it is good to go for merging!
Thanks; all smoke tests are green, but I will add a couple more before merging this. |
Ok to merge after this final commit; thanks! |
Resolves #2359
This is sufficient for the basic Kafka smoke test.
I will issue another PR if changes are needed for the Streams and Avro versions.
I have a
KafkaAvroBeanRegistrationAotProcessor
but I have omitted it fromthis commit because it hasn't been tested.