Skip to content

Commit fd35d43

Browse files
garyrussellartembilan
authored andcommitted
INT-3784: Use acknowledge="transacted" by Default
JIRA: https://jira.spring.io/browse/INT-3784 Also - suppress WARN log when priority mapping fails - rework extract payload tests to use a single context Make a new `JmsMessageDrivenEndpoint` as `private` because it makes sense only for XML `BeanDefinition` variant.
1 parent 4cca684 commit fd35d43

File tree

11 files changed

+305
-154
lines changed

11 files changed

+305
-154
lines changed

spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,9 @@ public void onMessage(javax.jms.Message jmsMessage, Session session) throws JMSE
341341
Message<?> replyMessage = this.gatewayDelegate.sendAndReceiveMessage(requestMessage);
342342
if (replyMessage != null) {
343343
Destination destination = this.getReplyDestination(jmsMessage, session);
344+
if (logger.isDebugEnabled()) {
345+
logger.debug("Reply destination: " + destination);
346+
}
344347
if (destination != null) {
345348
// convert SI Message to JMS Message
346349
Object replyResult = replyMessage;

spring-integration-jms/src/main/java/org/springframework/integration/jms/DefaultJmsHeaderMapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ public void fromHeaders(MessageHeaders headers, javax.jms.Message jmsMessage) {
150150
jmsMessage.setObjectProperty(propertyName, value);
151151
}
152152
catch (Exception e) {
153-
if (headerName.startsWith("JMSX")) {
153+
if (headerName.startsWith("JMSX")
154+
|| headerName.equals(IntegrationMessageHeaderAccessor.PRIORITY)) {
154155
if (logger.isTraceEnabled()) {
155156
logger.trace("skipping reserved header, it cannot be set by client: " + headerName);
156157
}

spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsMessageDrivenEndpoint.java

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121
import org.springframework.integration.endpoint.AbstractEndpoint;
2222
import org.springframework.integration.jms.util.JmsAdapterUtils;
2323
import org.springframework.jms.listener.AbstractMessageListenerContainer;
24+
import org.springframework.jms.listener.DefaultMessageListenerContainer;
2425
import org.springframework.util.Assert;
2526

2627
/**
@@ -35,12 +36,33 @@ public class JmsMessageDrivenEndpoint extends AbstractEndpoint implements Dispos
3536

3637
private final AbstractMessageListenerContainer listenerContainer;
3738

39+
private final boolean externalContainer;
40+
3841
private final ChannelPublishingJmsMessageListener listener;
39-
42+
4043
private volatile String sessionAcknowledgeMode;
41-
44+
45+
/**
46+
* Construct an instance with an externally configured container.
47+
* @param listenerContainer the container.
48+
* @param listener the listener.
49+
*/
4250
public JmsMessageDrivenEndpoint(AbstractMessageListenerContainer listenerContainer,
4351
ChannelPublishingJmsMessageListener listener) {
52+
this(listenerContainer, listener, true);
53+
}
54+
55+
/**
56+
* Construct an instance with an argument indicating whether the container's ack mode should
57+
* be overridden with {@link #setSessionAcknowledgeMode(String) sessionAcknowledgeMode}, default
58+
* 'transacted'.
59+
* @param listenerContainer the container.
60+
* @param listener the listener.
61+
* @param externalContainer true if the container is externally configured and should not have its ackmode
62+
* coerced when no sessionAcknowledgeMode was supplied.
63+
*/
64+
private JmsMessageDrivenEndpoint(AbstractMessageListenerContainer listenerContainer,
65+
ChannelPublishingJmsMessageListener listener, boolean externalContainer) {
4466
Assert.notNull(listenerContainer, "listener container must not be null");
4567
Assert.notNull(listener, "listener must not be null");
4668
if (logger.isWarnEnabled() && listenerContainer.getMessageListener() != null) {
@@ -51,12 +73,20 @@ public JmsMessageDrivenEndpoint(AbstractMessageListenerContainer listenerContain
5173
this.listener = listener;
5274
this.listenerContainer = listenerContainer;
5375
setPhase(Integer.MAX_VALUE / 2);
76+
this.externalContainer = externalContainer;
5477
}
5578

79+
/**
80+
* Set the session acknowledge mode on the listener container. It will override the
81+
* container setting even if an external container is provided. Defaults to null
82+
* (won't change container) if an external container is provided or `transacted` when
83+
* the framework creates an implicit {@link DefaultMessageListenerContainer}.
84+
* @param sessionAcknowledgeMode the acknowledge mode.
85+
*/
5686
public void setSessionAcknowledgeMode(String sessionAcknowledgeMode) {
5787
this.sessionAcknowledgeMode = sessionAcknowledgeMode;
5888
}
59-
89+
6090
@Override
6191
public String getComponentType() {
6292
return "jms:message-driven-channel-adapter";
@@ -68,7 +98,12 @@ protected void onInit() throws Exception {
6898
if (!this.listenerContainer.isActive()) {
6999
this.listenerContainer.afterPropertiesSet();
70100
}
71-
Integer acknowledgeMode = JmsAdapterUtils.parseAcknowledgeMode(this.sessionAcknowledgeMode);
101+
String sessionAcknowledgeMode = this.sessionAcknowledgeMode;
102+
if (sessionAcknowledgeMode == null && !this.externalContainer
103+
&& DefaultMessageListenerContainer.class.isAssignableFrom(this.listenerContainer.getClass())) {
104+
sessionAcknowledgeMode = JmsAdapterUtils.SESSION_TRANSACTED_STRING;
105+
}
106+
Integer acknowledgeMode = JmsAdapterUtils.parseAcknowledgeMode(sessionAcknowledgeMode);
72107
if (acknowledgeMode != null) {
73108
if (acknowledgeMode.intValue() == JmsAdapterUtils.SESSION_TRANSACTED) {
74109
this.listenerContainer.setSessionTransacted(true);
@@ -94,6 +129,7 @@ protected void doStop() {
94129
this.listener.stop();
95130
}
96131

132+
@Override
97133
public void destroy() throws Exception {
98134
if (this.isRunning()) {
99135
this.stop();
@@ -102,12 +138,14 @@ public void destroy() throws Exception {
102138
}
103139

104140

141+
@Override
105142
public int beforeShutdown() {
106143
this.stop();
107144
return 0;
108145
}
109146

110147

148+
@Override
111149
public int afterShutdown() {
112150
return 0;
113151
}

spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsOutboundGateway.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -770,6 +770,9 @@ private javax.jms.Message sendAndReceiveWithContainer(Message<?> requestMessage)
770770

771771
jmsRequest.setJMSReplyTo(replyTo);
772772
connection.start();
773+
if (logger.isDebugEnabled()) {
774+
logger.debug("ReplyTo: " + replyTo);
775+
}
773776

774777
Integer priority = new IntegrationMessageHeaderAccessor(requestMessage).getPriority();
775778
if (priority == null) {
@@ -824,6 +827,9 @@ private javax.jms.Message sendAndReceiveWithoutContainer(Message<?> requestMessa
824827
replyTo = this.determineReplyDestination(requestMessage, session);
825828
jmsRequest.setJMSReplyTo(replyTo);
826829
connection.start();
830+
if (logger.isDebugEnabled()) {
831+
logger.debug("ReplyTo: " + replyTo);
832+
}
827833

828834
Integer priority = new IntegrationMessageHeaderAccessor(requestMessage).getPriority();
829835
if (priority == null) {

spring-integration-jms/src/main/java/org/springframework/integration/jms/config/JmsMessageDrivenEndpointParser.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class JmsMessageDrivenEndpointParser extends AbstractSingleBeanDefinition
6363
JmsAdapterParserUtils.DESTINATION_NAME_ATTRIBUTE,
6464
"destination-resolver", "transaction-manager",
6565
"concurrent-consumers", "max-concurrent-consumers",
66+
"acknowledge",
6667
"max-messages-per-task", "selector",
6768
"receive-timeout", "recovery-interval",
6869
"idle-consumer-limit", "idle-task-execution-limit",
@@ -117,6 +118,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
117118
String listenerBeanName = this.parseMessageListener(element, parserContext, builder.getRawBeanDefinition());
118119
builder.addConstructorArgReference(containerBeanName);
119120
builder.addConstructorArgReference(listenerBeanName);
121+
builder.addConstructorArgValue(hasExternalContainer(element));
120122
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, IntegrationNamespaceUtils.AUTO_STARTUP);
121123
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, IntegrationNamespaceUtils.PHASE);
122124
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "acknowledge", "sessionAcknowledgeMode");
@@ -126,7 +128,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
126128
private String parseMessageListenerContainer(Element element, ParserContext parserContext,
127129
BeanDefinition adapterBeanDefinition) {
128130
String containerClass = element.getAttribute("container-class");
129-
if (element.hasAttribute("container")) {
131+
if (hasExternalContainer(element)) {
130132
if (StringUtils.hasText(containerClass)) {
131133
parserContext.getReaderContext().error("Cannot have both 'container' and 'container-class'", element);
132134
}
@@ -198,6 +200,11 @@ private String parseMessageListenerContainer(Element element, ParserContext pars
198200
return beanName;
199201
}
200202

203+
204+
private boolean hasExternalContainer(Element element) {
205+
return element.hasAttribute("container");
206+
}
207+
201208
private String parseMessageListener(Element element, ParserContext parserContext,
202209
BeanDefinition adapterBeanDefinition) {
203210
BeanDefinitionBuilder builder = BeanDefinitionBuilder

spring-integration-jms/src/main/java/org/springframework/integration/jms/util/JmsAdapterUtils.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014 the original author or authors.
2+
* Copyright 2014-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,11 +19,20 @@
1919

2020
/**
2121
* @author Liujiong
22+
* @author Gary Russell
2223
* @since 4.1
2324
*
2425
*/
2526
public abstract class JmsAdapterUtils {
26-
27+
28+
public static final String AUTO_ACKNOWLEDGE_STRING = "auto";
29+
30+
public static final String DUPS_OK_ACKNOWLEDGE_STRING = "dups-ok";
31+
32+
public static final String CLIENT_ACKNOWLEDGE_STRING = "client";
33+
34+
public static final String SESSION_TRANSACTED_STRING = "transacted";
35+
2736
public static final int SESSION_TRANSACTED = 0;
2837

2938
public static final int AUTO_ACKNOWLEDGE = 1;
@@ -35,16 +44,16 @@ public abstract class JmsAdapterUtils {
3544
public static Integer parseAcknowledgeMode(String acknowledge) {
3645
if (StringUtils.hasText(acknowledge)) {
3746
int acknowledgeMode = AUTO_ACKNOWLEDGE;
38-
if ("transacted".equals(acknowledge)) {
47+
if (SESSION_TRANSACTED_STRING.equals(acknowledge)) {
3948
acknowledgeMode = SESSION_TRANSACTED;
4049
}
41-
else if ("dups-ok".equals(acknowledge)) {
50+
else if (DUPS_OK_ACKNOWLEDGE_STRING.equals(acknowledge)) {
4251
acknowledgeMode = DUPS_OK_ACKNOWLEDGE;
4352
}
44-
else if ("client".equals(acknowledge)) {
53+
else if (CLIENT_ACKNOWLEDGE_STRING.equals(acknowledge)) {
4554
acknowledgeMode = CLIENT_ACKNOWLEDGE;
4655
}
47-
else if (!"auto".equals(acknowledge)) {
56+
else if (!AUTO_ACKNOWLEDGE_STRING.equals(acknowledge)) {
4857
throw new IllegalStateException("Invalid JMS 'acknowledge' setting: " +
4958
"only \"auto\", \"client\", \"dups-ok\" and \"transacted\" supported.");
5059
}

spring-integration-jms/src/main/resources/org/springframework/integration/jms/config/spring-integration-jms-4.2.xsd

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1317,7 +1317,10 @@
13171317
The native JMS acknowledge mode: "auto", "client", "dups-ok" or "transacted".
13181318
The latter effectively activates a locally transacted Session. 'transacted'
13191319
is not allowed on the inbound-channel-adapter; use 'session-transacted' instead.
1320-
acknowlege="transacted" is used on the message-driven-channel-adapter.
1320+
acknowlege="transacted" is used on the message-driven-channel-adapter and inbound gateway.
1321+
Defaults to "transacted" when an implicit message listener container is configured.
1322+
Not allowed when using an externally configured listener container; configure the container
1323+
instead.
13211324
]]></xsd:documentation>
13221325
</xsd:annotation>
13231326
<xsd:simpleType>

0 commit comments

Comments
 (0)