Skip to content

Commit afd248d

Browse files
committed
MultipartHttpMessageWriter consumes source once only
The previous fix #09f1f7 did not actually address the issue but only moved it further down, so instead of the subscribe(), it was consuming it inside the MultipartHttpMessageWriter#write method which returned this.body.then(), and then again for the actual request body writing. In this commit MultipartHttpMessageWriter#write returns Mono.empty() since we don't actually want to write the part content from there, but only want to access it as soon as it is availabele, for writing to the request body. Issue: SPR-16402
1 parent 572c668 commit afd248d

File tree

2 files changed

+55
-8
lines changed

2 files changed

+55
-8
lines changed

spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriter.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -269,13 +269,18 @@ private <T> Flux<DataBuffer> encodePart(byte[] boundary, String name, T value) {
269269
Publisher<T> bodyPublisher =
270270
body instanceof Publisher ? (Publisher<T>) body : Mono.just(body);
271271

272-
Mono<Void> partWritten = ((HttpMessageWriter<T>) writer.get())
272+
// The writer will call MultipartHttpOutputMessage#write which doesn't actually write
273+
// but only stores the body Flux and returns Mono.empty().
274+
275+
Mono<Void> partContentReady = ((HttpMessageWriter<T>) writer.get())
273276
.write(bodyPublisher, resolvableType, contentType, outputMessage, Collections.emptyMap());
274277

275-
return Flux.concat(
276-
Mono.just(generateBoundaryLine(boundary)),
277-
partWritten.thenMany(Flux.defer(outputMessage::getBody)),
278-
Mono.just(generateNewLine()));
278+
// After partContentReady, we can access the part content from MultipartHttpOutputMessage
279+
// and use it for writing to the actual request body
280+
281+
Flux<DataBuffer> partContent = partContentReady.thenMany(Flux.defer(outputMessage::getBody));
282+
283+
return Flux.concat(Mono.just(generateBoundaryLine(boundary)), partContent, Mono.just(generateNewLine()));
279284
}
280285

281286

@@ -353,7 +358,9 @@ public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
353358
return Mono.error(new IllegalStateException("Multiple calls to writeWith() not supported"));
354359
}
355360
this.body = Flux.just(generateHeaders()).concatWith(body);
356-
return this.body.then();
361+
362+
// We don't actually want to write (just save the body Flux)
363+
return Mono.empty();
357364
}
358365

359366
private DataBuffer generateHeaders() {
@@ -387,8 +394,7 @@ public Flux<DataBuffer> getBody() {
387394

388395
@Override
389396
public Mono<Void> setComplete() {
390-
return (this.body != null ? this.body.then() :
391-
Mono.error(new IllegalStateException("Body has not been written yet")));
397+
return Mono.error(new UnsupportedOperationException());
392398
}
393399
}
394400

spring-web/src/test/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriterTests.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.http.codec.multipart;
1818

19+
import java.io.IOException;
1920
import java.time.Duration;
2021
import java.util.Collections;
2122
import java.util.List;
@@ -25,6 +26,7 @@
2526
import org.reactivestreams.Publisher;
2627
import reactor.core.publisher.Flux;
2728
import reactor.core.publisher.Mono;
29+
import reactor.core.publisher.UnicastProcessor;
2830

2931
import org.springframework.core.ResolvableType;
3032
import org.springframework.core.codec.StringDecoder;
@@ -146,9 +148,48 @@ public String getFilename() {
146148
Collections.emptyMap()).block(Duration.ZERO);
147149

148150
assertEquals("foobarbaz", value);
151+
}
152+
153+
@Test // SPR-16402
154+
public void singleSubscriberWithResource() throws IOException {
155+
UnicastProcessor<Resource> processor = UnicastProcessor.create();
156+
Resource logo = new ClassPathResource("/org/springframework/http/converter/logo.jpg");
157+
Mono.just(logo).subscribe(processor);
158+
159+
MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder();
160+
bodyBuilder.asyncPart("logo", processor, Resource.class);
161+
162+
Mono<MultiValueMap<String, HttpEntity<?>>> result = Mono.just(bodyBuilder.build());
163+
164+
MockServerHttpResponse response = new MockServerHttpResponse();
165+
Map<String, Object> hints = Collections.emptyMap();
166+
this.writer.write(result, null, MediaType.MULTIPART_FORM_DATA, response, hints).block();
167+
168+
MultiValueMap<String, Part> requestParts = parse(response, hints);
169+
assertEquals(1, requestParts.size());
149170

171+
Part part = requestParts.getFirst("logo");
172+
assertEquals("logo", part.name());
173+
// TODO: a Resource written as an async part doesn't have a file name in the contentDisposition
174+
// assertTrue(part instanceof FilePart);
175+
// assertEquals("logo.jpg", ((FilePart) part).filename());
176+
assertEquals(MediaType.IMAGE_JPEG, part.headers().getContentType());
177+
assertEquals(logo.getFile().length(), part.headers().getContentLength());
178+
}
150179

180+
@Test // SPR-16402
181+
public void singleSubscriberWithStrings() {
182+
UnicastProcessor<String> processor = UnicastProcessor.create();
183+
Flux.just("foo", "bar", "baz").subscribe(processor);
151184

185+
MultipartBodyBuilder bodyBuilder = new MultipartBodyBuilder();
186+
bodyBuilder.asyncPart("name", processor, String.class);
187+
188+
Mono<MultiValueMap<String, HttpEntity<?>>> result = Mono.just(bodyBuilder.build());
189+
190+
MockServerHttpResponse response = new MockServerHttpResponse();
191+
Map<String, Object> hints = Collections.emptyMap();
192+
this.writer.write(result, null, MediaType.MULTIPART_FORM_DATA, response, hints).block();
152193
}
153194

154195
private MultiValueMap<String, Part> parse(MockServerHttpResponse response, Map<String, Object> hints) {

0 commit comments

Comments
 (0)