Skip to content

Add support for Pulsar default tenant/namespace #41851

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.PulsarReaderFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.PulsarTopicBuilder;
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
Expand Down Expand Up @@ -88,24 +89,29 @@ public class PulsarAutoConfiguration {
@ConditionalOnMissingBean(PulsarProducerFactory.class)
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "false")
DefaultPulsarProducerFactory<?> pulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver,
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider, PulsarTopicBuilder topicBuilder) {
List<ProducerBuilderCustomizer<Object>> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers(
customizersProvider);
return new DefaultPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(),
lambdaSafeCustomizers, topicResolver);
DefaultPulsarProducerFactory<?> producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver);
producerFactory.setTopicBuilder(topicBuilder);
return producerFactory;
}

@Bean
@ConditionalOnMissingBean(PulsarProducerFactory.class)
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "true", matchIfMissing = true)
CachingPulsarProducerFactory<?> cachingPulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver,
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider, PulsarTopicBuilder topicBuilder) {
PulsarProperties.Producer.Cache cacheProperties = this.properties.getProducer().getCache();
List<ProducerBuilderCustomizer<Object>> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers(
customizersProvider);
return new CachingPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(),
lambdaSafeCustomizers, topicResolver, cacheProperties.getExpireAfterAccess(),
cacheProperties.getMaximumSize(), cacheProperties.getInitialCapacity());
CachingPulsarProducerFactory<?> producerFactory = new CachingPulsarProducerFactory<>(pulsarClient,
this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver,
cacheProperties.getExpireAfterAccess(), cacheProperties.getMaximumSize(),
cacheProperties.getInitialCapacity());
producerFactory.setTopicBuilder(topicBuilder);
return producerFactory;
}

private List<ProducerBuilderCustomizer<Object>> lambdaSafeProducerBuilderCustomizers(
Expand Down Expand Up @@ -138,13 +144,16 @@ PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
@Bean
@ConditionalOnMissingBean(PulsarConsumerFactory.class)
DefaultPulsarConsumerFactory<?> pulsarConsumerFactory(PulsarClient pulsarClient,
ObjectProvider<ConsumerBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ConsumerBuilderCustomizer<?>> customizersProvider, PulsarTopicBuilder topicBuilder) {
List<ConsumerBuilderCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeConsumerBuilder);
customizers.addAll(customizersProvider.orderedStream().toList());
List<ConsumerBuilderCustomizer<Object>> lambdaSafeCustomizers = List
.of((builder) -> applyConsumerBuilderCustomizers(customizers, builder));
return new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers);
DefaultPulsarConsumerFactory<?> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
lambdaSafeCustomizers);
consumerFactory.setTopicBuilder(topicBuilder);
return consumerFactory;
}

@Bean
Expand Down Expand Up @@ -181,13 +190,16 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
@Bean
@ConditionalOnMissingBean(PulsarReaderFactory.class)
DefaultPulsarReaderFactory<?> pulsarReaderFactory(PulsarClient pulsarClient,
ObjectProvider<ReaderBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ReaderBuilderCustomizer<?>> customizersProvider, PulsarTopicBuilder topicBuilder) {
List<ReaderBuilderCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeReaderBuilder);
customizers.addAll(customizersProvider.orderedStream().toList());
List<ReaderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
.of((builder) -> applyReaderBuilderCustomizers(customizers, builder));
return new DefaultPulsarReaderFactory<>(pulsarClient, lambdaSafeCustomizers);
DefaultPulsarReaderFactory<?> readerFactory = new DefaultPulsarReaderFactory<>(pulsarClient,
lambdaSafeCustomizers);
readerFactory.setTopicBuilder(topicBuilder);
return readerFactory;
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.schema.SchemaType;

import org.springframework.beans.factory.ObjectProvider;
Expand All @@ -34,13 +35,15 @@
import org.springframework.boot.util.LambdaSafe;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.pulsar.core.DefaultPulsarClientFactory;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.PulsarAdminBuilderCustomizer;
import org.springframework.pulsar.core.PulsarAdministration;
import org.springframework.pulsar.core.PulsarClientBuilderCustomizer;
import org.springframework.pulsar.core.PulsarClientFactory;
import org.springframework.pulsar.core.PulsarTopicBuilder;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer;
import org.springframework.pulsar.core.TopicResolver;
Expand Down Expand Up @@ -176,4 +179,13 @@ PulsarFunctionAdministration pulsarFunctionAdministration(PulsarAdministration p
properties.isFailFast(), properties.isPropagateFailures(), properties.isPropagateStopFailures());
}

