Skip to content

Commit 5c9f09c

Browse files
committed
Register lazy @JmsListener components
Support the creation and registration of message listener containers in a lazy manner, that is after the container initialization has completed. Such support brought an interesting brainstorming of the thread safety if JmsListenerEndpointRegistrar and JmsListenerEndpointRegistry so those have also been revisited as part of this commit. Issue: SPR-12774
1 parent fdd1f83 commit 5c9f09c

File tree

3 files changed

+99
-16
lines changed

3 files changed

+99
-16
lines changed

spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistrar.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import org.springframework.beans.factory.BeanFactory;
2323
import org.springframework.beans.factory.BeanFactoryAware;
2424
import org.springframework.beans.factory.InitializingBean;
25+
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
2526
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
2627
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
2728
import org.springframework.util.Assert;
@@ -50,6 +51,10 @@ public class JmsListenerEndpointRegistrar implements BeanFactoryAware, Initializ
5051
private final List<JmsListenerEndpointDescriptor> endpointDescriptors =
5152
new ArrayList<JmsListenerEndpointDescriptor>();
5253

54+
private boolean startImmediately;
55+
56+
private Object mutex = endpointDescriptors;
57+
5358

5459
/**
5560
* Set the {@link JmsListenerEndpointRegistry} instance to use.
@@ -113,6 +118,10 @@ public void setContainerFactoryBeanName(String containerFactoryBeanName) {
113118
@Override
114119
public void setBeanFactory(BeanFactory beanFactory) {
115120
this.beanFactory = beanFactory;
121+
if (beanFactory instanceof ConfigurableBeanFactory) {
122+
ConfigurableBeanFactory cbf = (ConfigurableBeanFactory) beanFactory;
123+
this.mutex = cbf.getSingletonMutex();
124+
}
116125
}
117126

118127

@@ -122,8 +131,12 @@ public void afterPropertiesSet() {
122131
}
123132

124133
protected void registerAllEndpoints() {
125-
for (JmsListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
126-
this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));
134+
synchronized (this.mutex) {
135+
for (JmsListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
136+
this.endpointRegistry.registerListenerContainer(
137+
descriptor.endpoint, resolveContainerFactory(descriptor));
138+
}
139+
startImmediately = true; // trigger immediate startup
127140
}
128141
}
129142

@@ -157,7 +170,16 @@ public void registerEndpoint(JmsListenerEndpoint endpoint, JmsListenerContainerF
157170
Assert.notNull(endpoint, "Endpoint must be set");
158171
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
159172
// Factory may be null, we defer the resolution right before actually creating the container
160-
this.endpointDescriptors.add(new JmsListenerEndpointDescriptor(endpoint, factory));
173+
JmsListenerEndpointDescriptor descriptor = new JmsListenerEndpointDescriptor(endpoint, factory);
174+
synchronized (this.mutex) {
175+
if (startImmediately) { // Register and start immediately
176+
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
177+
resolveContainerFactory(descriptor), true);
178+
}
179+
else {
180+
this.endpointDescriptors.add(descriptor);
181+
}
182+
}
161183
}
162184

163185
/**

spring-jms/src/main/java/org/springframework/jms/config/JmsListenerEndpointRegistry.java

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,8 +18,8 @@
1818

1919
import java.util.Collection;
2020
import java.util.Collections;
21-
import java.util.LinkedHashMap;
2221
import java.util.Map;
22+
import java.util.concurrent.ConcurrentHashMap;
2323
import java.util.concurrent.atomic.AtomicInteger;
2424

2525
import org.apache.commons.logging.Log;
@@ -57,7 +57,7 @@ public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecyc
5757
protected final Log logger = LogFactory.getLog(getClass());
5858

5959
private final Map<String, MessageListenerContainer> listenerContainers =
60-
new LinkedHashMap<String, MessageListenerContainer>();
60+
new ConcurrentHashMap<String, MessageListenerContainer>();
6161

6262
private int phase = Integer.MAX_VALUE;
6363

@@ -86,21 +86,43 @@ public Collection<MessageListenerContainer> getListenerContainers() {
8686
* Create a message listener container for the given {@link JmsListenerEndpoint}.
8787
* <p>This create the necessary infrastructure to honor that endpoint
8888
* with regards to its configuration.
89+
* <p>The {@code startImmediately} flag determines if the container should be
90+
* started immediately.
8991
* @param endpoint the endpoint to add
92+
* @param factory the listener factory to use
93+
* @param startImmediately start the container immediately if necessary
9094
* @see #getListenerContainers()
9195
* @see #getListenerContainer(String)
9296
*/
93-
public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory) {
97+
public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory,
98+
boolean startImmediately) {
99+
94100
Assert.notNull(endpoint, "Endpoint must not be null");
95101
Assert.notNull(factory, "Factory must not be null");
96102

97103
String id = endpoint.getId();
98104
Assert.notNull(id, "Endpoint id must not be null");
99-
Assert.state(!this.listenerContainers.containsKey(id),
100-
"Another endpoint is already registered with id '" + id + "'");
105+
synchronized (this.listenerContainers) {
106+
Assert.state(!this.listenerContainers.containsKey(id),
107+
"Another endpoint is already registered with id '" + id + "'");
108+
MessageListenerContainer container = createListenerContainer(endpoint, factory);
109+
this.listenerContainers.put(id, container);
110+
if (startImmediately) {
111+
startIfNecessary(container);
112+
}
113+
}
114+
}
101115

