Skip to content

Commit 09fb4f7

Browse files
artembilangaryrussell
authored andcommitted
INT-2166: Add SecurityContext Propagation
JIRA: https://jira.spring.io/browse/INT-2166 * Introduce `ThreadStatePropagationChannelInterceptor` based on the `ExecutorChannelInterceptor` * Add `SecurityContextPropagationChannelInterceptor`,`SecurityContextCleanupChannelInterceptor` * Introduce `AbstractExecutorChannel` to utilize `ExecutorChannelInterceptor` logic * Introduce `MessageHandlingTaskDecorator` to avoid package tangle from `dispatcher` and `channel` * Introduce `SecurityContextCleanupAdvice` for those cases when we don't get deal with `MessageChannel`s already, but want to have proper way to cleanup `SecurityContext` * Make `GlobalChannelInterceptorProcessor` as `SmartInitializingSingleton` to avoid `phase` conflicts. * Fix `MessagingAnnotationPostProcessor` to use `beanFactory.initializeBean(endpoint, endpointBeanName);` instead of manual `start()` invocation bypassing the `phase` logic, hence having a bug, when endpoints have been started very early * Optimise `AbstractPollableChannel` to use `size` field from `ChannelInterceptorList` instead of `size()` from `Collection<?>` * Fix `AnnotatedEndpointActivationTests` extracting separate component for annotation configuration instead of using test class directly. This caused very late Messaging Annotations process on that class * Fix typo in the `spring-integration-jdbc-4.2.xsd` * Remove some `SOUT`s throughout the project TODO Docs PR Comments: * Remove redundant `AbstractExecutorChannel#executorInterceptors` and make logic based on the `super.interceptors` * Fix wrong imports order * JavaDocs for `ThreadStatePropagationChannelInterceptor` * Docs for `SecurityContext` propagation INT-3593: Fix FTP PartialSuccess Tests JIRA: https://jira.spring.io/browse/INT-3593 Sort the files for the MPUT tests. INT-2166: Add SecurityContext Propagation JIRA: https://jira.spring.io/browse/INT-2166 * Introduce `ThreadStatePropagationChannelInterceptor` based on the `ExecutorChannelInterceptor` * Add `SecurityContextPropagationChannelInterceptor`,`SecurityContextCleanupChannelInterceptor` * Introduce `AbstractExecutorChannel` to utilize `ExecutorChannelInterceptor` logic * Introduce `MessageHandlingTaskDecorator` to avoid package tangle from `dispatcher` and `channel` * Introduce `SecurityContextCleanupAdvice` for those cases when we don't get deal with `MessageChannel`s already, but want to have proper way to cleanup `SecurityContext` * Make `GlobalChannelInterceptorProcessor` as `SmartInitializingSingleton` to avoid `phase` conflicts. * Fix `MessagingAnnotationPostProcessor` to use `beanFactory.initializeBean(endpoint, endpointBeanName);` instead of manual `start()` invocation bypassing the `phase` logic, hence having a bug, when endpoints have been started very early * Optimise `AbstractPollableChannel` to use `size` field from `ChannelInterceptorList` instead of `size()` from `Collection<?>` * Fix `AnnotatedEndpointActivationTests` extracting separate component for annotation configuration instead of using test class directly. This caused very late Messaging Annotations process on that class * Fix typo in the `spring-integration-jdbc-4.2.xsd` * Remove some `SOUT`s throughout the project TODO Docs PR Comments: * Remove redundant `AbstractExecutorChannel#executorInterceptors` and make logic based on the `super.interceptors` * Fix wrong imports order * JavaDocs for `ThreadStatePropagationChannelInterceptor` * Docs for `SecurityContext` propagation Doc Polishing Address PR comments Address PR comments * Extract `ExecutorChannelInterceptor` logic in the `PollingConsumer` to have an ability to invoke `afterMessageHandled()` on the TaskScheduler's Thread for example for the `SecurityContext` clean up * Get rid of all that redundant "clean up" stuff * Docs polishing Fix `NPE` in the `PollingConsumer` Introduce `ExecutorChannelInterceptorAware` to avoid iterators on each message Polishing; Docs, Sonar
1 parent fd35d43 commit 09fb4f7

File tree

37 files changed

+1452
-281
lines changed

37 files changed

+1452
-281
lines changed

