Skip to content

Commit df89351

Browse files
committed
Merge pull request #41851 from
* gh-41851: Polish "Add support for Pulsar default tenant/namespace" Add support for Pulsar default tenant/namespace Closes gh-41851
2 parents 5c76189 + 3bbbef7 commit df89351

File tree

9 files changed

+256
-25
lines changed

9 files changed

+256
-25
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.springframework.pulsar.core.PulsarProducerFactory;
5353
import org.springframework.pulsar.core.PulsarReaderFactory;
5454
import org.springframework.pulsar.core.PulsarTemplate;
55+
import org.springframework.pulsar.core.PulsarTopicBuilder;
5556
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
5657
import org.springframework.pulsar.core.SchemaResolver;
5758
import org.springframework.pulsar.core.TopicResolver;
@@ -88,24 +89,31 @@ public class PulsarAutoConfiguration {
8889
@ConditionalOnMissingBean(PulsarProducerFactory.class)
8990
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "false")
9091
DefaultPulsarProducerFactory<?> pulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver,
91-
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider) {
92+
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider,
93+
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
9294
List<ProducerBuilderCustomizer<Object>> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers(
9395
customizersProvider);
94-
return new DefaultPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(),
95-
lambdaSafeCustomizers, topicResolver);
96+
DefaultPulsarProducerFactory<?> producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
97+
this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver);
98+
topicBuilderProvider.ifAvailable(producerFactory::setTopicBuilder);
99+
return producerFactory;
96100
}
97101

98102
@Bean
99103
@ConditionalOnMissingBean(PulsarProducerFactory.class)
100104
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "true", matchIfMissing = true)
101105
CachingPulsarProducerFactory<?> cachingPulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver,
102-
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider) {
106+
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider,
107+
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
103108
PulsarProperties.Producer.Cache cacheProperties = this.properties.getProducer().getCache();
104109
List<ProducerBuilderCustomizer<Object>> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers(
105110
customizersProvider);
106-
return new CachingPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(),
107-
lambdaSafeCustomizers, topicResolver, cacheProperties.getExpireAfterAccess(),
108-
cacheProperties.getMaximumSize(), cacheProperties.getInitialCapacity());
111+
CachingPulsarProducerFactory<?> producerFactory = new CachingPulsarProducerFactory<>(pulsarClient,
112+
this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver,
113+
cacheProperties.getExpireAfterAccess(), cacheProperties.getMaximumSize(),
114+
cacheProperties.getInitialCapacity());
115+
topicBuilderProvider.ifAvailable(producerFactory::setTopicBuilder);
116+
return producerFactory;
109117
}
110118

111119
private List<ProducerBuilderCustomizer<Object>> lambdaSafeProducerBuilderCustomizers(
@@ -138,13 +146,17 @@ PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
138146
@Bean
139147
@ConditionalOnMissingBean(PulsarConsumerFactory.class)
140148
DefaultPulsarConsumerFactory<?> pulsarConsumerFactory(PulsarClient pulsarClient,
141-
ObjectProvider<ConsumerBuilderCustomizer<?>> customizersProvider) {
149+
ObjectProvider<ConsumerBuilderCustomizer<?>> customizersProvider,
150+
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
142151
List<ConsumerBuilderCustomizer<?>> customizers = new ArrayList<>();
143152
customizers.add(this.propertiesMapper::customizeConsumerBuilder);
144153
customizers.addAll(customizersProvider.orderedStream().toList());
145154
List<ConsumerBuilderCustomizer<Object>> lambdaSafeCustomizers = List
146155
.of((builder) -> applyConsumerBuilderCustomizers(customizers, builder));
147-
return new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers);
156+
DefaultPulsarConsumerFactory<?> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
157+
lambdaSafeCustomizers);
158+
topicBuilderProvider.ifAvailable(consumerFactory::setTopicBuilder);
159+
return consumerFactory;
148160
}
149161

150162
@Bean
@@ -181,13 +193,17 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
181193
@Bean
182194
@ConditionalOnMissingBean(PulsarReaderFactory.class)
183195
DefaultPulsarReaderFactory<?> pulsarReaderFactory(PulsarClient pulsarClient,
184-
ObjectProvider<ReaderBuilderCustomizer<?>> customizersProvider) {
196+
ObjectProvider<ReaderBuilderCustomizer<?>> customizersProvider,
197+
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
185198
List<ReaderBuilderCustomizer<?>> customizers = new ArrayList<>();
186199
customizers.add(this.propertiesMapper::customizeReaderBuilder);
187200
customizers.addAll(customizersProvider.orderedStream().toList());
188201
List<ReaderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
189202
.of((builder) -> applyReaderBuilderCustomizers(customizers, builder));
190-
return new DefaultPulsarReaderFactory<>(pulsarClient, lambdaSafeCustomizers);
203+
DefaultPulsarReaderFactory<?> readerFactory = new DefaultPulsarReaderFactory<>(pulsarClient,
204+
lambdaSafeCustomizers);
205+
topicBuilderProvider.ifAvailable(readerFactory::setTopicBuilder);
206+
return readerFactory;
191207
}
192208