102-
MessageListenerContainer container = createListenerContainer(endpoint, factory);
103-
this.listenerContainers.put(id, container);
116+
/**
117+
* Create a message listener container for the given {@link JmsListenerEndpoint}.
118+
* <p>This create the necessary infrastructure to honor that endpoint
119+
* with regards to its configuration.
120+
* @param endpoint the endpoint to add
121+
* @param factory the listener factory to use
122+
* @see #registerListenerContainer(JmsListenerEndpoint, JmsListenerContainerFactory, boolean)
123+
*/
124+
public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory) {
125+
registerListenerContainer(endpoint, factory, false);
104126
}
105127

106128
/**
@@ -163,9 +185,7 @@ public boolean isAutoStartup() {
163185
@Override
164186
public void start() {
165187
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
166-
if (listenerContainer.isAutoStartup()) {
167-
listenerContainer.start();
168-
}
188+
startIfNecessary(listenerContainer);
169189
}
170190
}
171191

@@ -195,6 +215,17 @@ public boolean isRunning() {
195215
return false;
196216
}
197217

218+
/**
219+
* Start the specified {@link MessageListenerContainer} if it should be started
220+
* on startup.
221+
* @see MessageListenerContainer#isAutoStartup()
222+
*/
223+
private static void startIfNecessary(MessageListenerContainer listenerContainer) {
224+
if (listenerContainer.isAutoStartup()) {
225+
listenerContainer.start();
226+
}
227+
}
228+
198229

199230
private static class AggregatingCallback implements Runnable {
200231

spring-jms/src/test/java/org/springframework/jms/annotation/EnableJmsTests.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,17 +29,22 @@
2929
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
3030
import org.springframework.context.annotation.Bean;
3131
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.context.annotation.Lazy;
3233
import org.springframework.context.annotation.PropertySource;
3334
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
3435
import org.springframework.jms.config.JmsListenerContainerTestFactory;
3536
import org.springframework.jms.config.JmsListenerEndpointRegistrar;
3637
import org.springframework.jms.config.JmsListenerEndpointRegistry;
38+
import org.springframework.jms.config.MessageListenerTestContainer;
3739
import org.springframework.jms.config.SimpleJmsListenerEndpoint;
3840
import org.springframework.jms.listener.adapter.ListenerExecutionFailedException;
3941
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
4042
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
4143
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
4244
import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException;
45+
import org.springframework.stereotype.Component;
46+
47+
import static org.junit.Assert.*;
4348

4449
/**
4550
* @author Stephane Nicoll
@@ -115,6 +120,22 @@ public void unknownFactory() {
115120
EnableJmsSampleConfig.class, CustomBean.class);
116121
}
117122

123+
@Test
124+
public void lazyComponent() {
125+
ConfigurableApplicationContext context = new AnnotationConfigApplicationContext(
126+
EnableJmsDefaultContainerFactoryConfig.class, LazyBean.class);
127+
JmsListenerContainerTestFactory defaultFactory =
128+
context.getBean("jmsListenerContainerFactory", JmsListenerContainerTestFactory.class);
129+
assertEquals(0, defaultFactory.getListenerContainers().size());
130+
131+
context.getBean(LazyBean.class); // trigger lazy resolution
132+
assertEquals(1, defaultFactory.getListenerContainers().size());
133+
MessageListenerTestContainer container = defaultFactory.getListenerContainers().get(0);
134+
assertTrue("Should have been started " + container, container.isStarted());
135+
context.close(); // Close and stop the listeners
136+
assertTrue("Should have been stopped " + container, container.isStopped());
137+
}
138+
118139
@EnableJms
119140
@Configuration
120141
static class EnableJmsSampleConfig {
@@ -240,4 +261,13 @@ public JmsListenerContainerTestFactory defaultFactory() {
240261
}
241262
}
242263

264+
@Component
265+
@Lazy
266+
static class LazyBean {
267+
268+
@JmsListener(destination = "myQueue")
269+
public void handle(String msg) {
270+
}
271+
}
272+
243273
}

0 commit comments

Comments
 (0)