Skip to content

Commit 98da3aa

Browse files
onobcwilkinsona
authored andcommitted
Add support for Pulsar default tenant/namespace
This commit allows Pulsar users to configure a default tenant and/or namespace to be used when producing or consuming messages to topic URLs that are not fully-qualified. See gh-41851
1 parent 5c76189 commit 98da3aa

File tree

9 files changed

+191
-20
lines changed

9 files changed

+191
-20
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.springframework.pulsar.core.PulsarProducerFactory;
5353
import org.springframework.pulsar.core.PulsarReaderFactory;
5454
import org.springframework.pulsar.core.PulsarTemplate;
55+
import org.springframework.pulsar.core.PulsarTopicBuilder;
5556
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
5657
import org.springframework.pulsar.core.SchemaResolver;
5758
import org.springframework.pulsar.core.TopicResolver;
@@ -88,24 +89,29 @@ public class PulsarAutoConfiguration {
8889
@ConditionalOnMissingBean(PulsarProducerFactory.class)
8990
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "false")
9091
DefaultPulsarProducerFactory<?> pulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver,
91-
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider) {
92+
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider, PulsarTopicBuilder topicBuilder) {
9293
List<ProducerBuilderCustomizer<Object>> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers(
9394
customizersProvider);
94-
return new DefaultPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(),
95-
lambdaSafeCustomizers, topicResolver);
95+
DefaultPulsarProducerFactory<?> producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
96+
this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver);
97+
producerFactory.setTopicBuilder(topicBuilder);
98+
return producerFactory;
9699
}
97100

98101
@Bean
99102
@ConditionalOnMissingBean(PulsarProducerFactory.class)
100103
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "true", matchIfMissing = true)
101104
CachingPulsarProducerFactory<?> cachingPulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver,
102-
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider) {
105+
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider, PulsarTopicBuilder topicBuilder) {
103106
PulsarProperties.Producer.Cache cacheProperties = this.properties.getProducer().getCache();
104107
List<ProducerBuilderCustomizer<Object>> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers(
105108
customizersProvider);
106-
return new CachingPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(),
107-
lambdaSafeCustomizers, topicResolver, cacheProperties.getExpireAfterAccess(),
108-
cacheProperties.getMaximumSize(), cacheProperties.getInitialCapacity());
109+
CachingPulsarProducerFactory<?> producerFactory = new CachingPulsarProducerFactory<>(pulsarClient,
110+
this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver,
111+
cacheProperties.getExpireAfterAccess(), cacheProperties.getMaximumSize(),
112+
cacheProperties.getInitialCapacity());
113+
producerFactory.setTopicBuilder(topicBuilder);
114+
return producerFactory;
109115
}
110116

111117
private List<ProducerBuilderCustomizer<Object>> lambdaSafeProducerBuilderCustomizers(
@@ -138,13 +144,16 @@ PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
138144
@Bean
139145
@ConditionalOnMissingBean(PulsarConsumerFactory.class)
140146
DefaultPulsarConsumerFactory<?> pulsarConsumerFactory(PulsarClient pulsarClient,
141-
ObjectProvider<ConsumerBuilderCustomizer<?>> customizersProvider) {
147+
ObjectProvider<ConsumerBuilderCustomizer<?>> customizersProvider, PulsarTopicBuilder topicBuilder) {
142148
List<ConsumerBuilderCustomizer<?>> customizers = new ArrayList<>();
143149
customizers.add(this.propertiesMapper::customizeConsumerBuilder);
144150
customizers.addAll(customizersProvider.orderedStream().toList());
145151
List<ConsumerBuilderCustomizer<Object>> lambdaSafeCustomizers = List
146152
.of((builder) -> applyConsumerBuilderCustomizers(customizers, builder));
147-
return new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers);
153+
DefaultPulsarConsumerFactory<?> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
154+
lambdaSafeCustomizers);
155+
consumerFactory.setTopicBuilder(topicBuilder);
156+
return consumerFactory;
148157
}
149158

150159
@Bean
@@ -181,13 +190,16 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
181190
@Bean
182191
@ConditionalOnMissingBean(PulsarReaderFactory.class)
183192
DefaultPulsarReaderFactory<?> pulsarReaderFactory(PulsarClient pulsarClient,
184-
ObjectProvider<ReaderBuilderCustomizer<?>> customizersProvider) {
193+
ObjectProvider<ReaderBuilderCustomizer<?>> customizersProvider, PulsarTopicBuilder topicBuilder) {
185194
List<ReaderBuilderCustomizer<?>> customizers = new ArrayList<>();
186195
customizers.add(this.propertiesMapper::customizeReaderBuilder);
187196
customizers.addAll(customizersProvider.orderedStream().toList());
188197
List<ReaderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
189198
.of((builder) -> applyReaderBuilderCustomizers(customizers, builder));
190-
return new DefaultPulsarReaderFactory<>(pulsarClient, lambdaSafeCustomizers);
199+
DefaultPulsarReaderFactory<?> readerFactory = new DefaultPulsarReaderFactory<>(pulsarClient,
200+
lambdaSafeCustomizers);
201+
readerFactory.setTopicBuilder(topicBuilder);
202+
return readerFactory;
191203
}
192204

