Skip to content

Commit 6ffeee3

Browse files
committed
Basic integration tests with various listener container settings
See gh-26442
1 parent 1b458ae commit 6ffeee3

File tree

2 files changed

+120
-0
lines changed

2 files changed

+120
-0
lines changed

spring-jms/spring-jms.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@ dependencies {
1414
optional("com.fasterxml.jackson.core:jackson-databind")
1515
testCompile(testFixtures(project(":spring-beans")))
1616
testCompile(testFixtures(project(":spring-tx")))
17+
testCompile("org.apache.activemq:activemq-broker")
1718
testImplementation("javax.jms:javax.jms-api")
1819
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2002-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.jms.listener;
18+
19+
import java.util.HashSet;
20+
import java.util.Set;
21+
import java.util.concurrent.CopyOnWriteArraySet;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import javax.jms.JMSException;
26+
import javax.jms.Session;
27+
import javax.jms.TextMessage;
28+
29+
import org.apache.activemq.ActiveMQConnectionFactory;
30+
import org.junit.jupiter.api.Test;
31+
32+
import org.springframework.jms.core.JmsTemplate;
33+
34+
import static org.assertj.core.api.Assertions.assertThat;
35+
36+
/**
37+
* @author Juergen Hoeller
38+
* @since 5.3.5
39+
*/
40+
public class MessageListenerContainerIntegrationTests {
41+
42+
@Test
43+
public void simpleMessageListenerContainer() throws InterruptedException {
44+
SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer();
45+
46+
testMessageListenerContainer(mlc);
47+
}
48+
49+
@Test
50+
public void defaultMessageListenerContainer() throws InterruptedException {
51+
DefaultMessageListenerContainer mlc = new DefaultMessageListenerContainer();
52+
53+
testMessageListenerContainer(mlc);
54+
}
55+
56+
@Test
57+
public void defaultMessageListenerContainerWithMaxMessagesPerTask() throws InterruptedException {
58+
DefaultMessageListenerContainer mlc = new DefaultMessageListenerContainer();
59+
mlc.setConcurrentConsumers(1);
60+
mlc.setMaxConcurrentConsumers(2);
61+
mlc.setMaxMessagesPerTask(1);
62+
63+
testMessageListenerContainer(mlc);
64+
}
65+
66+
@Test
67+
public void defaultMessageListenerContainerWithIdleReceivesPerTaskLimit() throws InterruptedException {
68+
DefaultMessageListenerContainer mlc = new DefaultMessageListenerContainer();
69+
mlc.setConcurrentConsumers(1);
70+
mlc.setMaxConcurrentConsumers(2);
71+
mlc.setIdleReceivesPerTaskLimit(1);
72+
73+
testMessageListenerContainer(mlc);
74+
}
75+
76+
private void testMessageListenerContainer(AbstractMessageListenerContainer mlc) throws InterruptedException {
77+
ActiveMQConnectionFactory aqcf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
78+
TestMessageListener tml = new TestMessageListener();
79+
80+
mlc.setConnectionFactory(aqcf);
81+
mlc.setMessageListener(tml);
82+
mlc.setDestinationName("test");
83+
mlc.afterPropertiesSet();
84+
mlc.start();
85+
86+
JmsTemplate jt = new JmsTemplate(aqcf);
87+
jt.setDefaultDestinationName("test");
88+
89+
Set<String> messages = new HashSet<>();
90+
messages.add("text1");
91+
messages.add("text2");
92+
for (String message : messages) {
93+
jt.convertAndSend(message);
94+
}
95+
assertThat(tml.result()).isEqualTo(messages);
96+
97+
mlc.destroy();
98+
}
99+
100+
101+
private static class TestMessageListener implements SessionAwareMessageListener<TextMessage> {
102+
103+
private final CountDownLatch latch = new CountDownLatch(2);
104+
105+
private final Set<String> messages = new CopyOnWriteArraySet<>();
106+
107+
@Override
108+
public void onMessage(TextMessage message, Session session) throws JMSException {
109+
this.messages.add(message.getText());
110+
this.latch.countDown();
111+
}
112+
113+
public Set<String> result() throws InterruptedException {
114+
assertThat(this.latch.await(5, TimeUnit.SECONDS)).isTrue();
115+
return this.messages;
116+
}
117+
}
118+
119+
}

0 commit comments

Comments
 (0)