build.gradle

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -551,9 +551,8 @@ project('spring-integration-security') {
551551
compile("org.springframework.security:spring-security-core:$springSecurityVersion") {
552552
exclude group: 'org.springframework', module: 'spring-support'
553553
}
554-
compile("org.springframework.security:spring-security-config:$springSecurityVersion") {
555-
exclude group: 'org.springframework', module: 'spring-support'
556-
}
554+
555+
testCompile "org.springframework.security:spring-security-config:$springSecurityVersion"
557556
}
558557
}
559558

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/PollableAmqpChannel.java

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,19 @@
1818

1919
import java.util.ArrayDeque;
2020
import java.util.Deque;
21+
import java.util.List;
2122

2223
import org.springframework.amqp.core.AmqpAdmin;
2324
import org.springframework.amqp.core.AmqpTemplate;
2425
import org.springframework.amqp.core.Queue;
2526
import org.springframework.amqp.rabbit.core.RabbitAdmin;
2627
import org.springframework.amqp.rabbit.core.RabbitTemplate;
28+
import org.springframework.integration.channel.ExecutorChannelInterceptorAware;
2729
import org.springframework.integration.channel.management.PollableChannelManagement;
2830
import org.springframework.messaging.Message;
2931
import org.springframework.messaging.PollableChannel;
3032
import org.springframework.messaging.support.ChannelInterceptor;
33+
import org.springframework.messaging.support.ExecutorChannelInterceptor;
3134
import org.springframework.util.Assert;
3235