193205
@SuppressWarnings("unchecked")

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.pulsar.client.api.ClientBuilder;
2424
import org.apache.pulsar.client.api.PulsarClient;
2525
import org.apache.pulsar.client.api.Schema;
26+
import org.apache.pulsar.common.naming.TopicDomain;
2627
import org.apache.pulsar.common.schema.SchemaType;
2728

2829
import org.springframework.beans.factory.ObjectProvider;
@@ -34,13 +35,15 @@
3435
import org.springframework.boot.util.LambdaSafe;
3536
import org.springframework.context.annotation.Bean;
3637
import org.springframework.context.annotation.Configuration;
38+
import org.springframework.context.annotation.Scope;
3739
import org.springframework.pulsar.core.DefaultPulsarClientFactory;
3840
import org.springframework.pulsar.core.DefaultSchemaResolver;
3941
import org.springframework.pulsar.core.DefaultTopicResolver;
4042
import org.springframework.pulsar.core.PulsarAdminBuilderCustomizer;
4143
import org.springframework.pulsar.core.PulsarAdministration;
4244
import org.springframework.pulsar.core.PulsarClientBuilderCustomizer;
4345
import org.springframework.pulsar.core.PulsarClientFactory;
46+
import org.springframework.pulsar.core.PulsarTopicBuilder;
4447
import org.springframework.pulsar.core.SchemaResolver;
4548
import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer;
4649
import org.springframework.pulsar.core.TopicResolver;
@@ -176,4 +179,13 @@ PulsarFunctionAdministration pulsarFunctionAdministration(PulsarAdministration p
176179
properties.isFailFast(), properties.isPropagateFailures(), properties.isPropagateStopFailures());
177180
}
178181

182+
@Bean
183+
@Scope("prototype")
184+
@ConditionalOnMissingBean
185+
@ConditionalOnProperty(name = "spring.pulsar.defaults.topic.enabled", havingValue = "true", matchIfMissing = true)
186+
PulsarTopicBuilder pulsarTopicBuilder() {
187+
return new PulsarTopicBuilder(TopicDomain.persistent, this.properties.getDefaults().getTopic().getTenant(),
188+
this.properties.getDefaults().getTopic().getNamespace());
189+
}
190+
179191
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,16 @@ public static class Defaults {
257257
*/
258258
private List<TypeMapping> typeMappings = new ArrayList<>();
259259

260+
private Topic topic = new Topic();
261+
262+
public Topic getTopic() {
263+
return this.topic;
264+
}
265+
266+
public void setTopic(Topic topic) {
267+
this.topic = topic;
268+
}
269+
260270
public List<TypeMapping> getTypeMappings() {
261271
return this.typeMappings;
262272
}
@@ -301,6 +311,40 @@ public record SchemaInfo(SchemaType schemaType, Class<?> messageKeyType) {
301311

302312
}
303313

314+
public static class Topic {
315+
316+
/**
317+
* Default tenant to use when producing or consuming messages against a
318+
* non-fully-qualified topic URL. When not specified Pulsar uses a default
319+
* tenant of 'public'.
320+
*/
321+
private String tenant;
322+
323+
/**
324+
* Default namespace to use when producing or consuming messages against a
325+
* non-fully-qualified topic URL. When not specified Pulsar uses a default
326+
* namespace of 'default'.
327+
*/
328+
private String namespace;
329+
330+
public String getTenant() {
331+
return this.tenant;
332+
}
333+
334+
public void setTenant(String tenant) {
335+
this.tenant = tenant;
336+
}
337+
338+
public String getNamespace() {
339+
return this.namespace;
340+
}
341+
342+
public void setNamespace(String namespace) {
343+
this.namespace = namespace;
344+
}
345+
346+
}
347+
304348
}
305349

