diff --git a/lib/netty b/lib/netty index c9e5238ea61..1c6b3307bec 160000 --- a/lib/netty +++ b/lib/netty @@ -1 +1 @@ -Subproject commit c9e5238ea61b753f4ebcd84249f853857d8c1eae +Subproject commit 1c6b3307becc3b0c19fdbac6a7058bd731a4db2c diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientHandler.java b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientHandler.java index 9cd20ecab97..ad660dd1e1a 100644 --- a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientHandler.java +++ b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientHandler.java @@ -31,6 +31,7 @@ package com.google.net.stubby.transport.netty; +import com.google.common.base.Preconditions; import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; @@ -48,10 +49,8 @@ import io.netty.handler.codec.http2.Http2FrameReader; import io.netty.handler.codec.http2.Http2FrameWriter; import io.netty.handler.codec.http2.Http2Headers; -import io.netty.handler.codec.http2.Http2InboundFlowController; -import io.netty.handler.codec.http2.Http2OutboundFlowController; +import io.netty.handler.codec.http2.Http2LocalFlowController; import io.netty.handler.codec.http2.Http2Stream; -import io.netty.handler.codec.http2.Http2StreamException; import java.util.ArrayDeque; import java.util.Deque; @@ -81,15 +80,17 @@ public PendingStream(CreateStreamCommand command, ChannelPromise promise) { } private final Deque pendingStreams = new ArrayDeque(); + private final Http2LocalFlowController inboundFlow; private Throwable connectionError; private ChannelHandlerContext ctx; public NettyClientHandler(Http2Connection connection, Http2FrameReader frameReader, Http2FrameWriter frameWriter, - Http2InboundFlowController inboundFlow, - Http2OutboundFlowController outboundFlow) { - super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, new LazyFrameListener()); + Http2LocalFlowController inboundFlow) { + super(connection, frameReader, frameWriter, new LazyFrameListener()); + this.inboundFlow = Preconditions.checkNotNull(inboundFlow, "inboundFlow"); + initListener(); // Disallow stream creation by the server. @@ -148,7 +149,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) void returnProcessedBytes(int streamId, int bytes) { try { Http2Stream http2Stream = connection().requireStream(streamId); - http2Stream.garbageCollector().returnProcessedBytes(ctx, bytes); + inboundFlow.consumeBytes(ctx, http2Stream, bytes); } catch (Http2Exception e) { throw new RuntimeException(e); } @@ -215,7 +216,7 @@ protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause, @Override protected void onStreamError(ChannelHandlerContext ctx, Throwable cause, - Http2StreamException http2Ex) { + Http2Exception.StreamException http2Ex) { // Close the stream with a status that contains the cause. Http2Stream stream = connection().stream(http2Ex.streamId()); if (stream != null) { diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientTransport.java b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientTransport.java index 9f3de126bb9..88c4d6b5d11 100644 --- a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientTransport.java +++ b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientTransport.java @@ -56,8 +56,7 @@ import io.netty.handler.codec.http2.DefaultHttp2Connection; import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; -import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController; -import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController; +import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController; import io.netty.handler.codec.http2.DefaultHttp2StreamRemovalPolicy; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2FrameLogger; @@ -65,7 +64,6 @@ import io.netty.handler.codec.http2.Http2FrameWriter; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2InboundFrameLogger; -import io.netty.handler.codec.http2.Http2OutboundFlowController; import io.netty.handler.codec.http2.Http2OutboundFrameLogger; import io.netty.handler.ssl.SslContext; import io.netty.util.internal.logging.InternalLogLevel; @@ -250,10 +248,8 @@ private static NettyClientHandler newHandler() { frameReader = new Http2InboundFrameLogger(frameReader, frameLogger); frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger); - DefaultHttp2InboundFlowController inboundFlow = - new DefaultHttp2InboundFlowController(connection, frameWriter); - Http2OutboundFlowController outboundFlow = - new DefaultHttp2OutboundFlowController(connection, frameWriter); - return new NettyClientHandler(connection, frameReader, frameWriter, inboundFlow, outboundFlow); + DefaultHttp2LocalFlowController inboundFlow = + new DefaultHttp2LocalFlowController(connection, frameWriter); + return new NettyClientHandler(connection, frameReader, frameWriter, inboundFlow); } } diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerHandler.java b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerHandler.java index 2c3a4214ae0..5cc3721fda9 100644 --- a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerHandler.java +++ b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerHandler.java @@ -56,14 +56,13 @@ import io.netty.handler.codec.http2.Http2ConnectionHandler; import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2Exception.StreamException; import io.netty.handler.codec.http2.Http2FrameAdapter; import io.netty.handler.codec.http2.Http2FrameReader; import io.netty.handler.codec.http2.Http2FrameWriter; import io.netty.handler.codec.http2.Http2Headers; -import io.netty.handler.codec.http2.Http2InboundFlowController; -import io.netty.handler.codec.http2.Http2OutboundFlowController; +import io.netty.handler.codec.http2.Http2LocalFlowController; import io.netty.handler.codec.http2.Http2Stream; -import io.netty.handler.codec.http2.Http2StreamException; import io.netty.util.ReferenceCountUtil; import java.util.logging.Level; @@ -82,6 +81,7 @@ class NettyServerHandler extends Http2ConnectionHandler { private static final Status GOAWAY_STATUS = Status.UNAVAILABLE; private final ServerTransportListener transportListener; + private final Http2LocalFlowController inboundFlow; private Throwable connectionError; private ChannelHandlerContext ctx; private boolean teWarningLogged; @@ -90,10 +90,10 @@ class NettyServerHandler extends Http2ConnectionHandler { Http2Connection connection, Http2FrameReader frameReader, Http2FrameWriter frameWriter, - Http2InboundFlowController inboundFlow, - Http2OutboundFlowController outboundFlow) { - super(connection, frameReader, frameWriter, inboundFlow, outboundFlow, new LazyFrameListener()); + Http2LocalFlowController inboundFlow) { + super(connection, frameReader, frameWriter, new LazyFrameListener()); this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener"); + this.inboundFlow = Preconditions.checkNotNull(inboundFlow, "inboundFlow"); initListener(); connection.local().allowPushTo(false); } @@ -189,9 +189,9 @@ protected void onConnectionError(ChannelHandlerContext ctx, Throwable cause, @Override protected void onStreamError(ChannelHandlerContext ctx, Throwable cause, - Http2StreamException http2Ex) { + StreamException http2Ex) { logger.log(Level.WARNING, "Stream Error", cause); - Http2Stream stream = connection().stream(http2Ex.streamId()); + Http2Stream stream = connection().stream(http2Ex.streamId(http2Ex)); if (stream != null) { // Abort the stream with a status to help the client with debugging. // Don't need to send a RST_STREAM since the end-of-stream flag will @@ -240,7 +240,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) void returnProcessedBytes(int streamId, int bytes) { try { Http2Stream http2Stream = connection().requireStream(streamId); - http2Stream.garbageCollector().returnProcessedBytes(ctx, bytes); + inboundFlow.consumeBytes(ctx, http2Stream, bytes); } catch (Http2Exception e) { throw new RuntimeException(e); } @@ -305,25 +305,25 @@ public void operationComplete(ChannelFuture future) throws Exception { }); } - private String determineMethod(int streamId, Http2Headers headers) throws Http2StreamException { + private String determineMethod(int streamId, Http2Headers headers) throws Http2Exception { if (!HTTP_METHOD.equals(headers.method())) { - throw new Http2StreamException(streamId, Http2Error.REFUSED_STREAM, - String.format("Method '%s' is not supported", headers.method())); + throw Http2Exception.streamError(streamId, Http2Error.REFUSED_STREAM, + "Method '%s' is not supported", headers.method()); } checkHeader(streamId, headers, CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC); String methodName = TransportFrameUtil.getFullMethodNameFromPath(headers.path().toString()); if (methodName == null) { - throw new Http2StreamException(streamId, Http2Error.REFUSED_STREAM, - String.format("Malformatted path: %s", headers.path())); + throw Http2Exception.streamError(streamId, Http2Error.REFUSED_STREAM, + "Malformatted path: %s", headers.path()); } return methodName; } private static void checkHeader(int streamId, Http2Headers headers, - AsciiString header, AsciiString expectedValue) throws Http2StreamException { + AsciiString header, AsciiString expectedValue) throws Http2Exception { if (!expectedValue.equals(headers.get(header))) { - throw new Http2StreamException(streamId, Http2Error.REFUSED_STREAM, String.format( - "Header '%s'='%s', while '%s' is expected", header, headers.get(header), expectedValue)); + throw Http2Exception.streamError(streamId, Http2Error.REFUSED_STREAM, + "Header '%s'='%s', while '%s' is expected", header, headers.get(header), expectedValue); } } @@ -334,8 +334,8 @@ private NettyServerStream serverStream(Http2Stream stream) { return stream.getProperty(NettyServerStream.class); } - private Http2StreamException newStreamException(int streamId, Throwable cause) { - return new Http2StreamException(streamId, Http2Error.INTERNAL_ERROR, cause.getMessage(), cause); + private Http2Exception newStreamException(int streamId, Throwable cause) { + return Http2Exception.streamError(streamId, Http2Error.INTERNAL_ERROR, cause.getMessage(), cause); } private static class LazyFrameListener extends Http2FrameAdapter { diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerTransport.java b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerTransport.java index 1dfb17ac42c..ba4eea12d0b 100644 --- a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerTransport.java +++ b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerTransport.java @@ -42,15 +42,13 @@ import io.netty.handler.codec.http2.DefaultHttp2Connection; import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; -import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController; -import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController; +import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController; import io.netty.handler.codec.http2.DefaultHttp2StreamRemovalPolicy; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2FrameLogger; import io.netty.handler.codec.http2.Http2FrameReader; import io.netty.handler.codec.http2.Http2FrameWriter; import io.netty.handler.codec.http2.Http2InboundFrameLogger; -import io.netty.handler.codec.http2.Http2OutboundFlowController; import io.netty.handler.codec.http2.Http2OutboundFrameLogger; import io.netty.handler.ssl.SslContext; import io.netty.util.internal.logging.InternalLogLevel; @@ -129,15 +127,12 @@ private NettyServerHandler createHandler(ServerTransportListener transportListen Http2FrameWriter frameWriter = new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger); - DefaultHttp2InboundFlowController inboundFlow = - new DefaultHttp2InboundFlowController(connection, frameWriter); - Http2OutboundFlowController outboundFlow = - new DefaultHttp2OutboundFlowController(connection, frameWriter); + DefaultHttp2LocalFlowController inboundFlow = + new DefaultHttp2LocalFlowController(connection, frameWriter); return new NettyServerHandler(transportListener, connection, frameReader, frameWriter, - inboundFlow, - outboundFlow); + inboundFlow); } } diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientHandlerTest.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientHandlerTest.java index 615443f8473..b8961860f02 100644 --- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientHandlerTest.java @@ -60,15 +60,13 @@ import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2Headers; -import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController; -import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController; +import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController; import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2Error; import io.netty.handler.codec.http2.Http2FrameReader; import io.netty.handler.codec.http2.Http2FrameWriter; import io.netty.handler.codec.http2.Http2Headers; -import io.netty.handler.codec.http2.Http2OutboundFlowController; import io.netty.handler.codec.http2.Http2Settings; import org.junit.Before; @@ -314,15 +312,12 @@ private static NettyClientHandler newHandler() { Http2Connection connection = new DefaultHttp2Connection(false); Http2FrameReader frameReader = new DefaultHttp2FrameReader(); Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); - DefaultHttp2InboundFlowController inboundFlow = - new DefaultHttp2InboundFlowController(connection, frameWriter); - Http2OutboundFlowController outboundFlow = - new DefaultHttp2OutboundFlowController(connection, frameWriter); + DefaultHttp2LocalFlowController inboundFlow = + new DefaultHttp2LocalFlowController(connection, frameWriter); return new NettyClientHandler(connection, frameReader, frameWriter, - inboundFlow, - outboundFlow); + inboundFlow); } private AsciiString as(String string) { diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerHandlerTest.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerHandlerTest.java index 0bfd8a65a3d..63327144d8e 100644 --- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerHandlerTest.java @@ -38,7 +38,7 @@ import static com.google.net.stubby.transport.netty.Utils.TE_HEADER; import static com.google.net.stubby.transport.netty.Utils.TE_TRAILERS; import static io.netty.handler.codec.http2.Http2CodecUtil.toByteBuf; -import static io.netty.handler.codec.http2.Http2Exception.protocolError; +import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; @@ -68,15 +68,14 @@ import io.netty.handler.codec.http2.DefaultHttp2FrameReader; import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; import io.netty.handler.codec.http2.DefaultHttp2Headers; -import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController; -import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController; +import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController; import io.netty.handler.codec.http2.Http2CodecUtil; import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2Error; +import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2FrameReader; import io.netty.handler.codec.http2.Http2FrameWriter; import io.netty.handler.codec.http2.Http2Headers; -import io.netty.handler.codec.http2.Http2OutboundFlowController; import io.netty.handler.codec.http2.Http2Settings; import org.junit.Before; @@ -233,9 +232,10 @@ public void connectionErrorShouldCloseChannel() throws Exception { handler.channelRead(ctx, badFrame()); // Verify the expected GO_AWAY frame was written. - Exception e = protocolError("Frame length 0 incorrect size for ping."); + Exception e = connectionError(Http2Error.PROTOCOL_ERROR, + "Frame length 0 incorrect size for ping."); ByteBuf expected = - goAwayFrame(STREAM_ID, (int) Http2Error.PROTOCOL_ERROR.code(), toByteBuf(ctx, e)); + goAwayFrame(STREAM_ID, (int) Http2Error.FRAME_SIZE_ERROR.code(), toByteBuf(ctx, e)); ByteBuf actual = captureWrite(ctx); assertEquals(expected, actual); @@ -264,6 +264,7 @@ private void createStream() throws Exception { .path(new AsciiString("/foo.bar")); ByteBuf headersFrame = headersFrame(STREAM_ID, headers); handler.channelRead(ctx, headersFrame); + ArgumentCaptor streamCaptor = ArgumentCaptor.forClass(NettyServerStream.class); @SuppressWarnings("rawtypes") @@ -309,15 +310,12 @@ private static NettyServerHandler newHandler(ServerTransportListener transportLi Http2Connection connection = new DefaultHttp2Connection(true); Http2FrameReader frameReader = new DefaultHttp2FrameReader(); Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); - DefaultHttp2InboundFlowController inboundFlow = - new DefaultHttp2InboundFlowController(connection, frameWriter); - Http2OutboundFlowController outboundFlow = - new DefaultHttp2OutboundFlowController(connection, frameWriter); + DefaultHttp2LocalFlowController inboundFlow = + new DefaultHttp2LocalFlowController(connection, frameWriter); return new NettyServerHandler(transportListener, connection, frameReader, frameWriter, - inboundFlow, - outboundFlow); + inboundFlow); } }