193209
@SuppressWarnings("unchecked")

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.pulsar.client.api.ClientBuilder;
2424
import org.apache.pulsar.client.api.PulsarClient;
2525
import org.apache.pulsar.client.api.Schema;
26+
import org.apache.pulsar.common.naming.TopicDomain;
2627
import org.apache.pulsar.common.schema.SchemaType;
2728

2829
import org.springframework.beans.factory.ObjectProvider;
@@ -34,13 +35,15 @@
3435
import org.springframework.boot.util.LambdaSafe;
3536
import org.springframework.context.annotation.Bean;
3637
import org.springframework.context.annotation.Configuration;
38+
import org.springframework.context.annotation.Scope;
3739
import org.springframework.pulsar.core.DefaultPulsarClientFactory;
3840
import org.springframework.pulsar.core.DefaultSchemaResolver;
3941
import org.springframework.pulsar.core.DefaultTopicResolver;
4042
import org.springframework.pulsar.core.PulsarAdminBuilderCustomizer;
4143
import org.springframework.pulsar.core.PulsarAdministration;
4244
import org.springframework.pulsar.core.PulsarClientBuilderCustomizer;
4345
import org.springframework.pulsar.core.PulsarClientFactory;
46+
import org.springframework.pulsar.core.PulsarTopicBuilder;
4447
import org.springframework.pulsar.core.SchemaResolver;
4548
import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer;
4649
import org.springframework.pulsar.core.TopicResolver;
@@ -176,4 +179,13 @@ PulsarFunctionAdministration pulsarFunctionAdministration(PulsarAdministration p
176179
properties.isFailFast(), properties.isPropagateFailures(), properties.isPropagateStopFailures());
177180
}
178181

182+
@Bean
183+
@Scope("prototype")
184+
@ConditionalOnMissingBean
185+
@ConditionalOnProperty(name = "spring.pulsar.defaults.topic.enabled", havingValue = "true", matchIfMissing = true)
186+
PulsarTopicBuilder pulsarTopicBuilder() {
187+
return new PulsarTopicBuilder(TopicDomain.persistent, this.properties.getDefaults().getTopic().getTenant(),
188+
this.properties.getDefaults().getTopic().getNamespace());
189+
}
190+
179191
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,8 @@ public static class Defaults {
257257
*/
258258
private List<TypeMapping> typeMappings = new ArrayList<>();
259259

260+
private final Topic topic = new Topic();
261+
260262
public List<TypeMapping> getTypeMappings() {
261263
return this.typeMappings;
262264
}
@@ -265,6 +267,10 @@ public void setTypeMappings(List<TypeMapping> typeMappings) {
265267
this.typeMappings = typeMappings;
266268
}
267269

270+
public Topic getTopic() {
271+
return this.topic;
272+
}
273+
268274
/**
269275
* A mapping from message type to topic and/or schema info to use (at least one of
270276
* {@code topicName} or {@code schemaInfo} must be specified.
@@ -301,6 +307,40 @@ public record SchemaInfo(SchemaType schemaType, Class<?> messageKeyType) {
301307

302308
}
303309

310+
public static class Topic {
311+
312+
/**
313+
* Default tenant to use when producing or consuming messages against a
314+
* non-fully-qualified topic URL. When not specified Pulsar uses a default
315+
* tenant of 'public'.
316+
*/
317+
private String tenant;
318+
319+
/**
320+
* Default namespace to use when producing or consuming messages against a
321+
* non-fully-qualified topic URL. When not specified Pulsar uses a default
322+
* namespace of 'default'.
323+
*/
324+
private String namespace;
325+
326+
public String getTenant() {
327+
return this.tenant;
328+
}
329+
330+
public void setTenant(String tenant) {
331+
this.tenant = tenant;
332+
}
333+
334+
public String getNamespace() {
335+
return this.namespace;
336+
}
337+
338+
public void setNamespace(String namespace) {
339+
this.namespace = namespace;
340+
}
341+
342+
}
343+
304344
}
305345

