Skip to content

Commit 03b6316

Browse files
David Turanskigaryrussell
David Turanski
authored andcommitted
GH-3027: RotatingServerAdvice enhancements
Fix checkstyle errors Polishing - wrap code at 120, javadocs at 90; don't use this. on method calls
1 parent baf875c commit 03b6316

File tree

5 files changed

+248
-150
lines changed

5 files changed

+248
-150
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.file.remote.aop;
18+
19+
import java.util.ArrayList;
20+
import java.util.Iterator;
21+
import java.util.List;
22+
23+
import org.apache.commons.logging.Log;
24+
import org.apache.commons.logging.LogFactory;
25+
26+
import org.springframework.integration.core.MessageSource;
27+
import org.springframework.integration.file.remote.session.DelegatingSessionFactory;
28+
import org.springframework.util.Assert;
29+
30+
/**
31+
* Standard rotation policy; iterates over key/directory pairs; when the end is reached,
32+
* starts again at the beginning. If the fair option is true the rotation occurs on every
33+
* poll, regardless of result. Otherwise rotation occurs when the current pair returns no
34+
* message.
35+
*
36+
* Subclasses implement {@code onRotation(MessageSource<?> source)} to configure the
37+
* {@link MessageSource} on each rotation.
38+
*
39+
* @author Gary Russell
40+
* @author Michael Forstner
41+
* @author Artem Bilan
42+
* @author David Turanski
43+
*
44+
* @since 5.1.8
45+
*/
46+
public abstract class AbstractStandardRotationPolicy implements RotationPolicy {
47+
protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final
48+
49+
private final DelegatingSessionFactory<?> factory; // NOSONAR final
50+
51+
private final List<KeyDirectory> keyDirectories = new ArrayList<>();
52+
53+
private final boolean fair;
54+
55+
private volatile Iterator<KeyDirectory> iterator;
56+
57+
private volatile KeyDirectory current;
58+
59+
private volatile boolean initialized;
60+
61+
protected AbstractStandardRotationPolicy(DelegatingSessionFactory<?> factory, List<KeyDirectory> keyDirectories,
62+
boolean fair) {
63+
64+
Assert.notNull(factory, "factory cannot be null");
65+
Assert.notNull(keyDirectories, "keyDirectories cannot be null");
66+
Assert.isTrue(keyDirectories.size() > 0, "At least one KeyDirectory is required");
67+
this.factory = factory;
68+
this.keyDirectories.addAll(keyDirectories);
69+
this.fair = fair;
70+
this.iterator = this.keyDirectories.iterator();
71+
}
72+
73+
@Override
74+
public void beforeReceive(MessageSource<?> source) {
75+
if (this.fair || !this.initialized) {
76+
configureSource(source);
77+
this.initialized = true;
78+
}
79+
if (this.logger.isTraceEnabled()) {
80+
this.logger.trace("Next poll is for " + this.current);
81+
}
82+
this.factory.setThreadKey(this.current.getKey());
83+
}
84+
85+
@Override
86+
public void afterReceive(boolean messageReceived, MessageSource<?> source) {
87+
if (this.logger.isTraceEnabled()) {
88+
this.logger.trace("Poll produced "
89+
+ (messageReceived ? "a" : "no")
90+
+ " message");
91+
}
92+
this.factory.clearThreadKey();
93+
if (!this.fair && !messageReceived) {
94+
configureSource(source);
95+
}
96+
}
97+
98+
@Override
99+
public KeyDirectory getCurrent() {
100+
return this.current;
101+
}
102+
103+
104+
protected DelegatingSessionFactory<?> getFactory() {
105+
return this.factory;
106+
}
107+
108+
protected List<KeyDirectory> getKeyDirectories() {
109+
return this.keyDirectories;
110+
}
111+
112+
protected boolean isFair() {
113+
return this.fair;
114+
}
115+
116+
protected Iterator<KeyDirectory> getIterator() {
117+
return this.iterator;
118+
}
119+
120+
protected boolean isInitialized() {
121+
return this.initialized;
122+
}
123+
124+
protected void configureSource(MessageSource<?> source) {
125+
126+
if (!this.iterator.hasNext()) {
127+
this.iterator = this.keyDirectories.iterator();
128+
}
129+
this.current = this.iterator.next();
130+
131+
onRotation(source);
132+
}
133+
134+
protected abstract void onRotation(MessageSource<?> source);
135+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.file.remote.aop;
18+
19+
import org.springframework.integration.file.remote.session.DelegatingSessionFactory;
20+
import org.springframework.util.Assert;
21+
22+
/**
23+
* A {@link DelegatingSessionFactory} key/directory pair.
24+
*/
25+
public class KeyDirectory {
26+
27+
private final Object key;
28+
29+
private final String directory;
30+
31+
public KeyDirectory(Object key, String directory) {
32+
Assert.notNull(key, "key cannot be null");
33+
Assert.notNull(directory, "directory cannot be null");
34+
this.key = key;
35+
this.directory = directory;
36+
}
37+
38+
public Object getKey() {
39+
return this.key;
40+
}
41+
42+
public String getDirectory() {
43+
return this.directory;
44+
}
45+
46+
@Override
47+
public String toString() {
48+
return "KeyDirectory [key=" + this.key.toString() + ", directory=" + this.directory + "]";
49+
}
50+
51+
}

spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/RotatingServerAdvice.java

Lines changed: 7 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,8 @@
1616

1717
package org.springframework.integration.file.remote.aop;
1818

19-
import java.util.ArrayList;
20-
import java.util.Iterator;
2119
import java.util.List;
2220

23-
import org.apache.commons.logging.Log;
24-
import org.apache.commons.logging.LogFactory;
25-
2621
import org.springframework.integration.aop.AbstractMessageSourceAdvice;
2722
import org.springframework.integration.core.MessageSource;
2823
import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource;
@@ -37,6 +32,7 @@
3732
* @author Gary Russell
3833
* @author Michael Forstner
3934
* @author Artem Bilan
35+
* @author David Turanski
4036
*
4137
* @since 5.0.7
4238
*
@@ -88,167 +84,29 @@ public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
8884
return result;
8985
}
9086

