From ef5b32abb016df5fc1677941e84ad7a999bcf0fa Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Tue, 15 Jul 2025 15:00:24 -0700 Subject: [PATCH] netty: Associate netty stream eagerly to avoid client hang In #12185, RPCs were randomly hanging. In #12207 this was tracked down to the headers promise completing successfully, but the netty stream was null. This was because the headers write hadn't completed but stream.close() had been called by goingAway(). --- .../io/grpc/netty/NettyClientHandler.java | 13 ++++++++++++ .../io/grpc/netty/NettyClientHandlerTest.java | 20 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index a5fa0f80077..9e9e0359804 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -768,6 +768,19 @@ public void operationComplete(ChannelFuture future) throws Exception { } } }); + // When the HEADERS are not buffered because of MAX_CONCURRENT_STREAMS in + // StreamBufferingEncoder, the stream is created immediately even if the bytes of the HEADERS + // are delayed because the OS may have too much buffered and isn't accepting the write. The + // write promise is also delayed until flush(). However, we need to associate the netty stream + // with the transport state so that goingAway() and forcefulClose() and able to notify the + // stream of failures. + // + // This leaves a hole when MAX_CONCURRENT_STREAMS is reached, as http2Stream will be null, but + // it is better than nothing. + Http2Stream http2Stream = connection().stream(streamId); + if (http2Stream != null) { + http2Stream.setProperty(streamKey, stream); + } } /** diff --git a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java index f8fbeea9b82..ee458e177ac 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java @@ -453,6 +453,26 @@ public void receivedAbruptGoAwayShouldFailRacingQueuedStreamid() throws Exceptio assertTrue(future.isDone()); } + @Test + public void receivedAbruptGoAwayShouldFailRacingQueuedIoStreamid() throws Exception { + // Purposefully avoid flush(), since we want the write to not actually complete. + // EmbeddedChannel doesn't support flow control, so this is the next closest approximation. + ChannelFuture future = channel().write( + newCreateStreamCommand(grpcHeaders, streamTransportState)); + // Read a GOAWAY that indicates our stream can't be sent + channelRead(goAwayFrame(0, 0 /* NO_ERROR */, Unpooled.copiedBuffer("this is a test", UTF_8))); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); + verify(streamListener).closed(captor.capture(), same(REFUSED), + ArgumentMatchers.notNull()); + assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode()); + assertEquals( + "Abrupt GOAWAY closed sent stream. HTTP/2 error code: NO_ERROR, " + + "debug data: this is a test", + captor.getValue().getDescription()); + assertTrue(future.isDone()); + } + @Test public void receivedGoAway_shouldFailBufferedStreamsExceedingMaxConcurrentStreams() throws Exception {