306350
public static class Function {

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.springframework.context.annotation.Configuration;
4242
import org.springframework.context.annotation.Import;
4343
import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames;
44+
import org.springframework.pulsar.core.PulsarTopicBuilder;
4445
import org.springframework.pulsar.core.SchemaResolver;
4546
import org.springframework.pulsar.core.TopicResolver;
4647
import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory;
@@ -112,7 +113,8 @@ private ReactiveMessageSenderCache reactivePulsarMessageSenderCache(ProducerCach
112113
@ConditionalOnMissingBean(ReactivePulsarSenderFactory.class)
113114
DefaultReactivePulsarSenderFactory<?> reactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient,
114115
ObjectProvider<ReactiveMessageSenderCache> reactiveMessageSenderCache, TopicResolver topicResolver,
115-
ObjectProvider<ReactiveMessageSenderBuilderCustomizer<?>> customizersProvider) {
116+
ObjectProvider<ReactiveMessageSenderBuilderCustomizer<?>> customizersProvider,
117+
PulsarTopicBuilder topicBuilder) {
116118
List<ReactiveMessageSenderBuilderCustomizer<?>> customizers = new ArrayList<>();
117119
customizers.add(this.propertiesMapper::customizeMessageSenderBuilder);
118120
customizers.addAll(customizersProvider.orderedStream().toList());
@@ -122,6 +124,7 @@ DefaultReactivePulsarSenderFactory<?> reactivePulsarSenderFactory(ReactivePulsar
122124
.withDefaultConfigCustomizers(lambdaSafeCustomizers)
123125
.withMessageSenderCache(reactiveMessageSenderCache.getIfAvailable())
124126
.withTopicResolver(topicResolver)
127+
.withTopicBuilder(topicBuilder)
125128
.build();
126129
}
127130

@@ -136,13 +139,17 @@ private void applyMessageSenderBuilderCustomizers(List<ReactiveMessageSenderBuil
136139
@ConditionalOnMissingBean(ReactivePulsarConsumerFactory.class)
137140
DefaultReactivePulsarConsumerFactory<?> reactivePulsarConsumerFactory(
138141
ReactivePulsarClient pulsarReactivePulsarClient,
139-
ObjectProvider<ReactiveMessageConsumerBuilderCustomizer<?>> customizersProvider) {
142+
ObjectProvider<ReactiveMessageConsumerBuilderCustomizer<?>> customizersProvider,
143+
PulsarTopicBuilder topicBuilder) {
140144
List<ReactiveMessageConsumerBuilderCustomizer<?>> customizers = new ArrayList<>();
141145
customizers.add(this.propertiesMapper::customizeMessageConsumerBuilder);
142146
customizers.addAll(customizersProvider.orderedStream().toList());
143147
List<ReactiveMessageConsumerBuilderCustomizer<Object>> lambdaSafeCustomizers = List
144148
.of((builder) -> applyMessageConsumerBuilderCustomizers(customizers, builder));
145-
return new DefaultReactivePulsarConsumerFactory<>(pulsarReactivePulsarClient, lambdaSafeCustomizers);
149+
DefaultReactivePulsarConsumerFactory<?> consumerFactory = new DefaultReactivePulsarConsumerFactory<>(
150+
pulsarReactivePulsarClient, lambdaSafeCustomizers);
151+
consumerFactory.setTopicBuilder(topicBuilder);
152+
return consumerFactory;
146153
}
147154

148155
@SuppressWarnings("unchecked")
@@ -167,13 +174,17 @@ DefaultReactivePulsarListenerContainerFactory<?> reactivePulsarListenerContainer
167174
@Bean
168175
@ConditionalOnMissingBean(ReactivePulsarReaderFactory.class)
169176
DefaultReactivePulsarReaderFactory<?> reactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient,
170-
ObjectProvider<ReactiveMessageReaderBuilderCustomizer<?>> customizersProvider) {
177+
ObjectProvider<ReactiveMessageReaderBuilderCustomizer<?>> customizersProvider,
178+
PulsarTopicBuilder topicBuilder) {
171179
List<ReactiveMessageReaderBuilderCustomizer<?>> customizers = new ArrayList<>();
172180
customizers.add(this.propertiesMapper::customizeMessageReaderBuilder);
173181
customizers.addAll(customizersProvider.orderedStream().toList());
174182
List<ReactiveMessageReaderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
175183
.of((builder) -> applyMessageReaderBuilderCustomizers(customizers, builder));
176-
return new DefaultReactivePulsarReaderFactory<>(reactivePulsarClient, lambdaSafeCustomizers);
184+
DefaultReactivePulsarReaderFactory<?> readerFactory = new DefaultReactivePulsarReaderFactory<>(
185+
reactivePulsarClient, lambdaSafeCustomizers);
186+
readerFactory.setTopicBuilder(topicBuilder);
187+
return readerFactory;
177188
}
178189

179190
@SuppressWarnings("unchecked")

spring-boot-project/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2068,6 +2068,12 @@
20682068
"name": "spring.neo4j.uri",
20692069
"defaultValue": "bolt://localhost:7687"
20702070
},
2071+
{
2072+
"name": "spring.pulsar.defaults.topic.enabled",
2073+
"type": "java.lang.Boolean",
2074+
"description": "Whether to enable default tenant and namespace support for topics.",
2075+
"defaultValue": true
2076+
},
20712077
{
20722078
"name": "spring.pulsar.function.enabled",
20732079
"type": "java.lang.Boolean",

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,9 @@ void injectsExpectedBeans() {
219219
"spring.pulsar.producer.cache.enabled=false")
220220
.run((context) -> assertThat(context).getBean(DefaultPulsarProducerFactory.class)
221221
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
222-
.hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class)));
222+
.hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class))
223+
.extracting("topicBuilder")
224+
.isNotNull()); // prototype so only check not-null
223225
}
224226

