From 3277577f50fb0fc2c454fdf534bc9b6f6a490c8d Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Tue, 13 Aug 2024 19:53:21 -0500 Subject: [PATCH 1/7] Add support for Pulsar default tenant/namespace This commit allows Pulsar users to configure a default tenant and/or namespace to be used when producing or consuming messages to topic URLs that are not fully-qualified. The following changes accomplish this: - add `tenant` and `namespace` config props to `spring.pulsar.defaults` - auto-configure a `PulsarTopicBuilder` bean populated w/ above props - provide above topic builder to producer/consumer/reader factories (imperative and reactive) - add tests for all of the above --- .../pulsar/PulsarAutoConfiguration.java | 29 +++++++++++++----- .../pulsar/PulsarConfiguration.java | 9 ++++++ .../pulsar/PulsarProperties.java | 30 +++++++++++++++++++ .../PulsarReactiveAutoConfiguration.java | 20 +++++++++---- .../pulsar/PulsarAutoConfigurationTests.java | 10 +++++-- .../pulsar/PulsarConfigurationTests.java | 29 ++++++++++++++++++ .../pulsar/PulsarPropertiesTests.java | 25 +++++++++++++++- .../PulsarReactiveAutoConfigurationTests.java | 17 ++++++++++- 8 files changed, 151 insertions(+), 18 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java index 92c66e216824..f081be57bf87 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java @@ -52,6 +52,7 @@ import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarReaderFactory; import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.ReaderBuilderCustomizer; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; @@ -88,24 +89,30 @@ public class PulsarAutoConfiguration { @ConditionalOnMissingBean(PulsarProducerFactory.class) @ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "false") DefaultPulsarProducerFactory pulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver, - ObjectProvider> customizersProvider) { + ObjectProvider> customizersProvider, + PulsarTopicBuilder topicBuilder) { List> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers( customizersProvider); - return new DefaultPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(), + DefaultPulsarProducerFactory producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver); + producerFactory.setTopicBuilder(topicBuilder); + return producerFactory; } @Bean @ConditionalOnMissingBean(PulsarProducerFactory.class) @ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "true", matchIfMissing = true) CachingPulsarProducerFactory cachingPulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver, - ObjectProvider> customizersProvider) { + ObjectProvider> customizersProvider, + PulsarTopicBuilder topicBuilder) { PulsarProperties.Producer.Cache cacheProperties = this.properties.getProducer().getCache(); List> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers( customizersProvider); - return new CachingPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(), + CachingPulsarProducerFactory producerFactory = new CachingPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver, cacheProperties.getExpireAfterAccess(), cacheProperties.getMaximumSize(), cacheProperties.getInitialCapacity()); + producerFactory.setTopicBuilder(topicBuilder); + return producerFactory; } private List> lambdaSafeProducerBuilderCustomizers( @@ -138,13 +145,16 @@ PulsarTemplate pulsarTemplate(PulsarProducerFactory pulsarProducerFactory, @Bean @ConditionalOnMissingBean(PulsarConsumerFactory.class) DefaultPulsarConsumerFactory pulsarConsumerFactory(PulsarClient pulsarClient, - ObjectProvider> customizersProvider) { + ObjectProvider> customizersProvider, + PulsarTopicBuilder topicBuilder) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeConsumerBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); List> lambdaSafeCustomizers = List .of((builder) -> applyConsumerBuilderCustomizers(customizers, builder)); - return new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers); + DefaultPulsarConsumerFactory consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers); + consumerFactory.setTopicBuilder(topicBuilder); + return consumerFactory; } @Bean @@ -181,13 +191,16 @@ ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory( @Bean @ConditionalOnMissingBean(PulsarReaderFactory.class) DefaultPulsarReaderFactory pulsarReaderFactory(PulsarClient pulsarClient, - ObjectProvider> customizersProvider) { + ObjectProvider> customizersProvider, + PulsarTopicBuilder topicBuilder) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeReaderBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); List> lambdaSafeCustomizers = List .of((builder) -> applyReaderBuilderCustomizers(customizers, builder)); - return new DefaultPulsarReaderFactory<>(pulsarClient, lambdaSafeCustomizers); + DefaultPulsarReaderFactory readerFactory = new DefaultPulsarReaderFactory<>(pulsarClient, lambdaSafeCustomizers); + readerFactory.setTopicBuilder(topicBuilder); + return readerFactory; } @SuppressWarnings("unchecked") diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java index 0f5b860490f3..ac6c2b7e3d8c 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java @@ -23,6 +23,7 @@ import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.schema.SchemaType; import org.springframework.beans.factory.ObjectProvider; @@ -41,6 +42,7 @@ import org.springframework.pulsar.core.PulsarAdministration; import org.springframework.pulsar.core.PulsarClientBuilderCustomizer; import org.springframework.pulsar.core.PulsarClientFactory; +import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer; import org.springframework.pulsar.core.TopicResolver; @@ -176,4 +178,11 @@ PulsarFunctionAdministration pulsarFunctionAdministration(PulsarAdministration p properties.isFailFast(), properties.isPropagateFailures(), properties.isPropagateStopFailures()); } + @Bean + @ConditionalOnMissingBean(PulsarTopicBuilder.class) + PulsarTopicBuilder pulsarTopicBuilder() { + return new PulsarTopicBuilder(TopicDomain.persistent, this.properties.getDefaults().getTenant(), + this.properties.getDefaults().getNamespace()); + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java index 458aebb814a2..afb5e39f1da6 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java @@ -250,6 +250,20 @@ public Authentication getAuthentication() { public static class Defaults { + /** + * Default tenant to use when producing or consuming messages against a + * non-fully-qualified topic URL. When not specified Pulsar uses a default tenant + * of 'public'. + */ + private String tenant; + + /** + * Default namespace to use when producing or consuming messages against a + * non-fully-qualified topic URL. When not specified Pulsar uses a default + * namespace of 'default'. + */ + private String namespace; + /** * List of mappings from message type to topic name and schema info to use as a * defaults when a topic name and/or schema is not explicitly specified when @@ -257,6 +271,22 @@ public static class Defaults { */ private List typeMappings = new ArrayList<>(); + public String getTenant() { + return this.tenant; + } + + public void setTenant(String tenant) { + this.tenant = tenant; + } + + public String getNamespace() { + return this.namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + public List getTypeMappings() { return this.typeMappings; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java index 4c2aeb172d52..c78a4d7a2275 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java @@ -41,6 +41,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames; +import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; @@ -112,7 +113,8 @@ private ReactiveMessageSenderCache reactivePulsarMessageSenderCache(ProducerCach @ConditionalOnMissingBean(ReactivePulsarSenderFactory.class) DefaultReactivePulsarSenderFactory reactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient, ObjectProvider reactiveMessageSenderCache, TopicResolver topicResolver, - ObjectProvider> customizersProvider) { + ObjectProvider> customizersProvider, + PulsarTopicBuilder topicBuilder) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeMessageSenderBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); @@ -122,6 +124,7 @@ DefaultReactivePulsarSenderFactory reactivePulsarSenderFactory(ReactivePulsar .withDefaultConfigCustomizers(lambdaSafeCustomizers) .withMessageSenderCache(reactiveMessageSenderCache.getIfAvailable()) .withTopicResolver(topicResolver) + .withTopicBuilder(topicBuilder) .build(); } @@ -136,13 +139,17 @@ private void applyMessageSenderBuilderCustomizers(List reactivePulsarConsumerFactory( ReactivePulsarClient pulsarReactivePulsarClient, - ObjectProvider> customizersProvider) { + ObjectProvider> customizersProvider, + PulsarTopicBuilder topicBuilder) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeMessageConsumerBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); List> lambdaSafeCustomizers = List .of((builder) -> applyMessageConsumerBuilderCustomizers(customizers, builder)); - return new DefaultReactivePulsarConsumerFactory<>(pulsarReactivePulsarClient, lambdaSafeCustomizers); + DefaultReactivePulsarConsumerFactory consumerFactory = + new DefaultReactivePulsarConsumerFactory<>(pulsarReactivePulsarClient, lambdaSafeCustomizers); + consumerFactory.setTopicBuilder(topicBuilder); + return consumerFactory; } @SuppressWarnings("unchecked") @@ -167,13 +174,16 @@ DefaultReactivePulsarListenerContainerFactory reactivePulsarListenerContainer @Bean @ConditionalOnMissingBean(ReactivePulsarReaderFactory.class) DefaultReactivePulsarReaderFactory reactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient, - ObjectProvider> customizersProvider) { + ObjectProvider> customizersProvider, + PulsarTopicBuilder topicBuilder) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeMessageReaderBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); List> lambdaSafeCustomizers = List .of((builder) -> applyMessageReaderBuilderCustomizers(customizers, builder)); - return new DefaultReactivePulsarReaderFactory<>(reactivePulsarClient, lambdaSafeCustomizers); + DefaultReactivePulsarReaderFactory readerFactory = new DefaultReactivePulsarReaderFactory<>(reactivePulsarClient, lambdaSafeCustomizers); + readerFactory.setTopicBuilder(topicBuilder); + return readerFactory; } @SuppressWarnings("unchecked") diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java index 1b5e3fed0e4d..c35cf620260b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java @@ -67,6 +67,7 @@ import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarReaderFactory; import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.ReaderBuilderCustomizer; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; @@ -219,7 +220,8 @@ void injectsExpectedBeans() { "spring.pulsar.producer.cache.enabled=false") .run((context) -> assertThat(context).getBean(DefaultPulsarProducerFactory.class) .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)) - .hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class))); + .hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class)) + .hasFieldOrPropertyWithValue("topicBuilder", context.getBean(PulsarTopicBuilder.class))); } @ParameterizedTest @@ -375,7 +377,8 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { @Test void injectsExpectedBeans() { this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarConsumerFactory.class) - .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))); + .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)) + .hasFieldOrPropertyWithValue("topicBuilder", context.getBean(PulsarTopicBuilder.class))); } @Test @@ -574,7 +577,8 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { @Test void injectsExpectedBeans() { this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarReaderFactory.class) - .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))); + .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)) + .hasFieldOrPropertyWithValue("topicBuilder", context.getBean(PulsarTopicBuilder.class))); } @Test diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java index ef775cab6336..64dc83e7942d 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java @@ -49,6 +49,7 @@ import org.springframework.pulsar.core.PulsarAdministration; import org.springframework.pulsar.core.PulsarClientBuilderCustomizer; import org.springframework.pulsar.core.PulsarClientFactory; +import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer; import org.springframework.pulsar.core.TopicResolver; @@ -320,6 +321,34 @@ void whenHasDefaultsTypeMappingAddsToSchemaResolver() { } + @Nested + class TopicBuilderTests { + + private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner; + + @Test + void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { + PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class); + this.contextRunner.withBean("customPulsarTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder) + .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class).isSameAs(topicBuilder)); + } + + @Test + void whenHasDefaultsTenantAndNamespaceAppliedToTopicBuilder() { + List properties = new ArrayList<>(); + properties.add("spring.pulsar.defaults.tenant=my-tenant"); + properties.add("spring.pulsar.defaults.namespace=my-namespace"); + this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) + .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class) + .asInstanceOf(InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) + .satisfies((topicBuilder -> { + assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultTenant", "my-tenant"); + assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultNamespace", "my-namespace"); + }))); + } + + } + @Nested class FunctionAdministrationTests { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java index 53e90a2b954c..569c95b3084e 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java @@ -152,7 +152,7 @@ void bindAuthentication() { } @Nested - class DefaultsProperties { + class DefaultsTypeMappingProperties { @Test void bindWhenNoTypeMappings() { @@ -242,6 +242,29 @@ record TestMessage(String value) { } + @Nested + class DefaultsTenantNamespaceProperties { + + @Test + void bindWhenValuesNotSpecified() { + assertThat(new PulsarProperties().getDefaults()).satisfies((defaults) -> { + assertThat(defaults.getTenant()).isNull(); + assertThat(defaults.getNamespace()).isNull(); + }); + } + + @Test + void bindWhenValuesSpecified() { + Map map = new HashMap<>(); + map.put("spring.pulsar.defaults.tenant", "my-tenant"); + map.put("spring.pulsar.defaults.namespace", "my-namespace"); + PulsarProperties.Defaults properties = bindProperties(map).getDefaults(); + assertThat(properties.getTenant()).isEqualTo("my-tenant"); + assertThat(properties.getNamespace()).isEqualTo("my-namespace"); + } + + } + @Nested class FunctionProperties { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java index 4f3ab011ea2e..5de47a65dec8 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java @@ -48,6 +48,7 @@ import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.DefaultTopicResolver; import org.springframework.pulsar.core.PulsarAdministration; +import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; @@ -177,6 +178,9 @@ void injectsExpectedBeans() { assertThat(senderFactory) .extracting("topicResolver", InstanceOfAssertFactories.type(TopicResolver.class)) .isSameAs(context.getBean(TopicResolver.class)); + assertThat(senderFactory) + .extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) + .isSameAs(context.getBean(PulsarTopicBuilder.class)); }); } @@ -252,13 +256,19 @@ class ConsumerFactoryTests { @Test void injectsExpectedBeans() { ReactivePulsarClient client = mock(ReactivePulsarClient.class); - this.contextRunner.withBean("customReactivePulsarClient", ReactivePulsarClient.class, () -> client) + PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class); + this.contextRunner + .withBean("customReactivePulsarClient", ReactivePulsarClient.class, () -> client) + .withBean("customTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder) .run((context) -> { ReactivePulsarConsumerFactory consumerFactory = context .getBean(DefaultReactivePulsarConsumerFactory.class); assertThat(consumerFactory) .extracting("reactivePulsarClient", InstanceOfAssertFactories.type(ReactivePulsarClient.class)) .isSameAs(client); + assertThat(consumerFactory) + .extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) + .isSameAs(topicBuilder); }); } @@ -362,14 +372,19 @@ class ReaderFactoryTests { @Test void injectsExpectedBeans() { ReactivePulsarClient client = mock(ReactivePulsarClient.class); + PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class); this.contextRunner.withPropertyValues("spring.pulsar.reader.name=test-reader") .withBean("customReactivePulsarClient", ReactivePulsarClient.class, () -> client) + .withBean("customPulsarTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder) .run((context) -> { DefaultReactivePulsarReaderFactory readerFactory = context .getBean(DefaultReactivePulsarReaderFactory.class); assertThat(readerFactory) .extracting("reactivePulsarClient", InstanceOfAssertFactories.type(ReactivePulsarClient.class)) .isSameAs(client); + assertThat(readerFactory) + .extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) + .isSameAs(topicBuilder); }); } From aa2df469927b02d2c7e316d8c589bfcf2250ab31 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Wed, 14 Aug 2024 11:33:44 -0500 Subject: [PATCH 2/7] Make PulsarTopicBuilder bean prototype scope The builder is stateful and as such each injection point needs its own bean instance. --- .../boot/autoconfigure/pulsar/PulsarConfiguration.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java index ac6c2b7e3d8c..b8b5bda23649 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java @@ -35,6 +35,7 @@ import org.springframework.boot.util.LambdaSafe; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Scope; import org.springframework.pulsar.core.DefaultPulsarClientFactory; import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.DefaultTopicResolver; @@ -179,7 +180,8 @@ PulsarFunctionAdministration pulsarFunctionAdministration(PulsarAdministration p } @Bean - @ConditionalOnMissingBean(PulsarTopicBuilder.class) + @Scope("prototype") + @ConditionalOnMissingBean PulsarTopicBuilder pulsarTopicBuilder() { return new PulsarTopicBuilder(TopicDomain.persistent, this.properties.getDefaults().getTenant(), this.properties.getDefaults().getNamespace()); From b288d82ce618649e73c25716f3b2724d5ead617c Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Wed, 14 Aug 2024 11:35:31 -0500 Subject: [PATCH 3/7] Fix formatting --- .../pulsar/PulsarAutoConfiguration.java | 29 +++++++++---------- .../PulsarReactiveAutoConfiguration.java | 7 +++-- .../pulsar/PulsarConfigurationTests.java | 16 +++++----- .../PulsarReactiveAutoConfigurationTests.java | 17 +++++------ 4 files changed, 34 insertions(+), 35 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java index f081be57bf87..7d466d571965 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java @@ -89,12 +89,11 @@ public class PulsarAutoConfiguration { @ConditionalOnMissingBean(PulsarProducerFactory.class) @ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "false") DefaultPulsarProducerFactory pulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver, - ObjectProvider> customizersProvider, - PulsarTopicBuilder topicBuilder) { + ObjectProvider> customizersProvider, PulsarTopicBuilder topicBuilder) { List> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers( customizersProvider); - DefaultPulsarProducerFactory producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(), - lambdaSafeCustomizers, topicResolver); + DefaultPulsarProducerFactory producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, + this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver); producerFactory.setTopicBuilder(topicBuilder); return producerFactory; } @@ -103,14 +102,14 @@ DefaultPulsarProducerFactory pulsarProducerFactory(PulsarClient pulsarClient, @ConditionalOnMissingBean(PulsarProducerFactory.class) @ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "true", matchIfMissing = true) CachingPulsarProducerFactory cachingPulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver, - ObjectProvider> customizersProvider, - PulsarTopicBuilder topicBuilder) { + ObjectProvider> customizersProvider, PulsarTopicBuilder topicBuilder) { PulsarProperties.Producer.Cache cacheProperties = this.properties.getProducer().getCache(); List> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers( customizersProvider); - CachingPulsarProducerFactory producerFactory = new CachingPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(), - lambdaSafeCustomizers, topicResolver, cacheProperties.getExpireAfterAccess(), - cacheProperties.getMaximumSize(), cacheProperties.getInitialCapacity()); + CachingPulsarProducerFactory producerFactory = new CachingPulsarProducerFactory<>(pulsarClient, + this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver, + cacheProperties.getExpireAfterAccess(), cacheProperties.getMaximumSize(), + cacheProperties.getInitialCapacity()); producerFactory.setTopicBuilder(topicBuilder); return producerFactory; } @@ -145,14 +144,14 @@ PulsarTemplate pulsarTemplate(PulsarProducerFactory pulsarProducerFactory, @Bean @ConditionalOnMissingBean(PulsarConsumerFactory.class) DefaultPulsarConsumerFactory pulsarConsumerFactory(PulsarClient pulsarClient, - ObjectProvider> customizersProvider, - PulsarTopicBuilder topicBuilder) { + ObjectProvider> customizersProvider, PulsarTopicBuilder topicBuilder) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeConsumerBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); List> lambdaSafeCustomizers = List .of((builder) -> applyConsumerBuilderCustomizers(customizers, builder)); - DefaultPulsarConsumerFactory consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers); + DefaultPulsarConsumerFactory consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, + lambdaSafeCustomizers); consumerFactory.setTopicBuilder(topicBuilder); return consumerFactory; } @@ -191,14 +190,14 @@ ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory( @Bean @ConditionalOnMissingBean(PulsarReaderFactory.class) DefaultPulsarReaderFactory pulsarReaderFactory(PulsarClient pulsarClient, - ObjectProvider> customizersProvider, - PulsarTopicBuilder topicBuilder) { + ObjectProvider> customizersProvider, PulsarTopicBuilder topicBuilder) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeReaderBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); List> lambdaSafeCustomizers = List .of((builder) -> applyReaderBuilderCustomizers(customizers, builder)); - DefaultPulsarReaderFactory readerFactory = new DefaultPulsarReaderFactory<>(pulsarClient, lambdaSafeCustomizers); + DefaultPulsarReaderFactory readerFactory = new DefaultPulsarReaderFactory<>(pulsarClient, + lambdaSafeCustomizers); readerFactory.setTopicBuilder(topicBuilder); return readerFactory; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java index c78a4d7a2275..6b983e932983 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java @@ -146,8 +146,8 @@ DefaultReactivePulsarConsumerFactory reactivePulsarConsumerFactory( customizers.addAll(customizersProvider.orderedStream().toList()); List> lambdaSafeCustomizers = List .of((builder) -> applyMessageConsumerBuilderCustomizers(customizers, builder)); - DefaultReactivePulsarConsumerFactory consumerFactory = - new DefaultReactivePulsarConsumerFactory<>(pulsarReactivePulsarClient, lambdaSafeCustomizers); + DefaultReactivePulsarConsumerFactory consumerFactory = new DefaultReactivePulsarConsumerFactory<>( + pulsarReactivePulsarClient, lambdaSafeCustomizers); consumerFactory.setTopicBuilder(topicBuilder); return consumerFactory; } @@ -181,7 +181,8 @@ DefaultReactivePulsarReaderFactory reactivePulsarReaderFactory(ReactivePulsar customizers.addAll(customizersProvider.orderedStream().toList()); List> lambdaSafeCustomizers = List .of((builder) -> applyMessageReaderBuilderCustomizers(customizers, builder)); - DefaultReactivePulsarReaderFactory readerFactory = new DefaultReactivePulsarReaderFactory<>(reactivePulsarClient, lambdaSafeCustomizers); + DefaultReactivePulsarReaderFactory readerFactory = new DefaultReactivePulsarReaderFactory<>( + reactivePulsarClient, lambdaSafeCustomizers); readerFactory.setTopicBuilder(topicBuilder); return readerFactory; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java index 64dc83e7942d..3d861f0c9621 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java @@ -330,7 +330,7 @@ class TopicBuilderTests { void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class); this.contextRunner.withBean("customPulsarTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder) - .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class).isSameAs(topicBuilder)); + .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class).isSameAs(topicBuilder)); } @Test @@ -339,16 +339,16 @@ void whenHasDefaultsTenantAndNamespaceAppliedToTopicBuilder() { properties.add("spring.pulsar.defaults.tenant=my-tenant"); properties.add("spring.pulsar.defaults.namespace=my-namespace"); this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) - .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class) - .asInstanceOf(InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) - .satisfies((topicBuilder -> { - assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultTenant", "my-tenant"); - assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultNamespace", "my-namespace"); - }))); + .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class) + .asInstanceOf(InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) + .satisfies((topicBuilder -> { + assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultTenant", "my-tenant"); + assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultNamespace", "my-namespace"); + }))); } } - + @Nested class FunctionAdministrationTests { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java index 5de47a65dec8..e03c08e6f29b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java @@ -179,8 +179,8 @@ void injectsExpectedBeans() { .extracting("topicResolver", InstanceOfAssertFactories.type(TopicResolver.class)) .isSameAs(context.getBean(TopicResolver.class)); assertThat(senderFactory) - .extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) - .isSameAs(context.getBean(PulsarTopicBuilder.class)); + .extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) + .isSameAs(context.getBean(PulsarTopicBuilder.class)); }); } @@ -257,9 +257,8 @@ class ConsumerFactoryTests { void injectsExpectedBeans() { ReactivePulsarClient client = mock(ReactivePulsarClient.class); PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class); - this.contextRunner - .withBean("customReactivePulsarClient", ReactivePulsarClient.class, () -> client) - .withBean("customTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder) + this.contextRunner.withBean("customReactivePulsarClient", ReactivePulsarClient.class, () -> client) + .withBean("customTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder) .run((context) -> { ReactivePulsarConsumerFactory consumerFactory = context .getBean(DefaultReactivePulsarConsumerFactory.class); @@ -267,8 +266,8 @@ void injectsExpectedBeans() { .extracting("reactivePulsarClient", InstanceOfAssertFactories.type(ReactivePulsarClient.class)) .isSameAs(client); assertThat(consumerFactory) - .extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) - .isSameAs(topicBuilder); + .extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) + .isSameAs(topicBuilder); }); } @@ -383,8 +382,8 @@ void injectsExpectedBeans() { .extracting("reactivePulsarClient", InstanceOfAssertFactories.type(ReactivePulsarClient.class)) .isSameAs(client); assertThat(readerFactory) - .extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) - .isSameAs(topicBuilder); + .extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) + .isSameAs(topicBuilder); }); } From d635c7c28acfb97be23aa740c10dcd83220d0185 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Wed, 14 Aug 2024 14:24:21 -0500 Subject: [PATCH 4/7] Fix formatting (take 2) --- .../boot/autoconfigure/pulsar/PulsarConfigurationTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java index 3d861f0c9621..f163abb8f5e5 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java @@ -341,10 +341,10 @@ void whenHasDefaultsTenantAndNamespaceAppliedToTopicBuilder() { this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class) .asInstanceOf(InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) - .satisfies((topicBuilder -> { + .satisfies((topicBuilder) -> { assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultTenant", "my-tenant"); assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultNamespace", "my-namespace"); - }))); + })); } } From f40986bc707732eff93a3e3cd03065744a5f83d9 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Thu, 15 Aug 2024 11:02:32 -0500 Subject: [PATCH 5/7] Move default tenant/namespace config prop under spring.pulsar.defaults.topic --- .../pulsar/PulsarConfiguration.java | 5 +- .../pulsar/PulsarProperties.java | 64 +++++++++++-------- ...itional-spring-configuration-metadata.json | 6 ++ .../pulsar/PulsarConfigurationTests.java | 10 ++- .../pulsar/PulsarPropertiesTests.java | 8 +-- 5 files changed, 60 insertions(+), 33 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java index b8b5bda23649..ea60717f7bdf 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java @@ -182,9 +182,10 @@ PulsarFunctionAdministration pulsarFunctionAdministration(PulsarAdministration p @Bean @Scope("prototype") @ConditionalOnMissingBean + @ConditionalOnProperty(name = "spring.pulsar.defaults.topic.enabled", havingValue = "true", matchIfMissing = true) PulsarTopicBuilder pulsarTopicBuilder() { - return new PulsarTopicBuilder(TopicDomain.persistent, this.properties.getDefaults().getTenant(), - this.properties.getDefaults().getNamespace()); + return new PulsarTopicBuilder(TopicDomain.persistent, this.properties.getDefaults().getTopic().getTenant(), + this.properties.getDefaults().getTopic().getNamespace()); } } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java index afb5e39f1da6..e4f6897f0b7b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java @@ -250,20 +250,6 @@ public Authentication getAuthentication() { public static class Defaults { - /** - * Default tenant to use when producing or consuming messages against a - * non-fully-qualified topic URL. When not specified Pulsar uses a default tenant - * of 'public'. - */ - private String tenant; - - /** - * Default namespace to use when producing or consuming messages against a - * non-fully-qualified topic URL. When not specified Pulsar uses a default - * namespace of 'default'. - */ - private String namespace; - /** * List of mappings from message type to topic name and schema info to use as a * defaults when a topic name and/or schema is not explicitly specified when @@ -271,20 +257,14 @@ public static class Defaults { */ private List typeMappings = new ArrayList<>(); - public String getTenant() { - return this.tenant; - } + private Topic topic = new Topic(); - public void setTenant(String tenant) { - this.tenant = tenant; + public Topic getTopic() { + return this.topic; } - public String getNamespace() { - return this.namespace; - } - - public void setNamespace(String namespace) { - this.namespace = namespace; + public void setTopic(Topic topic) { + this.topic = topic; } public List getTypeMappings() { @@ -331,6 +311,40 @@ public record SchemaInfo(SchemaType schemaType, Class messageKeyType) { } + public static class Topic { + + /** + * Default tenant to use when producing or consuming messages against a + * non-fully-qualified topic URL. When not specified Pulsar uses a default + * tenant of 'public'. + */ + private String tenant; + + /** + * Default namespace to use when producing or consuming messages against a + * non-fully-qualified topic URL. When not specified Pulsar uses a default + * namespace of 'default'. + */ + private String namespace; + + public String getTenant() { + return this.tenant; + } + + public void setTenant(String tenant) { + this.tenant = tenant; + } + + public String getNamespace() { + return this.namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + } + } public static class Function { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 8683a4ce0539..63dae453db94 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -2068,6 +2068,12 @@ "name": "spring.neo4j.uri", "defaultValue": "bolt://localhost:7687" }, + { + "name": "spring.pulsar.defaults.topic.enabled", + "type": "java.lang.Boolean", + "description": "Whether to enable default tenant and namespace support for topics.", + "defaultValue": true + }, { "name": "spring.pulsar.function.enabled", "type": "java.lang.Boolean", diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java index f163abb8f5e5..eadc25dc8486 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java @@ -333,11 +333,17 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class).isSameAs(topicBuilder)); } + @Test + void whenHasDefaultsTopicDisabledPropertyDoesNotCreateBean() { + this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") + .run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class)); + } + @Test void whenHasDefaultsTenantAndNamespaceAppliedToTopicBuilder() { List properties = new ArrayList<>(); - properties.add("spring.pulsar.defaults.tenant=my-tenant"); - properties.add("spring.pulsar.defaults.namespace=my-namespace"); + properties.add("spring.pulsar.defaults.topic.tenant=my-tenant"); + properties.add("spring.pulsar.defaults.topic.namespace=my-namespace"); this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class) .asInstanceOf(InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java index 569c95b3084e..f7dd1b7e5a5b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java @@ -247,7 +247,7 @@ class DefaultsTenantNamespaceProperties { @Test void bindWhenValuesNotSpecified() { - assertThat(new PulsarProperties().getDefaults()).satisfies((defaults) -> { + assertThat(new PulsarProperties().getDefaults().getTopic()).satisfies((defaults) -> { assertThat(defaults.getTenant()).isNull(); assertThat(defaults.getNamespace()).isNull(); }); @@ -256,9 +256,9 @@ void bindWhenValuesNotSpecified() { @Test void bindWhenValuesSpecified() { Map map = new HashMap<>(); - map.put("spring.pulsar.defaults.tenant", "my-tenant"); - map.put("spring.pulsar.defaults.namespace", "my-namespace"); - PulsarProperties.Defaults properties = bindProperties(map).getDefaults(); + map.put("spring.pulsar.defaults.topic.tenant", "my-tenant"); + map.put("spring.pulsar.defaults.topic.namespace", "my-namespace"); + PulsarProperties.Defaults.Topic properties = bindProperties(map).getDefaults().getTopic(); assertThat(properties.getTenant()).isEqualTo("my-tenant"); assertThat(properties.getNamespace()).isEqualTo("my-namespace"); } From df8db516e10e8ceeecdd75acbaab5a42d4433e36 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Thu, 15 Aug 2024 14:02:17 -0500 Subject: [PATCH 6/7] Adjust test for topic builder having prototype scope --- .../pulsar/PulsarAutoConfigurationTests.java | 10 ++++++---- .../autoconfigure/pulsar/PulsarConfigurationTests.java | 6 ++++++ 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java index c35cf620260b..b61f10fdae13 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java @@ -67,7 +67,6 @@ import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarReaderFactory; import org.springframework.pulsar.core.PulsarTemplate; -import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.ReaderBuilderCustomizer; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; @@ -221,7 +220,8 @@ void injectsExpectedBeans() { .run((context) -> assertThat(context).getBean(DefaultPulsarProducerFactory.class) .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)) .hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class)) - .hasFieldOrPropertyWithValue("topicBuilder", context.getBean(PulsarTopicBuilder.class))); + .extracting("topicBuilder") + .isNotNull()); // prototype so only check not-null } @ParameterizedTest @@ -378,7 +378,8 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { void injectsExpectedBeans() { this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarConsumerFactory.class) .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)) - .hasFieldOrPropertyWithValue("topicBuilder", context.getBean(PulsarTopicBuilder.class))); + .extracting("topicBuilder") + .isNotNull()); // prototype so only check not-null } @Test @@ -578,7 +579,8 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { void injectsExpectedBeans() { this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarReaderFactory.class) .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)) - .hasFieldOrPropertyWithValue("topicBuilder", context.getBean(PulsarTopicBuilder.class))); + .extracting("topicBuilder") + .isNotNull()); // prototype so only check not-null } @Test diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java index eadc25dc8486..c61777862e43 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java @@ -353,6 +353,12 @@ void whenHasDefaultsTenantAndNamespaceAppliedToTopicBuilder() { })); } + @Test + void beanHasScopePrototype() { + this.contextRunner.run((context) -> assertThat(context.getBean(PulsarTopicBuilder.class)) + .isNotSameAs(context.getBean(PulsarTopicBuilder.class))); + } + } @Nested From e65bdf1e96612bd4b091245eaddfceeb1056e443 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Thu, 15 Aug 2024 17:09:26 -0500 Subject: [PATCH 7/7] Adjust reactive test for topic builder having prototype scope --- .../pulsar/PulsarReactiveAutoConfigurationTests.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java index e03c08e6f29b..86fc67c9a024 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java @@ -178,9 +178,11 @@ void injectsExpectedBeans() { assertThat(senderFactory) .extracting("topicResolver", InstanceOfAssertFactories.type(TopicResolver.class)) .isSameAs(context.getBean(TopicResolver.class)); - assertThat(senderFactory) - .extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) - .isSameAs(context.getBean(PulsarTopicBuilder.class)); + assertThat(senderFactory).extracting("topicBuilder").isNotNull(); // prototype + // so + // only + // check + // not-null }); }