@Bean
@Scope("prototype")
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "spring.pulsar.defaults.topic.enabled", havingValue = "true", matchIfMissing = true)
PulsarTopicBuilder pulsarTopicBuilder() {
return new PulsarTopicBuilder(TopicDomain.persistent, this.properties.getDefaults().getTopic().getTenant(),
this.properties.getDefaults().getTopic().getNamespace());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,16 @@ public static class Defaults {
*/
private List<TypeMapping> typeMappings = new ArrayList<>();

private Topic topic = new Topic();

public Topic getTopic() {
return this.topic;
}

public void setTopic(Topic topic) {
this.topic = topic;
}

public List<TypeMapping> getTypeMappings() {
return this.typeMappings;
}
Expand Down Expand Up @@ -301,6 +311,40 @@ public record SchemaInfo(SchemaType schemaType, Class<?> messageKeyType) {

}

public static class Topic {

/**
* Default tenant to use when producing or consuming messages against a
* non-fully-qualified topic URL. When not specified Pulsar uses a default
* tenant of 'public'.
*/
private String tenant;

/**
* Default namespace to use when producing or consuming messages against a
* non-fully-qualified topic URL. When not specified Pulsar uses a default
* namespace of 'default'.
*/
private String namespace;

public String getTenant() {
return this.tenant;
}

public void setTenant(String tenant) {
this.tenant = tenant;
}

public String getNamespace() {
return this.namespace;
}

public void setNamespace(String namespace) {
this.namespace = namespace;
}

}

}

public static class Function {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames;
import org.springframework.pulsar.core.PulsarTopicBuilder;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory;
Expand Down Expand Up @@ -112,7 +113,8 @@ private ReactiveMessageSenderCache reactivePulsarMessageSenderCache(ProducerCach
@ConditionalOnMissingBean(ReactivePulsarSenderFactory.class)
DefaultReactivePulsarSenderFactory<?> reactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient,
ObjectProvider<ReactiveMessageSenderCache> reactiveMessageSenderCache, TopicResolver topicResolver,
ObjectProvider<ReactiveMessageSenderBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ReactiveMessageSenderBuilderCustomizer<?>> customizersProvider,
PulsarTopicBuilder topicBuilder) {
List<ReactiveMessageSenderBuilderCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeMessageSenderBuilder);
customizers.addAll(customizersProvider.orderedStream().toList());
Expand All @@ -122,6 +124,7 @@ DefaultReactivePulsarSenderFactory<?> reactivePulsarSenderFactory(ReactivePulsar
.withDefaultConfigCustomizers(lambdaSafeCustomizers)
.withMessageSenderCache(reactiveMessageSenderCache.getIfAvailable())
.withTopicResolver(topicResolver)
.withTopicBuilder(topicBuilder)
.build();
}

Expand All @@ -136,13 +139,17 @@ private void applyMessageSenderBuilderCustomizers(List<ReactiveMessageSenderBuil
@ConditionalOnMissingBean(ReactivePulsarConsumerFactory.class)
DefaultReactivePulsarConsumerFactory<?> reactivePulsarConsumerFactory(
ReactivePulsarClient pulsarReactivePulsarClient,
ObjectProvider<ReactiveMessageConsumerBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ReactiveMessageConsumerBuilderCustomizer<?>> customizersProvider,
PulsarTopicBuilder topicBuilder) {
List<ReactiveMessageConsumerBuilderCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeMessageConsumerBuilder);
customizers.addAll(customizersProvider.orderedStream().toList());
List<ReactiveMessageConsumerBuilderCustomizer<Object>> lambdaSafeCustomizers = List
.of((builder) -> applyMessageConsumerBuilderCustomizers(customizers, builder));
return new DefaultReactivePulsarConsumerFactory<>(pulsarReactivePulsarClient, lambdaSafeCustomizers);
DefaultReactivePulsarConsumerFactory<?> consumerFactory = new DefaultReactivePulsarConsumerFactory<>(
pulsarReactivePulsarClient, lambdaSafeCustomizers);
consumerFactory.setTopicBuilder(topicBuilder);
return consumerFactory;
}

@SuppressWarnings("unchecked")
Expand All @@ -167,13 +174,17 @@ DefaultReactivePulsarListenerContainerFactory<?> reactivePulsarListenerContainer
@Bean
@ConditionalOnMissingBean(ReactivePulsarReaderFactory.class)
DefaultReactivePulsarReaderFactory<?> reactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient,
ObjectProvider<ReactiveMessageReaderBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ReactiveMessageReaderBuilderCustomizer<?>> customizersProvider,
PulsarTopicBuilder topicBuilder) {
List<ReactiveMessageReaderBuilderCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeMessageReaderBuilder);
customizers.addAll(customizersProvider.orderedStream().toList());
List<ReactiveMessageReaderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
.of((builder) -> applyMessageReaderBuilderCustomizers(customizers, builder));
return new DefaultReactivePulsarReaderFactory<>(reactivePulsarClient, lambdaSafeCustomizers);
DefaultReactivePulsarReaderFactory<?> readerFactory = new DefaultReactivePulsarReaderFactory<>(
reactivePulsarClient, lambdaSafeCustomizers);
readerFactory.setTopicBuilder(topicBuilder);
return readerFactory;
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2068,6 +2068,12 @@
"name": "spring.neo4j.uri",
"defaultValue": "bolt://localhost:7687"
},
{
"name": "spring.pulsar.defaults.topic.enabled",
"type": "java.lang.Boolean",
"description": "Whether to enable default tenant and namespace support for topics.",
"defaultValue": true
},
{
"name": "spring.pulsar.function.enabled",
"type": "java.lang.Boolean",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ void injectsExpectedBeans() {
"spring.pulsar.producer.cache.enabled=false")
.run((context) -> assertThat(context).getBean(DefaultPulsarProducerFactory.class)
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
.hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class)));
.hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class))
.extracting("topicBuilder")
.isNotNull()); // prototype so only check not-null
}

