Skip to content

Commit d11bd64

Browse files
committed
TaskExecutorRegistration does not apply its default settings to a user-provided executor
Also, ChannelRegistration.setInterceptors is deprecated now: in favor of a fluently named interceptors(...) method which is documented to add the given interceptors to the channel's current list. Issue: SPR-15962 Issue: SPR-15976 (cherry picked from commit ac9cfef)
1 parent dedecb9 commit d11bd64

File tree

4 files changed

+91
-43
lines changed

4 files changed

+91
-43
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,15 @@ public ApplicationContext getApplicationContext() {
120120
public AbstractSubscribableChannel clientInboundChannel() {
121121
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor());
122122
ChannelRegistration reg = getClientInboundChannelRegistration();
123-
channel.setInterceptors(reg.getInterceptors());
123+
if (reg.hasInterceptors()) {
124+
channel.setInterceptors(reg.getInterceptors());
125+
}
124126
return channel;
125127
}
126128

127129
@Bean
128130
public ThreadPoolTaskExecutor clientInboundChannelExecutor() {
129-
TaskExecutorRegistration reg = getClientInboundChannelRegistration().getOrCreateTaskExecRegistration();
131+
TaskExecutorRegistration reg = getClientInboundChannelRegistration().taskExecutor();
130132
ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
131133
executor.setThreadNamePrefix("clientInboundChannel-");
132134
return executor;
@@ -136,7 +138,7 @@ protected final ChannelRegistration getClientInboundChannelRegistration() {
136138
if (this.clientInboundChannelRegistration == null) {
137139
ChannelRegistration registration = new ChannelRegistration();
138140
configureClientInboundChannel(registration);
139-
registration.setInterceptors(new ImmutableMessageChannelInterceptor());
141+
registration.interceptors(new ImmutableMessageChannelInterceptor());
140142
this.clientInboundChannelRegistration = registration;
141143
}
142144
return this.clientInboundChannelRegistration;
@@ -153,13 +155,15 @@ protected void configureClientInboundChannel(ChannelRegistration registration) {
153155
public AbstractSubscribableChannel clientOutboundChannel() {
154156
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor());
155157
ChannelRegistration reg = getClientOutboundChannelRegistration();
156-
channel.setInterceptors(reg.getInterceptors());
158+
if (reg.hasInterceptors()) {
159+
channel.setInterceptors(reg.getInterceptors());
160+
}
157161
return channel;
158162
}
159163

160164
@Bean
161165
public ThreadPoolTaskExecutor clientOutboundChannelExecutor() {
162-
TaskExecutorRegistration reg = getClientOutboundChannelRegistration().getOrCreateTaskExecRegistration();
166+
TaskExecutorRegistration reg = getClientOutboundChannelRegistration().taskExecutor();
163167
ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
164168
executor.setThreadNamePrefix("clientOutboundChannel-");
165169
return executor;
@@ -169,7 +173,7 @@ protected final ChannelRegistration getClientOutboundChannelRegistration() {
169173
if (this.clientOutboundChannelRegistration == null) {
170174
ChannelRegistration registration = new ChannelRegistration();
171175
configureClientOutboundChannel(registration);
172-
registration.setInterceptors(new ImmutableMessageChannelInterceptor());
176+
registration.interceptors(new ImmutableMessageChannelInterceptor());
173177
this.clientOutboundChannelRegistration = registration;
174178
}
175179
return this.clientOutboundChannelRegistration;
@@ -185,9 +189,9 @@ protected void configureClientOutboundChannel(ChannelRegistration registration)
185189
@Bean
186190
public AbstractSubscribableChannel brokerChannel() {
187191
ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration();
188-
ExecutorSubscribableChannel channel = reg.hasTaskExecutor() ?
189-
new ExecutorSubscribableChannel(brokerChannelExecutor()) : new ExecutorSubscribableChannel();
190-
reg.setInterceptors(new ImmutableMessageChannelInterceptor());
192+
ExecutorSubscribableChannel channel = (reg.hasTaskExecutor() ?
193+
new ExecutorSubscribableChannel(brokerChannelExecutor()) : new ExecutorSubscribableChannel());
194+
reg.interceptors(new ImmutableMessageChannelInterceptor());
191195
channel.setInterceptors(reg.getInterceptors());
192196
return channel;
193197
}

spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2017 the original author or authors.7
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
* {@link org.springframework.messaging.MessageChannel}.
2929
*
3030
* @author Rossen Stoyanchev
31+
* @author Juergen Hoeller
3132
* @since 4.0
3233
*/
3334
public class ChannelRegistration {
@@ -41,26 +42,37 @@ public class ChannelRegistration {
4142
* Configure the thread pool backing this message channel.
4243
*/
4344
public TaskExecutorRegistration taskExecutor() {
44-
if (this.registration == null) {
45-
this.registration = new TaskExecutorRegistration();
46-
}
47-
return this.registration;
45+
return taskExecutor(null);
4846
}
4947

5048
/**
5149
* Configure the thread pool backing this message channel using a custom
5250
* ThreadPoolTaskExecutor.
51+
* @param taskExecutor the executor to use (or {@code null} for a default executor)
5352
*/
5453
public TaskExecutorRegistration taskExecutor(ThreadPoolTaskExecutor taskExecutor) {
5554
if (this.registration == null) {
56-
this.registration = new TaskExecutorRegistration(taskExecutor);
55+
this.registration = (taskExecutor != null ? new TaskExecutorRegistration(taskExecutor) :
56+
new TaskExecutorRegistration());
5757
}
5858
return this.registration;
5959
}
6060

61+
/**
62+
* Configure the given interceptors for this message channel,
63+
* adding them to the channel's current list of interceptors.
64+
* @since 4.3.12
65+
*/
66+
public ChannelRegistration interceptors(ChannelInterceptor... interceptors) {
67+
this.interceptors.addAll(Arrays.asList(interceptors));
68+
return this;
69+
}
70+
6171
/**
6272
* Configure interceptors for the message channel.
73+
* @deprecated as of 4.3.12, in favor of {@link #interceptors(ChannelInterceptor...)}
6374
*/
75+
@Deprecated
6476
public ChannelRegistration setInterceptors(ChannelInterceptor... interceptors) {
6577
if (interceptors != null) {
6678
this.interceptors.addAll(Arrays.asList(interceptors));
@@ -73,10 +85,18 @@ protected boolean hasTaskExecutor() {
7385
return (this.registration != null);
7486
}
7587

88+
/**
89+
* @deprecated as of 4.3.12 since it's not used anymore
90+
*/
91+
@Deprecated
7692
protected TaskExecutorRegistration getTaskExecRegistration() {
7793
return this.registration;
7894
}
7995

96+
/**
97+
* @deprecated as of 4.3.12 since it's not used anymore
98+
*/
99+
@Deprecated
80100
protected TaskExecutorRegistration getOrCreateTaskExecRegistration() {
81101
return taskExecutor();
82102
}
@@ -88,4 +108,5 @@ protected boolean hasInterceptors() {
88108
protected List<ChannelInterceptor> getInterceptors() {
89109
return this.interceptors;
90110
}
111+
91112
}

spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java

Lines changed: 47 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,33 +17,49 @@
1717
package org.springframework.messaging.simp.config;
1818

1919
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
20+
import org.springframework.util.Assert;
2021

2122
/**
2223
* A registration class for customizing the properties of {@link ThreadPoolTaskExecutor}.
2324
*
2425
* @author Rossen Stoyanchev
26+
* @author Juergen Hoeller
2527
* @since 4.0
2628
*/
2729
public class TaskExecutorRegistration {
2830

29-
private ThreadPoolTaskExecutor taskExecutor;
31+
private final ThreadPoolTaskExecutor taskExecutor;
3032

31-
private int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
33+
private Integer corePoolSize;
3234

33-
private int maxPoolSize = Integer.MAX_VALUE;
35+
private Integer maxPoolSize;
3436

35-
private int queueCapacity = Integer.MAX_VALUE;
37+
private Integer keepAliveSeconds;
3638

37-
private int keepAliveSeconds = 60;
39+
private Integer queueCapacity;
3840

3941

42+
/**
43+
* Create a new {@code TaskExecutorRegistration} for a default
44+
* {@link ThreadPoolTaskExecutor}.
45+
*/
4046
public TaskExecutorRegistration() {
47+
this.taskExecutor = new ThreadPoolTaskExecutor();
48+
this.taskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
49+
this.taskExecutor.setAllowCoreThreadTimeOut(true);
4150
}
4251

52+
/**
53+
* Create a new {@code TaskExecutorRegistration} for a given
54+
* {@link ThreadPoolTaskExecutor}.
55+
* @param taskExecutor the executor to use
56+
*/
4357
public TaskExecutorRegistration(ThreadPoolTaskExecutor taskExecutor) {
58+
Assert.notNull(taskExecutor, "ThreadPoolTaskExecutor must not be null");
4459
this.taskExecutor = taskExecutor;
4560
}
4661

62+
4763
/**
4864
* Set the core pool size of the ThreadPoolExecutor.
4965
* <p><strong>NOTE:</strong> The core pool size is effectively the max pool size
@@ -75,6 +91,18 @@ public TaskExecutorRegistration maxPoolSize(int maxPoolSize) {
7591
return this;
7692
}
7793

94+
/**
95+
* Set the time limit for which threads may remain idle before being terminated.
96+
* If there are more than the core number of threads currently in the pool,
97+
* after waiting this amount of time without processing a task, excess threads
98+
* will be terminated. This overrides any value set in the constructor.
99+
* <p>By default this is set to 60.
100+
*/
101+
public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) {
102+
this.keepAliveSeconds = keepAliveSeconds;
103+
return this;
104+
}
105+
78106
/**
79107
* Set the queue capacity for the ThreadPoolExecutor.
80108
* <p><strong>NOTE:</strong> when an unbounded {@code queueCapacity} is configured
@@ -89,26 +117,21 @@ public TaskExecutorRegistration queueCapacity(int queueCapacity) {
89117
return this;
90118
}
91119

92-
/**
93-
* Set the time limit for which threads may remain idle before being terminated.
94-
* If there are more than the core number of threads currently in the pool,
95-
* after waiting this amount of time without processing a task, excess threads
96-
* will be terminated. This overrides any value set in the constructor.
97-
* <p>By default this is set to 60.
98-
*/
99-
public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) {
100-
this.keepAliveSeconds = keepAliveSeconds;
101-
return this;
102-
}
103120

104121
protected ThreadPoolTaskExecutor getTaskExecutor() {
105-
ThreadPoolTaskExecutor executor = (this.taskExecutor != null ? this.taskExecutor : new ThreadPoolTaskExecutor());
106-
executor.setCorePoolSize(this.corePoolSize);
107-
executor.setMaxPoolSize(this.maxPoolSize);
108-
executor.setKeepAliveSeconds(this.keepAliveSeconds);
109-
executor.setQueueCapacity(this.queueCapacity);
110-
executor.setAllowCoreThreadTimeOut(true);
111-
return executor;
122+
if (this.corePoolSize != null) {
123+
this.taskExecutor.setCorePoolSize(this.corePoolSize);
124+
}
125+
if (this.maxPoolSize != null) {
126+
this.taskExecutor.setMaxPoolSize(this.maxPoolSize);
127+
}
128+
if (this.keepAliveSeconds != null) {
129+
this.taskExecutor.setKeepAliveSeconds(this.keepAliveSeconds);
130+
}
131+
if (this.queueCapacity != null) {
132+
this.taskExecutor.setQueueCapacity(this.queueCapacity);
133+
}
134+
return this.taskExecutor;
112135
}
113136

114137
}

spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -511,14 +511,14 @@ static class CustomConfig extends BaseTestMessageBrokerConfig {
511511

512512
@Override
513513
protected void configureClientInboundChannel(ChannelRegistration registration) {
514-
registration.setInterceptors(this.interceptor);
514+
registration.interceptors(this.interceptor);
515515
registration.taskExecutor(new CustomThreadPoolTaskExecutor())
516516
.corePoolSize(11).maxPoolSize(12).keepAliveSeconds(13).queueCapacity(14);
517517
}
518518

519519
@Override
520520
protected void configureClientOutboundChannel(ChannelRegistration registration) {
521-
registration.setInterceptors(this.interceptor, this.interceptor);
521+
registration.interceptors(this.interceptor, this.interceptor);
522522
registration.taskExecutor().corePoolSize(21).maxPoolSize(22).keepAliveSeconds(23).queueCapacity(24);
523523
}
524524

@@ -534,7 +534,7 @@ protected void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> retu
534534

535535
@Override
536536
protected void configureMessageBroker(MessageBrokerRegistry registry) {
537-
registry.configureBrokerChannel().setInterceptors(this.interceptor, this.interceptor, this.interceptor);
537+
registry.configureBrokerChannel().interceptors(this.interceptor, this.interceptor, this.interceptor);
538538
registry.configureBrokerChannel().taskExecutor().corePoolSize(31).maxPoolSize(32).keepAliveSeconds(33).queueCapacity(34);
539539
registry.setPathMatcher(new AntPathMatcher(".")).enableSimpleBroker("/topic", "/queue");
540540
registry.setCacheLimit(8192);

0 commit comments

Comments
 (0)