91-
/**
92-
* Implementations can reconfigure the message source before and/or after
93-
* a poll.
94-
*/
95-
public interface RotationPolicy {
96-
97-
/**
98-
* Invoked before the message source receive() method.
99-
* @param source the message source.
100-
*/
101-
void beforeReceive(MessageSource<?> source);
102-
103-
/**
104-
* Invoked after the message source receive() method.
105-
* @param messageReceived true if a message was received.
106-
* @param source the message source.
107-
*/
108-
void afterReceive(boolean messageReceived, MessageSource<?> source);
109-
110-
}
111-
112-
/**
113-
* Standard rotation policy; iterates over key/directory pairs; when the end
114-
* is reached, starts again at the beginning. If the fair option is true
115-
* the rotation occurs on every poll, regardless of result. Otherwise rotation
116-
* occurs when the current pair returns no message.
117-
*/
118-
public static class StandardRotationPolicy implements RotationPolicy {
119-
120-
protected final Log logger = LogFactory.getLog(getClass());
121-
122-
protected final DelegatingSessionFactory<?> factory;
123-
124-
private final List<KeyDirectory> keyDirectories = new ArrayList<>();
125-
126-
private final boolean fair;
127-
128-
private volatile Iterator<KeyDirectory> iterator;
129-
130-
private volatile KeyDirectory current;
87+
public static class StandardRotationPolicy extends AbstractStandardRotationPolicy {
13188

132-
private volatile boolean initialized;
13389

13490
public StandardRotationPolicy(DelegatingSessionFactory<?> factory, List<KeyDirectory> keyDirectories,
13591
boolean fair) {
136-
137-
Assert.notNull(factory, "factory cannot be null");
138-
Assert.notNull(keyDirectories, "keyDirectories cannot be null");
139-
Assert.isTrue(keyDirectories.size() > 0, "At least one KeyDirectory is required");
140-
this.factory = factory;
141-
this.keyDirectories.addAll(keyDirectories);
142-
this.fair = fair;
143-
this.iterator = this.keyDirectories.iterator();
144-
}
145-
146-
protected Iterator<KeyDirectory> getIterator() {
147-
return this.iterator;
148-
}
149-
150-
protected void setIterator(Iterator<KeyDirectory> iterator) {
151-
this.iterator = iterator;
152-
}
153-
154-
protected boolean isInitialized() {
155-
return this.initialized;
156-
}
157-
158-
protected void setInitialized(boolean initialized) {
159-
this.initialized = initialized;
160-
}
161-
162-
protected DelegatingSessionFactory<?> getFactory() {
163-
return this.factory;
164-
}
165-
166-
protected List<KeyDirectory> getKeyDirectories() {
167-
return this.keyDirectories;
168-
}
169-
170-
protected boolean isFair() {
171-
return this.fair;
172-
}
173-
174-
protected KeyDirectory getCurrent() {
175-
return this.current;
176-
}
177-
178-
@Override
179-
public void beforeReceive(MessageSource<?> source) {
180-
if (this.fair || !this.initialized) {
181-
configureSource(source);
182-
this.initialized = true;
183-
}
184-
if (this.logger.isTraceEnabled()) {
185-
this.logger.trace("Next poll is for " + this.current);
186-
}
187-
this.factory.setThreadKey(this.current.getKey());
92+
super(factory, keyDirectories, fair);
18893
}
18994

19095
@Override
191-
public void afterReceive(boolean messageReceived, MessageSource<?> source) {
192-
if (this.logger.isTraceEnabled()) {
193-
this.logger.trace("Poll produced "
194-
+ (messageReceived ? "a" : "no")
195-
+ " message");
196-
}
197-
this.factory.clearThreadKey();
198-
if (!this.fair && !messageReceived) {
199-
configureSource(source);
200-
}
201-
}
202-
203-
protected void configureSource(MessageSource<?> source) {
96+
protected void onRotation(MessageSource<?> source) {
20497
Assert.isTrue(source instanceof AbstractInboundFileSynchronizingMessageSource
20598
|| source instanceof AbstractRemoteFileStreamingMessageSource,
20699
"source must be an AbstractInboundFileSynchronizingMessageSource or a "
207100
+ "AbstractRemoteFileStreamingMessageSource");
208-
if (!this.iterator.hasNext()) {
209-
this.iterator = this.keyDirectories.iterator();
210-
}
211-
this.current = this.iterator.next();
101+
212102
if (source instanceof AbstractRemoteFileStreamingMessageSource) {
213-
((AbstractRemoteFileStreamingMessageSource<?>) source).setRemoteDirectory(this.current.getDirectory());
103+
((AbstractRemoteFileStreamingMessageSource<?>) source).setRemoteDirectory(getCurrent().getDirectory());
214104
}
215105
else {
216106
((AbstractInboundFileSynchronizingMessageSource<?>) source).getSynchronizer()
217-
.setRemoteDirectory(this.current.getDirectory());
107+
.setRemoteDirectory(getCurrent().getDirectory());
218108
}
219109
}
220110

221111
}
222-
223-
/**
224-
* A {@link DelegatingSessionFactory} key/directory pair.
225-
*/
226-
public static class KeyDirectory {
227-
228-
private final Object key;
229-
230-
private final String directory;
231-
232-
public KeyDirectory(Object key, String directory) {
233-
Assert.notNull(key, "key cannot be null");
234-
Assert.notNull(directory, "directory cannot be null");
235-
this.key = key;
236-
this.directory = directory;
237-
}
238-
239-
public Object getKey() {
240-
return this.key;
241-
}
242-
243-
public String getDirectory() {
244-
return this.directory;
245-
}
246-
247-
@Override
248-
public String toString() {
249-
return "KeyDirectory [key=" + this.key.toString() + ", directory=" + this.directory + "]";
250-
}
251-
252-
}
253-
254112
}

0 commit comments

Comments
 (0)