Skip to content

Commit 67e7c78

Browse files
committed
Use DataBufferUtils.compose and remove writeAggregator
Use DataBufferUtils.compose instead of writeAggregator to combine multiple data buffers into one, as the write aggregator would not work when the initial data buffer did not have enough capacity to contain all subsequent buffers. Removed writeAggregator, as it is no longer needed. Issue: SPR-16365
1 parent 384a399 commit 67e7c78

File tree

12 files changed

+20
-50
lines changed

12 files changed

+20
-50
lines changed

spring-core/src/main/java/org/springframework/core/codec/AbstractDataBufferDecoder.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ public Flux<T> decode(Publisher<DataBuffer> inputStream, ResolvableType elementT
6363
public Mono<T> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType,
6464
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
6565

66-
return Flux.from(inputStream)
67-
.reduce(DataBufferUtils.writeAggregator())
66+
return DataBufferUtils.compose(inputStream)
6867
.map(buffer -> decodeDataBuffer(buffer, elementType, mimeType, hints));
6968
}
7069

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

-16
Original file line numberDiff line numberDiff line change
@@ -419,22 +419,6 @@ public static Consumer<DataBuffer> releaseConsumer() {
419419
return RELEASE_CONSUMER;
420420
}
421421

422-
/**
423-
* Return an aggregator function that can be used to {@linkplain Flux#reduce(BiFunction) reduce}
424-
* a {@code Flux} of data buffers into a single data buffer by writing all subsequent buffers
425-
* into the first buffer. All buffers except the first buffer are
426-
* {@linkplain #release(DataBuffer) released}.
427-
* <p>For example:
428-
* <pre class="code">
429-
* Flux&lt;DataBuffer&gt; flux = ...
430-
* Mono&lt;DataBuffer&gt; mono = flux.reduce(DataBufferUtils.writeAggregator());
431-
* </pre>
432-
* @see Flux#reduce(BiFunction)
433-
*/
434-
public static BinaryOperator<DataBuffer> writeAggregator() {
435-
return WRITE_AGGREGATOR;
436-
}
437-
438422
/**
439423
* Composes the buffers in the given {@link Publisher} into a single data buffer. Depending on
440424
* the {@code DataBuffer} implementation, the returned buffer may be a single buffer containing

spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java

-15
Original file line numberDiff line numberDiff line change
@@ -308,21 +308,6 @@ private static void assertReleased(DataBuffer dataBuffer) {
308308
}
309309
}
310310

311-
@Test
312-
public void writeAggregator() {
313-
DataBuffer foo = stringBuffer("foo");
314-
DataBuffer bar = stringBuffer("bar");
315-
DataBuffer baz = stringBuffer("baz");
316-
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
317-
318-
DataBuffer result =
319-
flux.reduce(DataBufferUtils.writeAggregator()).block(Duration.ofSeconds(1));
320-
321-
assertEquals("foobarbaz", DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8));
322-
323-
release(result);
324-
}
325-
326311
@Test
327312
public void SPR16070() throws Exception {
328313
ReadableByteChannel channel = mock(ReadableByteChannel.class);

spring-web/src/main/java/org/springframework/http/codec/FormHttpMessageReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public Mono<MultiValueMap<String, String>> readMono(ResolvableType elementType,
9696
MediaType contentType = message.getHeaders().getContentType();
9797
Charset charset = getMediaTypeCharset(contentType);
9898

99-
return message.getBody().reduce(DataBufferUtils.writeAggregator())
99+
return DataBufferUtils.compose(message.getBody())
100100
.map(buffer -> {
101101
CharBuffer charBuffer = charset.decode(buffer.asByteBuffer());
102102
String body = charBuffer.toString();

spring-web/src/main/java/org/springframework/http/codec/xml/XmlEventDecoder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public Flux<XMLEvent> decode(Publisher<DataBuffer> inputStream, ResolvableType e
103103
.doFinally(signalType -> aaltoMapper.endOfInput());
104104
}
105105
else {
106-
Mono<DataBuffer> singleBuffer = flux.reduce(DataBufferUtils.writeAggregator());
106+
Mono<DataBuffer> singleBuffer = DataBufferUtils.compose(flux);
107107
return singleBuffer.
108108
flatMapMany(dataBuffer -> {
109109
try {

spring-web/src/test/java/org/springframework/http/codec/multipart/SynchronossPartHttpMessageReaderTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public void resolveParts() throws IOException {
8989
assertTrue(part instanceof FilePart);
9090
assertEquals("fooPart", part.name());
9191
assertEquals("foo.txt", ((FilePart) part).filename());
92-
DataBuffer buffer = part.content().reduce(DataBufferUtils.writeAggregator()).block();
92+
DataBuffer buffer = DataBufferUtils.compose(part.content()).block();
9393
assertEquals(12, buffer.readableByteCount());
9494
byte[] byteContent = new byte[12];
9595
buffer.read(byteContent);

spring-web/src/test/java/org/springframework/http/server/reactive/MultipartIntegrationTests.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,7 @@ private void assertFooPart(Part part) {
103103
assertEquals("fooPart", part.name());
104104
assertTrue(part instanceof FilePart);
105105
assertEquals("foo.txt", ((FilePart) part).filename());
106-
DataBuffer buffer = part
107-
.content()
108-
.reduce(DataBufferUtils.writeAggregator())
109-
.block();
106+
DataBuffer buffer = DataBufferUtils.compose(part.content()).block();
110107
assertEquals(12, buffer.readableByteCount());
111108
byte[] byteContent = new byte[12];
112109
buffer.read(byteContent);

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -457,8 +457,7 @@ private <T extends Publisher<?>> T bodyToPublisher(ClientResponse response,
457457

458458
private static Mono<WebClientResponseException> createResponseException(ClientResponse response) {
459459

460-
return response.body(BodyExtractors.toDataBuffers())
461-
.reduce(DataBufferUtils.writeAggregator())
460+
return DataBufferUtils.compose(response.body(BodyExtractors.toDataBuffers()))
462461
.map(dataBuffer -> {
463462
byte[] bytes = new byte[dataBuffer.readableByteCount()];
464463
dataBuffer.read(bytes);

spring-webflux/src/main/java/org/springframework/web/reactive/resource/AppCacheManifestTransformer.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import reactor.core.publisher.SynchronousSink;
3535

3636
import org.springframework.core.io.Resource;
37+
import org.springframework.core.io.buffer.DataBuffer;
3738
import org.springframework.core.io.buffer.DataBufferFactory;
3839
import org.springframework.core.io.buffer.DataBufferUtils;
3940
import org.springframework.lang.Nullable;
@@ -109,8 +110,9 @@ public Mono<Resource> transform(ServerWebExchange exchange, Resource inputResour
109110
return Mono.just(outputResource);
110111
}
111112
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
112-
return DataBufferUtils.read(outputResource, bufferFactory, StreamUtils.BUFFER_SIZE)
113-
.reduce(DataBufferUtils.writeAggregator())
113+
Flux<DataBuffer> flux = DataBufferUtils
114+
.read(outputResource, bufferFactory, StreamUtils.BUFFER_SIZE);
115+
return DataBufferUtils.compose(flux)
114116
.flatMap(dataBuffer -> {
115117
CharBuffer charBuffer = DEFAULT_CHARSET.decode(dataBuffer.asByteBuffer());
116118
DataBufferUtils.release(dataBuffer);

spring-webflux/src/main/java/org/springframework/web/reactive/resource/ContentVersionStrategy.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616

1717
package org.springframework.web.reactive.resource;
1818

19+
import reactor.core.publisher.Flux;
1920
import reactor.core.publisher.Mono;
2021

2122
import org.springframework.core.io.Resource;
23+
import org.springframework.core.io.buffer.DataBuffer;
2224
import org.springframework.core.io.buffer.DataBufferFactory;
2325
import org.springframework.core.io.buffer.DataBufferUtils;
2426
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
@@ -42,8 +44,9 @@ public class ContentVersionStrategy extends AbstractFileNameVersionStrategy {
4244

4345
@Override
4446
public Mono<String> getResourceVersion(Resource resource) {
45-
return DataBufferUtils.read(resource, dataBufferFactory, StreamUtils.BUFFER_SIZE)
46-
.reduce(DataBufferUtils.writeAggregator())
47+
Flux<DataBuffer> flux =
48+
DataBufferUtils.read(resource, dataBufferFactory, StreamUtils.BUFFER_SIZE);
49+
return DataBufferUtils.compose(flux)
4750
.map(buffer -> {
4851
byte[] result = new byte[buffer.readableByteCount()];
4952
buffer.read(result);

spring-webflux/src/main/java/org/springframework/web/reactive/resource/CssLinkResourceTransformer.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import reactor.core.publisher.Mono;
3434

3535
import org.springframework.core.io.Resource;
36+
import org.springframework.core.io.buffer.DataBuffer;
3637
import org.springframework.core.io.buffer.DataBufferFactory;
3738
import org.springframework.core.io.buffer.DataBufferUtils;
3839
import org.springframework.lang.Nullable;
@@ -86,8 +87,9 @@ public Mono<Resource> transform(ServerWebExchange exchange, Resource inputResour
8687
}
8788

8889
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
89-
return DataBufferUtils.read(ouptputResource, bufferFactory, StreamUtils.BUFFER_SIZE)
90-
.reduce(DataBufferUtils.writeAggregator())
90+
Flux<DataBuffer> flux = DataBufferUtils
91+
.read(ouptputResource, bufferFactory, StreamUtils.BUFFER_SIZE);
92+
return DataBufferUtils.compose(flux)
9193
.flatMap(dataBuffer -> {
9294
CharBuffer charBuffer = DEFAULT_CHARSET.decode(dataBuffer.asByteBuffer());
9395
DataBufferUtils.release(dataBuffer);

spring-webflux/src/test/java/org/springframework/web/reactive/function/BodyInsertersTests.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -332,8 +332,7 @@ public void fromMultipartDataWithMultipleValues() {
332332
Mono<Void> result = inserter.insert(request, this.context);
333333
StepVerifier.create(result).expectComplete().verify();
334334

335-
StepVerifier.create(request.getBody()
336-
.reduce(DataBufferUtils.writeAggregator()))
335+
StepVerifier.create(DataBufferUtils.compose(request.getBody()))
337336
.consumeNextWith(dataBuffer -> {
338337
byte[] resultBytes = new byte[dataBuffer.readableByteCount()];
339338
dataBuffer.read(resultBytes);

0 commit comments

Comments
 (0)