3336
/**
@@ -40,15 +43,16 @@
4043
* @author Gary Russell
4144
* @since 2.1
4245
*/
43-
public class PollableAmqpChannel extends AbstractAmqpChannel implements PollableChannel,
44-
PollableChannelManagement {
46+
public class PollableAmqpChannel extends AbstractAmqpChannel
47+
implements PollableChannel, PollableChannelManagement, ExecutorChannelInterceptorAware {
4548

4649
private final String channelName;
4750

4851
private volatile String queueName;
4952

5053
private volatile AmqpAdmin amqpAdmin;
5154

55+
private volatile int executorInterceptorsSize;
5256

5357
public PollableAmqpChannel(String channelName, AmqpTemplate amqpTemplate) {
5458
super(amqpTemplate);
@@ -184,4 +188,53 @@ public Message<?> receive(long timeout) {
184188
return this.receive();
185189
}
186190

191+
@Override
192+
public void setInterceptors(List<ChannelInterceptor> interceptors) {
193+
super.setInterceptors(interceptors);
194+
for (ChannelInterceptor interceptor : interceptors) {
195+
if (interceptor instanceof ExecutorChannelInterceptor) {
196+
this.executorInterceptorsSize++;
197+
}
198+
}
199+
}
200+
201+
@Override
202+
public void addInterceptor(ChannelInterceptor interceptor) {
203+
super.addInterceptor(interceptor);
204+
if (interceptor instanceof ExecutorChannelInterceptor) {
205+
this.executorInterceptorsSize++;
206+
}
207+
}
208+
209+
@Override
210+
public void addInterceptor(int index, ChannelInterceptor interceptor) {
211+
super.addInterceptor(index, interceptor);
212+
if (interceptor instanceof ExecutorChannelInterceptor) {
213+
this.executorInterceptorsSize++;
214+
}
215+
}
216+
217+
@Override
218+
public boolean removeInterceptor(ChannelInterceptor interceptor) {
219+
boolean removed = super.removeInterceptor(interceptor);
220+
if (removed && interceptor instanceof ExecutorChannelInterceptor) {
221+
this.executorInterceptorsSize--;
222+
}
223+
return removed;
224+
}
225+
226+
@Override
227+
public ChannelInterceptor removeInterceptor(int index) {
228+
ChannelInterceptor interceptor = super.removeInterceptor(index);
229+
if (interceptor != null && interceptor instanceof ExecutorChannelInterceptor) {
230+
this.executorInterceptorsSize--;
231+
}
232+
return interceptor;
233+
}
234+
235+
@Override
236+
public boolean hasExecutorInterceptors() {
237+
return this.executorInterceptorsSize > 0;
238+
}
239+
187240
}
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/*
2+
* Copyright 2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.channel;
18+
19+
import java.util.ArrayDeque;
20+
import java.util.Deque;
21+
import java.util.Iterator;
22+
import java.util.List;
23+
import java.util.concurrent.Executor;
24+
25+
import org.springframework.integration.dispatcher.AbstractDispatcher;
26+
import org.springframework.messaging.Message;
27+
import org.springframework.messaging.MessageDeliveryException;
28+
import org.springframework.messaging.MessageHandler;
29+
import org.springframework.messaging.MessagingException;
30+
import org.springframework.messaging.support.ChannelInterceptor;
31+
import org.springframework.messaging.support.ExecutorChannelInterceptor;
32+
import org.springframework.messaging.support.MessageHandlingRunnable;
33+
import org.springframework.util.Assert;
34+
import org.springframework.util.CollectionUtils;
35+
36+
/**
37+
* The {@link AbstractSubscribableChannel} base implementation for those inheritors
38+
* which logic may be based on the {@link Executor}.
39+
* <p>
40+
* Utilizes common operations for the {@link AbstractDispatcher}.
41+
* <p>
42+
* Implements the {@link ExecutorChannelInterceptor}s logic when the message handling
43+
* is handed to the {@link Executor#execute(Runnable)}.
44+
*
45+
* @author Artem Bilan
46+
* @see ExecutorChannel
47+
* @see PublishSubscribeChannel
48+
* @since 4.2
49+
*/
50+
public abstract class AbstractExecutorChannel extends AbstractSubscribableChannel
51+
implements ExecutorChannelInterceptorAware {
52+
53+
protected volatile Executor executor;
54+
55+
protected volatile AbstractDispatcher dispatcher;
56+
57+
protected volatile Integer maxSubscribers;
58+
59+
protected volatile int executorInterceptorsSize;
60+
61+
public AbstractExecutorChannel(Executor executor) {
62+
this.executor = executor;
63+
}
64+
65+
/**
66+
* Specify the maximum number of subscribers supported by the
67+
* channel's dispatcher.
68+
*
69+
* @param maxSubscribers The maximum number of subscribers allowed.
70+
*/
71+
public void setMaxSubscribers(int maxSubscribers) {
72+
this.maxSubscribers = maxSubscribers;
73+
this.dispatcher.setMaxSubscribers(maxSubscribers);
74+
}
75+
76+
@Override
77+
public void setInterceptors(List<ChannelInterceptor> interceptors) {
78+
super.setInterceptors(interceptors);
79+
for (ChannelInterceptor interceptor : interceptors) {
80+
if (interceptor instanceof ExecutorChannelInterceptor) {
81+
this.executorInterceptorsSize++;
82+
}
83+
}
84+
}
85+
86+
@Override
87+
public void addInterceptor(ChannelInterceptor interceptor) {
88+
super.addInterceptor(interceptor);
89+
if (interceptor instanceof ExecutorChannelInterceptor) {
90+
this.executorInterceptorsSize++;
91+
}
92+
}
93+
94+
@Override
95+
public void addInterceptor(int index, ChannelInterceptor interceptor) {
96+
super.addInterceptor(index, interceptor);
97+
if (interceptor instanceof ExecutorChannelInterceptor) {
98+
this.executorInterceptorsSize++;
99+
}
100+
}
101+
102+
@Override
103+
public boolean removeInterceptor(ChannelInterceptor interceptor) {
104+
boolean removed = super.removeInterceptor(interceptor);
105+
if (removed && interceptor instanceof ExecutorChannelInterceptor) {
106+
this.executorInterceptorsSize--;
107+
}
108+
return removed;
109+
}
110+
111+
@Override
112+
public ChannelInterceptor removeInterceptor(int index) {
113+
ChannelInterceptor interceptor = super.removeInterceptor(index);
114+
if (interceptor != null && interceptor instanceof ExecutorChannelInterceptor) {
115+
this.executorInterceptorsSize--;
116+
}
117+
return interceptor;
118+
}
119+
120+
@Override
121+
public boolean hasExecutorInterceptors() {
122+
return this.executorInterceptorsSize > 0;
123+
}
124+
125+
protected class MessageHandlingTask implements Runnable {
126+
127+
private final MessageHandlingRunnable delegate;
128+
129+
public MessageHandlingTask(MessageHandlingRunnable task) {
130+
this.delegate = task;
131+
}
132+
133+
@Override
134+
public void run() {
135+
Message<?> message = this.delegate.getMessage();
136+
MessageHandler messageHandler = this.delegate.getMessageHandler();
137+
Assert.notNull(messageHandler, "'messageHandler' must not be null");
138+
Deque<ExecutorChannelInterceptor> interceptorStack = null;
139+
try {
140+
if (executorInterceptorsSize > 0) {
141+
interceptorStack = new ArrayDeque<ExecutorChannelInterceptor>();
142+
message = applyBeforeHandle(message, interceptorStack);
143+
if (message == null) {
144+
return;
145+
}
146+
}
147+
messageHandler.handleMessage(message);
148+
if (!CollectionUtils.isEmpty(interceptorStack)) {
149+
triggerAfterMessageHandled(message, null, interceptorStack);
150+
}
151+
}
152+
catch (Exception ex) {
153+
if (!CollectionUtils.isEmpty(interceptorStack)) {
154+
triggerAfterMessageHandled(message, ex, interceptorStack);
155+
}
156+
if (ex instanceof MessagingException) {
157+
throw (MessagingException) ex;
158+
}
159+
String description = "Failed to handle " + message + " to " + this + " in " + messageHandler;
160+
throw new MessageDeliveryException(message, description, ex);
161+
}
162+
catch (Error ex) {//NOSONAR - ok, we re-throw below
163+
if (!CollectionUtils.isEmpty(interceptorStack)) {
164+
String description = "Failed to handle " + message + " to " + this + " in " + messageHandler;
165+
triggerAfterMessageHandled(message, new MessageDeliveryException(message, description, ex),
166+
interceptorStack);
167+
}
168+
throw ex;
169+
}
170+
}
171+
172+
private Message<?> applyBeforeHandle(Message<?> message, Deque<ExecutorChannelInterceptor> interceptorStack) {
173+
for (ChannelInterceptor interceptor : AbstractExecutorChannel.this.interceptors.interceptors) {
174+
if (interceptor instanceof ExecutorChannelInterceptor) {
175+
ExecutorChannelInterceptor executorInterceptor = (ExecutorChannelInterceptor) interceptor;
176+
message = executorInterceptor.beforeHandle(message, AbstractExecutorChannel.this,
177+
this.delegate.getMessageHandler());
178+
if (message == null) {
179+
if (isLoggingEnabled() && logger.isDebugEnabled()) {
180+
logger.debug(executorInterceptor.getClass().getSimpleName()
181+
+ " returned null from beforeHandle, i.e. precluding the send.");
182+
}
183+
triggerAfterMessageHandled(null, null, interceptorStack);
184+
return null;
185+
}
186+
interceptorStack.add(executorInterceptor);
187+
}
188+
}
189+
return message;
190+
}
191+
192+
private void triggerAfterMessageHandled(Message<?> message, Exception ex,
193+
Deque<ExecutorChannelInterceptor> interceptorStack) {
194+
Iterator<ExecutorChannelInterceptor> iterator = interceptorStack.descendingIterator();
195+
while (iterator.hasNext()) {
196+
ExecutorChannelInterceptor interceptor = iterator.next();
197+
try {
198+
interceptor.afterMessageHandled(message, AbstractExecutorChannel.this,
199+
this.delegate.getMessageHandler(), ex);
200+
}
201+
catch (Throwable ex2) {//NOSONAR
202+
logger.error("Exception from afterMessageHandled in " + interceptor, ex2);
203+
}
204+
}
205+
}
206+
207+
}
208+
209+
}

spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public abstract class AbstractMessageChannel extends IntegrationObjectSupport
6565
implements MessageChannel, TrackableComponent, ChannelInterceptorAware, MessageChannelMetrics,
6666
ConfigurableMetricsAware<AbstractMessageChannelMetrics> {
6767

68-
private final ChannelInterceptorList interceptors;
68+
protected final ChannelInterceptorList interceptors;
6969

7070
private final Comparator<Object> orderComparator = new OrderComparator();
7171

@@ -520,7 +520,7 @@ protected static class ChannelInterceptorList {
520520

521521
private final Log logger;
522522

523-
private final List<ChannelInterceptor> interceptors = new CopyOnWriteArrayList<ChannelInterceptor>();
523+
protected final List<ChannelInterceptor> interceptors = new CopyOnWriteArrayList<ChannelInterceptor>();
524524

525525
private volatile int size;
526526

0 commit comments

Comments
 (0)