306346
public static class Function {

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2023 the original author or authors.
2+
* Copyright 2012-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,13 +41,15 @@
4141
import org.springframework.context.annotation.Configuration;
4242
import org.springframework.context.annotation.Import;
4343
import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames;
44+
import org.springframework.pulsar.core.PulsarTopicBuilder;
4445
import org.springframework.pulsar.core.SchemaResolver;
4546
import org.springframework.pulsar.core.TopicResolver;
4647
import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory;
4748
import org.springframework.pulsar.reactive.config.annotation.EnableReactivePulsar;
4849
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory;
4950
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarReaderFactory;
5051
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory;
52+
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarSenderFactory.Builder;
5153
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
5254
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
5355
import org.springframework.pulsar.reactive.core.ReactiveMessageSenderBuilderCustomizer;
@@ -112,17 +114,19 @@ private ReactiveMessageSenderCache reactivePulsarMessageSenderCache(ProducerCach
112114
@ConditionalOnMissingBean(ReactivePulsarSenderFactory.class)
113115
DefaultReactivePulsarSenderFactory<?> reactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient,
114116
ObjectProvider<ReactiveMessageSenderCache> reactiveMessageSenderCache, TopicResolver topicResolver,
115-
ObjectProvider<ReactiveMessageSenderBuilderCustomizer<?>> customizersProvider) {
117+
ObjectProvider<ReactiveMessageSenderBuilderCustomizer<?>> customizersProvider,
118+
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
116119
List<ReactiveMessageSenderBuilderCustomizer<?>> customizers = new ArrayList<>();
117120
customizers.add(this.propertiesMapper::customizeMessageSenderBuilder);
118121
customizers.addAll(customizersProvider.orderedStream().toList());
119122
List<ReactiveMessageSenderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
120123
.of((builder) -> applyMessageSenderBuilderCustomizers(customizers, builder));
121-
return DefaultReactivePulsarSenderFactory.builderFor(reactivePulsarClient)
124+
Builder<Object> senderFactoryBuilder = DefaultReactivePulsarSenderFactory.builderFor(reactivePulsarClient)
122125
.withDefaultConfigCustomizers(lambdaSafeCustomizers)
123126
.withMessageSenderCache(reactiveMessageSenderCache.getIfAvailable())
124-
.withTopicResolver(topicResolver)
125-
.build();
127+
.withTopicResolver(topicResolver);
128+
topicBuilderProvider.ifAvailable(senderFactoryBuilder::withTopicBuilder);
129+
return senderFactoryBuilder.build();
126130
}
127131

128132
@SuppressWarnings("unchecked")
@@ -136,13 +140,17 @@ private void applyMessageSenderBuilderCustomizers(List<ReactiveMessageSenderBuil
136140
@ConditionalOnMissingBean(ReactivePulsarConsumerFactory.class)
137141
DefaultReactivePulsarConsumerFactory<?> reactivePulsarConsumerFactory(
138142
ReactivePulsarClient pulsarReactivePulsarClient,
139-
ObjectProvider<ReactiveMessageConsumerBuilderCustomizer<?>> customizersProvider) {
143+
ObjectProvider<ReactiveMessageConsumerBuilderCustomizer<?>> customizersProvider,
144+
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
140145
List<ReactiveMessageConsumerBuilderCustomizer<?>> customizers = new ArrayList<>();
141146
customizers.add(this.propertiesMapper::customizeMessageConsumerBuilder);
142147
customizers.addAll(customizersProvider.orderedStream().toList());
143148
List<ReactiveMessageConsumerBuilderCustomizer<Object>> lambdaSafeCustomizers = List
144149
.of((builder) -> applyMessageConsumerBuilderCustomizers(customizers, builder));
145-
return new DefaultReactivePulsarConsumerFactory<>(pulsarReactivePulsarClient, lambdaSafeCustomizers);
150+
DefaultReactivePulsarConsumerFactory<?> consumerFactory = new DefaultReactivePulsarConsumerFactory<>(
151+
pulsarReactivePulsarClient, lambdaSafeCustomizers);
152+
topicBuilderProvider.ifAvailable(consumerFactory::setTopicBuilder);
153+
return consumerFactory;
146154
}
147155

148156
@SuppressWarnings("unchecked")
@@ -167,13 +175,17 @@ DefaultReactivePulsarListenerContainerFactory<?> reactivePulsarListenerContainer
167175
@Bean
168176
@ConditionalOnMissingBean(ReactivePulsarReaderFactory.class)
169177
DefaultReactivePulsarReaderFactory<?> reactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient,
170-
ObjectProvider<ReactiveMessageReaderBuilderCustomizer<?>> customizersProvider) {
178+
ObjectProvider<ReactiveMessageReaderBuilderCustomizer<?>> customizersProvider,
179+
ObjectProvider<PulsarTopicBuilder> topicBuilderProvider) {
171180
List<ReactiveMessageReaderBuilderCustomizer<?>> customizers = new ArrayList<>();
172181
customizers.add(this.propertiesMapper::customizeMessageReaderBuilder);
173182
customizers.addAll(customizersProvider.orderedStream().toList());
174183
List<ReactiveMessageReaderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
175184
.of((builder) -> applyMessageReaderBuilderCustomizers(customizers, builder));
176-
return new DefaultReactivePulsarReaderFactory<>(reactivePulsarClient, lambdaSafeCustomizers);
185+
DefaultReactivePulsarReaderFactory<?> readerFactory = new DefaultReactivePulsarReaderFactory<>(
186+
reactivePulsarClient, lambdaSafeCustomizers);
187+
topicBuilderProvider.ifAvailable(readerFactory::setTopicBuilder);
188+
return readerFactory;
177189
}
178190

179191
@SuppressWarnings("unchecked")

spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2068,6 +2068,12 @@
20682068
"name": "spring.neo4j.uri",
20692069
"defaultValue": "bolt://localhost:7687"
20702070
},
2071+
{
2072+
"name": "spring.pulsar.defaults.topic.enabled",
2073+
"type": "java.lang.Boolean",
2074+
"description": "Whether to enable default tenant and namespace support for topics.",
2075+
"defaultValue": true
2076+
},
20712077
{
20722078
"name": "spring.pulsar.function.enabled",
20732079
"type": "java.lang.Boolean",

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.springframework.pulsar.core.PulsarProducerFactory;
6868
import org.springframework.pulsar.core.PulsarReaderFactory;
6969
import org.springframework.pulsar.core.PulsarTemplate;
70+
import org.springframework.pulsar.core.PulsarTopicBuilder;
7071
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
7172
import org.springframework.pulsar.core.SchemaResolver;
7273
import org.springframework.pulsar.core.TopicResolver;
@@ -126,6 +127,7 @@ void autoConfiguresBeans() {
126127
.hasSingleBean(PulsarConnectionDetails.class)
127128
.hasSingleBean(DefaultPulsarClientFactory.class)
128129
.hasSingleBean(PulsarClient.class)
130+
.hasSingleBean(PulsarTopicBuilder.class)
129131
.hasSingleBean(PulsarAdministration.class)
130132
.hasSingleBean(DefaultSchemaResolver.class)
131133
.hasSingleBean(DefaultTopicResolver.class)
@@ -141,6 +143,12 @@ void autoConfiguresBeans() {
141143
.hasSingleBean(PulsarReaderEndpointRegistry.class));
142144
}
143145

146+
@Test
147+
void topicDefaultsCanBeDisabled() {
148+
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
149+
.run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class));
150+
}
151+
144152
@Nested
145153
class ProducerFactoryTests {
146154

@@ -219,7 +227,17 @@ void injectsExpectedBeans() {
219227
"spring.pulsar.producer.cache.enabled=false")
220228
.run((context) -> assertThat(context).getBean(DefaultPulsarProducerFactory.class)
221229
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
222-
.hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class)));
230+
.hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class))
231+
.extracting("topicBuilder")
232+
.isNotNull());
233+
}
234+
235+
@Test
236+
void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() {
237+
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
238+
.run((context) -> assertThat(context).getBean(DefaultPulsarProducerFactory.class)
239+
.extracting("topicBuilder")
240+
.isNull());
223241
}
224242

225243
@ParameterizedTest
@@ -375,7 +393,18 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
375393
@Test
376394
void injectsExpectedBeans() {
377395
this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarConsumerFactory.class)
378-
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)));
396+
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
397+
.extracting("topicBuilder")
398+
.isNotNull());
399+
}
400+
401+
@Test
402+
void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() {
403+
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
404+
.run((context) -> assertThat(context).getBean(DefaultPulsarConsumerFactory.class)
405+
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
406+
.extracting("topicBuilder")
407+
.isNull());
379408
}
380409

381410
@Test
@@ -574,7 +603,17 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
574603
@Test
575604
void injectsExpectedBeans() {
576605
this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarReaderFactory.class)
577-
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)));
606+
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
607+
.extracting("topicBuilder")
608+
.isNotNull());
609+
}
610+
611+
@Test
612+
void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() {
613+
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
614+
.run((context) -> assertThat(context).getBean(DefaultPulsarReaderFactory.class)
615+
.extracting("topicBuilder")
616+
.isNull());
578617
}
579618

580619
@Test

0 commit comments

Comments
 (0)