Skip to content

Commit 5ddaf98

Browse files
committed
Add HttpProtocolNegotiationHandler
After SSL handshake and protocol negotiation, the client configure the pipeline based on the negotiation result.
1 parent 1cff196 commit 5ddaf98

File tree

6 files changed

+259
-38
lines changed

6 files changed

+259
-38
lines changed

driver/src/main/java/oracle/nosql/driver/http/NoSQLHandleImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,12 @@ private void configSslContext(NoSQLHandleConfig config) {
135135
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
136136
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
137137
ApplicationProtocolNames.HTTP_2));
138+
} else {
139+
builder.applicationProtocolConfig(
140+
new ApplicationProtocolConfig(ApplicationProtocolConfig.Protocol.ALPN,
141+
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
142+
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
143+
ApplicationProtocolNames.HTTP_1_1));
138144
}
139145
config.setSslContext(builder.build());
140146
} catch (SSLException se) {

driver/src/main/java/oracle/nosql/driver/httpclient/Http2SettingsHandler.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,19 @@
44
import io.netty.channel.ChannelPromise;
55
import io.netty.channel.SimpleChannelInboundHandler;
66
import io.netty.handler.codec.http2.Http2Settings;
7+
import io.netty.util.internal.RecyclableArrayList;
78

89
import java.util.concurrent.TimeUnit;
910

1011
public class Http2SettingsHandler extends SimpleChannelInboundHandler<Http2Settings> {
1112
private final ChannelPromise promise;
13+
RecyclableArrayList bufferedMessages;
14+
ChannelHandlerContext ctx;
1215

13-
public Http2SettingsHandler(ChannelPromise promise) {
14-
this.promise = promise;
16+
public Http2SettingsHandler(ChannelHandlerContext ctx, RecyclableArrayList bufferedMessages) {
17+
this.ctx = ctx;
18+
this.promise = ctx.newPromise();
19+
this.bufferedMessages = bufferedMessages;
1520
}
1621

1722
public void awaitSettings(long timeout, TimeUnit unit) throws Exception {
@@ -23,6 +28,19 @@ public void awaitSettings(long timeout, TimeUnit unit) throws Exception {
2328
@Override
2429
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Http2Settings http2Settings) throws Exception {
2530
promise.setSuccess();
31+
fireBufferedMessages();
2632
channelHandlerContext.pipeline().remove(this);
2733
}
34+
35+
private void fireBufferedMessages() {
36+
if (!this.bufferedMessages.isEmpty()) {
37+
for(int i = 0; i < this.bufferedMessages.size(); ++i) {
38+
Pair<Object, ChannelPromise> p = (Pair<Object, ChannelPromise>)this.bufferedMessages.get(i);
39+
this.ctx.channel().write(p.first, p.second);
40+
}
41+
42+
this.bufferedMessages.clear();
43+
}
44+
this.bufferedMessages.recycle();
45+
}
2846
}

driver/src/main/java/oracle/nosql/driver/httpclient/HttpClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.netty.channel.socket.nio.NioSocketChannel;
3232
import io.netty.handler.codec.http.DefaultFullHttpRequest;
3333
import io.netty.handler.codec.http.HttpRequest;
34+
import io.netty.handler.ssl.ApplicationProtocolNames;
3435
import io.netty.handler.ssl.SslContext;
3536
import io.netty.util.AttributeKey;
3637
import io.netty.util.concurrent.Future;
@@ -311,6 +312,10 @@ int getHandshakeTimeoutMs() {
311312
return handshakeTimeoutMs;
312313
}
313314

315+
public String getFallbackProtocol() {
316+
return ApplicationProtocolNames.HTTP_1_1;
317+
}
318+
314319
public int getMaxContentLength() {
315320
return maxContentLength;
316321
}

driver/src/main/java/oracle/nosql/driver/httpclient/HttpClientChannelPoolHandler.java

Lines changed: 6 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package oracle.nosql.driver.httpclient;
99

10-
import static io.netty.handler.logging.LogLevel.DEBUG;
1110
import static oracle.nosql.driver.util.LogUtil.logFine;
1211

1312
import java.net.InetSocketAddress;
@@ -23,9 +22,6 @@
2322
import io.netty.channel.EventLoop;
2423
import io.netty.channel.pool.ChannelHealthChecker;
2524
import io.netty.channel.pool.ChannelPoolHandler;
26-
import io.netty.handler.codec.http.HttpClientCodec;
27-
import io.netty.handler.codec.http.HttpObjectAggregator;
28-
import io.netty.handler.codec.http2.*;
2925
import io.netty.handler.proxy.HttpProxyHandler;
3026
import io.netty.handler.ssl.SslHandler;
3127
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
@@ -40,12 +36,6 @@
4036
public class HttpClientChannelPoolHandler implements ChannelPoolHandler,
4137
ChannelHealthChecker {
4238

43-
private static final Http2FrameLogger logger = new Http2FrameLogger(DEBUG, HttpClientChannelPoolHandler.class);
44-
45-
private static final String CODEC_HANDLER_NAME = "http-codec";
46-
private static final String AGG_HANDLER_NAME = "http-aggregator";
47-
private static final String HTTP_HANDLER_NAME = "http-response-handler";
48-
4939
private final HttpClient client;
5040

5141
/**
@@ -85,33 +75,13 @@ public void channelCreated(Channel ch) {
8575

8676
p.addLast(sslHandler);
8777
p.addLast(new ChannelLoggingHandler(client));
78+
// Handle ALPN protocol negotiation result, and configure the pipeline accordingly
79+
p.addLast(new HttpProtocolNegotiationHandler(
80+
client.getFallbackProtocol(), new HttpClientHandler(client.getLogger()), client.getMaxChunkSize(),
81+
client.getMaxContentLength(), client.getLogger()));
82+
} else {
83+
// TODO: H2C upgrade
8884
}
89-
if (client.isHttp2()) {
90-
Http2Connection connection = new DefaultHttp2Connection(false);
91-
HttpToHttp2ConnectionHandler connectionHandler = new HttpToHttp2ConnectionHandlerBuilder()
92-
.frameListener(new DelegatingDecompressorFrameListener(
93-
connection,
94-
new InboundHttp2ToHttpAdapterBuilder(connection)
95-
.maxContentLength(client.getMaxContentLength())
96-
.propagateSettings(true)
97-
.build()))
98-
.frameLogger(logger)
99-
.connection(connection)
100-
.build();
101-
Http2SettingsHandler settingsHandler = new Http2SettingsHandler(ch.newPromise());
102-
103-
p.addLast(connectionHandler);
104-
p.addLast(settingsHandler);
105-
} else { // http_1_1
106-
p.addLast(CODEC_HANDLER_NAME, new HttpClientCodec
107-
(4096, // initial line
108-
8192, // header size
109-
client.getMaxChunkSize()));
110-
p.addLast(AGG_HANDLER_NAME, new HttpObjectAggregator(
111-
client.getMaxContentLength()));
112-
}
113-
p.addLast(HTTP_HANDLER_NAME,
114-
new HttpClientHandler(client.getLogger()));
11585

11686
if (client.getProxyHost() != null) {
11787
InetSocketAddress sockAddr =
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*-
2+
* Copyright (c) 2011, 2022 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* Licensed under the Universal Permissive License v 1.0 as shown at
5+
* https://oss.oracle.com/licenses/upl/
6+
*/
7+
8+
package oracle.nosql.driver.httpclient;
9+
10+
import static io.netty.handler.logging.LogLevel.DEBUG;
11+
import static oracle.nosql.driver.util.LogUtil.logFine;
12+
13+
import java.net.SocketAddress;
14+
import java.util.Objects;
15+
import java.util.logging.Logger;
16+
17+
import io.netty.channel.ChannelHandlerContext;
18+
import io.netty.channel.ChannelOutboundHandler;
19+
import io.netty.channel.ChannelPipeline;
20+
import io.netty.channel.ChannelPromise;
21+
import io.netty.handler.codec.http.HttpClientCodec;
22+
import io.netty.handler.codec.http.HttpMessage;
23+
import io.netty.handler.codec.http.HttpObjectAggregator;
24+
import io.netty.handler.codec.http2.DefaultHttp2Connection;
25+
import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
26+
import io.netty.handler.codec.http2.Http2Connection;
27+
import io.netty.handler.codec.http2.Http2FrameLogger;
28+
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;
29+
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder;
30+
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapterBuilder;
31+
import io.netty.handler.ssl.ApplicationProtocolNames;
32+
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
33+
import io.netty.util.internal.RecyclableArrayList;
34+
35+
/**
36+
* Handle TLS protocol negotiation result, either Http1.1 or H2
37+
*
38+
* The channel initialization process:
39+
* 1. Channel aquired from {@link ConnectionPool} after channel is active.
40+
* 2. SSL negotiation started, pipeline is not ready.
41+
* 3. {@link HttpProtocolNegotiationHandler} holds all {@link HttpMessage} while waiting for the negotiation result.
42+
* 4. Negotiation finished, {@link HttpProtocolNegotiationHandler} changes the pipeline according to the protocol selected.
43+
* 5. {@link HttpProtocolNegotiationHandler} removes itself from the pipeline. Writes any buffered {@link HttpMessage} to the channel.
44+
*/
45+
public class HttpProtocolNegotiationHandler extends ApplicationProtocolNegotiationHandler implements ChannelOutboundHandler {
46+
private static final Http2FrameLogger frameLogger = new Http2FrameLogger(DEBUG, HttpProtocolNegotiationHandler.class);
47+
48+
private static final String CODEC_HANDLER_NAME = "http-codec";
49+
private static final String AGG_HANDLER_NAME = "http-aggregator";
50+
private static final String HTTP_HANDLER_NAME = "http-client-handler";
51+
52+
private final Logger logger;
53+
private final RecyclableArrayList bufferedMessages = RecyclableArrayList.newInstance();
54+
private final HttpClientHandler handler;
55+
private final int maxChunkSize;
56+
private final int maxContentLength;
57+
58+
public HttpProtocolNegotiationHandler(String fallbackProtocol, HttpClientHandler handler, int maxChunkSize, int maxContentLength, Logger logger) {
59+
super(fallbackProtocol);
60+
61+
this.logger = logger;
62+
this.handler = handler;
63+
this.maxChunkSize = maxChunkSize;
64+
this.maxContentLength = maxContentLength;
65+
}
66+
67+
private void writeBufferedMessages(ChannelHandlerContext ctx) {
68+
if (!this.bufferedMessages.isEmpty()) {
69+
for(int i = 0; i < this.bufferedMessages.size(); ++i) {
70+
Pair<Object, ChannelPromise> p = (Pair<Object, ChannelPromise>)this.bufferedMessages.get(i);
71+
ctx.channel().write(p.first, p.second);
72+
}
73+
74+
this.bufferedMessages.clear();
75+
}
76+
this.bufferedMessages.recycle();
77+
}
78+
79+
private void configureHttp1(ChannelHandlerContext ctx) {
80+
ChannelPipeline p = ctx.pipeline();
81+
82+
p.addLast(CODEC_HANDLER_NAME,
83+
new HttpClientCodec(4096, // initial line
84+
8192, // header size
85+
maxChunkSize)); // chunksize
86+
p.addLast(AGG_HANDLER_NAME,
87+
new HttpObjectAggregator(maxContentLength));
88+
}
89+
90+
private void configureHttp2(ChannelHandlerContext ctx) {
91+
ChannelPipeline p = ctx.pipeline();
92+
93+
Http2Connection connection = new DefaultHttp2Connection(false);
94+
HttpToHttp2ConnectionHandler connectionHandler = new HttpToHttp2ConnectionHandlerBuilder()
95+
.frameListener(new DelegatingDecompressorFrameListener(
96+
connection,
97+
new InboundHttp2ToHttpAdapterBuilder(connection)
98+
.maxContentLength(this.maxContentLength)
99+
.propagateSettings(false)
100+
.build()))
101+
.frameLogger(frameLogger)
102+
.connection(connection)
103+
.build();
104+
105+
p.addLast(connectionHandler);
106+
}
107+
108+
@Override
109+
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {
110+
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
111+
configureHttp2(ctx);
112+
} else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
113+
configureHttp1(ctx);
114+
} else {
115+
throw new IllegalStateException("unknown http protocol: " + protocol);
116+
}
117+
logFine(this.logger, "HTTP protocol selected: " + protocol);
118+
ctx.pipeline().addLast(HTTP_HANDLER_NAME, handler);
119+
}
120+
121+
/*
122+
* User can write requests right after the channel is active, while protocol
123+
* negotiation is still in progress. At this stage the pipeline is not ready
124+
* to write http request so we must hold them here.
125+
*/
126+
@Override
127+
public void write(ChannelHandlerContext ctx, Object o, ChannelPromise channelPromise) throws Exception {
128+
if (o instanceof HttpMessage) {
129+
Pair<Object, ChannelPromise> p = Pair.of(o, channelPromise);
130+
this.bufferedMessages.add(p);
131+
return;
132+
}
133+
134+
// let non-http message to pass, so the HTTP2 preface and settings frame can be sent
135+
ctx.write(o, channelPromise);
136+
}
137+
138+
/*
139+
* Protocol negotiation finish, handler removed, the pipeline is
140+
* ready to handle http messages. Write previousely buffered http messages.
141+
*/
142+
@Override
143+
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
144+
super.handlerRemoved(ctx);
145+
this.writeBufferedMessages(ctx);
146+
}
147+
148+
@Override
149+
public void bind(ChannelHandlerContext ctx, SocketAddress socketAddress, ChannelPromise channelPromise) {
150+
ctx.bind(socketAddress, channelPromise);
151+
}
152+
153+
@Override
154+
public void connect(ChannelHandlerContext ctx, SocketAddress socketAddress, SocketAddress socketAddress1, ChannelPromise channelPromise) throws Exception {
155+
ctx.connect(socketAddress, socketAddress1, channelPromise);
156+
}
157+
158+
@Override
159+
public void disconnect(ChannelHandlerContext ctx, ChannelPromise channelPromise) {
160+
ctx.disconnect(channelPromise);
161+
}
162+
163+
@Override
164+
public void close(ChannelHandlerContext ctx, ChannelPromise channelPromise) throws Exception {
165+
ctx.close(channelPromise);
166+
}
167+
168+
@Override
169+
public void deregister(ChannelHandlerContext ctx, ChannelPromise channelPromise) {
170+
ctx.deregister(channelPromise);
171+
}
172+
173+
@Override
174+
public void read(ChannelHandlerContext ctx) throws Exception {
175+
ctx.read();
176+
}
177+
178+
@Override
179+
public void flush(ChannelHandlerContext ctx) {
180+
ctx.flush();
181+
}
182+
183+
private static class Pair<A, B> {
184+
185+
public final A first;
186+
public final B second;
187+
188+
public Pair(A fst, B snd) {
189+
this.first = fst;
190+
this.second = snd;
191+
}
192+
193+
public String toString() {
194+
return "Pair[" + first + "," + second + "]";
195+
}
196+
197+
public boolean equals(Object other) {
198+
if (other instanceof Pair<?, ?>) {
199+
return Objects.equals(first, ((Pair<?, ?>) other).first) &&
200+
Objects.equals(second, ((Pair<?, ?>) other).second);
201+
}
202+
return false;
203+
}
204+
205+
public int hashCode() {
206+
if (first == null)
207+
return (second == null) ? 0 : second.hashCode() + 1;
208+
else if (second == null)
209+
return first.hashCode() + 2;
210+
else
211+
return first.hashCode() * 17 + second.hashCode();
212+
}
213+
214+
public static <A, B> Pair<A, B> of(A a, B b) {
215+
return new Pair<>(a, b);
216+
}
217+
}
218+
}
219+

driver/src/test/java/oracle/nosql/driver/ProxyTestBase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,9 @@ protected NoSQLHandle getHandle(NoSQLHandleConfig config) {
464464
logger.setLevel(Level.parse(level));
465465
config.setLogger(logger);
466466

467+
boolean useHttp2 = Boolean.getBoolean("test.http2");
468+
config.useHttp2(useHttp2);
469+
467470
/*
468471
* Open the handle
469472
*/

0 commit comments

Comments
 (0)