Skip to content

Commit 82593d9

Browse files
committed
Add more settings to Producer in PulsarProperties
1 parent 27b09d3 commit 82593d9

File tree

2 files changed

+162
-12
lines changed

2 files changed

+162
-12
lines changed

spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java

Lines changed: 142 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.time.Duration;
2020
import java.util.HashMap;
21+
import java.util.HashSet;
2122
import java.util.Map;
2223
import java.util.Set;
2324
import java.util.SortedMap;
@@ -37,6 +38,7 @@
3738

3839
import org.springframework.boot.context.properties.ConfigurationProperties;
3940
import org.springframework.boot.context.properties.PropertyMapper;
41+
import org.springframework.lang.Nullable;
4042
import org.springframework.pulsar.listener.AckMode;
4143
import org.springframework.util.CollectionUtils;
4244
import org.springframework.util.StringUtils;
@@ -507,11 +509,22 @@ public static class Producer {
507509
*/
508510
private Duration batchingMaxPublishDelay = Duration.ofMillis(1);
509511

512+
/**
513+
* Partition switch frequency while batching of messages is enabled and using
514+
* round-robin routing mode for non-keyed message.
515+
*/
516+
private Integer batchingPartitionSwitchFrequencyByPublishDelay = 10;
517+
510518
/**
511519
* Maximum number of messages to be batched.
512520
*/
513521
private Integer batchingMaxMessages = 1000;
514522

523+
/**
524+
* Maximum number of bytes permitted in a batch.
525+
*/
526+
private DataSize batchingMaxBytes = DataSize.ofKilobytes(128);
527+
515528
/**
516529
* Whether to automatically batch messages.
517530
*/
@@ -522,16 +535,53 @@ public static class Producer {
522535
*/
523536
private Boolean chunkingEnabled = false;
524537

538+
/**
539+
* Names of the public encryption keys to use when encrypting data.
540+
*/
541+
private Set<String> encryptionKeys = new HashSet<>();
542+
525543
/**
526544
* Message compression type.
527545
*/
528546
private CompressionType compressionType;
529547

548+
/**
549+
* Baseline for the sequence ids for messages published by the producer.
550+
*/
551+
@Nullable
552+
private Long initialSequenceId;
553+
554+
/**
555+
* Whether partitioned producer automatically discover new partitions at runtime.
556+
*/
557+
private Boolean autoUpdatePartitions = true;
558+
559+
/**
560+
* Interval of partitions discovery updates.
561+
*/
562+
private Duration autoUpdatePartitionsInterval = Duration.ofMinutes(1);
563+
564+
/**
565+
* Whether the multiple schema mode is enabled.
566+
*/
567+
private Boolean multiSchema = true;
568+
530569
/**
531570
* Type of access to the topic the producer requires.
532571
*/
533572
private ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared;
534573

574+
/**
575+
* Whether producers in Shared mode register and connect immediately to the owner
576+
* broker of each partition or start lazily on demand.
577+
*/
578+
private Boolean lazyStartPartitionedProducers = false;
579+
580+
/**
581+
* Map of properties to add to the producer.
582+
*/
583+
private Map<String, String> properties = new HashMap<>();
584+
535585
private Cache cache = new Cache();
536586

537587
public String getTopicName() {
@@ -558,7 +608,7 @@ public void setSendTimeout(Duration sendTimeout) {
558608
this.sendTimeout = sendTimeout;
559609
}
560610

561-
public Boolean isBlockIfQueueFull() {
611+
public Boolean getBlockIfQueueFull() {
562612
return this.blockIfQueueFull;
563613
}
564614

@@ -614,6 +664,15 @@ public void setBatchingMaxPublishDelay(Duration batchingMaxPublishDelay) {
614664
this.batchingMaxPublishDelay = batchingMaxPublishDelay;
615665
}
616666

667+
public Integer getBatchingPartitionSwitchFrequencyByPublishDelay() {
668+
return this.batchingPartitionSwitchFrequencyByPublishDelay;
669+
}
670+
671+
public void setBatchingPartitionSwitchFrequencyByPublishDelay(
672+
Integer batchingPartitionSwitchFrequencyByPublishDelay) {
673+
this.batchingPartitionSwitchFrequencyByPublishDelay = batchingPartitionSwitchFrequencyByPublishDelay;
674+
}
675+
617676
public Integer getBatchingMaxMessages() {
618677
return this.batchingMaxMessages;
619678
}
@@ -622,22 +681,38 @@ public void setBatchingMaxMessages(Integer batchingMaxMessages) {
622681
this.batchingMaxMessages = batchingMaxMessages;
623682
}
624683

625-
public Boolean isBatchingEnabled() {
684+
public DataSize getBatchingMaxBytes() {
685+
return this.batchingMaxBytes;
686+
}
687+
688+
public void setBatchingMaxBytes(DataSize batchingMaxBytes) {
689+
this.batchingMaxBytes = batchingMaxBytes;
690+
}
691+
692+
public Boolean getBatchingEnabled() {
626693
return this.batchingEnabled;
627694
}
628695

629696
public void setBatchingEnabled(Boolean batchingEnabled) {
630697
this.batchingEnabled = batchingEnabled;
631698
}
632699

633-
public Boolean isChunkingEnabled() {
700+
public Boolean getChunkingEnabled() {
634701
return this.chunkingEnabled;
635702
}
636703

637704
public void setChunkingEnabled(Boolean chunkingEnabled) {
638705
this.chunkingEnabled = chunkingEnabled;
639706
}
640707

708+
public Set<String> getEncryptionKeys() {
709+
return this.encryptionKeys;
710+
}
711+
712+
public void setEncryptionKeys(Set<String> encryptionKeys) {
713+
this.encryptionKeys = encryptionKeys;
714+
}
715+
641716
public CompressionType getCompressionType() {
642717
return this.compressionType;
643718
}
@@ -646,6 +721,39 @@ public void setCompressionType(CompressionType compressionType) {
646721
this.compressionType = compressionType;
647722
}
648723

724+
@Nullable
725+
public Long getInitialSequenceId() {
726+
return this.initialSequenceId;
727+
}
728+
729+
public void setInitialSequenceId(@Nullable Long initialSequenceId) {
730+
this.initialSequenceId = initialSequenceId;
731+
}
732+
733+
public Boolean getAutoUpdatePartitions() {
734+
return this.autoUpdatePartitions;
735+
}
736+
737+
public void setAutoUpdatePartitions(Boolean autoUpdatePartitions) {
738+
this.autoUpdatePartitions = autoUpdatePartitions;
739+
}
740+
741+
public Duration getAutoUpdatePartitionsInterval() {
742+
return this.autoUpdatePartitionsInterval;
743+
}
744+
745+
public void setAutoUpdatePartitionsInterval(Duration autoUpdatePartitionsInterval) {
746+
this.autoUpdatePartitionsInterval = autoUpdatePartitionsInterval;
747+
}
748+
749+
public Boolean getMultiSchema() {
750+
return this.multiSchema;
751+
}
752+
753+
public void setMultiSchema(Boolean multiSchema) {
754+
this.multiSchema = multiSchema;
755+
}
756+
649757
public ProducerAccessMode getProducerAccessMode() {
650758
return this.producerAccessMode;
651759
}
@@ -654,6 +762,22 @@ public void setProducerAccessMode(ProducerAccessMode producerAccessMode) {
654762
this.producerAccessMode = producerAccessMode;
655763
}
656764

765+
public Boolean getLazyStartPartitionedProducers() {
766+
return this.lazyStartPartitionedProducers;
767+
}
768+
769+
public void setLazyStartPartitionedProducers(Boolean lazyStartPartitionedProducers) {
770+
this.lazyStartPartitionedProducers = lazyStartPartitionedProducers;
771+
}
772+
773+
public Map<String, String> getProperties() {
774+
return this.properties;
775+
}
776+
777+
public void setProperties(Map<String, String> properties) {
778+
this.properties = properties;
779+
}
780+
657781
public Cache getCache() {
658782
return this.cache;
659783
}
@@ -665,8 +789,8 @@ public Map<String, Object> buildProperties() {
665789

666790
map.from(this::getTopicName).to(properties.in("topicName"));
667791
map.from(this::getProducerName).to(properties.in("producerName"));
668-
map.from(this::getSendTimeout).as(Duration::toMillis).to(properties.in("sendTimeoutMs"));
669-
map.from(this::isBlockIfQueueFull).to(properties.in("blockIfQueueFull"));
792+
map.from(this::getSendTimeout).asInt(Duration::toMillis).to(properties.in("sendTimeoutMs"));
793+
map.from(this::getBlockIfQueueFull).to(properties.in("blockIfQueueFull"));
670794
map.from(this::getMaxPendingMessages).to(properties.in("maxPendingMessages"));
671795
map.from(this::getMaxPendingMessagesAcrossPartitions)
672796
.to(properties.in("maxPendingMessagesAcrossPartitions"));
@@ -675,11 +799,22 @@ public Map<String, Object> buildProperties() {
675799
map.from(this::getCryptoFailureAction).to(properties.in("cryptoFailureAction"));
676800
map.from(this::getBatchingMaxPublishDelay).as(it -> it.toNanos() / 1000)
677801
.to(properties.in("batchingMaxPublishDelayMicros"));
802+
map.from(this::getBatchingPartitionSwitchFrequencyByPublishDelay)
803+
.to(properties.in("batchingPartitionSwitchFrequencyByPublishDelay"));
678804
map.from(this::getBatchingMaxMessages).to(properties.in("batchingMaxMessages"));
679-
map.from(this::isBatchingEnabled).to(properties.in("batchingEnabled"));
680-
map.from(this::isChunkingEnabled).to(properties.in("chunkingEnabled"));
805+
map.from(this::getBatchingMaxBytes).asInt(DataSize::toBytes).to(properties.in("batchingMaxBytes"));
806+
map.from(this::getBatchingEnabled).to(properties.in("batchingEnabled"));
807+
map.from(this::getChunkingEnabled).to(properties.in("chunkingEnabled"));
808+
map.from(this::getEncryptionKeys).to(properties.in("encryptionKeys"));
681809
map.from(this::getCompressionType).to(properties.in("compressionType"));
810+
map.from(this::getInitialSequenceId).to(properties.in("initialSequenceId"));
811+
map.from(this::getAutoUpdatePartitions).to(properties.in("autoUpdatePartitions"));
812+
map.from(this::getAutoUpdatePartitionsInterval).as(Duration::toMillis)
813+
.to(properties.in("autoUpdatePartitionsInterval"));
814+
map.from(this::getMultiSchema).to(properties.in("multiSchema"));
682815
map.from(this::getProducerAccessMode).to(properties.in("accessMode"));
816+
map.from(this::getLazyStartPartitionedProducers).to(properties.in("lazyStartPartitionedProducers"));
817+
map.from(this::getProperties).to(properties.in("properties"));
683818

684819
return properties;
685820
}

spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Collections;
2424
import java.util.HashMap;
2525
import java.util.Map;
26+
import java.util.Set;
2627

2728
import org.apache.pulsar.client.api.CompressionType;
2829
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
@@ -113,6 +114,7 @@ void authenticationNotAllowedUsingBothAuthParamsStringAndAuthenticationMap() {
113114
class ProducerPropertiesTests {
114115

115116
@Test
117+
@SuppressWarnings("unchecked")
116118
void producerProperties() {
117119
Map<String, String> props = new HashMap<>();
118120
props.put("spring.pulsar.producer.topic-name", "my-topic");
@@ -125,26 +127,39 @@ void producerProperties() {
125127
props.put("spring.pulsar.producer.hashing-scheme", "Murmur3_32Hash");
126128
props.put("spring.pulsar.producer.crypto-failure-action", "SEND");
127129
props.put("spring.pulsar.producer.batching-max-publish-delay", "5s");
128-
props.put("spring.pulsar.producer.batching-max-messages", "6");
130+
props.put("spring.pulsar.producer.batching-partition-switch-frequency-by-publish-delay", "6");
131+
props.put("spring.pulsar.producer.batching-max-messages", "7");
132+
props.put("spring.pulsar.producer.batching-max-bytes", "8");
129133
props.put("spring.pulsar.producer.batching-enabled", "false");
130134
props.put("spring.pulsar.producer.chunking-enabled", "true");
135+
props.put("spring.pulsar.producer.encryption-keys[0]", "my-key");
131136
props.put("spring.pulsar.producer.compression-type", "LZ4");
137+
props.put("spring.pulsar.producer.initial-sequence-id", "9");
132138
props.put("spring.pulsar.producer.producer-access-mode", "Exclusive");
139+
props.put("spring.pulsar.producer.lazy-start=partitioned-producers", "true");
140+
props.put("spring.pulsar.producer.properties[my-prop]", "my-prop-value");
133141

134142
bind(props);
135143
Map<String, Object> producerProps = properties.buildProducerProperties();
136144

137145
assertThat(producerProps).containsEntry("topicName", "my-topic")
138-
.containsEntry("producerName", "my-producer").containsEntry("sendTimeoutMs", 2_000L)
146+
.containsEntry("producerName", "my-producer").containsEntry("sendTimeoutMs", 2_000)
139147
.containsEntry("blockIfQueueFull", true).containsEntry("maxPendingMessages", 3)
140148
.containsEntry("maxPendingMessagesAcrossPartitions", 4)
141149
.containsEntry("messageRoutingMode", MessageRoutingMode.CustomPartition)
142150
.containsEntry("hashingScheme", HashingScheme.Murmur3_32Hash)
143151
.containsEntry("cryptoFailureAction", ProducerCryptoFailureAction.SEND)
144-
.containsEntry("batchingMaxPublishDelayMicros", 5_000_000L).containsEntry("batchingMaxMessages", 6)
152+
.containsEntry("batchingMaxPublishDelayMicros", 5_000_000L)
153+
.containsEntry("batchingPartitionSwitchFrequencyByPublishDelay", 6)
154+
.containsEntry("batchingMaxMessages", 7).containsEntry("batchingMaxBytes", 8)
145155
.containsEntry("batchingEnabled", false).containsEntry("chunkingEnabled", true)
146-
.containsEntry("compressionType", CompressionType.LZ4)
147-
.containsEntry("accessMode", ProducerAccessMode.Exclusive);
156+
.hasEntrySatisfying("encryptionKeys",
157+
keys -> assertThat(((Set<String>) keys)).containsExactly("my-key"))
158+
.containsEntry("compressionType", CompressionType.LZ4).containsEntry("initialSequenceId", 9L)
159+
.containsEntry("accessMode", ProducerAccessMode.Exclusive)
160+
.containsEntry("lazyStartPartitionedProducers", true)
161+
.hasEntrySatisfying("properties", properties -> assertThat(((Map<String, String>) properties))
162+
.containsEntry("my-prop", "my-prop-value"));
148163
}
149164

150165
}

0 commit comments

Comments
 (0)