Skip to content

Commit 759ab23

Browse files
committed
Release shared JMS Connection on Single/CachingConnectionFactory.stop()
See gh-30612
1 parent 1b62b6d commit 759ab23

File tree

2 files changed

+116
-47
lines changed

2 files changed

+116
-47
lines changed

spring-jms/src/main/java/org/springframework/jms/connection/SingleConnectionFactory.java

Lines changed: 76 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import org.springframework.beans.factory.DisposableBean;
4242
import org.springframework.beans.factory.InitializingBean;
43+
import org.springframework.context.Lifecycle;
4344
import org.springframework.lang.Nullable;
4445
import org.springframework.util.Assert;
4546
import org.springframework.util.ClassUtils;
@@ -85,7 +86,7 @@
8586
* @see org.springframework.jms.listener.DefaultMessageListenerContainer#setCacheLevel
8687
*/
8788
public class SingleConnectionFactory implements ConnectionFactory, QueueConnectionFactory,
88-
TopicConnectionFactory, ExceptionListener, InitializingBean, DisposableBean {
89+
TopicConnectionFactory, ExceptionListener, InitializingBean, DisposableBean, Lifecycle {
8990

9091
protected final Log logger = LogFactory.getLog(getClass());
9192

@@ -330,6 +331,67 @@ protected Connection getConnection() throws JMSException {
330331
}
331332
}
332333

334+
/**
335+
* Exception listener callback that renews the underlying single Connection.
336+
* @see #resetConnection()
337+
*/
338+
@Override
339+
public void onException(JMSException ex) {
340+
logger.info("Encountered a JMSException - resetting the underlying JMS Connection", ex);
341+
resetConnection();
342+
}
343+
344+
/**
345+
* Close the underlying shared connection.
346+
* The provider of this ConnectionFactory needs to care for proper shutdown.
347+
* <p>As this bean implements DisposableBean, a bean factory will
348+
* automatically invoke this on destruction of its cached singletons.
349+
* @see #resetConnection()
350+
*/
351+
@Override
352+
public void destroy() {
353+
resetConnection();
354+
}
355+
356+
/**
357+
* Initialize the underlying shared connection on start.
358+
* @since 6.1
359+
* @see #initConnection()
360+
*/
361+
@Override
362+
public void start() {
363+
try {
364+
initConnection();
365+
}
366+
catch (JMSException ex) {
367+
logger.info("Start attempt failed for shared JMS Connection", ex);
368+
}
369+
}
370+
371+
/**
372+
* Reset the underlying shared connection on stop.
373+
* @since 6.1
374+
* @see #resetConnection()
375+
*/
376+
@Override
377+
public void stop() {
378+
resetConnection();
379+
}
380+
381+
/**
382+
* Check whether there is currently an underlying connection.
383+
* @since 6.1
384+
* @see #start()
385+
* @see #stop()
386+
*/
387+
@Override
388+
public boolean isRunning() {
389+
synchronized (this.connectionMonitor) {
390+
return (this.connection != null);
391+
}
392+
}
393+
394+
333395
/**
334396
* Initialize the underlying shared Connection.
335397
* <p>Closes and reinitializes the Connection if an underlying
@@ -375,41 +437,6 @@ public void initConnection() throws JMSException {
375437
}
376438
}
377439

378-
/**
379-
* Exception listener callback that renews the underlying single Connection.
380-
* @see #resetConnection()
381-
*/
382-
@Override
383-
public void onException(JMSException ex) {
384-
logger.info("Encountered a JMSException - resetting the underlying JMS Connection", ex);
385-
resetConnection();
386-
}
387-
388-
/**
389-
* Close the underlying shared connection.
390-
* The provider of this ConnectionFactory needs to care for proper shutdown.
391-
* <p>As this bean implements DisposableBean, a bean factory will
392-
* automatically invoke this on destruction of its cached singletons.
393-
* @see #resetConnection()
394-
*/
395-
@Override
396-
public void destroy() {
397-
resetConnection();
398-
}
399-
400-
/**
401-
* Reset the underlying shared Connection, to be reinitialized on next access.
402-
* @see #closeConnection
403-
*/
404-
public void resetConnection() {
405-
synchronized (this.connectionMonitor) {
406-
if (this.connection != null) {
407-
closeConnection(this.connection);
408-
}
409-
this.connection = null;
410-
}
411-
}
412-
413440
/**
414441
* Create a JMS Connection via this template's ConnectionFactory.
415442
* @return the new JMS Connection
@@ -501,6 +528,19 @@ else if (Boolean.TRUE.equals(this.pubSubMode) && con instanceof TopicConnection
501528
}
502529
}
503530

531+
/**
532+
* Reset the underlying shared Connection, to be reinitialized on next access.
533+
* @see #closeConnection
534+
*/
535+
public void resetConnection() {
536+
synchronized (this.connectionMonitor) {
537+
if (this.connection != null) {
538+
closeConnection(this.connection);
539+
}
540+
this.connection = null;
541+
}
542+
}
543+
504544
/**
505545
* Close the given Connection.
506546
* @param con the Connection to close

spring-jms/src/test/java/org/springframework/jms/connection/SingleConnectionFactoryTests.java

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -367,13 +367,12 @@ public void setExceptionListener(ExceptionListener exceptionListener) throws JMS
367367
// Prepare base JMS ConnectionFactory
368368
// - createConnection(1st) -> TestConnection,
369369
// - createConnection(2nd and next) -> FailingTestConnection
370-
TestConnection testCon = new TestConnection();
371370
FailingTestConnection failingCon = new FailingTestConnection();
372371
AtomicInteger createConnectionMethodCounter = new AtomicInteger();
373372
ConnectionFactory cf = mock(ConnectionFactory.class);
374373
given(cf.createConnection()).willAnswer(invocation -> {
375374
int methodInvocationCounter = createConnectionMethodCounter.incrementAndGet();
376-
return methodInvocationCounter == 1 ? testCon : failingCon;
375+
return (methodInvocationCounter >= 4 ? failingCon : new TestConnection());
377376
});
378377

379378
// Prepare SingleConnectionFactory (setReconnectOnException())
@@ -382,18 +381,43 @@ public void setExceptionListener(ExceptionListener exceptionListener) throws JMS
382381
scf.setReconnectOnException(true);
383382
Field conField = ReflectionUtils.findField(SingleConnectionFactory.class, "connection");
384383
conField.setAccessible(true);
384+
assertThat(scf.isRunning()).isFalse();
385385

386386
// Get connection (1st)
387387
Connection con1 = scf.getConnection();
388388
assertThat(createConnectionMethodCounter.get()).isEqualTo(1);
389-
assertThat(con1).isNotNull();
390389
assertThat(con1.getExceptionListener()).isNotNull();
391-
assertThat(con1).isSameAs(testCon);
390+
assertThat(con1).isSameAs(conField.get(scf));
391+
assertThat(scf.isRunning()).isTrue();
392+
392393
// Get connection again, the same should be returned (shared connection till some problem)
393394
Connection con2 = scf.getConnection();
394395
assertThat(createConnectionMethodCounter.get()).isEqualTo(1);
395396
assertThat(con2.getExceptionListener()).isNotNull();
396397
assertThat(con2).isSameAs(con1);
398+
assertThat(scf.isRunning()).isTrue();
399+
400+
// Explicit stop should reset connection
401+
scf.stop();
402+
assertThat(conField.get(scf)).isNull();
403+
assertThat(scf.isRunning()).isFalse();
404+
Connection con3 = scf.getConnection();
405+
assertThat(createConnectionMethodCounter.get()).isEqualTo(2);
406+
assertThat(con3.getExceptionListener()).isNotNull();
407+
assertThat(con3).isNotSameAs(con2);
408+
assertThat(scf.isRunning()).isTrue();
409+
410+
// Explicit stop-and-restart should refresh connection
411+
scf.stop();
412+
assertThat(conField.get(scf)).isNull();
413+
assertThat(scf.isRunning()).isFalse();
414+
scf.start();
415+
assertThat(scf.isRunning()).isTrue();
416+
assertThat(conField.get(scf)).isNotNull();
417+
Connection con4 = scf.getConnection();
418+
assertThat(createConnectionMethodCounter.get()).isEqualTo(3);
419+
assertThat(con4.getExceptionListener()).isNotNull();
420+
assertThat(con4).isNotSameAs(con3);
397421

398422
// Invoke reset connection to simulate problem with connection
399423
// - SCF exception listener should be invoked -> connection should be set to null
@@ -405,16 +429,21 @@ public void setExceptionListener(ExceptionListener exceptionListener) throws JMS
405429
// - JMSException should be returned from FailingTestConnection
406430
// - connection should be still null (no new connection without exception listener like before fix)
407431
assertThatExceptionOfType(JMSException.class).isThrownBy(() -> scf.getConnection());
408-
assertThat(createConnectionMethodCounter.get()).isEqualTo(2);
432+
assertThat(createConnectionMethodCounter.get()).isEqualTo(4);
409433
assertThat(conField.get(scf)).isNull();
410434

411435
// Attempt to get connection again -> FailingTestConnection should be returned
412-
// - no JMSException is thrown, exception listener should be present
413-
Connection con3 = scf.getConnection();
414-
assertThat(createConnectionMethodCounter.get()).isEqualTo(3);
415-
assertThat(con3).isNotNull();
416-
assertThat(con3).isSameAs(failingCon);
417-
assertThat(con3.getExceptionListener()).isNotNull();
436+
// - no JMSException is thrown, exception listener should be present
437+
Connection con5 = scf.getConnection();
438+
assertThat(createConnectionMethodCounter.get()).isEqualTo(5);
439+
assertThat(con5).isNotNull();
440+
assertThat(con5).isSameAs(failingCon);
441+
assertThat(con5.getExceptionListener()).isNotNull();
442+
assertThat(con5).isNotSameAs(con4);
443+
444+
scf.destroy();
445+
assertThat(conField.get(scf)).isNull();
446+
assertThat(scf.isRunning()).isFalse();
418447
}
419448

420449
@Test

0 commit comments

Comments
 (0)