Skip to content

Commit eac9e66

Browse files
committed
Fix memory leak when canceling read from AsynchronousFileChannel
This commit fixes a memory leak that occurs when reading from a AsynchronousFileChannel, and cancelling the subscription. Issue: SPR-17419
1 parent 28cf7b7 commit eac9e66

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,14 @@ public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<Asynchronous
139139
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
140140
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
141141

142+
142143
Flux<DataBuffer> result = Flux.using(channelSupplier,
143144
channel -> Flux.create(sink -> {
144-
CompletionHandler<Integer, DataBuffer> completionHandler =
145+
AsynchronousFileChannelReadCompletionHandler completionHandler =
145146
new AsynchronousFileChannelReadCompletionHandler(channel,
146147
sink, position, dataBufferFactory, bufferSize);
147148
channel.read(byteBuffer, position, dataBuffer, completionHandler);
149+
sink.onDispose(completionHandler::dispose);
148150
}),
149151
DataBufferUtils::closeChannel);
150152

@@ -545,6 +547,10 @@ public void failed(Throwable exc, DataBuffer dataBuffer) {
545547
release(dataBuffer);
546548
this.sink.error(exc);
547549
}
550+
551+
public void dispose() {
552+
this.disposed.set(true);
553+
}
548554
}
549555

550556

spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import reactor.core.publisher.Mono;
4141
import reactor.test.StepVerifier;
4242

43+
import org.springframework.core.io.ByteArrayResource;
4344
import org.springframework.core.io.ClassPathResource;
4445
import org.springframework.core.io.Resource;
4546
import org.springframework.core.io.buffer.support.DataBufferTestUtils;
@@ -142,6 +143,36 @@ public void readResourcePosition() throws Exception {
142143
.verify(Duration.ofSeconds(5));
143144
}
144145

146+
@Test
147+
public void readResourcePositionAndTakeUntil() throws Exception {
148+
Resource resource = new ClassPathResource("DataBufferUtilsTests.txt", getClass());
149+
Flux<DataBuffer> flux = DataBufferUtils.read(resource, 3, this.bufferFactory, 3);
150+
151+
flux = DataBufferUtils.takeUntilByteCount(flux, 5);
152+
153+
154+
StepVerifier.create(flux)
155+
.consumeNextWith(stringConsumer("bar"))
156+
.consumeNextWith(stringConsumer("ba"))
157+
.expectComplete()
158+
.verify(Duration.ofSeconds(5));
159+
}
160+
161+
@Test
162+
public void readByteArrayResourcePositionAndTakeUntil() throws Exception {
163+
Resource resource = new ByteArrayResource("foobarbazqux" .getBytes());
164+
Flux<DataBuffer> flux = DataBufferUtils.read(resource, 3, this.bufferFactory, 3);
165+
166+
flux = DataBufferUtils.takeUntilByteCount(flux, 5);
167+
168+
169+
StepVerifier.create(flux)
170+
.consumeNextWith(stringConsumer("bar"))
171+
.consumeNextWith(stringConsumer("ba"))
172+
.expectComplete()
173+
.verify(Duration.ofSeconds(5));
174+
}
175+
145176
@Test
146177
public void writeOutputStream() throws Exception {
147178
DataBuffer foo = stringBuffer("foo");

0 commit comments

Comments
 (0)