Skip to content

Commit a006073

Browse files
committed
Fixed DataBufferUtils.join leak for error in source
This commit fixes an issue where DataBufferUtils.join() would not release databuffers that preceded an error signal. Issue: SPR-17025 (cherry picked from commit 196c0ad)
1 parent 1d58fac commit a006073

File tree

2 files changed

+186
-5
lines changed

2 files changed

+186
-5
lines changed

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 168 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.atomic.AtomicBoolean;
3333
import java.util.concurrent.atomic.AtomicLong;
3434
import java.util.function.Consumer;
35+
import java.util.function.IntPredicate;
3536

3637
import org.reactivestreams.Publisher;
3738
import org.reactivestreams.Subscription;
@@ -473,6 +474,9 @@ public static Consumer<DataBuffer> releaseConsumer() {
473474
* Depending on the {@link DataBuffer} implementation, the returned buffer may be a single
474475
* buffer containing all data of the provided buffers, or it may be a true composite that
475476
* contains references to the buffers.
477+
* <p>If {@code dataBuffers} contains an error signal, then all buffers that preceded the error
478+
* will be {@linkplain #release(DataBuffer) released}, and the error is stored in the
479+
* returned {@code Mono}.
476480
* @param dataBuffers the data buffers that are to be composed
477481
* @return a buffer that is composed from the {@code dataBuffers} argument
478482
* @since 5.0.3
@@ -481,14 +485,26 @@ public static Mono<DataBuffer> join(Publisher<DataBuffer> dataBuffers) {
481485
Assert.notNull(dataBuffers, "'dataBuffers' must not be null");
482486

483487
return Flux.from(dataBuffers)
488+
.onErrorResume(DataBufferUtils::exceptionDataBuffer)
484489
.collectList()
485490
.filter(list -> !list.isEmpty())
486-
.map(list -> {
491+
.flatMap(list -> {
492+
for (int i = 0; i < list.size(); i++) {
493+
DataBuffer dataBuffer = list.get(i);
494+
if (dataBuffer instanceof ExceptionDataBuffer) {
495+
list.subList(0, i).forEach(DataBufferUtils::release);
496+
return Mono.error(((ExceptionDataBuffer) dataBuffer).throwable());
497+
}
498+
}
487499
DataBufferFactory bufferFactory = list.get(0).factory();
488-
return bufferFactory.join(list);
500+
return Mono.just(bufferFactory.join(list));
489501
});
490502
}
491503

504+
private static Mono<DataBuffer> exceptionDataBuffer(Throwable throwable) {
505+
return Mono.just(new ExceptionDataBuffer(throwable));
506+
}
507+
492508

493509
private static class ReadableByteChannelGenerator implements Consumer<SynchronousSink<DataBuffer>> {
494510

@@ -658,4 +674,154 @@ public void failed(Throwable exc, ByteBuffer byteBuffer) {
658674
}
659675
}
660676

677+
678+
/**
679+
* DataBuffer implementation that holds a {@link Throwable}, used in {@link #join(Publisher)}.
680+
*/
681+
private static final class ExceptionDataBuffer implements DataBuffer {
682+
683+
private final Throwable throwable;
684+
685+
686+
public ExceptionDataBuffer(Throwable throwable) {
687+
this.throwable = throwable;
688+
}
689+
690+
public Throwable throwable() {
691+
return this.throwable;
692+
}
693+
694+
// Unsupported
695+
696+
@Override
697+
public DataBufferFactory factory() {
698+
throw new UnsupportedOperationException();
699+
}
700+
701+
@Override
702+
public int indexOf(IntPredicate predicate, int fromIndex) {
703+
throw new UnsupportedOperationException();
704+
}
705+
706+
@Override
707+
public int lastIndexOf(IntPredicate predicate, int fromIndex) {
708+
throw new UnsupportedOperationException();
709+
}
710+
711+
@Override
712+
public int readableByteCount() {
713+
throw new UnsupportedOperationException();
714+
}
715+
716+
@Override
717+
public int writableByteCount() {
718+
throw new UnsupportedOperationException();
719+
}
720+
721+
@Override
722+
public int capacity() {
723+
throw new UnsupportedOperationException();
724+
}
725+
726+
@Override
727+
public DataBuffer capacity(int capacity) {
728+
throw new UnsupportedOperationException();
729+
}
730+
731+
@Override
732+
public int readPosition() {
733+
throw new UnsupportedOperationException();
734+
}
735+
736+
@Override
737+
public DataBuffer readPosition(int readPosition) {
738+
throw new UnsupportedOperationException();
739+
}
740+
741+
@Override
742+
public int writePosition() {
743+
throw new UnsupportedOperationException();
744+
}
745+
746+
@Override
747+
public DataBuffer writePosition(int writePosition) {
748+
throw new UnsupportedOperationException();
749+
}
750+
751+
@Override
752+
public byte getByte(int index) {
753+
throw new UnsupportedOperationException();
754+
}
755+
756+
@Override
757+
public byte read() {
758+
throw new UnsupportedOperationException();
759+
}
760+
761+
@Override
762+
public DataBuffer read(byte[] destination) {
763+
throw new UnsupportedOperationException();
764+
}
765+
766+
@Override
767+
public DataBuffer read(byte[] destination, int offset, int length) {
768+
throw new UnsupportedOperationException();
769+
}
770+
771+
@Override
772+
public DataBuffer write(byte b) {
773+
throw new UnsupportedOperationException();
774+
}
775+
776+
@Override
777+
public DataBuffer write(byte[] source) {
778+
throw new UnsupportedOperationException();
779+
}
780+
781+
@Override
782+
public DataBuffer write(byte[] source, int offset, int length) {
783+
throw new UnsupportedOperationException();
784+
}
785+
786+
@Override
787+
public DataBuffer write(DataBuffer... buffers) {
788+
throw new UnsupportedOperationException();
789+
}
790+
791+
@Override
792+
public DataBuffer write(ByteBuffer... buffers) {
793+
throw new UnsupportedOperationException();
794+
}
795+
796+
@Override
797+
public DataBuffer slice(int index, int length) {
798+
throw new UnsupportedOperationException();
799+
}
800+
801+
@Override
802+
public ByteBuffer asByteBuffer() {
803+
throw new UnsupportedOperationException();
804+
}
805+
806+
@Override
807+
public ByteBuffer asByteBuffer(int index, int length) {
808+
throw new UnsupportedOperationException();
809+
}
810+
811+
@Override
812+
public InputStream asInputStream() {
813+
throw new UnsupportedOperationException();
814+
}
815+
816+
@Override
817+
public InputStream asInputStream(boolean releaseOnClose) {
818+
throw new UnsupportedOperationException();
819+
}
820+
821+
@Override
822+
public OutputStream asOutputStream() {
823+
throw new UnsupportedOperationException();
824+
}
825+
}
826+
661827
}

spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.junit.Test;
3636
import org.mockito.stubbing.Answer;
3737
import reactor.core.publisher.Flux;
38+
import reactor.core.publisher.Mono;
3839
import reactor.test.StepVerifier;
3940

4041
import org.springframework.core.io.ClassPathResource;
@@ -338,12 +339,26 @@ public void join() {
338339
DataBuffer bar = stringBuffer("bar");
339340
DataBuffer baz = stringBuffer("baz");
340341
Flux<DataBuffer> flux = Flux.just(foo, bar, baz);
342+
Mono<DataBuffer> result = DataBufferUtils.join(flux);
341343

342-
DataBuffer result = DataBufferUtils.join(flux).block(Duration.ofSeconds(5));
344+
StepVerifier.create(result)
345+
.consumeNextWith(dataBuffer -> {
346+
assertEquals("foobarbaz", DataBufferTestUtils.dumpString(dataBuffer, StandardCharsets.UTF_8));
347+
release(dataBuffer);
348+
})
349+
.verifyComplete();
350+
}
343351

344-
assertEquals("foobarbaz", DataBufferTestUtils.dumpString(result, StandardCharsets.UTF_8));
352+
@Test
353+
public void joinErrors() {
354+
DataBuffer foo = stringBuffer("foo");
355+
DataBuffer bar = stringBuffer("bar");
356+
Flux<DataBuffer> flux = Flux.just(foo, bar).concatWith(Flux.error(new RuntimeException()));
357+
Mono<DataBuffer> result = DataBufferUtils.join(flux);
345358

346-
release(result);
359+
StepVerifier.create(result)
360+
.expectError(RuntimeException.class)
361+
.verify();
347362
}
348363

349364
}

0 commit comments

Comments
 (0)