Skip to content

Commit 384a399

Browse files
committed
Add DataBufferUtils.compose
Added a utility method that composes data buffers into a single buffer. Depending on the `DataBuffer` implementation, the returned buffer may be a single buffer containing all data of the provided buffers, or it may be a true composite that contains references to the buffers. Issue: SPR-16365
1 parent c60313d commit 384a399

File tree

6 files changed

+99
-3
lines changed

6 files changed

+99
-3
lines changed

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferFactory.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.core.io.buffer;
1818

1919
import java.nio.ByteBuffer;
20+
import java.util.List;
2021

2122
/**
2223
* A factory for {@link DataBuffer}s, allowing for allocation and wrapping of
@@ -61,4 +62,14 @@ public interface DataBufferFactory {
6162
*/
6263
DataBuffer wrap(byte[] bytes);
6364

65+
/**
66+
* Create a composite data buffer from the list of provided data buffers. Depending on the
67+
* implementation, the returned buffer may be a single buffer containing all data of the
68+
* provided buffers, or it may be a true composite that contains references to the buffers.
69+
* <p>Note that the given data buffers do <strong>not</strong> have to be released, as they are
70+
* released as part of the returned composite.
71+
* @param dataBuffers the data buffers to be composed
72+
* @return a buffer that composes {@code dataBuffers} into one
73+
*/
74+
DataBuffer compose(List<DataBuffer> dataBuffers);
6475
}

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import reactor.core.publisher.BaseSubscriber;
4040
import reactor.core.publisher.Flux;
4141
import reactor.core.publisher.FluxSink;
42+
import reactor.core.publisher.Mono;
4243
import reactor.core.publisher.SynchronousSink;
4344

4445
import org.springframework.core.io.Resource;
@@ -434,6 +435,27 @@ public static BinaryOperator<DataBuffer> writeAggregator() {
434435
return WRITE_AGGREGATOR;
435436
}
436437

438+
/**
439+
* Composes the buffers in the given {@link Publisher} into a single data buffer. Depending on
440+
* the {@code DataBuffer} implementation, the returned buffer may be a single buffer containing
441+
* all data of the provided buffers, or it may be a true composite that contains references to
442+
* the buffers.
443+
* @param publisher the data buffers that are to be composed
444+
* @return the composed data buffer
445+
*/
446+
public static Mono<DataBuffer> compose(Publisher<DataBuffer> publisher) {
447+
Assert.notNull(publisher, "'publisher' must not be null");
448+
449+
Flux<DataBuffer> source = Flux.from(publisher);
450+
451+
return source.collectList()
452+
.filter(dataBuffers -> !dataBuffers.isEmpty())
453+
.map(dataBuffers -> {
454+
DataBufferFactory bufferFactory = dataBuffers.get(0).factory();
455+
return bufferFactory.compose(dataBuffers);
456+
});
457+
}
458+
437459

438460
private static class ReadableByteChannelGenerator
439461
implements BiFunction<ReadableByteChannel, SynchronousSink<DataBuffer>, ReadableByteChannel> {

spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBufferFactory.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.core.io.buffer;
1818

1919
import java.nio.ByteBuffer;
20+
import java.util.List;
2021

2122
import org.springframework.util.Assert;
2223

@@ -102,6 +103,23 @@ public DataBuffer wrap(byte[] bytes) {
102103
return DefaultDataBuffer.fromFilledByteBuffer(this, wrapper);
103104
}
104105

106+
/**
107+
* {@inheritDoc}
108+
* <p>This implementation creates a single {@link DefaultDataBuffer} to contain the data
109+
* in {@code dataBuffers}.
110+
*/
111+
@Override
112+
public DataBuffer compose(List<DataBuffer> dataBuffers) {
113+
Assert.notEmpty(dataBuffers, "'dataBuffers' must not be empty");
114+
115+
int capacity = dataBuffers.stream()
116+
.mapToInt(DataBuffer::readableByteCount)
117+
.sum();
118+
DefaultDataBuffer dataBuffer = allocateBuffer(capacity);
119+
return dataBuffers.stream()
120+
.reduce(dataBuffer, DataBuffer::write);
121+
}
122+
105123
@Override
106124
public String toString() {
107125
return "DefaultDataBufferFactory (preferDirect=" + this.preferDirect + ")";

spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBufferFactory.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,9 +17,11 @@
1717
package org.springframework.core.io.buffer;
1818

1919
import java.nio.ByteBuffer;
20+
import java.util.List;
2021

2122
import io.netty.buffer.ByteBuf;
2223
import io.netty.buffer.ByteBufAllocator;
24+
import io.netty.buffer.CompositeByteBuf;
2325
import io.netty.buffer.Unpooled;
2426

2527
import org.springframework.util.Assert;
@@ -80,6 +82,22 @@ public DataBuffer wrap(byte[] bytes) {
8082
return new NettyDataBuffer(byteBuf, this);
8183
}
8284

85+
/**
86+
* {@inheritDoc}
87+
* <p>This implementation uses Netty's {@link CompositeByteBuf}.
88+
*/
89+
@Override
90+
public DataBuffer compose(List<DataBuffer> dataBuffers) {
91+
Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
92+
CompositeByteBuf composite = this.byteBufAllocator.compositeBuffer(dataBuffers.size());
93+
for (DataBuffer dataBuffer : dataBuffers) {
94+
Assert.isInstanceOf(NettyDataBuffer.class, dataBuffer);
95+
NettyDataBuffer nettyDataBuffer = (NettyDataBuffer) dataBuffer;
96+
composite.addComponent(true, nettyDataBuffer.getNativeBuffer());
97+
}
98+
return new NettyDataBuffer(composite, this);
99+
}
100+
83101
/**
84102
* Wrap the given Netty {@link ByteBuf} in a {@code NettyDataBuffer}.
85103
* @param byteBuf the Netty byte buffer to wrap

spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,5 +479,18 @@ public void spr16351() {
479479
release(buffer);
480480
}
481481

482+
@Test
483+
public void composite() {
484+
DataBuffer composite = this.bufferFactory.compose(Arrays.asList(stringBuffer("a"),
485+
stringBuffer("b"), stringBuffer("c")));
486+
assertEquals(3, composite.readableByteCount());
487+
byte[] bytes = new byte[3];
488+
composite.read(bytes);
489+
490+
assertArrayEquals(new byte[] {'a','b','c'}, bytes);
491+
492+
release(composite);
493+
}
494+
482495

483496
}

spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,5 +351,19 @@ private Answer<Integer> putByte(int b) {
351351
};
352352
}
353353

354+
@Test
355+
public void compose() {
356+
DataBuffer foo = stringBuffer("foo");
357+
DataBuffer bar = stringBuffer("bar");
358+
DataBuffer baz = stringBuffer("baz");
359+
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
360+
361+
DataBuffer result = DataBufferUtils.compose(flux).block(Duration.ofSeconds(5));
362+
363+
assertEquals("foobarbaz", DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8));
364+
365+
release(result);
366+
}
367+
354368

355369
}

0 commit comments

Comments
 (0)