Description
Affects: 5.2.7.RELEASE
I’m having a memory leak in a real application, which I tried to reproduce with the sample: https://github.com/EtienneMiret/reactor-netty-oom.
In this sample, having a flux of integers, I call the below method on each of them and then aggregate the results:
public Mono<Long> get (int i) {
return webClient.get ()
.uri ("/{index}", i)
.exchange ()
.delayElement (Duration.ofMillis (200))
.flatMap (response -> response.bodyToMono (ByteArrayResource.class))
.map (ByteArrayResource::contentLength);
}
When the server is behaving properly, all is fine, but in case it aborts the TCP connection, this code starts leaking memory. My bet is that the network error triggers an exception which cancels the Flux, thus preventing the response -> response.bodyToMono (ByteArrayResource.class)
lambda from being called. The .delayElement ()
operator is an attempt at making it more likely that the ClientResponse is built and not consumed.
If I understand properly ClientResponse’s Javadoc and #21801, my code is correctly consuming the response, and error cases (such as IO error or pipeline cancellation) should be handled by Spring. Is this indeed the case?
After the first connection reset from the server, I have the below stack:
reactor.core.publisher.Operators Operator called default onErrorDropped
java.lang.NullPointerException: null
at reactor.core.publisher.MonoCollect$CollectSubscriber.onNext(MonoCollect.java:124) ~[reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Body from GET http://localhost:7890/52 [DefaultClientResponse]
Stack trace:
at reactor.core.publisher.MonoCollect$CollectSubscriber.onNext(MonoCollect.java:124) [reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) [reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) [reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) [reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) [reactor-core-3.3.6.RELEASE.jar:3.3.6.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:330) [reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:353) [reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:605) [reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) [reactor-netty-0.9.8.RELEASE.jar:0.9.8.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [netty-handler-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [netty-codec-4.1.50.Final.jar:4.1.50.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [netty-codec-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [netty-transport-4.1.50.Final.jar:4.1.50.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [netty-common-4.1.50.Final.jar:4.1.50.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.50.Final.jar:4.1.50.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.50.Final.jar:4.1.50.Final]
at java.lang.Thread.run(Thread.java:830) [?:?]
And later (when the GC runs I guess), I have a number of:
io.netty.util.ResourceLeakDetector LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
See full error.log.
I’m still not sure where the issue is (my code? Spring? Reactor-netty?). Sorry if this turns out not to be a Spring bug.