diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java index 92c66e216824..7d466d571965 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java @@ -52,6 +52,7 @@ import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarReaderFactory; import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.ReaderBuilderCustomizer; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; @@ -88,24 +89,29 @@ public class PulsarAutoConfiguration { @ConditionalOnMissingBean(PulsarProducerFactory.class) @ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "false") DefaultPulsarProducerFactory pulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver, - ObjectProvider> customizersProvider) { + ObjectProvider> customizersProvider, PulsarTopicBuilder topicBuilder) { List> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers( customizersProvider); - return new DefaultPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(), - lambdaSafeCustomizers, topicResolver); + DefaultPulsarProducerFactory producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, + this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver); + producerFactory.setTopicBuilder(topicBuilder); + return producerFactory; } @Bean @ConditionalOnMissingBean(PulsarProducerFactory.class) @ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "true", matchIfMissing = true) CachingPulsarProducerFactory cachingPulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver, - ObjectProvider> customizersProvider) { + ObjectProvider> customizersProvider, PulsarTopicBuilder topicBuilder) { PulsarProperties.Producer.Cache cacheProperties = this.properties.getProducer().getCache(); List> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers( customizersProvider); - return new CachingPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(), - lambdaSafeCustomizers, topicResolver, cacheProperties.getExpireAfterAccess(), - cacheProperties.getMaximumSize(), cacheProperties.getInitialCapacity()); + CachingPulsarProducerFactory producerFactory = new CachingPulsarProducerFactory<>(pulsarClient, + this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver, + cacheProperties.getExpireAfterAccess(), cacheProperties.getMaximumSize(), + cacheProperties.getInitialCapacity()); + producerFactory.setTopicBuilder(topicBuilder); + return producerFactory; } private List> lambdaSafeProducerBuilderCustomizers( @@ -138,13 +144,16 @@ PulsarTemplate pulsarTemplate(PulsarProducerFactory pulsarProducerFactory, @Bean @ConditionalOnMissingBean(PulsarConsumerFactory.class) DefaultPulsarConsumerFactory pulsarConsumerFactory(PulsarClient pulsarClient, - ObjectProvider> customizersProvider) { + ObjectProvider> customizersProvider, PulsarTopicBuilder topicBuilder) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeConsumerBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); List> lambdaSafeCustomizers = List .of((builder) -> applyConsumerBuilderCustomizers(customizers, builder)); - return new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers); + DefaultPulsarConsumerFactory consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, + lambdaSafeCustomizers); + consumerFactory.setTopicBuilder(topicBuilder); + return consumerFactory; } @Bean @@ -181,13 +190,16 @@ ConcurrentPulsarListenerContainerFactory pulsarListenerContainerFactory( @Bean @ConditionalOnMissingBean(PulsarReaderFactory.class) DefaultPulsarReaderFactory pulsarReaderFactory(PulsarClient pulsarClient, - ObjectProvider> customizersProvider) { + ObjectProvider> customizersProvider, PulsarTopicBuilder topicBuilder) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeReaderBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); List> lambdaSafeCustomizers = List .of((builder) -> applyReaderBuilderCustomizers(customizers, builder)); - return new DefaultPulsarReaderFactory<>(pulsarClient, lambdaSafeCustomizers); + DefaultPulsarReaderFactory readerFactory = new DefaultPulsarReaderFactory<>(pulsarClient, + lambdaSafeCustomizers); + readerFactory.setTopicBuilder(topicBuilder); + return readerFactory; } @SuppressWarnings("unchecked") diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java index 0f5b860490f3..ea60717f7bdf 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java @@ -23,6 +23,7 @@ import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.schema.SchemaType; import org.springframework.beans.factory.ObjectProvider; @@ -34,6 +35,7 @@ import org.springframework.boot.util.LambdaSafe; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Scope; import org.springframework.pulsar.core.DefaultPulsarClientFactory; import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.DefaultTopicResolver; @@ -41,6 +43,7 @@ import org.springframework.pulsar.core.PulsarAdministration; import org.springframework.pulsar.core.PulsarClientBuilderCustomizer; import org.springframework.pulsar.core.PulsarClientFactory; +import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer; import org.springframework.pulsar.core.TopicResolver; @@ -176,4 +179,13 @@ PulsarFunctionAdministration pulsarFunctionAdministration(PulsarAdministration p properties.isFailFast(), properties.isPropagateFailures(), properties.isPropagateStopFailures()); } + @Bean + @Scope("prototype") + @ConditionalOnMissingBean + @ConditionalOnProperty(name = "spring.pulsar.defaults.topic.enabled", havingValue = "true", matchIfMissing = true) + PulsarTopicBuilder pulsarTopicBuilder() { + return new PulsarTopicBuilder(TopicDomain.persistent, this.properties.getDefaults().getTopic().getTenant(), + this.properties.getDefaults().getTopic().getNamespace()); + } + } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java index 458aebb814a2..e4f6897f0b7b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java @@ -257,6 +257,16 @@ public static class Defaults { */ private List typeMappings = new ArrayList<>(); + private Topic topic = new Topic(); + + public Topic getTopic() { + return this.topic; + } + + public void setTopic(Topic topic) { + this.topic = topic; + } + public List getTypeMappings() { return this.typeMappings; } @@ -301,6 +311,40 @@ public record SchemaInfo(SchemaType schemaType, Class messageKeyType) { } + public static class Topic { + + /** + * Default tenant to use when producing or consuming messages against a + * non-fully-qualified topic URL. When not specified Pulsar uses a default + * tenant of 'public'. + */ + private String tenant; + + /** + * Default namespace to use when producing or consuming messages against a + * non-fully-qualified topic URL. When not specified Pulsar uses a default + * namespace of 'default'. + */ + private String namespace; + + public String getTenant() { + return this.tenant; + } + + public void setTenant(String tenant) { + this.tenant = tenant; + } + + public String getNamespace() { + return this.namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + } + } public static class Function { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java index 4c2aeb172d52..6b983e932983 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java @@ -41,6 +41,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames; +import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; @@ -112,7 +113,8 @@ private ReactiveMessageSenderCache reactivePulsarMessageSenderCache(ProducerCach @ConditionalOnMissingBean(ReactivePulsarSenderFactory.class) DefaultReactivePulsarSenderFactory reactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient, ObjectProvider reactiveMessageSenderCache, TopicResolver topicResolver, - ObjectProvider> customizersProvider) { + ObjectProvider> customizersProvider, + PulsarTopicBuilder topicBuilder) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeMessageSenderBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); @@ -122,6 +124,7 @@ DefaultReactivePulsarSenderFactory reactivePulsarSenderFactory(ReactivePulsar .withDefaultConfigCustomizers(lambdaSafeCustomizers) .withMessageSenderCache(reactiveMessageSenderCache.getIfAvailable()) .withTopicResolver(topicResolver) + .withTopicBuilder(topicBuilder) .build(); } @@ -136,13 +139,17 @@ private void applyMessageSenderBuilderCustomizers(List reactivePulsarConsumerFactory( ReactivePulsarClient pulsarReactivePulsarClient, - ObjectProvider> customizersProvider) { + ObjectProvider> customizersProvider, + PulsarTopicBuilder topicBuilder) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeMessageConsumerBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); List> lambdaSafeCustomizers = List .of((builder) -> applyMessageConsumerBuilderCustomizers(customizers, builder)); - return new DefaultReactivePulsarConsumerFactory<>(pulsarReactivePulsarClient, lambdaSafeCustomizers); + DefaultReactivePulsarConsumerFactory consumerFactory = new DefaultReactivePulsarConsumerFactory<>( + pulsarReactivePulsarClient, lambdaSafeCustomizers); + consumerFactory.setTopicBuilder(topicBuilder); + return consumerFactory; } @SuppressWarnings("unchecked") @@ -167,13 +174,17 @@ DefaultReactivePulsarListenerContainerFactory reactivePulsarListenerContainer @Bean @ConditionalOnMissingBean(ReactivePulsarReaderFactory.class) DefaultReactivePulsarReaderFactory reactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient, - ObjectProvider> customizersProvider) { + ObjectProvider> customizersProvider, + PulsarTopicBuilder topicBuilder) { List> customizers = new ArrayList<>(); customizers.add(this.propertiesMapper::customizeMessageReaderBuilder); customizers.addAll(customizersProvider.orderedStream().toList()); List> lambdaSafeCustomizers = List .of((builder) -> applyMessageReaderBuilderCustomizers(customizers, builder)); - return new DefaultReactivePulsarReaderFactory<>(reactivePulsarClient, lambdaSafeCustomizers); + DefaultReactivePulsarReaderFactory readerFactory = new DefaultReactivePulsarReaderFactory<>( + reactivePulsarClient, lambdaSafeCustomizers); + readerFactory.setTopicBuilder(topicBuilder); + return readerFactory; } @SuppressWarnings("unchecked") diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 8683a4ce0539..63dae453db94 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -2068,6 +2068,12 @@ "name": "spring.neo4j.uri", "defaultValue": "bolt://localhost:7687" }, + { + "name": "spring.pulsar.defaults.topic.enabled", + "type": "java.lang.Boolean", + "description": "Whether to enable default tenant and namespace support for topics.", + "defaultValue": true + }, { "name": "spring.pulsar.function.enabled", "type": "java.lang.Boolean", diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java index 1b5e3fed0e4d..b61f10fdae13 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java @@ -219,7 +219,9 @@ void injectsExpectedBeans() { "spring.pulsar.producer.cache.enabled=false") .run((context) -> assertThat(context).getBean(DefaultPulsarProducerFactory.class) .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)) - .hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class))); + .hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class)) + .extracting("topicBuilder") + .isNotNull()); // prototype so only check not-null } @ParameterizedTest @@ -375,7 +377,9 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { @Test void injectsExpectedBeans() { this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarConsumerFactory.class) - .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))); + .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)) + .extracting("topicBuilder") + .isNotNull()); // prototype so only check not-null } @Test @@ -574,7 +578,9 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { @Test void injectsExpectedBeans() { this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarReaderFactory.class) - .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))); + .hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)) + .extracting("topicBuilder") + .isNotNull()); // prototype so only check not-null } @Test diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java index ef775cab6336..c61777862e43 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java @@ -49,6 +49,7 @@ import org.springframework.pulsar.core.PulsarAdministration; import org.springframework.pulsar.core.PulsarClientBuilderCustomizer; import org.springframework.pulsar.core.PulsarClientFactory; +import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer; import org.springframework.pulsar.core.TopicResolver; @@ -320,6 +321,46 @@ void whenHasDefaultsTypeMappingAddsToSchemaResolver() { } + @Nested + class TopicBuilderTests { + + private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner; + + @Test + void whenHasUserDefinedBeanDoesNotAutoConfigureBean() { + PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class); + this.contextRunner.withBean("customPulsarTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder) + .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class).isSameAs(topicBuilder)); + } + + @Test + void whenHasDefaultsTopicDisabledPropertyDoesNotCreateBean() { + this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false") + .run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class)); + } + + @Test + void whenHasDefaultsTenantAndNamespaceAppliedToTopicBuilder() { + List properties = new ArrayList<>(); + properties.add("spring.pulsar.defaults.topic.tenant=my-tenant"); + properties.add("spring.pulsar.defaults.topic.namespace=my-namespace"); + this.contextRunner.withPropertyValues(properties.toArray(String[]::new)) + .run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class) + .asInstanceOf(InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) + .satisfies((topicBuilder) -> { + assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultTenant", "my-tenant"); + assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultNamespace", "my-namespace"); + })); + } + + @Test + void beanHasScopePrototype() { + this.contextRunner.run((context) -> assertThat(context.getBean(PulsarTopicBuilder.class)) + .isNotSameAs(context.getBean(PulsarTopicBuilder.class))); + } + + } + @Nested class FunctionAdministrationTests { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java index 53e90a2b954c..f7dd1b7e5a5b 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java @@ -152,7 +152,7 @@ void bindAuthentication() { } @Nested - class DefaultsProperties { + class DefaultsTypeMappingProperties { @Test void bindWhenNoTypeMappings() { @@ -242,6 +242,29 @@ record TestMessage(String value) { } + @Nested + class DefaultsTenantNamespaceProperties { + + @Test + void bindWhenValuesNotSpecified() { + assertThat(new PulsarProperties().getDefaults().getTopic()).satisfies((defaults) -> { + assertThat(defaults.getTenant()).isNull(); + assertThat(defaults.getNamespace()).isNull(); + }); + } + + @Test + void bindWhenValuesSpecified() { + Map map = new HashMap<>(); + map.put("spring.pulsar.defaults.topic.tenant", "my-tenant"); + map.put("spring.pulsar.defaults.topic.namespace", "my-namespace"); + PulsarProperties.Defaults.Topic properties = bindProperties(map).getDefaults().getTopic(); + assertThat(properties.getTenant()).isEqualTo("my-tenant"); + assertThat(properties.getNamespace()).isEqualTo("my-namespace"); + } + + } + @Nested class FunctionProperties { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java index 4f3ab011ea2e..86fc67c9a024 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java @@ -48,6 +48,7 @@ import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.DefaultTopicResolver; import org.springframework.pulsar.core.PulsarAdministration; +import org.springframework.pulsar.core.PulsarTopicBuilder; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; @@ -177,6 +178,11 @@ void injectsExpectedBeans() { assertThat(senderFactory) .extracting("topicResolver", InstanceOfAssertFactories.type(TopicResolver.class)) .isSameAs(context.getBean(TopicResolver.class)); + assertThat(senderFactory).extracting("topicBuilder").isNotNull(); // prototype + // so + // only + // check + // not-null }); } @@ -252,13 +258,18 @@ class ConsumerFactoryTests { @Test void injectsExpectedBeans() { ReactivePulsarClient client = mock(ReactivePulsarClient.class); + PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class); this.contextRunner.withBean("customReactivePulsarClient", ReactivePulsarClient.class, () -> client) + .withBean("customTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder) .run((context) -> { ReactivePulsarConsumerFactory consumerFactory = context .getBean(DefaultReactivePulsarConsumerFactory.class); assertThat(consumerFactory) .extracting("reactivePulsarClient", InstanceOfAssertFactories.type(ReactivePulsarClient.class)) .isSameAs(client); + assertThat(consumerFactory) + .extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) + .isSameAs(topicBuilder); }); } @@ -362,14 +373,19 @@ class ReaderFactoryTests { @Test void injectsExpectedBeans() { ReactivePulsarClient client = mock(ReactivePulsarClient.class); + PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class); this.contextRunner.withPropertyValues("spring.pulsar.reader.name=test-reader") .withBean("customReactivePulsarClient", ReactivePulsarClient.class, () -> client) + .withBean("customPulsarTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder) .run((context) -> { DefaultReactivePulsarReaderFactory readerFactory = context .getBean(DefaultReactivePulsarReaderFactory.class); assertThat(readerFactory) .extracting("reactivePulsarClient", InstanceOfAssertFactories.type(ReactivePulsarClient.class)) .isSameAs(client); + assertThat(readerFactory) + .extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class)) + .isSameAs(topicBuilder); }); }