Skip to content

Commit 8a48353

Browse files
committed
Use doOnDiscard to free internally queued data buffers
Issue: SPR-17246
1 parent 1756f83 commit 8a48353

File tree

5 files changed

+45
-15
lines changed

5 files changed

+45
-15
lines changed

spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ public static Flux<DataBuffer> readByteChannel(
101101
bufferSize);
102102
return Flux.generate(generator);
103103
},
104-
DataBufferUtils::closeChannel
105-
);
104+
DataBufferUtils::closeChannel)
105+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
106106
}
107107

108108
/**
@@ -140,14 +140,16 @@ public static Flux<DataBuffer> readAsynchronousFileChannel(Callable<Asynchronous
140140
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize);
141141
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize);
142142

143-
return Flux.using(channelSupplier,
143+
Flux<DataBuffer> result = Flux.using(channelSupplier,
144144
channel -> Flux.create(sink -> {
145-
CompletionHandler<Integer, DataBuffer> completionHandler =
146-
new AsynchronousFileChannelReadCompletionHandler(channel,
147-
sink, position, dataBufferFactory, bufferSize);
148-
channel.read(byteBuffer, position, dataBuffer, completionHandler);
149-
}),
145+
CompletionHandler<Integer, DataBuffer> completionHandler =
146+
new AsynchronousFileChannelReadCompletionHandler(channel,
147+
sink, position, dataBufferFactory, bufferSize);
148+
channel.read(byteBuffer, position, dataBuffer, completionHandler);
149+
}),
150150
DataBufferUtils::closeChannel);
151+
152+
return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
151153
}
152154

153155
/**
@@ -391,12 +393,19 @@ public static <T extends DataBuffer> T retain(T dataBuffer) {
391393
}
392394

393395
/**
394-
* Release the given data buffer, if it is a {@link PooledDataBuffer}.
396+
* Release the given data buffer, if it is a {@link PooledDataBuffer} and
397+
* has been {@linkplain PooledDataBuffer#isAllocated() allocated}.
395398
* @param dataBuffer the data buffer to release
396399
* @return {@code true} if the buffer was released; {@code false} otherwise.
397400
*/
398401
public static boolean release(@Nullable DataBuffer dataBuffer) {
399-
return (dataBuffer instanceof PooledDataBuffer && ((PooledDataBuffer) dataBuffer).release());
402+
if (dataBuffer instanceof PooledDataBuffer) {
403+
PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer;
404+
if (pooledDataBuffer.isAllocated()) {
405+
return pooledDataBuffer.release();
406+
}
407+
}
408+
return false;
400409
}
401410

402411
/**

spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,11 @@ public OutputStream asOutputStream() {
259259
return new ByteBufOutputStream(this.byteBuf);
260260
}
261261

262+
@Override
263+
public boolean isAllocated() {
264+
return this.byteBuf.refCnt() > 0;
265+
}
266+
262267
@Override
263268
public PooledDataBuffer retain() {
264269
return new NettyDataBuffer(this.byteBuf.retain(), this.dataBufferFactory);

spring-core/src/main/java/org/springframework/core/io/buffer/PooledDataBuffer.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,16 +25,22 @@
2525
*/
2626
public interface PooledDataBuffer extends DataBuffer {
2727

28+
/**
29+
* Return {@code true} if this buffer is allocated; {@code false} if it has been deallocated.
30+
* @since 5.1
31+
*/
32+
boolean isAllocated();
33+
2834
/**
2935
* Increase the reference count for this buffer by one.
3036
* @return this buffer
3137
*/
3238
PooledDataBuffer retain();
3339

3440
/**
35-
* Decrease the reference count for this buffer by one, and release it
41+
* Decrease the reference count for this buffer by one, and deallocate it
3642
* once the count reaches zero.
37-
* @return {@code true} if the buffer was released; {@code false} otherwise.
43+
* @return {@code true} if the buffer was deallocated; {@code false} otherwise.
3844
*/
3945
boolean release();
4046

spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import reactor.netty.http.server.HttpServerRequest;
3030

3131
import org.springframework.core.io.buffer.DataBuffer;
32+
import org.springframework.core.io.buffer.DataBufferUtils;
3233
import org.springframework.core.io.buffer.NettyDataBufferFactory;
34+
import org.springframework.core.io.buffer.PooledDataBuffer;
3335
import org.springframework.http.HttpCookie;
3436
import org.springframework.http.HttpHeaders;
3537
import org.springframework.lang.Nullable;
@@ -153,6 +155,7 @@ public InetSocketAddress getRemoteAddress() {
153155
return this.request.remoteAddress();
154156
}
155157

158+
@Override
156159
@Nullable
157160
protected SslInfo initSslInfo() {
158161
SslHandler sslHandler = ((Connection) this.request).channel().pipeline().get(SslHandler.class);
@@ -165,7 +168,8 @@ protected SslInfo initSslInfo() {
165168

166169
@Override
167170
public Flux<DataBuffer> getBody() {
168-
return this.request.receive().retain().map(this.bufferFactory::wrap);
171+
Flux<DataBuffer> body = this.request.receive().retain().map(this.bufferFactory::wrap);
172+
return body.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
169173
}
170174

171175
@SuppressWarnings("unchecked")

spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ protected SslInfo initSslInfo() {
119119

120120
@Override
121121
public Flux<DataBuffer> getBody() {
122-
return Flux.from(this.body);
122+
return Flux.from(this.body)
123+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
123124
}
124125

125126
@SuppressWarnings("unchecked")
@@ -216,6 +217,11 @@ public UndertowDataBuffer(DataBuffer dataBuffer, PooledByteBuffer pooledByteBuff
216217
this.pooledByteBuffer = pooledByteBuffer;
217218
}
218219

220+
@Override
221+
public boolean isAllocated() {
222+
return this.pooledByteBuffer.isOpen();
223+
}
224+
219225
@Override
220226
public PooledDataBuffer retain() {
221227
return this;

0 commit comments

Comments
 (0)