@ParameterizedTest
Expand Down Expand Up @@ -375,7 +377,9 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
@Test
void injectsExpectedBeans() {
this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarConsumerFactory.class)
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)));
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
.extracting("topicBuilder")
.isNotNull()); // prototype so only check not-null
}

@Test
Expand Down Expand Up @@ -574,7 +578,9 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
@Test
void injectsExpectedBeans() {
this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarReaderFactory.class)
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)));
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
.extracting("topicBuilder")
.isNotNull()); // prototype so only check not-null
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.springframework.pulsar.core.PulsarAdministration;
import org.springframework.pulsar.core.PulsarClientBuilderCustomizer;
import org.springframework.pulsar.core.PulsarClientFactory;
import org.springframework.pulsar.core.PulsarTopicBuilder;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer;
import org.springframework.pulsar.core.TopicResolver;
Expand Down Expand Up @@ -320,6 +321,46 @@ void whenHasDefaultsTypeMappingAddsToSchemaResolver() {

}

@Nested
class TopicBuilderTests {

private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner;

@Test
void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class);
this.contextRunner.withBean("customPulsarTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder)
.run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class).isSameAs(topicBuilder));
}

@Test
void whenHasDefaultsTopicDisabledPropertyDoesNotCreateBean() {
this.contextRunner.withPropertyValues("spring.pulsar.defaults.topic.enabled=false")
.run((context) -> assertThat(context).doesNotHaveBean(PulsarTopicBuilder.class));
}

@Test
void whenHasDefaultsTenantAndNamespaceAppliedToTopicBuilder() {
List<String> properties = new ArrayList<>();
properties.add("spring.pulsar.defaults.topic.tenant=my-tenant");
properties.add("spring.pulsar.defaults.topic.namespace=my-namespace");
this.contextRunner.withPropertyValues(properties.toArray(String[]::new))
.run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class)
.asInstanceOf(InstanceOfAssertFactories.type(PulsarTopicBuilder.class))
.satisfies((topicBuilder) -> {
assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultTenant", "my-tenant");
assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultNamespace", "my-namespace");
}));
}

@Test
void beanHasScopePrototype() {
this.contextRunner.run((context) -> assertThat(context.getBean(PulsarTopicBuilder.class))
.isNotSameAs(context.getBean(PulsarTopicBuilder.class)));
}

}

@Nested
class FunctionAdministrationTests {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ void bindAuthentication() {
}

@Nested
class DefaultsProperties {
class DefaultsTypeMappingProperties {

@Test
void bindWhenNoTypeMappings() {
Expand Down Expand Up @@ -242,6 +242,29 @@ record TestMessage(String value) {

}

@Nested
class DefaultsTenantNamespaceProperties {

@Test
void bindWhenValuesNotSpecified() {
assertThat(new PulsarProperties().getDefaults().getTopic()).satisfies((defaults) -> {
assertThat(defaults.getTenant()).isNull();
assertThat(defaults.getNamespace()).isNull();
});
}

@Test
void bindWhenValuesSpecified() {
Map<String, String> map = new HashMap<>();
map.put("spring.pulsar.defaults.topic.tenant", "my-tenant");
map.put("spring.pulsar.defaults.topic.namespace", "my-namespace");
PulsarProperties.Defaults.Topic properties = bindProperties(map).getDefaults().getTopic();
assertThat(properties.getTenant()).isEqualTo("my-tenant");
assertThat(properties.getNamespace()).isEqualTo("my-namespace");
}

}

@Nested
class FunctionProperties {

Expand Down
Loading