Skip to content

Commit 3d5f7db

Browse files
garyrussellartembilan
authored andcommitted
INT-4740: FileSplitter - Add Headers
JIRA: https://jira.spring.io/browse/INT-3740 For `File` and `String` payloads add `FileHeaders.ORIGINAL_FILE` and `FileHeaders.FILENAME` headers. Reworked to create headers once only. Fix typos and Java > 6 API usage
1 parent 6436aa6 commit 3d5f7db

File tree

3 files changed

+65
-4
lines changed

3 files changed

+65
-4
lines changed

spring-integration-core/src/main/java/org/springframework/integration/splitter/AbstractMessageSplitter.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,16 @@
1919
import java.util.Arrays;
2020
import java.util.Collection;
2121
import java.util.Collections;
22+
import java.util.HashMap;
2223
import java.util.Iterator;
24+
import java.util.Map;
2325
import java.util.concurrent.atomic.AtomicInteger;
2426

2527
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
2628
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
2729
import org.springframework.integration.util.Function;
2830
import org.springframework.integration.util.FunctionIterator;
2931
import org.springframework.messaging.Message;
30-
import org.springframework.messaging.MessageHeaders;
3132

3233
/**
3334
* Base class for Message-splitting handlers.
@@ -86,21 +87,28 @@ else if (result instanceof Iterator<?>) {
8687
return null;
8788
}
8889

89-
final MessageHeaders headers = message.getHeaders();
90-
final Object correlationId = headers.getId();
90+
Map<String, Object> messageHeaders = message.getHeaders();
91+
if (willAddHeaders(message)) {
92+
messageHeaders = new HashMap<String, Object>(messageHeaders);
93+
addHeaders(message, messageHeaders);
94+
}
95+
final Map<String, Object> headers = messageHeaders;
96+
final Object correlationId = message.getHeaders().getId();
9197
final AtomicInteger sequenceNumber = new AtomicInteger(1);
9298

9399
return new FunctionIterator<Object, AbstractIntegrationMessageBuilder<?>>(iterator,
94100
new Function<Object, AbstractIntegrationMessageBuilder<?>>() {
101+
95102
@Override
96103
public AbstractIntegrationMessageBuilder<?> apply(Object object) {
97104
return createBuilder(object, headers, correlationId, sequenceNumber.getAndIncrement(),
98105
sequenceSize);
99106
}
107+
100108
});
101109
}
102110

103-
private AbstractIntegrationMessageBuilder<?> createBuilder(Object item, MessageHeaders headers,
111+
private AbstractIntegrationMessageBuilder<?> createBuilder(Object item, Map<String, Object> headers,
104112
Object correlationId, int sequenceNumber, int sequenceSize) {
105113
AbstractIntegrationMessageBuilder<?> builder;
106114
if (item instanceof Message) {
@@ -116,6 +124,26 @@ private AbstractIntegrationMessageBuilder<?> createBuilder(Object item, MessageH
116124
return builder;
117125
}
118126

127+
/**
128+
* Return true if the subclass needs to add headers in the resulting splits.
129+
* If true, {@link #addHeaders} will be called.
130+
* @param message the message.
131+
* @return true
132+
*/
133+
protected boolean willAddHeaders(Message<?> message) {
134+
return false;
135+
}
136+
137+
/**
138+
* Allows subclasses to add extra headers to the output messages. Headers may not be
139+
* removed by this method.
140+
*
141+
* @param message the inbound message.
142+
* @param headers the headers to add messages to.
143+
*/
144+
protected void addHeaders(Message<?> message, Map<String, Object> headers) {
145+
}
146+
119147
@Override
120148
protected boolean shouldCopyRequestHeaders() {
121149
return false;

spring-integration-file/src/main/java/org/springframework/integration/file/splitter/FileSplitter.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030
import java.util.ArrayList;
3131
import java.util.Iterator;
3232
import java.util.List;
33+
import java.util.Map;
3334
import java.util.NoSuchElementException;
3435

36+
import org.springframework.integration.file.FileHeaders;
3537
import org.springframework.integration.file.splitter.FileSplitter.FileMarker.Mark;
3638
import org.springframework.integration.splitter.AbstractMessageSplitter;
3739
import org.springframework.messaging.Message;
@@ -240,6 +242,32 @@ public Object next() {
240242
}
241243
}
242244

245+
246+
@Override
247+
protected boolean willAddHeaders(Message<?> message) {
248+
Object payload = message.getPayload();
249+
return payload instanceof File || payload instanceof String;
250+
}
251+
252+
@Override
253+
protected void addHeaders(Message<?> message, Map<String, Object> headers) {
254+
File file = null;
255+
if (message.getPayload() instanceof File) {
256+
file = (File) message.getPayload();
257+
}
258+
else if (message.getPayload() instanceof String) {
259+
file = new File((String) message.getPayload());
260+
}
261+
if (file != null) {
262+
if (!headers.containsKey(FileHeaders.ORIGINAL_FILE)) {
263+
headers.put(FileHeaders.ORIGINAL_FILE, file);
264+
}
265+
if (!headers.containsKey(FileHeaders.FILENAME)) {
266+
headers.put(FileHeaders.FILENAME, file.getName());
267+
}
268+
}
269+
}
270+
243271
public static class FileMarker implements Serializable {
244272

245273
private static final long serialVersionUID = 8514605438145748406L;

spring-integration-file/src/test/java/org/springframework/integration/file/splitter/FileSplitterTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.springframework.integration.annotation.Splitter;
5050
import org.springframework.integration.channel.QueueChannel;
5151
import org.springframework.integration.config.EnableIntegration;
52+
import org.springframework.integration.file.FileHeaders;
5253
import org.springframework.integration.file.splitter.FileSplitter.FileMarker;
5354
import org.springframework.messaging.Message;
5455
import org.springframework.messaging.MessageChannel;
@@ -109,6 +110,8 @@ public void testFileSplitter() throws Exception {
109110
receive = this.output.receive(10000);
110111
assertNotNull(receive); //äöüß
111112
assertEquals("äöüß", receive.getPayload());
113+
assertEquals(file, receive.getHeaders().get(FileHeaders.ORIGINAL_FILE));
114+
assertEquals(file.getName(), receive.getHeaders().get(FileHeaders.FILENAME));
112115
assertNull(this.output.receive(1));
113116

114117
this.input1.send(new GenericMessage<String>(file.getAbsolutePath()));
@@ -117,6 +120,8 @@ public void testFileSplitter() throws Exception {
117120
assertEquals(2, receive.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE));
118121
receive = this.output.receive(10000);
119122
assertNotNull(receive); //äöüß
123+
assertEquals(file, receive.getHeaders().get(FileHeaders.ORIGINAL_FILE));
124+
assertEquals(file.getName(), receive.getHeaders().get(FileHeaders.FILENAME));
120125
assertNull(this.output.receive(1));
121126

122127
this.input1.send(new GenericMessage<Reader>(new FileReader(file)));

0 commit comments

Comments
 (0)