Skip to content

Commit 60df1c1

Browse files
committed
Add test for sender properties
1 parent bc2a4cc commit 60df1c1

File tree

1 file changed

+93
-0
lines changed

1 file changed

+93
-0
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.autoconfigure;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.time.Duration;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
import org.apache.pulsar.client.api.CompressionType;
26+
import org.apache.pulsar.client.api.HashingScheme;
27+
import org.apache.pulsar.client.api.MessageRoutingMode;
28+
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
29+
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
30+
import org.junit.jupiter.api.Nested;
31+
import org.junit.jupiter.api.Test;
32+
33+
import org.springframework.boot.context.properties.bind.Bindable;
34+
import org.springframework.boot.context.properties.bind.Binder;
35+
import org.springframework.boot.context.properties.source.ConfigurationPropertySource;
36+
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
37+
38+
39+
/**
40+
* Unit tests for {@link PulsarReactiveProperties}.
41+
*
42+
* @author Christophe Bornet
43+
*/
44+
public class PulsarReactivePropertiesTests {
45+
46+
private final PulsarReactiveProperties properties = new PulsarReactiveProperties();
47+
48+
private void bind(Map<String, String> map) {
49+
ConfigurationPropertySource source = new MapConfigurationPropertySource(map);
50+
new Binder(source).bind("spring.pulsar.reactive", Bindable.ofInstance(this.properties));
51+
}
52+
53+
@Nested
54+
class SenderPropertiesTests {
55+
56+
@Test
57+
void senderPropsToSenderSpec() {
58+
Map<String, String> props = new HashMap<>();
59+
props.put("spring.pulsar.reactive.sender.topic-name", "my-topic");
60+
props.put("spring.pulsar.reactive.sender.producer-name", "my-producer");
61+
props.put("spring.pulsar.reactive.sender.send-timeout", "2s");
62+
props.put("spring.pulsar.reactive.sender.max-pending-messages", "3");
63+
props.put("spring.pulsar.reactive.sender.max-pending-messages-across-partitions", "4");
64+
props.put("spring.pulsar.reactive.sender.message-routing-mode", "CustomPartition");
65+
props.put("spring.pulsar.reactive.sender.hashing-scheme", "Murmur3_32Hash");
66+
props.put("spring.pulsar.reactive.sender.crypto-failure-action", "SEND");
67+
props.put("spring.pulsar.reactive.sender.batching-max-publish-delay", "5s");
68+
props.put("spring.pulsar.reactive.sender.batching-max-messages", "6");
69+
props.put("spring.pulsar.reactive.sender.batching-enabled", "false");
70+
props.put("spring.pulsar.reactive.sender.chunking-enabled", "true");
71+
props.put("spring.pulsar.reactive.sender.compression-type", "LZ4");
72+
73+
bind(props);
74+
ReactiveMessageSenderSpec senderSpec = properties.buildReactiveMessageSenderSpec();
75+
76+
assertThat(senderSpec.getTopicName()).isEqualTo("my-topic");
77+
assertThat(senderSpec.getProducerName()).isEqualTo("my-producer");
78+
assertThat(senderSpec.getSendTimeout()).isEqualTo(Duration.ofSeconds(2));
79+
assertThat(senderSpec.getMaxPendingMessages()).isEqualTo(3);
80+
assertThat(senderSpec.getMaxPendingMessagesAcrossPartitions()).isEqualTo(4);
81+
assertThat(senderSpec.getMessageRoutingMode()).isEqualTo(MessageRoutingMode.CustomPartition);
82+
assertThat(senderSpec.getHashingScheme()).isEqualTo(HashingScheme.Murmur3_32Hash);
83+
assertThat(senderSpec.getCryptoFailureAction()).isEqualTo(ProducerCryptoFailureAction.SEND);
84+
assertThat(senderSpec.getBatchingMaxPublishDelay()).isEqualTo(Duration.ofSeconds(5));
85+
assertThat(senderSpec.getBatchingMaxMessages()).isEqualTo(6);
86+
assertThat(senderSpec.getBatchingEnabled()).isEqualTo(false);
87+
assertThat(senderSpec.getChunkingEnabled()).isEqualTo(true);
88+
assertThat(senderSpec.getCompressionType()).isEqualTo(CompressionType.LZ4);
89+
}
90+
91+
}
92+
93+
}

0 commit comments

Comments
 (0)