Skip to content

Commit 196c0ad

Browse files
committed
Fixed DataBufferUtils.join leak for error in source
This commit fixes an issue where DataBufferUtils.join() would not release databuffers that preceded an error signal. Issue: SPR-17025
1 parent 3067682 commit 196c0ad

File tree

2 files changed

+185
-5
lines changed

2 files changed

+185
-5
lines changed

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 167 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.atomic.AtomicBoolean;
3333
import java.util.concurrent.atomic.AtomicLong;
3434
import java.util.function.Consumer;
35+
import java.util.function.IntPredicate;
3536

3637
import org.reactivestreams.Publisher;
3738
import org.reactivestreams.Subscription;
@@ -473,6 +474,9 @@ public static Consumer<DataBuffer> releaseConsumer() {
473474
* Depending on the {@link DataBuffer} implementation, the returned buffer may be a single
474475
* buffer containing all data of the provided buffers, or it may be a true composite that
475476
* contains references to the buffers.
477+
* <p>If {@code dataBuffers} contains an error signal, then all buffers that preceded the error
478+
* will be {@linkplain #release(DataBuffer) released}, and the error is stored in the
479+
* returned {@code Mono}.
476480
* @param dataBuffers the data buffers that are to be composed
477481
* @return a buffer that is composed from the {@code dataBuffers} argument
478482
* @since 5.0.3
@@ -481,14 +485,26 @@ public static Mono<DataBuffer> join(Publisher<DataBuffer> dataBuffers) {
481485
Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
482486

483487
return Flux.from(dataBuffers)
488+
.onErrorResume(DataBufferUtils::exceptionDataBuffer)
484489
.collectList()
485490
.filter(list -> !list.isEmpty())
486-
.map(list -> {
491+
.flatMap(list -> {
492+
for (int i = 0; i < list.size(); i++) {
493+
DataBuffer dataBuffer = list.get(i);
494+
if (dataBuffer instanceof ExceptionDataBuffer) {
495+
list.subList(0, i).forEach(DataBufferUtils::release);
496+
return Mono.error(((ExceptionDataBuffer) dataBuffer).throwable());
497+
}
498+
}
487499
DataBufferFactory bufferFactory = list.get(0).factory();
488-
return bufferFactory.join(list);
500+
return Mono.just(bufferFactory.join(list));
489501
});
490502
}
491503

504+
private static Mono<DataBuffer> exceptionDataBuffer(Throwable throwable) {
505+
return Mono.just(new ExceptionDataBuffer(throwable));
506+
}
507+
492508

493509
private static class ReadableByteChannelGenerator implements Consumer<SynchronousSink<DataBuffer>> {
494510

@@ -658,4 +674,153 @@ public void failed(Throwable exc, ByteBuffer byteBuffer) {
658674
}
659675
}
660676

677+
/**
678+
* DataBuffer implementation that holds a {@link Throwable}, used in {@link #join(Publisher)}.
679+
*/
680+
private static final class ExceptionDataBuffer implements DataBuffer {
681+
682+
private final Throwable throwable;
683+
684+
685+
public ExceptionDataBuffer(Throwable throwable) {
686+
this.throwable = throwable;
687+
}
688+
689+
public Throwable throwable() {
690+
return this.throwable;
691+
}
692+
693+
// Unsupported
694+
695+
@Override
696+
public DataBufferFactory factory() {
697+
throw new UnsupportedOperationException();
698+
}
699+
700+
@Override
701+
public int indexOf(IntPredicate predicate, int fromIndex) {
702+
throw new UnsupportedOperationException();
703+
}
704+
705+
@Override
706+
public int lastIndexOf(IntPredicate predicate, int fromIndex) {
707+
throw new UnsupportedOperationException();
708+
}
709+
710+
@Override
711+
public int readableByteCount() {
712+
throw new UnsupportedOperationException();
713+
}
714+
715+
@Override
716+
public int writableByteCount() {
717+
throw new UnsupportedOperationException();
718+
}
719+
720+
@Override
721+
public int capacity() {
722+
throw new UnsupportedOperationException();
723+
}
724+
725+
@Override
726+
public DataBuffer capacity(int capacity) {
727+
throw new UnsupportedOperationException();
728+
}
729+
730+
@Override
731+
public int readPosition() {
732+
throw new UnsupportedOperationException();
733+
}
734+
735+
@Override
736+
public DataBuffer readPosition(int readPosition) {
737+
throw new UnsupportedOperationException();
738+
}
739+
740+
@Override
741+
public int writePosition() {
742+
throw new UnsupportedOperationException();
743+
}
744+
745+
@Override
746+
public DataBuffer writePosition(int writePosition) {
747+
throw new UnsupportedOperationException();
748+
}
749+
750+
@Override
751+
public byte getByte(int index) {
752+
throw new UnsupportedOperationException();
753+
}
754+
755+
@Override
756+
public byte read() {
757+
throw new UnsupportedOperationException();
758+
}
759+
760+
@Override
761+
public DataBuffer read(byte[] destination) {
762+
throw new UnsupportedOperationException();
763+
}
764+
765+
@Override
766+
public DataBuffer read(byte[] destination, int offset, int length) {
767+
throw new UnsupportedOperationException();
768+
}
769+
770+
@Override
771+
public DataBuffer write(byte b) {
772+
throw new UnsupportedOperationException();
773+
}
774+
775+
@Override
776+
public DataBuffer write(byte[] source) {
777+
throw new UnsupportedOperationException();
778+
}
779+
780+
@Override
781+
public DataBuffer write(byte[] source, int offset, int length) {
782+
throw new UnsupportedOperationException();
783+
}
784+
785+
@Override
786+
public DataBuffer write(DataBuffer... buffers) {
787+
throw new UnsupportedOperationException();
788+
}
789+
790+
@Override
791+
public DataBuffer write(ByteBuffer... buffers) {
792+
throw new UnsupportedOperationException();
793+
}
794+
795+
@Override
796+
public DataBuffer slice(int index, int length) {
797+
throw new UnsupportedOperationException();
798+
}
799+
800+
@Override
801+
public ByteBuffer asByteBuffer() {
802+
throw new UnsupportedOperationException();
803+
}
804+
805+
@Override
806+
public ByteBuffer asByteBuffer(int index, int length) {
807+
throw new UnsupportedOperationException();
808+
}
809+
810+
@Override
811+
public InputStream asInputStream() {
812+
throw new UnsupportedOperationException();
813+
}
814+
815+
@Override
816+
public InputStream asInputStream(boolean releaseOnClose) {
817+
throw new UnsupportedOperationException();
818+
}
819+
820+
@Override
821+
public OutputStream asOutputStream() {
822+
throw new UnsupportedOperationException();
823+
}
824+
}
825+
661826
}

spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.junit.Test;
3636
import org.mockito.stubbing.Answer;
3737
import reactor.core.publisher.Flux;
38+
import reactor.core.publisher.Mono;
3839
import reactor.test.StepVerifier;
3940

4041
import org.springframework.core.io.ClassPathResource;
@@ -338,12 +339,26 @@ public void join() {
338339
DataBuffer bar = stringBuffer("bar");
339340
DataBuffer baz = stringBuffer("baz");
340341
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
342+
Mono<DataBuffer> result = DataBufferUtils.join(flux);
341343

342-
DataBuffer result = DataBufferUtils.join(flux).block(Duration.ofSeconds(5));
344+
StepVerifier.create(result)
345+
.consumeNextWith(dataBuffer -> {
346+
assertEquals("foobarbaz", DataBufferTestUtils.dumpString(dataBuffer, StandardCharsets.UTF_8));
347+
release(dataBuffer);
348+
})
349+
.verifyComplete();
350+
}
343351

344-
assertEquals("foobarbaz", DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8));
352+
@Test
353+
public void joinErrors() {
354+
DataBuffer foo = stringBuffer("foo");
355+
DataBuffer bar = stringBuffer("bar");
356+
Flux<DataBuffer> flux = Flux.just(foo, bar).mergeWith(Flux.error(new RuntimeException()));
357+
Mono<DataBuffer> result = DataBufferUtils.join(flux);
345358

346-
release(result);
359+
StepVerifier.create(result)
360+
.expectError(RuntimeException.class)
361+
.verify();
347362
}
348363

349364
}

0 commit comments

Comments
 (0)