225227
@ParameterizedTest
@@ -375,7 +377,9 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
375377
@Test
376378
void injectsExpectedBeans() {
377379
this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarConsumerFactory.class)
378-
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)));
380+
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
381+
.extracting("topicBuilder")
382+
.isNotNull()); // prototype so only check not-null
379383
}
380384

381385
@Test
@@ -574,7 +578,9 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
574578
@Test
575579
void injectsExpectedBeans() {
576580
this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarReaderFactory.class)
577-
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)));
581+
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
582+
.extracting("topicBuilder")
583+
.isNotNull()); // prototype so only check not-null
578584
}
579585

580586
@Test

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.springframework.pulsar.core.PulsarAdministration;
5050
import org.springframework.pulsar.core.PulsarClientBuilderCustomizer;
5151
import org.springframework.pulsar.core.PulsarClientFactory;
52+
import org.springframework.pulsar.core.PulsarTopicBuilder;
5253
import org.springframework.pulsar.core.SchemaResolver;
5354
import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer;
5455
import org.springframework.pulsar.core.TopicResolver;
@@ -320,6 +321,46 @@ void whenHasDefaultsTypeMappingAddsToSchemaResolver() {
320321

321322
}
322323

324+
@Nested
325+
class TopicBuilderTests {
326+
327+
private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner;
328+
329+
@Test
330+
void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
331+
PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class);
332+
this.contextRunner.withBean("customPulsarTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder)
333+
.run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class).isSameAs(topicBuilder));
334+
}
335+
336+
@Test
337+
void whenHasDefaultsTopicDisabledPropertyDoesNotCreateBean() {
338+
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
339+
.run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class));
340+
}
341+
342+
@Test
343+
void whenHasDefaultsTenantAndNamespaceAppliedToTopicBuilder() {
344+
List<String> properties = new ArrayList<>();
345+
properties.add("spring.pulsar.defaults.topic.tenant=my-tenant");
346+
properties.add("spring.pulsar.defaults.topic.namespace=my-namespace");
347+
this.contextRunner.withPropertyValues(properties.toArray(String[]::new))
348+
.run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class)
349+
.asInstanceOf(InstanceOfAssertFactories.type(PulsarTopicBuilder.class))
350+
.satisfies((topicBuilder) -> {
351+
assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultTenant", "my-tenant");
352+
assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultNamespace", "my-namespace");
353+
}));
354+
}
355+
356+
@Test
357+
void beanHasScopePrototype() {
358+
this.contextRunner.run((context) -> assertThat(context.getBean(PulsarTopicBuilder.class))
359+
.isNotSameAs(context.getBean(PulsarTopicBuilder.class)));
360+
}
361+
362+
}
363+
323364
@Nested
324365
class FunctionAdministrationTests {
325366

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ void bindAuthentication() {
152152
}
153153

154154
@Nested
155-
class DefaultsProperties {
155+
class DefaultsTypeMappingProperties {
156156

157157
@Test
158158
void bindWhenNoTypeMappings() {
@@ -242,6 +242,29 @@ record TestMessage(String value) {
242242

243243
}
244244

245+
@Nested
246+
class DefaultsTenantNamespaceProperties {
247+
248+
@Test
249+
void bindWhenValuesNotSpecified() {
250+
assertThat(new PulsarProperties().getDefaults().getTopic()).satisfies((defaults) -> {
251+
assertThat(defaults.getTenant()).isNull();
252+
assertThat(defaults.getNamespace()).isNull();
253+
});
254+
}
255+
256+
@Test
257+
void bindWhenValuesSpecified() {
258+
Map<String, String> map = new HashMap<>();
259+
map.put("spring.pulsar.defaults.topic.tenant", "my-tenant");
260+
map.put("spring.pulsar.defaults.topic.namespace", "my-namespace");
261+
PulsarProperties.Defaults.Topic properties = bindProperties(map).getDefaults().getTopic();
262+
assertThat(properties.getTenant()).isEqualTo("my-tenant");
263+
assertThat(properties.getNamespace()).isEqualTo("my-namespace");
264+
}
265+
266+
}
267+
245268
@Nested
246269
class FunctionProperties {
247270

0 commit comments

Comments
 (0)